|
- #!/usr/bin/env python3.5
- # Copyright 2017 Digital
- #
- # This file is part of DigiLib.
- #
- # DigiLib is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # DigiLib is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with DigiLib. If not, see <http://www.gnu.org/licenses/>.
- import atexit
- import logging
- import logging.handlers
- import os
- import queue
- import select
- import socket
- import sys
- import threading
- import time
- import traceback
- import blinker
- import curio
- import digilib.misc
- lclient = logging.getLogger(__name__+".client")
- lserver = logging.getLogger(__name__+".server")
- lch = logging.getLogger(__name__+".chandler")
- lschat = logging.getLogger(__name__+".server.chat")
- lcchat = logging.getLogger(__name__+".client.chat")
- lerr = logging.getLogger(__name__+".errhandler")
- class ErrorHandler(object):
- """
- This Class provides a easy way to execute a function which is likely to
- raise an exception and log the traceback.
- """
- def __init__(self,logger):
- super().__init__()
- self.logger = logger
- def run(self,func,args=(),kwargs={},text=None,logger=None):
- if not logger:
- logger = self.logger
- if not text:
- print("ErrorHandler:WARNING:no text specified,"
- " this is highly discouraged")
- try:
- func(*args,**kwargs)
- except Exception as exc:
- logger.debug("error occured during "+text,exc_info=exc)
- async def arun(self,func,args=(),kwargs={},text=None,logger=None):
- if not logger:
- logger = self.logger
- if not text:
- print("ErrorHandler:WARNING:no text specified,"
- " this is highly discouraged")
- try:
- await func(*args,**kwargs)
- except Exception as exc:
- logger.debug("error occured during "+text,exc_info=exc)
- class ConnHandlerBase(object):
- """
- ConnectionHandlerBase is the base class for all connection handlers.
- It provides basic methodes. Consider inheriting form ConnectionHandler
- instead, as it provides better functionality.
- """
- addr = None
- block_size = 1024
- server = None
- def __init__(self, socket, addr, server):
- super(ConnHandlerBase, self).__init__()
- self.socket = socket
- self.addr = addr
- self.server = server
- async def disconnect(self):
- """
- disconenct explicitely disconnects the client, shuts the socket down
- and closes it.
- """
- try:
- await self.send(bytes())
- except:
- lch.debug("error during disconenct")
- try:
- await self.socket.shutdown(0)
- except:
- lch.debug("error during socket shutdown")
- try:
- await self.socket.close()
- except:
- lch.debug("error closing socket")
- async def handle(self, data):
- """
- This method is called for every message the server receives from the
- client. It should handle the data. Performing asynchronous blocking
- actions is ok, as the client loop does not wait for this method to
- finish. therefore, this method can be called multiple times at once,
- use curio locks if appropriate.
- """
- raise NotImplemented()
- async def recv(self,block_size=block_size):
- """
- This method waits for the client to send something and returns bytes!
- """
- data_received = await self.socket.recv(block_size)
- return data_received
- async def send(self, data, log_msg=None):
- """
- This method sends bytes to the client. Returns False if an exception
- was raised during sending, otherwise True.
- """
- if log_msg:
- lschat.info("server:"+str(log_msg))
- else:
- lschat.info("Server:"+str(data))
- try:
- await self.socket.send(data)
- return True
- except Exception as e:
- lch.error(e, exc_info=True)
- return False
- class ConnHandler(ConnHandlerBase):
- """
- More advanced connection handler than ConnectionHandlerBase. For
- instance, sends() takes a string and encodes it, recv() decodes
- the client's and returns a string and welcome_client is called after
- the ConnHandler is initialized (Not after the inheriting class is
- initialized though!)
- """
- def __init__(self, socket, addr, server):
- super(ConnHandler, self).__init__(socket,addr,server)
- async def disconnect(self):
- """
- disconenct() explicitely disconnects the client, performes a proper
- the shutdow on the socket and closes it.
- """
- try:
- await self.send("")
- except:
- lch.debug("error during disconenct")
- try:
- await self.socket.shutdown(0)
- except:
- lch.debug("error during socket shutdown")
- try:
- await self.socket.close()
- except:
- lch.debug("error closing socket")
- async def handle(self, data):
- return
- async def on_welcome(self):
- """
- This method can be used to send a welcome message to the client.
- """
- await self.send("Welcome client, this is the server sending!")
- async def recv(self,block_size=None):
- """
- This method waits for the client to send something, decodes it and
- returns a string.
- """
- if block_size == None:
- block_size = self.block_size
- data_received = await self.socket.recv(block_size)
- data_decoded = data_received.decode("utf-8")
- return data_decoded
- async def send(self, data, log_msg=False):
- """
- This method takes a string, encodes it and sends it to the client.
- Returns False if an exception was raised during sending, otherwise True.
- """
- if log_msg:
- lschat.info("server:"+log_msg)
- else:
- lschat.info("Server:"+data)
- data_encoded = bytes(data, "utf-8")
- try:
- await self.socket.send(data_encoded)
- return True
- except Exception as e:
- lch.error(e, exc_info=True)
- return False
- class ConnHandlerEcho(ConnHandler):
- """
- A Conn handler which sends everything it receives to every other client
- connected to the server
- """
- def __init__(self, socket, addr, server):
- super(ConnHandlerEcho, self).__init__(socket, addr, server)
- def handle(self, data):
- for h in self.server.connection_handler:
- if not h is self:
- h.send(data)
- class Server(object):
- """
- Server opens either an unix or an inet connection. For every new client
- a new ClientHandler object is created.
- _string_ **host** and _int_ **port** are the filename, hostname or ip
- address and the port respectively on which the connection will be opened.
- If you make an AF_UNIX socket, port is ignored so simply pass None.
- _object_ **handler_class** is the class (not an object of the class) used
- for making connection handlers.
- _dict_ **handler_kwargs** is a dict which will be passed as keyword
- argumetns to the __init__ function of handler_class when creating a new
- connection handler. Default is an emtpy dict.
- _string_ **af_family** specifies the AF_FAMILY socket type, valid options
- are: "AF_INET" for a inet socket and "AF_UNIX" for a unix (file) socket.
- Default is "AF_INET".
- _bool_ **log_ip** specifies wether the ip address of the server/client is logged.
- Default is False.
- _int_ **max_allowed_clients** specifies the maximum amount of clients
- connected to the server at once. Default is 5 (for now. this will change
- in the future).
- """
- # set to true when the server shuts down, for instance after a
- # fatal exception
- exit_event = False
- def __init__(
- self,
- host,
- port,
- handler_class,
- handler_kwargs={},
- af_family="AF_INET",
- log_ip=False,
- max_allowed_clients=5,
- ):
- super(Server, self).__init__()
- self.host = host
- self.port = port
- self.handler_class = handler_class
- self.handler_kwargs = handler_kwargs
- self.af_family = af_family
- self.log_ip = log_ip
- self.max_allowed_clients = max_allowed_clients
- # the error handler used by, for instance server.shutdown
- self.safe_handle = ErrorHandler(lserver)
- # don't make the socket yet, we don't need right now. will be created
- # when the start method is called
- self.socket = None
- # create a task group for handlers, so we can easily cancel/terminate
- # them all at once
- self.handle_tasks = curio.TaskGroup(name="tg_handle_clients")
- # list of all active connection handlers
- self.connection_handler = []
- # register our cleanup method to be executed when the program exits.
- # the cleanup function unregisters itself, so it won't get executed
- # twice when the user called it befor the program exites
- atexit.register(self.shutdown)
- def make_socket(self):
- """
- factory method for sockets.
- this method makes a normal socket and wraps it in a curi.io.Socket wrapper
- """
- lserver.debug("making a {} socket".format(self.af_family))
- if self.af_family == "AF_INET":
- s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
- elif self.af_family == "AF_UNIX":
- s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
- else:
- raise ValueError(
- "AF_FAMILY '{}' not supported!".format(
- self.af_family
- )
- )
- s = curio.io.Socket(s)
- return s
- def make_handler(self, conn, addr):
- """
- factory method for handlers.
- this method creates a handler object from self.handler_class and self.handler_kwargs
- """
- return self.handler_class(conn, addr, self, **self.handler_kwargs)
- def setup(self):
- """
- creates a scoket, opens the connection and starts listening for
- clients. this method does not block, it returns after starting to
- listen.
- """
- lserver.info("setting up server")
- self.socket = self.make_socket()
- if self.af_family == "AF_INET":
- self.socket.bind((self.host, self.port))
- elif self.af_family == "AF_UNIX":
- if os.path.exists(self.host):
- lserver.debug("file already exists")
- lserver.debug("attempting to remove it")
- os.remove(self.host)
- self.socket.bind(self.host)
- self.socket.listen(self.max_allowed_clients)
- def shutdown(self):
- """
- this method can be called synchronous and calls the asynchronous
- shutdown method.
- """
- curio.run(self.async_shutdown)
- async def async_shutdown(self):
- """
- This method properly shuts down the sockets and closes them.
- it unregisters itself from atexit, so it doesn't get executed twice
- when it was manually called before to program exits.
- """
- atexit.unregister(self.shutdown)
- lserver.info("shutting down server")
- await self.safe_handle.arun(
- self.handle_tasks.cancel_remaining,text="handler cancel")
- try:
- await curio.ignore_after(0.01,self.handle_tasks.join)
- except Exception as exc:
- lserver.error("error joining tasks:",exc_info=exc)
- # check if there is actually a socket. if the shutdown method is
- # executed before the start method, there is no socket.
- if self.socket:
- await self.safe_handle.arun(self.socket.shutdown,
- args=[socket.SHUT_RDWR,],text="socket shutdown")
- await self.safe_handle.arun(self.socket.close,text="socket close")
- def start(self):
- """
- this method starts the server. it is blocking.
- """
- self.setup()
- curio.run(self.run)
- async def run(self):
- """
- this method is the main loop of the Server. it waits for new client
- connections and creates a handle task for each of them. it does not
- receive or send anything.
- """
- lserver.debug("entering main loop")
- while ( not self.exit_event ):
- lserver.debug("waiting for client to connect")
- # wait for a client to connect
- conn,addr = await self.socket.accept()
- if self.log_ip:
- lserver.info(
- "new client connection, {}:{}!".format(*addr))
- else:
- lserver.info("a new client connected, let's handle it")
- handler = self.make_handler(conn, addr)
- # self.register_conn(conn, addr)
- # self.register_handler(handler, conn)
- await self.handle_tasks.spawn(self.handle_client(conn,handler))
- async def handle_client(self,socket,handler):
- """
- This method waits for the client to send something and calls the
- ClientHandler's handle method. there is a handle_client method running for each client connected.
- """
- if hasattr(handler,"on_welcome"):
- await handler.on_welcome()
- while True:
- try:
- if self.log_ip:
- lserver.debug("waiting for {} to send something"
- .format(socket.getsockname()))
- else:
- lserver.debug("waiting for the client to send something")
- # wait for the client to send something
- data = await handler.recv()
- # if there is no data the client disconnected. this is a
- # tcp protocoll specification.
- if not data:
- if self.log_ip:
- lserver.info("the connection to {} was closed"
- .format(socket.getsockname()))
- else:
- lserver.info("the connection to the client was closed")
- await handler.disconnect()
- # break out of the loop. don't return because we need to
- # do cleanup
- break
- else:
- # don't strip the data of its whitespaces, since they may
- # be important.
- lschat.info("Client:"+data.rstrip())
- await handler.handle(data)
- except Exception as e:
- lserver.error(e, exc_info=True)
- # let's sleep a bit, in case something is broken and the
- # loop throws an exception every time
- await curio.sleep(0.01)
- # if a task exits and hasn't been joined, curio prints a warning.
- # we don't want the warning, so let's join the current task for 0
- # seconds. instead of task.join() we use task.wait(). the only
- # difference is that wait doesn't throw a exception if the task was
- # stopped or crashed
- cur_task = await curio.current_task()
- await curio.ignore_after(0.01,cur_task.wait)
- class Client(threading.Thread):
- """docstring for Client"""
- status = "uninitialized"
- exit_event = False
- def __init__(
- self,
- host,
- port=None,
- af_family="AF_INET",
- handle_data_func=None,
- error_handler=None,
- block_size=1024,
- ):
- super()__init__()
- self.name = "Client"
- self.host = host
- self.port = port
- self.af_family = af_family
- self.block_size = block_size
- self.handle_data_func = handle_data_func
- self.error_handler = error_handler
- self.socket = None
- self.status = "disconnected"
- def connect(self):
- self.status = "connecting"
- self.socket = self.make_socket()
- lclient.info("connecting to socket '{}' of type {}"
- .format(self.host,self.af_family))
- try:
- if self.af_family == "AF_INET":
- self.socket.connect((self.host, self.port))
- elif self.af_family == "AF_UNIX":
- self.socket.connect(self.host)
- self.status = "connected"
- lclient.info("connected")
- return True
- except Exception as e:
- lclient.debug(e, exc_info=True)
- if type(e) is ConnectionRefusedError:
- lclient.info(
- "failed to connect to socket '{}'".format(self.host))
- self.disconnect()
- return False
- def disconnect(self):
- lclient.info("disconnecting from socket '{}'".format(self.host))
- self.status = "disconnected"
- if self.socket:
- try:
- self.socket.shutdown(socket.SHUT_RDWR)
- except Exception as e:
- lclient.error(e)
- try:
- self.socket.close()
- except Exception as e:
- lclient.error("error occured while closing the socket, " +
- "maybe it is already closed",exc_info=e)
- del self.socket
- self.socket = None
- def handle_data(self, data_received):
- data_decoded = data_received.decode("utf-8")
- lcchat.info("Server: "+data_decoded)
- if self.handle_data_func:
- try:
- self.handle_data_func(data_decoded)
- except Exception as e:
- lclient.error(e, exc_info=True)
- def is_running(self):
- return (self in threading.enumerate())
- def make_socket(self):
- lclient.info("creating a {} socket".format(self.af_family))
- if self.af_family == "AF_INET":
- s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
- elif self.af_family == "AF_UNIX":
- s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
- else:
- raise ValueError(
- "AF_FAMILY '{}' not supported!".format(
- self.af_family
- )
- )
- return s
- def main_loop(self):
- lclient.debug("starting main loop")
- while ( not self.exit_event ):
- if not self.status in ["connected"]:
- time.sleep(0.1)
- continue
- # print(0)
- read_confirmed, write_confirmed, exc_confirmed \
- = select.select(
- [self.socket],
- [],
- [self.socket],
- 1
- )
- if self.socket in exc_confirmed:
- lclient.warning("socket is expected to corrupt, exiting")
- self.disconnect()
- # self.stop()
- break
- elif self.socket in read_confirmed:
- try:
- data_received = self.read_from_socket()
- if data_received == b'':
- lclient.info("connection is broken, closing socket exiting")
- self.disconnect()
- # self.stop()
- # break
- else:
- try:
- self.handle_data(data_received)
- except Exception as e:
- lserver.error(
- "Error while handling data",
- exc_info=e
- )
- except Exception as e:
- lclient.error(e, exc_info=True)
- if type(e) is OSError:
- lclient.warn("connection broken, exiting")
- self.disconnect()
- # self.stop()
- # break
- else:
- raise
- else:
- time.sleep(0.1)
- def read_from_socket(self):
- data_received = self.socket.recv(self.block_size)
- return data_received
- def run(self):
- # self.connect()
- if self.error_handler:
- self.error_handler(self.main_loop)
- else:
- self.main_loop()
- def send(self, msg):
- msg = msg.rstrip()
- msg_encoded = bytes(msg+"\r\n", "utf-8")
- try:
- lcchat.info("Client: "+msg)
- self.socket.send(msg_encoded)
- except Exception as e:
- lclient.error(e, exc_info=True)
- self.status = "shutdown"
- def setup(self):
- pass
- def stop(self,reason=None):
- self.disconnect()
- self.exit_event = True
- if reason:
- print(reason)
- class AsyncClient(object):
- """docstring for Client"""
- block_size = 1024
- status = "uninitialized"
- exit_event = False
- def __init__(
- self,
- host,
- port=None,
- af_family="AF_INET",
- handle_data_func=None,
- error_handler=None,
- block_size=1024,
- ):
- super().__init__()
- self.host = host
- self.port = port
- self.af_family = af_family
- self.block_size = block_size
- self.handle_data_func = handle_data_func
- self.error_handler = error_handler
- self.socket = None
- self.status = "disconnected"
- def connect(self):
- self.status = "connecting"
- self.socket = self.make_socket()
- lclient.info("connecting to socket '{}' of type {}".format(
- self.host,self.af_family))
- try:
- if self.af_family == "AF_INET":
- self.socket.connect((self.host, self.port))
- elif self.af_family == "AF_UNIX":
- self.socket.connect(self.host)
- self.is_connected = True
- self.status = "connected"
- lclient.info("connected")
- return True
- except Exception as e:
- lclient.debug(e, exc_info=True)
- if type(e) is ConnectionRefusedError:
- lclient.info("failed to connect to socket '{}'".format(self.host))
- self.disconnect()
- return False
- def disconnect(self):
- lclient.info("disconnecting from socket '{}'".format(self.host))
- self.is_connected = False
- self.status = "disconnected"
- if self.socket:
- try:
- self.socket.shutdown(socket.SHUT_RDWR)
- except Exception as e:
- lclient.error(e)
- try:
- self.socket.close()
- except Exception as e:
- lclient.error("error occured while closing the socket, " +
- "maybe it is already closed",exc_info=e)
- del self.socket
- self.socket = None
- def handle_data(self, data_received):
- data_decoded = data_received.decode("utf-8")
- lcchat.info("Server: "+data_decoded)
- if self.handle_data_func:
- try:
- self.handle_data_func(data_decoded)
- except Exception as e:
- lclient.error(e, exc_info=True)
- def is_running(self):
- return (self in threading.enumerate())
- def make_socket(self):
- lclient.info("creating a {} socket".format(self.af_family))
- if self.af_family == "AF_INET":
- s = trio.socket.socket(socket.AF_INET,socket.SOCK_STREAM)
- elif self.af_family == "AF_UNIX":
- s = trio.socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
- else:
- raise ValueError(
- "AF_FAMILY '{}' not supported!".format(
- self.af_family
- )
- )
- return s
- async def read_from_socket(self):
- data_received = await self.socket.recv(self.block_size)
- return data_received
- async def recv(self):
- data_received = await self.socket.recv(self.block_size)
- data_decoded = data_received.decode("utf-8")
- return data_decoded
- async def run(self):
- lclient.debug("starting main loop")
- while ( not self.exit_event ):
- if not self.status in ["connected"]:
- time.sleep(0.1)
- continue
- # if self.socket in exc_confirmed:
- # self.is_connected = False
- # lclient.warning("socket is expected to corrupt, exiting")
- # self.disconnect()
- # # self.stop()
- # break
- # elif self.socket in read_confirmed:
- try:
- data = await self.read_from_socket()
- if not data:
- lclient.info("connection closed")
- self.disconnect()
- else:
- self.handle_data(data)
- except Exception as e:
- lclient.error(e, exc_info=True)
- if type(e) is OSError:
- self.is_connected = False
- lclient.warn("connection broken, exiting")
- self.disconnect()
- else:
- raise
- async def start(self,connect=False):
- if connect:
- self.connect()
- if self.error_handler:
- self.error_handler(self.run)
- else:
- self.run()
- def send(self, msg):
- msg = msg.rstrip()
- msg_encoded = bytes(msg+"\r\n", "utf-8")
- try:
- lcchat.info("Client: "+msg)
- self.socket.send(msg_encoded)
- except Exception as e:
- self.is_connected = False
- lclient.error(e, exc_info=True)
- self.disconnect()
- # self.exit_event = True
- # self.status = "shutdown"
- def setup(self):
- pass
- def stop(self,reason=None):
- self.disconnect()
- self.exit_event = True
- if reason:
- print(reason)
- #
|