123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626 |
- #!/usr/bin/env python3.5
- # Copyright 2017 Digital
- #
- # This file is part of DigiLib.
- #
- # DigiLib is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # DigiLib is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with DigiLib. If not, see <http://www.gnu.org/licenses/>.
- import logging
- import logging.handlers
- import os
- import queue
- import select
- import socket
- import sys
- import threading
- import time
- import traceback
- import blinker
- import curio
- lclient = logging.getLogger(__name__+".client")
- lserver = logging.getLogger(__name__+".server")
- lschat = logging.getLogger(__name__+".server.chat")
- lcchat = logging.getLogger(__name__+".client.chat")
- class ConnHandlerBase(object):
- def __init__(self, socket, addr, server):
- self.status = "init"
- super(ConnHandlerBase, self).__init__()
- self.socket = socket
- self.addr = addr
- self.server = server
- self.block_size = 1024
- # self.welcome_client()
- self.status="connected"
- async def welcome_client(self):
- pass
- async def handle(self, data):
- return
- async def recv(self):
- data_received = await self.socket.recv(self.block_size)
- data_decoded = data_received.decode("utf-8")
- return data_decoded
- async def send(self, msg):
- msg_encoded = bytes(msg, "utf-8")
- lschat.info("Server:"+msg)
- try:
- await self.socket.send(msg_encoded)
- return True
- except Exception as e:
- lserver.error(e, exc_info=True)
- return False
- def close(self):
- self.status = "closed"
- try:
- self.socket.shutdown(0)
- except:
- lserver.debug("error during socket shutdown")
- try:
- self.socket.close()
- except:
- lserver.debug("error closing socket")
- class ConnHandlerEcho(ConnHandlerBase):
- def __init__(self, socket, addr, server):
- self.status = "init"
- self.super_class = super(ConnHandlerEcho, self)
- self.super_class.__init__(socket, addr, server)
- self.server = server
- def welcome_client(self):
- self.send("welcome to the client")
- def handle(self, data):
- lschat.info("Client:"+data)
- for h in list(set(self.server.connection_handler)-{self}):
- h.send(data)
- class Server(object):
- """docstring for SocketHandler"""
- def __init__(self,
- host,
- port=None,
- af_family="AF_INET",
- log_ip=False,
- max_allowed_clients=5,
- handler=None,
- handler_kwargs={},
- ):
- super(Server, self).__init__()
- self.exit_event = False
- self.host=host
- self.port=port
- self.af_family = af_family
- self.log_ip = log_ip
- self.handler_kwargs = handler_kwargs
- self.handler = handler
- self.max_allowed_clients = max_allowed_clients
- self.socket = self.make_socket()
- self.connection_handler = []
- self.conn_to_addr = {}
- self.addr_to_conn = {}
- self.conn_to_handler ={}
- self.handler_to_conn = {}
- self.read_sockets_expected = [self.socket]
- def cleanup(self):
- pass
- def make_socket(self):
- lserver.debug("making a {} socket".format(self.af_family))
- if self.af_family == "AF_INET":
- s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
- elif self.af_family == "AF_UNIX":
- s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
- else:
- raise ValueError(
- "AF_FAMILY '{}' not supported!".format(
- self.af_family
- )
- )
- s = curio.io.Socket(s)
- return s
- def make_handler(self, conn, addr):
- return self.handler(conn, addr, self, **self.handler_kwargs)
- def register_conn(self, conn, addr):
- # if self.log_ip:
- # lserver.info("New connection from {} on port {}".format(*addr))
- self.read_sockets_expected.append(conn)
- if addr:
- self.conn_to_addr[conn] = addr
- self.addr_to_conn[addr] = conn
- def unregister_conn(self, conn):
- self.read_sockets_expected.remove(conn)
- addr = self.conn_to_addr.get(conn, False)
- if addr:
- del self.addr_to_conn[addr]
- del self.conn_to_addr[conn]
- def register_handler(self, handler, conn):
- self.connection_handler.append(handler)
- self.conn_to_handler[conn] = handler
- self.handler_to_conn[handler] = conn
- def unregister_handler(self, handler, conn):
- self.connection_handler.remove(handler)
- del self.conn_to_handler[conn]
- del self.handler_to_conn[handler]
- def setup(self):
- lserver.info("setting up socket")
- if self.af_family == "AF_INET":
- self.socket.bind((self.host, self.port))
- elif self.af_family == "AF_UNIX":
- if os.path.exists(self.host):
- lserver.debug("file already exists")
- lserver.debug("attempting to remove it")
- os.remove(self.host)
- self.socket.bind(self.host)
- self.socket.listen(self.max_allowed_clients)
- # self.socket.settimeout(1)
- def start(self):
- # lserver.debug(dir(self))
- curio.run(self.run)
- async def run(self):
- self.setup()
- lserver.debug("entering main loop")
- while ( not self.exit_event ):
- # lserver.debug(self.read_sockets_expected)
- # lserver.debug(self.write_sockets_expected)
- # lserver.debug(self.exc_sockets_expected)
- # read_sockets = select.select(self.read_sockets_expected,[],[])
- lserver.debug("waiting for client to connect")
- conn,addr = await self.socket.accept()
- if self.log_ip:
- lserver.info(
- "new client connection, {}:{}!".format(*addr))
- else:
- lserver.info("a new client connected, let's handle it")
- handler = self.make_handler(conn, addr)
- self.register_conn(conn, addr)
- self.register_handler(handler, conn)
- await curio.spawn(self.wait_for_client(conn,handler))
- # for s in read_sockets_confirmed:
- # socket_handler = self.conn_to_handler.get(s, None)
- # if ( s == self.socket ):
- # lserver.debug("handling new client")
- # conn, addr = self.socket.accept()
- # if self.log_ip:
- # lserver.info(
- # "New connection from {} on port {}".format(*addr))
- # handler = self.make_handler(conn, addr)
- # self.register_conn(conn, addr)
- # self.register_handler(handler, conn)
- # elif ( socket_handler
- # and (socket_handler in self.connection_handler) ):
- # lserver.debug("handling client connection")
- # try:
- # data = socket_handler.recv()
- # if not data:
- # lserver.info("connection {} closed".format(self.socket))
- # self.unregister_handler(socket_handler, s)
- # self.unregister_conn(conn)
- # socket_handler.close()
- # else:
- # lschat.info("Client:"+data.strip())
- # socket_handler.handle(data)
- # except Exception as e:
- # lserver.error(e, exc_info=True)
- # else:
- # lserver.debug("else!")
- # lserver.debug(socket_handler)
- # time.sleep(1)
- async def wait_for_client(self,socket,handler):
- while True:
- try:
- if self.log_ip:
- lserver.debug(
- "waiting for {} to send something"
- .format(socket.getsockname()))
- else:
- lserver.debug(
- "waiting for the client to send something")
- data = await handler.recv()
- if not data:
- if self.log_ip:
- lserver.info(
- "the connection to {} was closed"
- .format(socket.getsockname()))
- else:
- lserver.info(
- "the connection to the client was closed")
- self.unregister_handler(handler, socket)
- self.unregister_conn(socket)
- handler.close()
- else:
- lschat.info("Client:"+data.strip())
- await handler.handle(data)
- except Exception as e:
- lserver.error(e, exc_info=True)
- # saver to use time.sleep
- time.sleep(0.1)
- class Client(threading.Thread):
- """docstring for Client"""
- is_connecting = False
- is_connected = False
- status = "uninitialized"
- def __init__(self,
- host,
- port=None,
- af_family="AF_INET",
- handle_data_func=None,
- error_handler=None,
- block_size=1024,
- ):
- self.super_class = super(Client, self)
- self.super_class.__init__()
- self.name = "Client"
- self.exit_event = False
- self.host = host
- self.port = port
- self.af_family = af_family
- self.block_size = block_size
- self.handle_data_func = handle_data_func
- self.is_connected = False
- self.error_handler = error_handler
- # self.socket = self.make_socket()
- self.socket = None
- self.status = "disconnected"
- def connect(self):
- self.status = "connecting"
- self.socket = self.make_socket()
- lclient.info(
- "connecting to socket '{}' of type {}".format(
- self.host,
- self.af_family
- )
- )
- try:
- if self.af_family == "AF_INET":
- self.socket.connect((self.host, self.port))
- elif self.af_family == "AF_UNIX":
- if os.path.exists(self.host):
- self.socket.connect(self.host)
- else:
- lclient.warn("File not found. Aborting.")
- return
- self.is_connected = True
- self.status = "connected"
- lclient.info("connected")
- return True
- except Exception as e:
- lclient.debug(e, exc_info=True)
- if type(e) is ConnectionRefusedError:
- lclient.info("failed to connect to socket '{}'".format(self.host))
- self.disconnect()
- return False
- def disconnect(self):
- lclient.info("disconnecting from socket '{}'".format(self.host))
- self.is_connected = False
- self.status = "disconnected"
- if self.socket:
- try:
- self.socket.shutdown(socket.SHUT_RDWR)
- except Exception as e:
- lclient.error(e)
- try:
- self.socket.close()
- except Exception as e:
- lclient.error("error occured while closing the socket, " +
- "maybe it is already closed",exc_info=e)
- del self.socket
- self.socket = None
- def handle_data(self, data_received):
- data_decoded = data_received.decode("utf-8")
- lcchat.info("Server: "+data_decoded)
- if self.handle_data_func:
- try:
- self.handle_data_func(data_decoded)
- except Exception as e:
- lclient.error(e, exc_info=True)
- def is_running(self):
- return (self in threading.enumerate())
- def make_socket(self):
- lclient.info("creating a {} socket".format(self.af_family))
- if self.af_family == "AF_INET":
- s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
- elif self.af_family == "AF_UNIX":
- s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
- else:
- raise ValueError(
- "AF_FAMILY '{}' not supported!".format(
- self.af_family
- )
- )
- return s
- def main_loop(self):
- lclient.debug("starting main loop")
- while ( not self.exit_event ):
- if not self.status in ["connected"]:
- time.sleep(0.1)
- continue
- # print(0)
- read_confirmed, write_confirmed, exc_confirmed \
- = select.select(
- [self.socket],
- [],
- [self.socket],
- 1
- )
- if self.socket in exc_confirmed:
- self.is_connected = False
- lclient.warning("socket is expected to corrupt, exiting")
- self.disconnect()
- # self.stop()
- break
- elif self.socket in read_confirmed:
- try:
- data_received = self.read_from_socket()
- if data_received == b'':
- lclient.info("connection is broken, closing socket exiting")
- self.disconnect()
- # self.stop()
- # break
- else:
- try:
- self.handle_data(data_received)
- except Exception as e:
- lserver.error(
- "Error while handling data",
- exc_info=e
- )
- except Exception as e:
- lclient.error(e, exc_info=True)
- if type(e) is OSError:
- self.is_connected = False
- lclient.warn("connection broken, exiting")
- self.disconnect()
- # self.stop()
- # break
- else:
- raise
- else:
- time.sleep(0.1)
- def read_from_socket(self):
- data_received = self.socket.recv(self.block_size)
- return data_received
- def run(self):
- # self.connect()
- if self.error_handler:
- self.error_handler(self.main_loop)
- else:
- self.main_loop()
- def send(self, msg):
- msg = msg.rstrip()
- msg_encoded = bytes(msg+"\r\n", "utf-8")
- try:
- lcchat.info("Client: "+msg)
- self.socket.send(msg_encoded)
- except Exception as e:
- self.is_connected = False
- lclient.error(e, exc_info=True)
- self.status = "shutdown"
- def setup(self):
- pass
- def stop(self,reason=None):
- self.disconnect()
- self.exit_event = True
- if reason:
- print(reason)
- class AsyncClient(object):
- """docstring for Client"""
- is_connecting = False
- is_connected = False
- status = "uninitialized"
- def __init__(self,
- host,
- port=None,
- af_family="AF_INET",
- handle_data_func=None,
- error_handler=None,
- block_size=1024,
- ):
- self.super_class = super(AsyncClient, self)
- self.super_class.__init__()
- self.name = "Client"
- self.exit_event = False
- self.host = host
- self.port = port
- self.af_family = af_family
- self.block_size = block_size
- self.handle_data_func = handle_data_func
- self.is_connected = False
- self.error_handler = error_handler
- self.socket = None
- self.status = "disconnected"
- def connect(self):
- self.status = "connecting"
- self.socket = self.make_socket()
- lclient.info("connecting to socket '{}' of type {}".format(
- self.host,self.af_family))
- try:
- if self.af_family == "AF_INET":
- self.socket.connect((self.host, self.port))
- elif self.af_family == "AF_UNIX":
- self.socket.connect(self.host)
- # if os.path.exists(self.host):
- # pass
- # else:
- # lclient.warn("File not found. Aborting.")
- # return
- self.is_connected = True
- self.status = "connected"
- lclient.info("connected")
- return True
- except Exception as e:
- lclient.debug(e, exc_info=True)
- if type(e) is ConnectionRefusedError:
- lclient.info("failed to connect to socket '{}'".format(self.host))
- self.disconnect()
- return False
- def disconnect(self):
- lclient.info("disconnecting from socket '{}'".format(self.host))
- self.is_connected = False
- self.status = "disconnected"
- if self.socket:
- try:
- self.socket.shutdown(socket.SHUT_RDWR)
- except Exception as e:
- lclient.error(e)
- try:
- self.socket.close()
- except Exception as e:
- lclient.error("error occured while closing the socket, " +
- "maybe it is already closed",exc_info=e)
- del self.socket
- self.socket = None
- def handle_data(self, data_received):
- data_decoded = data_received.decode("utf-8")
- lcchat.info("Server: "+data_decoded)
- if self.handle_data_func:
- try:
- self.handle_data_func(data_decoded)
- except Exception as e:
- lclient.error(e, exc_info=True)
- def is_running(self):
- return (self in threading.enumerate())
- def make_socket(self):
- lclient.info("creating a {} socket".format(self.af_family))
- if self.af_family == "AF_INET":
- s = trio.socket.socket(socket.AF_INET,socket.SOCK_STREAM)
- elif self.af_family == "AF_UNIX":
- s = trio.socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
- else:
- raise ValueError(
- "AF_FAMILY '{}' not supported!".format(
- self.af_family
- )
- )
- return s
- def main_loop(self):
- lclient.debug("starting main loop")
- while ( not self.exit_event ):
- if not self.status in ["connected"]:
- time.sleep(0.1)
- continue
- # print(0)
- read_confirmed, write_confirmed, exc_confirmed \
- = select.select(
- [self.socket],
- [],
- [self.socket],
- 1
- )
- if self.socket in exc_confirmed:
- self.is_connected = False
- lclient.warning("socket is expected to corrupt, exiting")
- self.disconnect()
- # self.stop()
- break
- elif self.socket in read_confirmed:
- try:
- data_received = self.read_from_socket()
- if data_received == b'':
- lclient.info("connection is broken, closing socket exiting")
- self.disconnect()
- # self.stop()
- # break
- else:
- self.handle_data(data_received)
- except Exception as e:
- lclient.error(e, exc_info=True)
- if type(e) is OSError:
- self.is_connected = False
- lclient.warn("connection broken, exiting")
- self.disconnect()
- # self.stop()
- # break
- else:
- raise
- else:
- time.sleep(0.1)
- def read_from_socket(self):
- data_received = self.socket.recv(self.block_size)
- return data_received
- def run(self,connect=False):
- if connect:
- self.connect()
- if self.error_handler:
- self.error_handler(self.main_loop)
- else:
- self.main_loop()
- def send(self, msg):
- msg = msg.rstrip()
- msg_encoded = bytes(msg+"\r\n", "utf-8")
- try:
- lcchat.info("Client: "+msg)
- self.socket.send(msg_encoded)
- except Exception as e:
- self.is_connected = False
- lclient.error(e, exc_info=True)
- self.status = "shutdown"
- def setup(self):
- pass
- def stop(self,reason=None):
- self.disconnect()
- self.exit_event = True
- if reason:
- print(reason)
- #
|