123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768 |
- import atexit
- import logging
- import logging.handlers
- import os
- import queue
- import select
- import socket
- import sys
- import threading
- import time
- import traceback
- import blinker
- import curio
- import digilib.misc
- lclient = logging.getLogger(__name__+".client")
- lserver = logging.getLogger(__name__+".server")
- lch = logging.getLogger(__name__+".chandler")
- lschat = logging.getLogger(__name__+".server.chat")
- lcchat = logging.getLogger(__name__+".client.chat")
- lerr = logging.getLogger(__name__+".errhandler")
- class ErrorHandler(object):
- """
- This Class provides a easy way to execute a function which is likely to
- raise an exception and log the traceback.
- """
- def __init__(self,logger):
- super().__init__()
- self.logger = logger
- def run(self,func,args=(),kwargs={},text=None,logger=None):
- if not logger:
- logger = self.logger
- if not text:
- print("ErrorHandler:WARNING:no text specified,"
- " this is highly discouraged")
- try:
- func(*args,**kwargs)
- except Exception as exc:
- logger.debug("error occured during "+text,exc_info=exc)
- async def arun(self,func,args=(),kwargs={},text=None,logger=None):
- if not logger:
- logger = self.logger
- if not text:
- print("ErrorHandler:WARNING:no text specified,"
- " this is highly discouraged")
- try:
- await func(*args,**kwargs)
- except Exception as exc:
- logger.debug("error occured during "+text,exc_info=exc)
- class ConnHandlerBase(object):
- """
- ConnectionHandlerBase is the base class for all connection handlers.
- It provides basic methodes. Consider inheriting form ConnectionHandler
- instead, as it provides better functionality.
- """
- addr = None
- block_size = 1024
- server = None
- def __init__(self, socket, addr, server):
- super(ConnHandlerBase, self).__init__()
- self.socket = socket
- self.addr = addr
- self.server = server
- async def disconnect(self):
- """
- disconenct explicitely disconnects the client, shuts the socket down
- and closes it.
- """
- try:
- await self.send(bytes())
- except:
- lch.debug("error during disconenct")
- try:
- await self.socket.shutdown(0)
- except:
- lch.debug("error during socket shutdown")
- try:
- await self.socket.close()
- except:
- lch.debug("error closing socket")
- async def handle(self, data):
- """
- This method is called for every message the server receives from the
- client. It should handle the data. Performing asynchronous blocking
- actions is ok, as the client loop does not wait for this method to
- finish. therefore, this method can be called multiple times at once,
- use curio locks if appropriate.
- """
- raise NotImplemented()
- async def recv(self,block_size=block_size):
- """
- This method waits for the client to send something and returns bytes!
- """
- data_received = await self.socket.recv(block_size)
- return data_received
- async def send(self, data, log_msg=None):
- """
- This method sends bytes to the client. Returns False if an exception
- was raised during sending, otherwise True.
- """
- if log_msg:
- lschat.info("server:"+str(log_msg))
- else:
- lschat.info("Server:"+str(data))
- try:
- await self.socket.send(data)
- return True
- except Exception as e:
- lch.error(e, exc_info=True)
- return False
- class ConnHandler(ConnHandlerBase):
- """
- More advanced connection handler than ConnectionHandlerBase. For
- instance, sends() takes a string and encodes it, recv() decodes
- the client's and returns a string and welcome_client is called after
- the ConnHandler is initialized (Not after the inheriting class is
- initialized though!)
- """
- def __init__(self, socket, addr, server):
- super(ConnHandler, self).__init__(socket,addr,server)
- async def disconnect(self):
- """
- disconenct() explicitely disconnects the client, performes a proper
- the shutdow on the socket and closes it.
- """
- try:
- await self.send("")
- except:
- lch.debug("error during disconenct")
- try:
- await self.socket.shutdown(0)
- except:
- lch.debug("error during socket shutdown")
- try:
- await self.socket.close()
- except:
- lch.debug("error closing socket")
- async def handle(self, data):
- return
- async def on_welcome(self):
- """
- This method can be used to send a welcome message to the client.
- """
- await self.send("Welcome client, this is the server sending!")
- async def recv(self,block_size=None):
- """
- This method waits for the client to send something, decodes it and
- returns a string.
- """
- if block_size == None:
- block_size = self.block_size
- data_received = await self.socket.recv(block_size)
- data_decoded = data_received.decode("utf-8")
- return data_decoded
- async def send(self, data, log_msg=False):
- """
- This method takes a string, encodes it and sends it to the client.
- Returns False if an exception was raised during sending, otherwise True.
- """
- if log_msg:
- lschat.info("server:"+log_msg)
- else:
- lschat.info("Server:"+data)
- data_encoded = bytes(data, "utf-8")
- try:
- await self.socket.send(data_encoded)
- return True
- except Exception as e:
- lch.error(e, exc_info=True)
- return False
- class ConnHandlerEcho(ConnHandler):
- """
- A Conn handler which sends everything it receives to every other client
- connected to the server
- """
- def __init__(self, socket, addr, server):
- super(ConnHandlerEcho, self).__init__(socket, addr, server)
- def handle(self, data):
- for h in self.server.connection_handler:
- if not h is self:
- h.send(data)
- class Server(object):
- """
- Server opens either an unix or an inet connection. For every new client
- a new ClientHandler object is created.
- _string_ **host** and _int_ **port** are the filename, hostname or ip
- address and the port respectively on which the connection will be opened.
- If you make an AF_UNIX socket, port is ignored so simply pass None.
- _object_ **handler_class** is the class (not an object of the class) used
- for making connection handlers.
- _dict_ **handler_kwargs** is a dict which will be passed as keyword
- argumetns to the __init__ function of handler_class when creating a new
- connection handler. Default is an emtpy dict.
- _string_ **af_family** specifies the AF_FAMILY socket type, valid options
- are: "AF_INET" for a inet socket and "AF_UNIX" for a unix (file) socket.
- Default is "AF_INET".
- _bool_ **log_ip** specifies wether the ip address of the server/client is logged.
- Default is False.
- _int_ **max_allowed_clients** specifies the maximum amount of clients
- connected to the server at once. Default is 5 (for now. this will change
- in the future).
- """
-
-
- exit_event = False
- def __init__(
- self,
- host,
- port,
- handler_class,
- handler_kwargs={},
- af_family="AF_INET",
- log_ip=False,
- max_allowed_clients=5,
- ):
- super(Server, self).__init__()
- self.host = host
- self.port = port
- self.handler_class = handler_class
- self.handler_kwargs = handler_kwargs
- self.af_family = af_family
- self.log_ip = log_ip
- self.max_allowed_clients = max_allowed_clients
-
- self.safe_handle = ErrorHandler(lserver)
-
-
- self.socket = None
-
-
- self.handle_tasks = curio.TaskGroup(name="tg_handle_clients")
-
- self.connection_handler = []
-
-
-
- atexit.register(self.shutdown)
- def make_socket(self):
- """
- factory method for sockets.
- this method makes a normal socket and wraps it in a curi.io.Socket wrapper
- """
- lserver.debug("making a {} socket".format(self.af_family))
- if self.af_family == "AF_INET":
- s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
- elif self.af_family == "AF_UNIX":
- s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
- else:
- raise ValueError(
- "AF_FAMILY '{}' not supported!".format(
- self.af_family
- )
- )
- s = curio.io.Socket(s)
- return s
- def make_handler(self, conn, addr):
- """
- factory method for handlers.
- this method creates a handler object from self.handler_class and self.handler_kwargs
- """
- return self.handler_class(conn, addr, self, **self.handler_kwargs)
- def setup(self):
- """
- creates a scoket, opens the connection and starts listening for
- clients. this method does not block, it returns after starting to
- listen.
- """
- lserver.info("setting up server")
- self.socket = self.make_socket()
- if self.af_family == "AF_INET":
- self.socket.bind((self.host, self.port))
- elif self.af_family == "AF_UNIX":
- if os.path.exists(self.host):
- lserver.debug("file already exists")
- lserver.debug("attempting to remove it")
- os.remove(self.host)
- self.socket.bind(self.host)
- self.socket.listen(self.max_allowed_clients)
- def shutdown(self):
- """
- this method can be called synchronous and calls the asynchronous
- shutdown method.
- """
- curio.run(self.async_shutdown)
- async def async_shutdown(self):
- """
- This method properly shuts down the sockets and closes them.
- it unregisters itself from atexit, so it doesn't get executed twice
- when it was manually called before to program exits.
- """
- atexit.unregister(self.shutdown)
- lserver.info("shutting down server")
- await self.safe_handle.arun(
- self.handle_tasks.cancel_remaining,text="handler cancel")
- try:
- await curio.ignore_after(0.01,self.handle_tasks.join)
- except Exception as exc:
- lserver.error("error joining tasks:",exc_info=exc)
-
-
- if self.socket:
- await self.safe_handle.arun(self.socket.shutdown,
- args=[socket.SHUT_RDWR,],text="socket shutdown")
- await self.safe_handle.arun(self.socket.close,text="socket close")
- def start(self):
- """
- this method starts the server. it is blocking.
- """
- self.setup()
- curio.run(self.run)
- async def run(self):
- """
- this method is the main loop of the Server. it waits for new client
- connections and creates a handle task for each of them. it does not
- receive or send anything.
- """
- lserver.debug("entering main loop")
- while ( not self.exit_event ):
- lserver.debug("waiting for client to connect")
-
- conn,addr = await self.socket.accept()
- if self.log_ip:
- lserver.info(
- "new client connection, {}:{}!".format(*addr))
- else:
- lserver.info("a new client connected, let's handle it")
- handler = self.make_handler(conn, addr)
-
-
- await self.handle_tasks.spawn(self.handle_client(conn,handler))
- async def handle_client(self,socket,handler):
- """
- This method waits for the client to send something and calls the
- ClientHandler's handle method. there is a handle_client method running for each client connected.
- """
- if hasattr(handler,"on_welcome"):
- await handler.on_welcome()
- while True:
- try:
- if self.log_ip:
- lserver.debug("waiting for {} to send something"
- .format(socket.getsockname()))
- else:
- lserver.debug("waiting for the client to send something")
-
- data = await handler.recv()
-
-
- if not data:
- if self.log_ip:
- lserver.info("the connection to {} was closed"
- .format(socket.getsockname()))
- else:
- lserver.info("the connection to the client was closed")
- await handler.disconnect()
-
-
- break
- else:
-
-
- lschat.info("Client:"+data.rstrip())
- await handler.handle(data)
- except Exception as e:
- lserver.error(e, exc_info=True)
-
-
- await curio.sleep(0.01)
-
-
-
-
-
- cur_task = await curio.current_task()
- await curio.ignore_after(0.01,cur_task.wait)
- class Client(threading.Thread):
- """docstring for Client"""
- status = "uninitialized"
- exit_event = False
- def __init__(
- self,
- host,
- port=None,
- af_family="AF_INET",
- handle_data_func=None,
- error_handler=None,
- block_size=1024,
- ):
- super().__init__()
- self.name = "Client"
- self.host = host
- self.port = port
- self.af_family = af_family
- self.block_size = block_size
- self.handle_data_func = handle_data_func
- self.error_handler = error_handler
- self.socket = None
- self.status = "disconnected"
- def connect(self):
- self.status = "connecting"
- self.socket = self.make_socket()
- lclient.info("connecting to socket '{}' of type {}"
- .format(self.host,self.af_family))
- try:
- if self.af_family == "AF_INET":
- self.socket.connect((self.host, self.port))
- elif self.af_family == "AF_UNIX":
- self.socket.connect(self.host)
- self.status = "connected"
- lclient.info("connected")
- return True
- except Exception as e:
- lclient.debug(e, exc_info=True)
- if type(e) is ConnectionRefusedError:
- lclient.info(
- "failed to connect to socket '{}'".format(self.host))
- self.disconnect()
- return False
- def disconnect(self):
- lclient.info("disconnecting from socket '{}'".format(self.host))
- self.status = "disconnected"
- if self.socket:
- try:
- self.socket.shutdown(socket.SHUT_RDWR)
- except Exception as e:
- lclient.error(e)
- try:
- self.socket.close()
- except Exception as e:
- lclient.error("error occured while closing the socket, " +
- "maybe it is already closed",exc_info=e)
- del self.socket
- self.socket = None
- def handle_data(self, data_received):
- data_decoded = data_received.decode("utf-8")
- lcchat.info("Server: "+data_decoded)
- if self.handle_data_func:
- try:
- self.handle_data_func(data_decoded)
- except Exception as e:
- lclient.error(e, exc_info=True)
- def is_running(self):
- return (self in threading.enumerate())
- def make_socket(self):
- lclient.info("creating a {} socket".format(self.af_family))
- if self.af_family == "AF_INET":
- s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
- elif self.af_family == "AF_UNIX":
- s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
- else:
- raise ValueError(
- "AF_FAMILY '{}' not supported!".format(
- self.af_family
- )
- )
- return s
- def main_loop(self):
- lclient.debug("starting main loop")
- while ( not self.exit_event ):
- if not self.status in ["connected"]:
- time.sleep(0.1)
- continue
-
- read_confirmed, write_confirmed, exc_confirmed \
- = select.select(
- [self.socket],
- [],
- [self.socket],
- 1
- )
- if self.socket in exc_confirmed:
- lclient.warning("socket is expected to corrupt, exiting")
- self.disconnect()
-
- break
- elif self.socket in read_confirmed:
- try:
- data_received = self.read_from_socket()
- if data_received == b'':
- lclient.info("connection is broken, closing socket exiting")
- self.disconnect()
-
-
- else:
- try:
- self.handle_data(data_received)
- except Exception as e:
- lserver.error(
- "Error while handling data",
- exc_info=e
- )
- except Exception as e:
- lclient.error(e, exc_info=True)
- if type(e) is OSError:
- lclient.warn("connection broken, exiting")
- self.disconnect()
-
-
- else:
- raise
- else:
- time.sleep(0.1)
- def read_from_socket(self):
- data_received = self.socket.recv(self.block_size)
- return data_received
- def run(self):
-
- if self.error_handler:
- self.error_handler(self.main_loop)
- else:
- self.main_loop()
- def send(self, msg):
- msg = msg.rstrip()
- msg_encoded = bytes(msg+"\r\n", "utf-8")
- try:
- lcchat.info("Client: "+msg)
- self.socket.send(msg_encoded)
- except Exception as e:
- lclient.error(e, exc_info=True)
- self.status = "shutdown"
- def setup(self):
- pass
- def stop(self,reason=None):
- self.disconnect()
- self.exit_event = True
- if reason:
- print(reason)
- class AsyncClient(object):
- """docstring for Client"""
- block_size = 1024
- status = "uninitialized"
- exit_event = False
- def __init__(
- self,
- host,
- port=None,
- af_family="AF_INET",
- handle_data_func=None,
- error_handler=None,
- block_size=1024,
- ):
- super().__init__()
- self.host = host
- self.port = port
- self.af_family = af_family
- self.block_size = block_size
- self.handle_data_func = handle_data_func
- self.error_handler = error_handler
- self.socket = None
- self.status = "disconnected"
- def connect(self):
- self.status = "connecting"
- self.socket = self.make_socket()
- lclient.info("connecting to socket '{}' of type {}".format(
- self.host,self.af_family))
- try:
- if self.af_family == "AF_INET":
- self.socket.connect((self.host, self.port))
- elif self.af_family == "AF_UNIX":
- self.socket.connect(self.host)
- self.is_connected = True
- self.status = "connected"
- lclient.info("connected")
- return True
- except Exception as e:
- lclient.debug(e, exc_info=True)
- if type(e) is ConnectionRefusedError:
- lclient.info("failed to connect to socket '{}'".format(self.host))
- self.disconnect()
- return False
- def disconnect(self):
- lclient.info("disconnecting from socket '{}'".format(self.host))
- self.is_connected = False
- self.status = "disconnected"
- if self.socket:
- try:
- self.socket.shutdown(socket.SHUT_RDWR)
- except Exception as e:
- lclient.error(e)
- try:
- self.socket.close()
- except Exception as e:
- lclient.error("error occured while closing the socket, " +
- "maybe it is already closed",exc_info=e)
- del self.socket
- self.socket = None
- def handle_data(self, data_received):
- data_decoded = data_received.decode("utf-8")
- lcchat.info("Server: "+data_decoded)
- if self.handle_data_func:
- try:
- self.handle_data_func(data_decoded)
- except Exception as e:
- lclient.error(e, exc_info=True)
- def is_running(self):
- return (self in threading.enumerate())
- def make_socket(self):
- lclient.info("creating a {} socket".format(self.af_family))
- if self.af_family == "AF_INET":
- s = trio.socket.socket(socket.AF_INET,socket.SOCK_STREAM)
- elif self.af_family == "AF_UNIX":
- s = trio.socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
- else:
- raise ValueError(
- "AF_FAMILY '{}' not supported!".format(
- self.af_family
- )
- )
- return s
- async def read_from_socket(self):
- data_received = await self.socket.recv(self.block_size)
- return data_received
- async def recv(self):
- data_received = await self.socket.recv(self.block_size)
- data_decoded = data_received.decode("utf-8")
- return data_decoded
- async def run(self):
- lclient.debug("starting main loop")
- while ( not self.exit_event ):
- if not self.status in ["connected"]:
- time.sleep(0.1)
- continue
-
-
-
-
-
-
-
- try:
- data = await self.read_from_socket()
- if not data:
- lclient.info("connection closed")
- self.disconnect()
- else:
- self.handle_data(data)
- except Exception as e:
- lclient.error(e, exc_info=True)
- if type(e) is OSError:
- self.is_connected = False
- lclient.warn("connection broken, exiting")
- self.disconnect()
- else:
- raise
- async def start(self,connect=False):
- if connect:
- self.connect()
- if self.error_handler:
- self.error_handler(self.run)
- else:
- self.run()
- def send(self, msg):
- msg = msg.rstrip()
- msg_encoded = bytes(msg+"\r\n", "utf-8")
- try:
- lcchat.info("Client: "+msg)
- self.socket.send(msg_encoded)
- except Exception as e:
- self.is_connected = False
- lclient.error(e, exc_info=True)
- self.disconnect()
-
-
- def setup(self):
- pass
- def stop(self,reason=None):
- self.disconnect()
- self.exit_event = True
- if reason:
- print(reason)
|