#!/usr/bin/env python3.5 # Copyright 2017 Digital # # This file is part of DigiLib. # # DigiLib is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # DigiLib is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with DigiLib. If not, see . import atexit import logging import logging.handlers import os import queue import select import socket import sys import threading import time import traceback import blinker import curio import digilib.misc lclient = logging.getLogger(__name__+".client") lserver = logging.getLogger(__name__+".server") lschat = logging.getLogger(__name__+".server.chat") lcchat = logging.getLogger(__name__+".client.chat") _tasks = [] class ConnHandlerBase(object): def __init__(self, socket, addr, server): self.status = "init" super(ConnHandlerBase, self).__init__() self.socket = socket self.addr = addr self.server = server self.block_size = 1024 # self.welcome_client() self.status="connected" async def welcome_client(self): pass async def handle(self, data): return async def recv(self): data_received = await self.socket.recv(self.block_size) data_decoded = data_received.decode("utf-8") return data_decoded async def send(self, msg, log_msg=False): msg_encoded = bytes(msg, "utf-8") if log_msg: lschat.info("server:"+log_msg) else: lschat.info("Server:"+msg) try: await self.socket.send(msg_encoded) return True except Exception as e: lserver.error(e, exc_info=True) return False async def close(self): self.status = "closed" try: await self.socket.shutdown(0) except: lserver.debug("error during socket shutdown") try: await self.socket.close() except: lserver.debug("error closing socket") class ConnHandlerEcho(ConnHandlerBase): def __init__(self, socket, addr, server): self.status = "init" self.super_class = super(ConnHandlerEcho, self) self.super_class.__init__(socket, addr, server) self.server = server def welcome_client(self): self.send("welcome to the client") def handle(self, data): lschat.info("Client:"+data) for h in list(set(self.server.connection_handler)-{self}): h.send(data) class Server(object): """ Server opens either an unix or an inet connection. for every client which connects a new ClientHandler class is created. """ def __init__(self, host, port=None, af_family="AF_INET", log_ip=False, max_allowed_clients=5, handler_class=None, handler_kwargs={}, ): super(Server, self).__init__() # set to true when the server shuts down, for instance after a # fatal exception self.exit_event = False # on which hostname or ip address the connection will be opened self.host = host # on which port the connection will be opened self.port = port # what AF_INET family to use, either AF_INET or AF_UNIX (file socket) self.af_family = af_family # whether ip addresses will be logged self.log_ip = log_ip # number of maximum client connection at a time self.max_allowed_clients = max_allowed_clients # this handler class will be used when creating a new handler self.handler_class = handler_class # the kwargs passed to the handler's __init__ function self.handler_kwargs = handler_kwargs # don't make the socket yet, we don't need right now. will be created # when the start method is called # self.socket = self.make_socket() self.socket = None # create a task group for handlers, so we can easily cancel/terminate # them all at once self.handle_tasks = curio.TaskGroup(name="tg_handle_clients") # list of all active connection handlers self.connection_handler = [] # these aren't actually used, I will remove them soon # self.conn_to_addr = {} # self.addr_to_conn = {} # self.conn_to_handler ={} # self.handler_to_conn = {} # this was used for the select.select approach. we use the async approach now, let's remove it # self.read_sockets_expected = [self.socket] # register our cleanup method to be executed when the program exits. # the cleanup function unregisters itself, so it won't get executed twice when the user called it befor the program exites atexit.register(self.shutdown) def make_socket(self): """ factory method for sockets. this method makes a normal socket and wraps it in a curi.io.Socket wrapper """ lserver.debug("making a {} socket".format(self.af_family)) if self.af_family == "AF_INET": s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) elif self.af_family == "AF_UNIX": s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM) else: raise ValueError( "AF_FAMILY '{}' not supported!".format( self.af_family ) ) s = curio.io.Socket(s) return s def make_handler(self, conn, addr): """ factory method for handlers. this method creates a handler object from self.handler_class and self.handler_kwargs """ return self.handler_class(conn, addr, self, **self.handler_kwargs) # DEPRECATED # def register_conn(self, conn, addr): # # if self.log_ip: # # lserver.info("New connection from {} on port {}".format(*addr)) # self.read_sockets_expected.append(conn) # if addr: # self.conn_to_addr[conn] = addr # self.addr_to_conn[addr] = conn # # def unregister_conn(self, conn): # self.read_sockets_expected.remove(conn) # addr = self.conn_to_addr.get(conn, False) # if addr: # del self.addr_to_conn[addr] # del self.conn_to_addr[conn] # # def register_handler(self, handler, conn): # self.connection_handler.append(handler) # self.conn_to_handler[conn] = handler # self.handler_to_conn[handler] = conn # # def unregister_handler(self, handler, conn): # self.connection_handler.remove(handler) # del self.conn_to_handler[conn] # del self.handler_to_conn[handler] def setup(self): """ creates a scoket, opens the connection and starts listening for clients. this method does not block, it returns after starting to listen. """ lserver.info("setting up server") self.socket = self.make_socket() if self.af_family == "AF_INET": self.socket.bind((self.host, self.port)) elif self.af_family == "AF_UNIX": if os.path.exists(self.host): lserver.debug("file already exists") lserver.debug("attempting to remove it") os.remove(self.host) self.socket.bind(self.host) self.socket.listen(self.max_allowed_clients) def shutdown(self): """ This method properly shuts down the sockets and closes them. it unregisters itself from atexit, so it doesn't get executed twice when it was manually called before to program exits. """ def error_handler(func,*args,log_text="error",**kwargs): try: func(*args,**kwargs) except Exception as exc: lserver.debug("error occured during "+log_text,exc_info=exc) atexit.unregister(self.shutdown) lserver.info("shutting down server") error_handler( self.handle_tasks.cancel_remaining,log_text="handler cancel") # check if there is actually a socket. if the shutdown method is # executed before the start method, there is no socket. if self.socket: error_handler(self.socket.shutdown,log_text="socket shutdown") error_handler(self.socket.close,log_text="socket close") def start(self): """ this method starts the server. it is blocking. """ self.setup() curio.run(self.run) async def run(self): """ this method is the main loop of the Server. it waits for new client connections and creates a handle task for each of them. it does not receive or send anything. """ lserver.debug("entering main loop") while ( not self.exit_event ): lserver.debug("waiting for client to connect") # wait for a client to connect conn,addr = await self.socket.accept() if self.log_ip: lserver.info( "new client connection, {}:{}!".format(*addr)) else: lserver.info("a new client connected, let's handle it") handler = self.make_handler(conn, addr) # self.register_conn(conn, addr) # self.register_handler(handler, conn) await self.handle_tasks.spawn(self.handle_client(conn,handler)) async def handle_client(self,socket,handler): """ This method waits for the client to send something and calls the ClientHandler's handle method. there is a handle_client method running for each client connected. """ while True: try: if self.log_ip: lserver.debug("waiting for {} to send something" .format(socket.getsockname())) else: lserver.debug("waiting for the client to send something") # wait for the client to send something data = await handler.recv() # if there is no data the client disconnected. this is a # tcp protocoll specification. if not data: if self.log_ip: lserver.info("the connection to {} was closed" .format(socket.getsockname())) else: lserver.info("the connection to the client was closed") # self.unregister_handler(handler, socket) # self.unregister_conn(socket) await handler.close() # break out of the loop. don't return because we need to # do cleanup break else: # don't strip the data of its whitespaces, since they may # be important. lschat.info("Client:"+data.rstrip()) coro = handler.handle(data) task = await curio.spawn(coro) except Exception as e: lserver.error(e, exc_info=True) # let's sleep a bit, in case something is broken and the # loop throws an exception every time await curio.sleep(0.01) # if a task exits and hasn't been joined, curio prints a warning. # we don't want the warning, so let's join the current task for 0 # seconds. instead of task.join() we use task.wait(). the only # difference is that wait doesn't throw a exception if the task was # stopped or crashed cur_task = await curio.current_task() await curio.ignore_after(0,cur_task.wait) class Client(threading.Thread): """docstring for Client""" is_connecting = False is_connected = False status = "uninitialized" def __init__(self, host, port=None, af_family="AF_INET", handle_data_func=None, error_handler=None, block_size=1024, ): self.super_class = super(Client, self) self.super_class.__init__() self.name = "Client" self.exit_event = False self.host = host self.port = port self.af_family = af_family self.block_size = block_size self.handle_data_func = handle_data_func self.is_connected = False self.error_handler = error_handler # self.socket = self.make_socket() self.socket = None self.status = "disconnected" def connect(self): self.status = "connecting" self.socket = self.make_socket() lclient.info( "connecting to socket '{}' of type {}".format( self.host, self.af_family ) ) try: if self.af_family == "AF_INET": self.socket.connect((self.host, self.port)) elif self.af_family == "AF_UNIX": if os.path.exists(self.host): self.socket.connect(self.host) else: lclient.warn("File not found. Aborting.") return self.is_connected = True self.status = "connected" lclient.info("connected") return True except Exception as e: lclient.debug(e, exc_info=True) if type(e) is ConnectionRefusedError: lclient.info("failed to connect to socket '{}'".format(self.host)) self.disconnect() return False def disconnect(self): lclient.info("disconnecting from socket '{}'".format(self.host)) self.is_connected = False self.status = "disconnected" if self.socket: try: self.socket.shutdown(socket.SHUT_RDWR) except Exception as e: lclient.error(e) try: self.socket.close() except Exception as e: lclient.error("error occured while closing the socket, " + "maybe it is already closed",exc_info=e) del self.socket self.socket = None def handle_data(self, data_received): data_decoded = data_received.decode("utf-8") lcchat.info("Server: "+data_decoded) if self.handle_data_func: try: self.handle_data_func(data_decoded) except Exception as e: lclient.error(e, exc_info=True) def is_running(self): return (self in threading.enumerate()) def make_socket(self): lclient.info("creating a {} socket".format(self.af_family)) if self.af_family == "AF_INET": s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) elif self.af_family == "AF_UNIX": s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM) else: raise ValueError( "AF_FAMILY '{}' not supported!".format( self.af_family ) ) return s def main_loop(self): lclient.debug("starting main loop") while ( not self.exit_event ): if not self.status in ["connected"]: time.sleep(0.1) continue # print(0) read_confirmed, write_confirmed, exc_confirmed \ = select.select( [self.socket], [], [self.socket], 1 ) if self.socket in exc_confirmed: self.is_connected = False lclient.warning("socket is expected to corrupt, exiting") self.disconnect() # self.stop() break elif self.socket in read_confirmed: try: data_received = self.read_from_socket() if data_received == b'': lclient.info("connection is broken, closing socket exiting") self.disconnect() # self.stop() # break else: try: self.handle_data(data_received) except Exception as e: lserver.error( "Error while handling data", exc_info=e ) except Exception as e: lclient.error(e, exc_info=True) if type(e) is OSError: self.is_connected = False lclient.warn("connection broken, exiting") self.disconnect() # self.stop() # break else: raise else: time.sleep(0.1) def read_from_socket(self): data_received = self.socket.recv(self.block_size) return data_received def run(self): # self.connect() if self.error_handler: self.error_handler(self.main_loop) else: self.main_loop() def send(self, msg): msg = msg.rstrip() msg_encoded = bytes(msg+"\r\n", "utf-8") try: lcchat.info("Client: "+msg) self.socket.send(msg_encoded) except Exception as e: self.is_connected = False lclient.error(e, exc_info=True) self.status = "shutdown" def setup(self): pass def stop(self,reason=None): self.disconnect() self.exit_event = True if reason: print(reason) class AsyncClient(object): """docstring for Client""" is_connecting = False is_connected = False status = "uninitialized" def __init__(self, host, port=None, af_family="AF_INET", handle_data_func=None, error_handler=None, block_size=1024, ): self.super_class = super(AsyncClient, self) self.super_class.__init__() self.name = "Client" self.exit_event = False self.host = host self.port = port self.af_family = af_family self.block_size = block_size self.handle_data_func = handle_data_func self.is_connected = False self.error_handler = error_handler self.socket = None self.status = "disconnected" def connect(self): self.status = "connecting" self.socket = self.make_socket() lclient.info("connecting to socket '{}' of type {}".format( self.host,self.af_family)) try: if self.af_family == "AF_INET": self.socket.connect((self.host, self.port)) elif self.af_family == "AF_UNIX": self.socket.connect(self.host) self.is_connected = True self.status = "connected" lclient.info("connected") return True except Exception as e: lclient.debug(e, exc_info=True) if type(e) is ConnectionRefusedError: lclient.info("failed to connect to socket '{}'".format(self.host)) self.disconnect() return False def disconnect(self): lclient.info("disconnecting from socket '{}'".format(self.host)) self.is_connected = False self.status = "disconnected" if self.socket: try: self.socket.shutdown(socket.SHUT_RDWR) except Exception as e: lclient.error(e) try: self.socket.close() except Exception as e: lclient.error("error occured while closing the socket, " + "maybe it is already closed",exc_info=e) del self.socket self.socket = None def handle_data(self, data_received): data_decoded = data_received.decode("utf-8") lcchat.info("Server: "+data_decoded) if self.handle_data_func: try: self.handle_data_func(data_decoded) except Exception as e: lclient.error(e, exc_info=True) def is_running(self): return (self in threading.enumerate()) def make_socket(self): lclient.info("creating a {} socket".format(self.af_family)) if self.af_family == "AF_INET": s = trio.socket.socket(socket.AF_INET,socket.SOCK_STREAM) elif self.af_family == "AF_UNIX": s = trio.socket.socket(socket.AF_UNIX,socket.SOCK_STREAM) else: raise ValueError( "AF_FAMILY '{}' not supported!".format( self.af_family ) ) return s async def read_from_socket(self): data_received = await self.socket.recv(self.block_size) return data_received async def recv(self): data_received = await self.socket.recv(self.block_size) data_decoded = data_received.decode("utf-8") return data_decoded async def run(self): lclient.debug("starting main loop") while ( not self.exit_event ): if not self.status in ["connected"]: time.sleep(0.1) continue # if self.socket in exc_confirmed: # self.is_connected = False # lclient.warning("socket is expected to corrupt, exiting") # self.disconnect() # # self.stop() # break # elif self.socket in read_confirmed: try: data = await self.read_from_socket() if not data: lclient.info("connection closed") self.disconnect() else: self.handle_data(data) except Exception as e: lclient.error(e, exc_info=True) if type(e) is OSError: self.is_connected = False lclient.warn("connection broken, exiting") self.disconnect() else: raise async def start(self,connect=False): if connect: self.connect() if self.error_handler: self.error_handler(self.run) else: self.run() def send(self, msg): msg = msg.rstrip() msg_encoded = bytes(msg+"\r\n", "utf-8") try: lcchat.info("Client: "+msg) self.socket.send(msg_encoded) except Exception as e: self.is_connected = False lclient.error(e, exc_info=True) self.disconnect() # self.exit_event = True # self.status = "shutdown" def setup(self): pass def stop(self,reason=None): self.disconnect() self.exit_event = True if reason: print(reason) #