123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679 |
- #!/usr/bin/env python3.5
- # Copyright 2017 Digital
- #
- # This file is part of DigiLib.
- #
- # DigiLib is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # DigiLib is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with DigiLib. If not, see <http://www.gnu.org/licenses/>.
- import atexit
- import logging
- import logging.handlers
- import os
- import queue
- import select
- import socket
- import sys
- import threading
- import time
- import traceback
- import blinker
- import curio
- import digilib.misc
- lclient = logging.getLogger(__name__+".client")
- lserver = logging.getLogger(__name__+".server")
- lschat = logging.getLogger(__name__+".server.chat")
- lcchat = logging.getLogger(__name__+".client.chat")
- _tasks = []
- class ConnHandlerBase(object):
- def __init__(self, socket, addr, server):
- self.status = "init"
- super(ConnHandlerBase, self).__init__()
- self.socket = socket
- self.addr = addr
- self.server = server
- self.block_size = 1024
- # self.welcome_client()
- self.status="connected"
- async def welcome_client(self):
- pass
- async def handle(self, data):
- return
- async def recv(self):
- data_received = await self.socket.recv(self.block_size)
- data_decoded = data_received.decode("utf-8")
- return data_decoded
- async def send(self, msg, log_msg=False):
- msg_encoded = bytes(msg, "utf-8")
- if log_msg:
- lschat.info("server:"+log_msg)
- else:
- lschat.info("Server:"+msg)
- try:
- await self.socket.send(msg_encoded)
- return True
- except Exception as e:
- lserver.error(e, exc_info=True)
- return False
- async def close(self):
- self.status = "closed"
- try:
- await self.socket.shutdown(0)
- except:
- lserver.debug("error during socket shutdown")
- try:
- await self.socket.close()
- except:
- lserver.debug("error closing socket")
- class ConnHandlerEcho(ConnHandlerBase):
- def __init__(self, socket, addr, server):
- self.status = "init"
- self.super_class = super(ConnHandlerEcho, self)
- self.super_class.__init__(socket, addr, server)
- self.server = server
- def welcome_client(self):
- self.send("welcome to the client")
- def handle(self, data):
- lschat.info("Client:"+data)
- for h in list(set(self.server.connection_handler)-{self}):
- h.send(data)
- class Server(object):
- """
- Server opens either an unix or an inet connection. for every client which connects a new ClientHandler class is created.
- """
- def __init__(self,
- host,
- port=None,
- af_family="AF_INET",
- log_ip=False,
- max_allowed_clients=5,
- handler_class=None,
- handler_kwargs={},
- ):
- super(Server, self).__init__()
- # set to true when the server shuts down, for instance after a
- # fatal exception
- self.exit_event = False
- # on which hostname or ip address the connection will be opened
- self.host = host
- # on which port the connection will be opened
- self.port = port
- # what AF_INET family to use, either AF_INET or AF_UNIX (file socket)
- self.af_family = af_family
- # whether ip addresses will be logged
- self.log_ip = log_ip
- # number of maximum client connection at a time
- self.max_allowed_clients = max_allowed_clients
- # this handler class will be used when creating a new handler
- self.handler_class = handler_class
- # the kwargs passed to the handler's __init__ function
- self.handler_kwargs = handler_kwargs
- # don't make the socket yet, we don't need right now. will be created
- # when the start method is called
- # self.socket = self.make_socket()
- self.socket = None
- # create a task group for handlers, so we can easily cancel/terminate
- # them all at once
- self.handle_tasks = curio.TaskGroup(name="tg_handle_clients")
- # list of all active connection handlers
- self.connection_handler = []
- # these aren't actually used, I will remove them soon
- # self.conn_to_addr = {}
- # self.addr_to_conn = {}
- # self.conn_to_handler ={}
- # self.handler_to_conn = {}
- # this was used for the select.select approach. we use the async approach now, let's remove it
- # self.read_sockets_expected = [self.socket]
- # register our cleanup method to be executed when the program exits.
- # the cleanup function unregisters itself, so it won't get executed twice when the user called it befor the program exites
- atexit.register(self.shutdown)
- def make_socket(self):
- """
- factory method for sockets.
- this method makes a normal socket and wraps it in a curi.io.Socket wrapper
- """
- lserver.debug("making a {} socket".format(self.af_family))
- if self.af_family == "AF_INET":
- s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
- elif self.af_family == "AF_UNIX":
- s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
- else:
- raise ValueError(
- "AF_FAMILY '{}' not supported!".format(
- self.af_family
- )
- )
- s = curio.io.Socket(s)
- return s
- def make_handler(self, conn, addr):
- """
- factory method for handlers.
- this method creates a handler object from self.handler_class and self.handler_kwargs
- """
- return self.handler_class(conn, addr, self, **self.handler_kwargs)
- # DEPRECATED
- # def register_conn(self, conn, addr):
- # # if self.log_ip:
- # # lserver.info("New connection from {} on port {}".format(*addr))
- # self.read_sockets_expected.append(conn)
- # if addr:
- # self.conn_to_addr[conn] = addr
- # self.addr_to_conn[addr] = conn
- #
- # def unregister_conn(self, conn):
- # self.read_sockets_expected.remove(conn)
- # addr = self.conn_to_addr.get(conn, False)
- # if addr:
- # del self.addr_to_conn[addr]
- # del self.conn_to_addr[conn]
- #
- # def register_handler(self, handler, conn):
- # self.connection_handler.append(handler)
- # self.conn_to_handler[conn] = handler
- # self.handler_to_conn[handler] = conn
- #
- # def unregister_handler(self, handler, conn):
- # self.connection_handler.remove(handler)
- # del self.conn_to_handler[conn]
- # del self.handler_to_conn[handler]
- def setup(self):
- """
- opens the connection and starts listening for clients. this method
- does not block, it returns after starting to listen.
- """
- lserver.info("setting up socket")
- if self.af_family == "AF_INET":
- self.socket.bind((self.host, self.port))
- elif self.af_family == "AF_UNIX":
- if os.path.exists(self.host):
- lserver.debug("file already exists")
- lserver.debug("attempting to remove it")
- os.remove(self.host)
- self.socket.bind(self.host)
- self.socket.listen(self.max_allowed_clients)
- # self.socket.settimeout(1)
- def shutdown(self):
- """
- This method properly shuts down the sockets and closes them.
- it unregisters itself from atexit, so it doesn't get executed twice
- when it was manually called before to program exits.
- """
- def error_handler(func,*args,log_text="error",**kwargs):
- try:
- func(*args,**kwargs)
- except Exception as exc:
- lserver.debug("error occured during "+log_text,exc_info=exc)
- atexit.unregister(self.shutdown)
- lserver.info("shutting down server")
- error_handler(
- self.handle_tasks.cancel_remaining,log_text="handler cancel")
- # check if there is actually a socket. if the shutdown method is
- # executed before the start method, there is no socket.
- if self.socket:
- error_handler(self.socket.shutdown,log_text="socket shutdown")
- error_handler(self.socket.close,log_text="socket close")
- def start(self):
- """
- this method starts the server. it is blocking.
- """
- self.setup()
- curio.run(self.run)
- async def run(self):
- """
- this method is the main loop of the Server. it waits for new client
- connections and creates a handle task for each of them. it does not
- receive or send anything.
- """
- lserver.debug("entering main loop")
- while ( not self.exit_event ):
- lserver.debug("waiting for client to connect")
- # wait for a client to connect
- conn,addr = await self.socket.accept()
- if self.log_ip:
- lserver.info(
- "new client connection, {}:{}!".format(*addr))
- else:
- lserver.info("a new client connected, let's handle it")
- handler = self.make_handler(conn, addr)
- # self.register_conn(conn, addr)
- # self.register_handler(handler, conn)
- await self.handle_tasks.spawn(self.handle_client(conn,handler))
- async def handle_client(self,socket,handler):
- """
- This method waits for the client to send something and calls the
- ClientHandler's handle method. there is a handle_client method running for each client connected.
- """
- while True:
- try:
- if self.log_ip:
- lserver.debug("waiting for {} to send something"
- .format(socket.getsockname()))
- else:
- lserver.debug("waiting for the client to send something")
- # wait for the client to send something
- data = await handler.recv()
- # if there is no data the client disconnected. this is a
- # tcp protocoll specification.
- if not data:
- if self.log_ip:
- lserver.info("the connection to {} was closed"
- .format(socket.getsockname()))
- else:
- lserver.info("the connection to the client was closed")
- # self.unregister_handler(handler, socket)
- # self.unregister_conn(socket)
- await handler.close()
- # break out of the loop. don't return because we need to
- # do cleanup
- break
- else:
- # don't strip the data of its whitespaces, since they may
- # be important.
- lschat.info("Client:"+data.rstrip())
- coro = handler.handle(data)
- task = await curio.spawn(coro)
- except Exception as e:
- lserver.error(e, exc_info=True)
- # let's sleep a bit, in case something is broken and the
- # loop throws an exception every time
- await async.sleep(0.01)
- # if a task exits and hasn't been joined, curio prints a warning.
- # we don't want the warning, so let's join the current task for 0
- # seconds. instead of task.join() we use task.wait(). the only
- # difference is that wait doesn't throw a exception if the task was
- # stopped or crashed
- cur_task = await curio.current_task()
- await curio.ignore_after(0,cur_task.wait)
- class Client(threading.Thread):
- """docstring for Client"""
- is_connecting = False
- is_connected = False
- status = "uninitialized"
- def __init__(self,
- host,
- port=None,
- af_family="AF_INET",
- handle_data_func=None,
- error_handler=None,
- block_size=1024,
- ):
- self.super_class = super(Client, self)
- self.super_class.__init__()
- self.name = "Client"
- self.exit_event = False
- self.host = host
- self.port = port
- self.af_family = af_family
- self.block_size = block_size
- self.handle_data_func = handle_data_func
- self.is_connected = False
- self.error_handler = error_handler
- # self.socket = self.make_socket()
- self.socket = None
- self.status = "disconnected"
- def connect(self):
- self.status = "connecting"
- self.socket = self.make_socket()
- lclient.info(
- "connecting to socket '{}' of type {}".format(
- self.host,
- self.af_family
- )
- )
- try:
- if self.af_family == "AF_INET":
- self.socket.connect((self.host, self.port))
- elif self.af_family == "AF_UNIX":
- if os.path.exists(self.host):
- self.socket.connect(self.host)
- else:
- lclient.warn("File not found. Aborting.")
- return
- self.is_connected = True
- self.status = "connected"
- lclient.info("connected")
- return True
- except Exception as e:
- lclient.debug(e, exc_info=True)
- if type(e) is ConnectionRefusedError:
- lclient.info("failed to connect to socket '{}'".format(self.host))
- self.disconnect()
- return False
- def disconnect(self):
- lclient.info("disconnecting from socket '{}'".format(self.host))
- self.is_connected = False
- self.status = "disconnected"
- if self.socket:
- try:
- self.socket.shutdown(socket.SHUT_RDWR)
- except Exception as e:
- lclient.error(e)
- try:
- self.socket.close()
- except Exception as e:
- lclient.error("error occured while closing the socket, " +
- "maybe it is already closed",exc_info=e)
- del self.socket
- self.socket = None
- def handle_data(self, data_received):
- data_decoded = data_received.decode("utf-8")
- lcchat.info("Server: "+data_decoded)
- if self.handle_data_func:
- try:
- self.handle_data_func(data_decoded)
- except Exception as e:
- lclient.error(e, exc_info=True)
- def is_running(self):
- return (self in threading.enumerate())
- def make_socket(self):
- lclient.info("creating a {} socket".format(self.af_family))
- if self.af_family == "AF_INET":
- s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
- elif self.af_family == "AF_UNIX":
- s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
- else:
- raise ValueError(
- "AF_FAMILY '{}' not supported!".format(
- self.af_family
- )
- )
- return s
- def main_loop(self):
- lclient.debug("starting main loop")
- while ( not self.exit_event ):
- if not self.status in ["connected"]:
- time.sleep(0.1)
- continue
- # print(0)
- read_confirmed, write_confirmed, exc_confirmed \
- = select.select(
- [self.socket],
- [],
- [self.socket],
- 1
- )
- if self.socket in exc_confirmed:
- self.is_connected = False
- lclient.warning("socket is expected to corrupt, exiting")
- self.disconnect()
- # self.stop()
- break
- elif self.socket in read_confirmed:
- try:
- data_received = self.read_from_socket()
- if data_received == b'':
- lclient.info("connection is broken, closing socket exiting")
- self.disconnect()
- # self.stop()
- # break
- else:
- try:
- self.handle_data(data_received)
- except Exception as e:
- lserver.error(
- "Error while handling data",
- exc_info=e
- )
- except Exception as e:
- lclient.error(e, exc_info=True)
- if type(e) is OSError:
- self.is_connected = False
- lclient.warn("connection broken, exiting")
- self.disconnect()
- # self.stop()
- # break
- else:
- raise
- else:
- time.sleep(0.1)
- def read_from_socket(self):
- data_received = self.socket.recv(self.block_size)
- return data_received
- def run(self):
- # self.connect()
- if self.error_handler:
- self.error_handler(self.main_loop)
- else:
- self.main_loop()
- def send(self, msg):
- msg = msg.rstrip()
- msg_encoded = bytes(msg+"\r\n", "utf-8")
- try:
- lcchat.info("Client: "+msg)
- self.socket.send(msg_encoded)
- except Exception as e:
- self.is_connected = False
- lclient.error(e, exc_info=True)
- self.status = "shutdown"
- def setup(self):
- pass
- def stop(self,reason=None):
- self.disconnect()
- self.exit_event = True
- if reason:
- print(reason)
- class AsyncClient(object):
- """docstring for Client"""
- is_connecting = False
- is_connected = False
- status = "uninitialized"
- def __init__(self,
- host,
- port=None,
- af_family="AF_INET",
- handle_data_func=None,
- error_handler=None,
- block_size=1024,
- ):
- self.super_class = super(AsyncClient, self)
- self.super_class.__init__()
- self.name = "Client"
- self.exit_event = False
- self.host = host
- self.port = port
- self.af_family = af_family
- self.block_size = block_size
- self.handle_data_func = handle_data_func
- self.is_connected = False
- self.error_handler = error_handler
- self.socket = None
- self.status = "disconnected"
- def connect(self):
- self.status = "connecting"
- self.socket = self.make_socket()
- lclient.info("connecting to socket '{}' of type {}".format(
- self.host,self.af_family))
- try:
- if self.af_family == "AF_INET":
- self.socket.connect((self.host, self.port))
- elif self.af_family == "AF_UNIX":
- self.socket.connect(self.host)
- self.is_connected = True
- self.status = "connected"
- lclient.info("connected")
- return True
- except Exception as e:
- lclient.debug(e, exc_info=True)
- if type(e) is ConnectionRefusedError:
- lclient.info("failed to connect to socket '{}'".format(self.host))
- self.disconnect()
- return False
- def disconnect(self):
- lclient.info("disconnecting from socket '{}'".format(self.host))
- self.is_connected = False
- self.status = "disconnected"
- if self.socket:
- try:
- self.socket.shutdown(socket.SHUT_RDWR)
- except Exception as e:
- lclient.error(e)
- try:
- self.socket.close()
- except Exception as e:
- lclient.error("error occured while closing the socket, " +
- "maybe it is already closed",exc_info=e)
- del self.socket
- self.socket = None
- def handle_data(self, data_received):
- data_decoded = data_received.decode("utf-8")
- lcchat.info("Server: "+data_decoded)
- if self.handle_data_func:
- try:
- self.handle_data_func(data_decoded)
- except Exception as e:
- lclient.error(e, exc_info=True)
- def is_running(self):
- return (self in threading.enumerate())
- def make_socket(self):
- lclient.info("creating a {} socket".format(self.af_family))
- if self.af_family == "AF_INET":
- s = trio.socket.socket(socket.AF_INET,socket.SOCK_STREAM)
- elif self.af_family == "AF_UNIX":
- s = trio.socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
- else:
- raise ValueError(
- "AF_FAMILY '{}' not supported!".format(
- self.af_family
- )
- )
- return s
- async def read_from_socket(self):
- data_received = await self.socket.recv(self.block_size)
- return data_received
- async def recv(self):
- data_received = await self.socket.recv(self.block_size)
- data_decoded = data_received.decode("utf-8")
- return data_decoded
- async def run(self):
- lclient.debug("starting main loop")
- while ( not self.exit_event ):
- if not self.status in ["connected"]:
- time.sleep(0.1)
- continue
- # if self.socket in exc_confirmed:
- # self.is_connected = False
- # lclient.warning("socket is expected to corrupt, exiting")
- # self.disconnect()
- # # self.stop()
- # break
- # elif self.socket in read_confirmed:
- try:
- data = await self.read_from_socket()
- if not data:
- lclient.info("connection closed")
- self.disconnect()
- else:
- self.handle_data(data)
- except Exception as e:
- lclient.error(e, exc_info=True)
- if type(e) is OSError:
- self.is_connected = False
- lclient.warn("connection broken, exiting")
- self.disconnect()
- else:
- raise
- async def start(self,connect=False):
- if connect:
- self.connect()
- if self.error_handler:
- self.error_handler(self.run)
- else:
- self.run()
- def send(self, msg):
- msg = msg.rstrip()
- msg_encoded = bytes(msg+"\r\n", "utf-8")
- try:
- lcchat.info("Client: "+msg)
- self.socket.send(msg_encoded)
- except Exception as e:
- self.is_connected = False
- lclient.error(e, exc_info=True)
- self.disconnect()
- # self.exit_event = True
- # self.status = "shutdown"
- def setup(self):
- pass
- def stop(self,reason=None):
- self.disconnect()
- self.exit_event = True
- if reason:
- print(reason)
- #
|