#!/usr/bin/env python3.5 # Copyright 2017 Digital # # This file is part of DigiLib. # # DigiLib is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # DigiLib is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with DigiLib. If not, see . import atexit import logging import logging.handlers import os import queue import select import socket import sys import threading import time import traceback import blinker import curio import digilib.misc lclient = logging.getLogger(__name__+".client") lserver = logging.getLogger(__name__+".server") lch = logging.getLogger(__name__+".chandler") lschat = logging.getLogger(__name__+".server.chat") lcchat = logging.getLogger(__name__+".client.chat") lerr = logging.getLogger(__name__+".errhandler") class ErrorHandler(object): """ This Class provides a easy way to execute a function which is likely to raise an exception and log the traceback. """ def __init__(self,logger): super().__init__() self.logger = logger def run(self,func,args=(),kwargs={},text=None,logger=None): if not logger: logger = self.logger if not text: print("ErrorHandler:WARNING:no text specified," " this is highly discouraged") try: func(*args,**kwargs) except Exception as exc: logger.debug("error occured during "+text,exc_info=exc) async def arun(self,func,args=(),kwargs={},text=None,logger=None): if not logger: logger = self.logger if not text: print("ErrorHandler:WARNING:no text specified," " this is highly discouraged") try: await func(*args,**kwargs) except Exception as exc: logger.debug("error occured during "+text,exc_info=exc) class ConnHandlerBase(object): """ ConnectionHandlerBase is the base class for all connection handlers. It provides basic methodes. Consider inheriting form ConnectionHandler instead, as it provides better functionality. """ addr = None block_size = 1024 server = None def __init__(self, socket, addr, server): super(ConnHandlerBase, self).__init__() self.socket = socket self.addr = addr self.server = server async def disconnect(self): """ disconenct explicitely disconnects the client, shuts the socket down and closes it. """ try: await self.send(bytes()) except: lch.debug("error during disconenct") try: await self.socket.shutdown(0) except: lch.debug("error during socket shutdown") try: await self.socket.close() except: lch.debug("error closing socket") async def handle(self, data): """ This method is called for every message the server receives from the client. It should handle the data. Performing asynchronous blocking actions is ok, as the client loop does not wait for this method to finish. therefore, this method can be called multiple times at once, use curio locks if appropriate. """ raise NotImplemented() async def recv(self,block_size=block_size): """ This method waits for the client to send something and returns bytes! """ data_received = await self.socket.recv(block_size) return data_received async def send(self, data, log_msg=None): """ This method sends bytes to the client. Returns False if an exception was raised during sending, otherwise True. """ if log_msg: lschat.info("server:"+str(log_msg)) else: lschat.info("Server:"+str(data)) try: await self.socket.send(data) return True except Exception as e: lch.error(e, exc_info=True) return False class ConnHandler(ConnHandlerBase): """ More advanced connection handler than ConnectionHandlerBase. For instance, sends() takes a string and encodes it, recv() decodes the client's and returns a string and welcome_client is called after the ConnHandler is initialized (Not after the inheriting class is initialized though!) """ def __init__(self, socket, addr, server): super(ConnHandler, self).__init__(socket,addr,server) async def disconnect(self): """ disconenct() explicitely disconnects the client, performes a proper the shutdow on the socket and closes it. """ try: await self.send("") except: lch.debug("error during disconenct") try: await self.socket.shutdown(0) except: lch.debug("error during socket shutdown") try: await self.socket.close() except: lch.debug("error closing socket") async def handle(self, data): return async def on_welcome(self): """ This method can be used to send a welcome message to the client. """ await self.send("Welcome client, this is the server sending!") async def recv(self,block_size=None): """ This method waits for the client to send something, decodes it and returns a string. """ if block_size == None: block_size = self.block_size data_received = await self.socket.recv(block_size) data_decoded = data_received.decode("utf-8") return data_decoded async def send(self, data, log_msg=False): """ This method takes a string, encodes it and sends it to the client. Returns False if an exception was raised during sending, otherwise True. """ if log_msg: lschat.info("server:"+log_msg) else: lschat.info("Server:"+data) data_encoded = bytes(data, "utf-8") try: await self.socket.send(data_encoded) return True except Exception as e: lch.error(e, exc_info=True) return False class ConnHandlerEcho(ConnHandler): """ A Conn handler which sends everything it receives to every other client connected to the server """ def __init__(self, socket, addr, server): super(ConnHandlerEcho, self).__init__(socket, addr, server) def handle(self, data): for h in self.server.connection_handler: if not h is self: h.send(data) class Server(object): """ Server opens either an unix or an inet connection. For every new client a new ClientHandler object is created. _string_ **host** and _int_ **port** are the filename, hostname or ip address and the port respectively on which the connection will be opened. If you make an AF_UNIX socket, port is ignored so simply pass None. _object_ **handler_class** is the class (not an object of the class) used for making connection handlers. _dict_ **handler_kwargs** is a dict which will be passed as keyword argumetns to the __init__ function of handler_class when creating a new connection handler. Default is an emtpy dict. _string_ **af_family** specifies the AF_FAMILY socket type, valid options are: "AF_INET" for a inet socket and "AF_UNIX" for a unix (file) socket. Default is "AF_INET". _bool_ **log_ip** specifies wether the ip address of the server/client is logged. Default is False. _int_ **max_allowed_clients** specifies the maximum amount of clients connected to the server at once. Default is 5 (for now. this will change in the future). """ # set to true when the server shuts down, for instance after a # fatal exception exit_event = False def __init__( self, host, port, handler_class, handler_kwargs={}, af_family="AF_INET", log_ip=False, max_allowed_clients=5, ): super(Server, self).__init__() self.host = host self.port = port self.handler_class = handler_class self.handler_kwargs = handler_kwargs self.af_family = af_family self.log_ip = log_ip self.max_allowed_clients = max_allowed_clients # the error handler used by, for instance server.shutdown self.safe_handle = ErrorHandler(lserver) # don't make the socket yet, we don't need right now. will be created # when the start method is called self.socket = None # create a task group for handlers, so we can easily cancel/terminate # them all at once self.handle_tasks = curio.TaskGroup(name="tg_handle_clients") # list of all active connection handlers self.connection_handler = [] # register our cleanup method to be executed when the program exits. # the cleanup function unregisters itself, so it won't get executed # twice when the user called it befor the program exites atexit.register(self.shutdown) def make_socket(self): """ factory method for sockets. this method makes a normal socket and wraps it in a curi.io.Socket wrapper """ lserver.debug("making a {} socket".format(self.af_family)) if self.af_family == "AF_INET": s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) elif self.af_family == "AF_UNIX": s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM) else: raise ValueError( "AF_FAMILY '{}' not supported!".format( self.af_family ) ) s = curio.io.Socket(s) return s def make_handler(self, conn, addr): """ factory method for handlers. this method creates a handler object from self.handler_class and self.handler_kwargs """ return self.handler_class(conn, addr, self, **self.handler_kwargs) def setup(self): """ creates a scoket, opens the connection and starts listening for clients. this method does not block, it returns after starting to listen. """ lserver.info("setting up server") self.socket = self.make_socket() if self.af_family == "AF_INET": self.socket.bind((self.host, self.port)) elif self.af_family == "AF_UNIX": if os.path.exists(self.host): lserver.debug("file already exists") lserver.debug("attempting to remove it") os.remove(self.host) self.socket.bind(self.host) self.socket.listen(self.max_allowed_clients) def shutdown(self): """ this method can be called synchronous and calls the asynchronous shutdown method. """ curio.run(self.async_shutdown) async def async_shutdown(self): """ This method properly shuts down the sockets and closes them. it unregisters itself from atexit, so it doesn't get executed twice when it was manually called before to program exits. """ atexit.unregister(self.shutdown) lserver.info("shutting down server") await self.safe_handle.arun( self.handle_tasks.cancel_remaining,text="handler cancel") try: await curio.ignore_after(0.01,self.handle_tasks.join) except Exception as exc: lserver.error("error joining tasks:",exc_info=exc) # check if there is actually a socket. if the shutdown method is # executed before the start method, there is no socket. if self.socket: await self.safe_handle.arun(self.socket.shutdown, args=[socket.SHUT_RDWR,],text="socket shutdown") await self.safe_handle.arun(self.socket.close,text="socket close") def start(self): """ this method starts the server. it is blocking. """ self.setup() curio.run(self.run) async def run(self): """ this method is the main loop of the Server. it waits for new client connections and creates a handle task for each of them. it does not receive or send anything. """ lserver.debug("entering main loop") while ( not self.exit_event ): lserver.debug("waiting for client to connect") # wait for a client to connect conn,addr = await self.socket.accept() if self.log_ip: lserver.info( "new client connection, {}:{}!".format(*addr)) else: lserver.info("a new client connected, let's handle it") handler = self.make_handler(conn, addr) # self.register_conn(conn, addr) # self.register_handler(handler, conn) await self.handle_tasks.spawn(self.handle_client(conn,handler)) async def handle_client(self,socket,handler): """ This method waits for the client to send something and calls the ClientHandler's handle method. there is a handle_client method running for each client connected. """ if hasattr(handler,"on_welcome"): await handler.on_welcome() while True: try: if self.log_ip: lserver.debug("waiting for {} to send something" .format(socket.getsockname())) else: lserver.debug("waiting for the client to send something") # wait for the client to send something data = await handler.recv() # if there is no data the client disconnected. this is a # tcp protocoll specification. if not data: if self.log_ip: lserver.info("the connection to {} was closed" .format(socket.getsockname())) else: lserver.info("the connection to the client was closed") await handler.disconnect() # break out of the loop. don't return because we need to # do cleanup break else: # don't strip the data of its whitespaces, since they may # be important. lschat.info("Client:"+data.rstrip()) await handler.handle(data) except Exception as e: lserver.error(e, exc_info=True) # let's sleep a bit, in case something is broken and the # loop throws an exception every time await curio.sleep(0.01) # if a task exits and hasn't been joined, curio prints a warning. # we don't want the warning, so let's join the current task for 0 # seconds. instead of task.join() we use task.wait(). the only # difference is that wait doesn't throw a exception if the task was # stopped or crashed cur_task = await curio.current_task() await curio.ignore_after(0.01,cur_task.wait) class Client(threading.Thread): """docstring for Client""" status = "uninitialized" exit_event = False def __init__( self, host, port=None, af_family="AF_INET", handle_data_func=None, error_handler=None, block_size=1024, ): super()__init__() self.name = "Client" self.host = host self.port = port self.af_family = af_family self.block_size = block_size self.handle_data_func = handle_data_func self.error_handler = error_handler self.socket = None self.status = "disconnected" def connect(self): self.status = "connecting" self.socket = self.make_socket() lclient.info("connecting to socket '{}' of type {}" .format(self.host,self.af_family)) try: if self.af_family == "AF_INET": self.socket.connect((self.host, self.port)) elif self.af_family == "AF_UNIX": self.socket.connect(self.host) self.status = "connected" lclient.info("connected") return True except Exception as e: lclient.debug(e, exc_info=True) if type(e) is ConnectionRefusedError: lclient.info( "failed to connect to socket '{}'".format(self.host)) self.disconnect() return False def disconnect(self): lclient.info("disconnecting from socket '{}'".format(self.host)) self.status = "disconnected" if self.socket: try: self.socket.shutdown(socket.SHUT_RDWR) except Exception as e: lclient.error(e) try: self.socket.close() except Exception as e: lclient.error("error occured while closing the socket, " + "maybe it is already closed",exc_info=e) del self.socket self.socket = None def handle_data(self, data_received): data_decoded = data_received.decode("utf-8") lcchat.info("Server: "+data_decoded) if self.handle_data_func: try: self.handle_data_func(data_decoded) except Exception as e: lclient.error(e, exc_info=True) def is_running(self): return (self in threading.enumerate()) def make_socket(self): lclient.info("creating a {} socket".format(self.af_family)) if self.af_family == "AF_INET": s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) elif self.af_family == "AF_UNIX": s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM) else: raise ValueError( "AF_FAMILY '{}' not supported!".format( self.af_family ) ) return s def main_loop(self): lclient.debug("starting main loop") while ( not self.exit_event ): if not self.status in ["connected"]: time.sleep(0.1) continue # print(0) read_confirmed, write_confirmed, exc_confirmed \ = select.select( [self.socket], [], [self.socket], 1 ) if self.socket in exc_confirmed: lclient.warning("socket is expected to corrupt, exiting") self.disconnect() # self.stop() break elif self.socket in read_confirmed: try: data_received = self.read_from_socket() if data_received == b'': lclient.info("connection is broken, closing socket exiting") self.disconnect() # self.stop() # break else: try: self.handle_data(data_received) except Exception as e: lserver.error( "Error while handling data", exc_info=e ) except Exception as e: lclient.error(e, exc_info=True) if type(e) is OSError: lclient.warn("connection broken, exiting") self.disconnect() # self.stop() # break else: raise else: time.sleep(0.1) def read_from_socket(self): data_received = self.socket.recv(self.block_size) return data_received def run(self): # self.connect() if self.error_handler: self.error_handler(self.main_loop) else: self.main_loop() def send(self, msg): msg = msg.rstrip() msg_encoded = bytes(msg+"\r\n", "utf-8") try: lcchat.info("Client: "+msg) self.socket.send(msg_encoded) except Exception as e: lclient.error(e, exc_info=True) self.status = "shutdown" def setup(self): pass def stop(self,reason=None): self.disconnect() self.exit_event = True if reason: print(reason) class AsyncClient(object): """docstring for Client""" block_size = 1024 status = "uninitialized" exit_event = False def __init__( self, host, port=None, af_family="AF_INET", handle_data_func=None, error_handler=None, block_size=1024, ): super().__init__() self.host = host self.port = port self.af_family = af_family self.block_size = block_size self.handle_data_func = handle_data_func self.error_handler = error_handler self.socket = None self.status = "disconnected" def connect(self): self.status = "connecting" self.socket = self.make_socket() lclient.info("connecting to socket '{}' of type {}".format( self.host,self.af_family)) try: if self.af_family == "AF_INET": self.socket.connect((self.host, self.port)) elif self.af_family == "AF_UNIX": self.socket.connect(self.host) self.is_connected = True self.status = "connected" lclient.info("connected") return True except Exception as e: lclient.debug(e, exc_info=True) if type(e) is ConnectionRefusedError: lclient.info("failed to connect to socket '{}'".format(self.host)) self.disconnect() return False def disconnect(self): lclient.info("disconnecting from socket '{}'".format(self.host)) self.is_connected = False self.status = "disconnected" if self.socket: try: self.socket.shutdown(socket.SHUT_RDWR) except Exception as e: lclient.error(e) try: self.socket.close() except Exception as e: lclient.error("error occured while closing the socket, " + "maybe it is already closed",exc_info=e) del self.socket self.socket = None def handle_data(self, data_received): data_decoded = data_received.decode("utf-8") lcchat.info("Server: "+data_decoded) if self.handle_data_func: try: self.handle_data_func(data_decoded) except Exception as e: lclient.error(e, exc_info=True) def is_running(self): return (self in threading.enumerate()) def make_socket(self): lclient.info("creating a {} socket".format(self.af_family)) if self.af_family == "AF_INET": s = trio.socket.socket(socket.AF_INET,socket.SOCK_STREAM) elif self.af_family == "AF_UNIX": s = trio.socket.socket(socket.AF_UNIX,socket.SOCK_STREAM) else: raise ValueError( "AF_FAMILY '{}' not supported!".format( self.af_family ) ) return s async def read_from_socket(self): data_received = await self.socket.recv(self.block_size) return data_received async def recv(self): data_received = await self.socket.recv(self.block_size) data_decoded = data_received.decode("utf-8") return data_decoded async def run(self): lclient.debug("starting main loop") while ( not self.exit_event ): if not self.status in ["connected"]: time.sleep(0.1) continue # if self.socket in exc_confirmed: # self.is_connected = False # lclient.warning("socket is expected to corrupt, exiting") # self.disconnect() # # self.stop() # break # elif self.socket in read_confirmed: try: data = await self.read_from_socket() if not data: lclient.info("connection closed") self.disconnect() else: self.handle_data(data) except Exception as e: lclient.error(e, exc_info=True) if type(e) is OSError: self.is_connected = False lclient.warn("connection broken, exiting") self.disconnect() else: raise async def start(self,connect=False): if connect: self.connect() if self.error_handler: self.error_handler(self.run) else: self.run() def send(self, msg): msg = msg.rstrip() msg_encoded = bytes(msg+"\r\n", "utf-8") try: lcchat.info("Client: "+msg) self.socket.send(msg_encoded) except Exception as e: self.is_connected = False lclient.error(e, exc_info=True) self.disconnect() # self.exit_event = True # self.status = "shutdown" def setup(self): pass def stop(self,reason=None): self.disconnect() self.exit_event = True if reason: print(reason) #