#!/usr/bin/env python3.5 # Copyright 2017 Digital # # This file is part of DigiLib. # # DigiLib is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # DigiLib is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with DigiLib. If not, see . import atexit import logging import logging.handlers import os import queue import select import socket import sys import threading import time import traceback import blinker import curio import digilib.misc lclient = logging.getLogger(__name__+".client") lserver = logging.getLogger(__name__+".server") lschat = logging.getLogger(__name__+".server.chat") lcchat = logging.getLogger(__name__+".client.chat") _tasks = [] def _append_task(): cur_task = curio.current_task() _tasks.append(cur_task) def _remove_task(): cur_task = curio.current_task() if cur_task in _tasks: _tasks.remove(cur_task) class ConnHandlerBase(object): def __init__(self, socket, addr, server): self.status = "init" super(ConnHandlerBase, self).__init__() self.socket = socket self.addr = addr self.server = server self.block_size = 1024 # self.welcome_client() self.status="connected" async def welcome_client(self): pass async def handle(self, data): return async def recv(self): data_received = await self.socket.recv(self.block_size) data_decoded = data_received.decode("utf-8") return data_decoded async def send(self, msg, log_msg=False): msg_encoded = bytes(msg, "utf-8") if log_msg: lschat.info("server:"+log_msg) else: lschat.info("Server:"+msg) try: await self.socket.send(msg_encoded) return True except Exception as e: lserver.error(e, exc_info=True) return False async def close(self): self.status = "closed" try: await self.socket.shutdown(0) except: lserver.debug("error during socket shutdown") try: await self.socket.close() except: lserver.debug("error closing socket") class ConnHandlerEcho(ConnHandlerBase): def __init__(self, socket, addr, server): self.status = "init" self.super_class = super(ConnHandlerEcho, self) self.super_class.__init__(socket, addr, server) self.server = server def welcome_client(self): self.send("welcome to the client") def handle(self, data): lschat.info("Client:"+data) for h in list(set(self.server.connection_handler)-{self}): h.send(data) class Server(object): """docstring for SocketHandler""" def __init__(self, host, port=None, af_family="AF_INET", log_ip=False, max_allowed_clients=5, handler=None, handler_kwargs={}, ): super(Server, self).__init__() self.exit_event = False self.host = host self.port = port self.af_family = af_family self.log_ip = log_ip self.handler_kwargs = handler_kwargs self.handler = handler self.max_allowed_clients = max_allowed_clients self.socket = self.make_socket() self.handle_tasks = curio.TaskGroup(name="tg_handle_clients") self.connection_handler = [] self.conn_to_addr = {} self.addr_to_conn = {} self.conn_to_handler ={} self.handler_to_conn = {} self.read_sockets_expected = [self.socket] atexit.register(self.shutdown) def make_socket(self): lserver.debug("making a {} socket".format(self.af_family)) if self.af_family == "AF_INET": s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) elif self.af_family == "AF_UNIX": s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM) else: raise ValueError( "AF_FAMILY '{}' not supported!".format( self.af_family ) ) s = curio.io.Socket(s) return s def make_handler(self, conn, addr): return self.handler(conn, addr, self, **self.handler_kwargs) def register_conn(self, conn, addr): # if self.log_ip: # lserver.info("New connection from {} on port {}".format(*addr)) self.read_sockets_expected.append(conn) if addr: self.conn_to_addr[conn] = addr self.addr_to_conn[addr] = conn def unregister_conn(self, conn): self.read_sockets_expected.remove(conn) addr = self.conn_to_addr.get(conn, False) if addr: del self.addr_to_conn[addr] del self.conn_to_addr[conn] def register_handler(self, handler, conn): self.connection_handler.append(handler) self.conn_to_handler[conn] = handler self.handler_to_conn[handler] = conn def unregister_handler(self, handler, conn): self.connection_handler.remove(handler) del self.conn_to_handler[conn] del self.handler_to_conn[handler] def setup(self): lserver.info("setting up socket") if self.af_family == "AF_INET": self.socket.bind((self.host, self.port)) elif self.af_family == "AF_UNIX": if os.path.exists(self.host): lserver.debug("file already exists") lserver.debug("attempting to remove it") os.remove(self.host) self.socket.bind(self.host) self.socket.listen(self.max_allowed_clients) # self.socket.settimeout(1) def shutdown(self): def error_handler(func,*args,log_text="error",**kwargs): try: func(*args,**kwargs) except Exception as exc: lserver.debug("error occured during "+log_text,exc_info=exc) # this function can be called by the user and we don't need to clean # up a second time when the program exits atexit.unregister(self.shutdown) lserver.info("shutting down server") error_handler(self.handle_tasks.cancel_remaining,log_text="handler cancel") if self.socket: error_handler(self.socket.shutdown,log_text="socket shutdown") error_handler(self.socket.close,log_text="socket close") # del self.socket # self.socket = None def start(self): # lserver.debug(dir(self)) curio.run(self.run) async def run(self): self.setup() lserver.debug("entering main loop") while ( not self.exit_event ): lserver.debug("waiting for client to connect") conn,addr = await self.socket.accept() if self.log_ip: lserver.info( "new client connection, {}:{}!".format(*addr)) else: lserver.info("a new client connected, let's handle it") handler = self.make_handler(conn, addr) self.register_conn(conn, addr) self.register_handler(handler, conn) await self.handle_tasks.spawn(self.handle_client(conn,handler)) async def handle_client(self,socket,handler): # _append_task() while True: # for i in range(1): try: if self.log_ip: lserver.debug("waiting for {} to send something" .format(socket.getsockname())) else: lserver.debug("waiting for the client to send something") data = await handler.recv() if not data: if self.log_ip: lserver.info("the connection to {} was closed" .format(socket.getsockname())) else: lserver.info("the connection to the client was closed") self.unregister_handler(handler, socket) self.unregister_conn(socket) await handler.close() break else: lschat.info("Client:"+data.strip()) coro = handler.handle(data) task = await curio.spawn(coro) _tasks.append(task) except Exception as e: lserver.error(e, exc_info=True) # saver to use time.sleep time.sleep(0.1) cur_task = await curio.current_task() await curio.ignore_after(0,cur_task.join) # _tasks.remove(cur_task) class Client(threading.Thread): """docstring for Client""" is_connecting = False is_connected = False status = "uninitialized" def __init__(self, host, port=None, af_family="AF_INET", handle_data_func=None, error_handler=None, block_size=1024, ): self.super_class = super(Client, self) self.super_class.__init__() self.name = "Client" self.exit_event = False self.host = host self.port = port self.af_family = af_family self.block_size = block_size self.handle_data_func = handle_data_func self.is_connected = False self.error_handler = error_handler # self.socket = self.make_socket() self.socket = None self.status = "disconnected" def connect(self): self.status = "connecting" self.socket = self.make_socket() lclient.info( "connecting to socket '{}' of type {}".format( self.host, self.af_family ) ) try: if self.af_family == "AF_INET": self.socket.connect((self.host, self.port)) elif self.af_family == "AF_UNIX": if os.path.exists(self.host): self.socket.connect(self.host) else: lclient.warn("File not found. Aborting.") return self.is_connected = True self.status = "connected" lclient.info("connected") return True except Exception as e: lclient.debug(e, exc_info=True) if type(e) is ConnectionRefusedError: lclient.info("failed to connect to socket '{}'".format(self.host)) self.disconnect() return False def disconnect(self): lclient.info("disconnecting from socket '{}'".format(self.host)) self.is_connected = False self.status = "disconnected" if self.socket: try: self.socket.shutdown(socket.SHUT_RDWR) except Exception as e: lclient.error(e) try: self.socket.close() except Exception as e: lclient.error("error occured while closing the socket, " + "maybe it is already closed",exc_info=e) del self.socket self.socket = None def handle_data(self, data_received): data_decoded = data_received.decode("utf-8") lcchat.info("Server: "+data_decoded) if self.handle_data_func: try: self.handle_data_func(data_decoded) except Exception as e: lclient.error(e, exc_info=True) def is_running(self): return (self in threading.enumerate()) def make_socket(self): lclient.info("creating a {} socket".format(self.af_family)) if self.af_family == "AF_INET": s = socket.socket(socket.AF_INET,socket.SOCK_STREAM) elif self.af_family == "AF_UNIX": s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM) else: raise ValueError( "AF_FAMILY '{}' not supported!".format( self.af_family ) ) return s def main_loop(self): lclient.debug("starting main loop") while ( not self.exit_event ): if not self.status in ["connected"]: time.sleep(0.1) continue # print(0) read_confirmed, write_confirmed, exc_confirmed \ = select.select( [self.socket], [], [self.socket], 1 ) if self.socket in exc_confirmed: self.is_connected = False lclient.warning("socket is expected to corrupt, exiting") self.disconnect() # self.stop() break elif self.socket in read_confirmed: try: data_received = self.read_from_socket() if data_received == b'': lclient.info("connection is broken, closing socket exiting") self.disconnect() # self.stop() # break else: try: self.handle_data(data_received) except Exception as e: lserver.error( "Error while handling data", exc_info=e ) except Exception as e: lclient.error(e, exc_info=True) if type(e) is OSError: self.is_connected = False lclient.warn("connection broken, exiting") self.disconnect() # self.stop() # break else: raise else: time.sleep(0.1) def read_from_socket(self): data_received = self.socket.recv(self.block_size) return data_received def run(self): # self.connect() if self.error_handler: self.error_handler(self.main_loop) else: self.main_loop() def send(self, msg): msg = msg.rstrip() msg_encoded = bytes(msg+"\r\n", "utf-8") try: lcchat.info("Client: "+msg) self.socket.send(msg_encoded) except Exception as e: self.is_connected = False lclient.error(e, exc_info=True) self.status = "shutdown" def setup(self): pass def stop(self,reason=None): self.disconnect() self.exit_event = True if reason: print(reason) class AsyncClient(object): """docstring for Client""" is_connecting = False is_connected = False status = "uninitialized" def __init__(self, host, port=None, af_family="AF_INET", handle_data_func=None, error_handler=None, block_size=1024, ): self.super_class = super(AsyncClient, self) self.super_class.__init__() self.name = "Client" self.exit_event = False self.host = host self.port = port self.af_family = af_family self.block_size = block_size self.handle_data_func = handle_data_func self.is_connected = False self.error_handler = error_handler self.socket = None self.status = "disconnected" def connect(self): self.status = "connecting" self.socket = self.make_socket() lclient.info("connecting to socket '{}' of type {}".format( self.host,self.af_family)) try: if self.af_family == "AF_INET": self.socket.connect((self.host, self.port)) elif self.af_family == "AF_UNIX": self.socket.connect(self.host) # if os.path.exists(self.host): # pass # else: # lclient.warn("File not found. Aborting.") # return self.is_connected = True self.status = "connected" lclient.info("connected") return True except Exception as e: lclient.debug(e, exc_info=True) if type(e) is ConnectionRefusedError: lclient.info("failed to connect to socket '{}'".format(self.host)) self.disconnect() return False def disconnect(self): lclient.info("disconnecting from socket '{}'".format(self.host)) self.is_connected = False self.status = "disconnected" if self.socket: try: self.socket.shutdown(socket.SHUT_RDWR) except Exception as e: lclient.error(e) try: self.socket.close() except Exception as e: lclient.error("error occured while closing the socket, " + "maybe it is already closed",exc_info=e) del self.socket self.socket = None def handle_data(self, data_received): data_decoded = data_received.decode("utf-8") lcchat.info("Server: "+data_decoded) if self.handle_data_func: try: self.handle_data_func(data_decoded) except Exception as e: lclient.error(e, exc_info=True) def is_running(self): return (self in threading.enumerate()) def make_socket(self): lclient.info("creating a {} socket".format(self.af_family)) if self.af_family == "AF_INET": s = trio.socket.socket(socket.AF_INET,socket.SOCK_STREAM) elif self.af_family == "AF_UNIX": s = trio.socket.socket(socket.AF_UNIX,socket.SOCK_STREAM) else: raise ValueError( "AF_FAMILY '{}' not supported!".format( self.af_family ) ) return s def main_loop(self): lclient.debug("starting main loop") while ( not self.exit_event ): if not self.status in ["connected"]: time.sleep(0.1) continue # print(0) read_confirmed, write_confirmed, exc_confirmed \ = select.select( [self.socket], [], [self.socket], 1 ) if self.socket in exc_confirmed: self.is_connected = False lclient.warning("socket is expected to corrupt, exiting") self.disconnect() # self.stop() break elif self.socket in read_confirmed: try: data_received = self.read_from_socket() if data_received == b'': lclient.info("connection is broken, closing socket exiting") self.disconnect() # self.stop() # break else: self.handle_data(data_received) except Exception as e: lclient.error(e, exc_info=True) if type(e) is OSError: self.is_connected = False lclient.warn("connection broken, exiting") self.disconnect() # self.stop() # break else: raise else: time.sleep(0.1) def read_from_socket(self): data_received = self.socket.recv(self.block_size) return data_received def run(self,connect=False): if connect: self.connect() if self.error_handler: self.error_handler(self.main_loop) else: self.main_loop() def send(self, msg): msg = msg.rstrip() msg_encoded = bytes(msg+"\r\n", "utf-8") try: lcchat.info("Client: "+msg) self.socket.send(msg_encoded) except Exception as e: self.is_connected = False lclient.error(e, exc_info=True) self.status = "shutdown" def setup(self): pass def stop(self,reason=None): self.disconnect() self.exit_event = True if reason: print(reason) #