|
- #!/usr/bin/env python3.5
- # Copyright 2017 Digital
- #
- # This file is part of DigiLib.
- #
- # DigiLib is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # DigiLib is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with DigiLib. If not, see <http://www.gnu.org/licenses/>.
- import logging
- import logging.handlers
- import os
- import queue
- import select
- import socket
- import sys
- import threading
- import time
- import traceback
- import blinker
- import curio
- lclient = logging.getLogger(__name__+".client")
- lserver = logging.getLogger(__name__+".server")
- lschat = logging.getLogger(__name__+".server.chat")
- lcchat = logging.getLogger(__name__+".client.chat")
- class ConnHandlerBase(object):
- def __init__(self, socket, addr, server):
- self.status = "init"
- super(ConnHandlerBase, self).__init__()
- self.socket = socket
- self.addr = addr
- self.server = server
- self.block_size = 1024
- # self.welcome_client()
- self.status="connected"
- async def welcome_client(self):
- pass
- async def handle(self, data):
- return
- async def recv(self):
- data_received = await self.socket.recv(self.block_size)
- data_decoded = data_received.decode("utf-8")
- return data_decoded
- async def send(self, msg):
- msg_encoded = bytes(msg, "utf-8")
- lschat.info("Server:"+msg)
- try:
- await self.socket.send(msg_encoded)
- return True
- except Exception as e:
- lserver.error(e, exc_info=True)
- return False
- def close(self):
- self.status = "closed"
- try:
- self.socket.shutdown(0)
- except:
- lserver.debug("error during socket shutdown")
- try:
- self.socket.close()
- except:
- lserver.debug("error closing socket")
- class ConnHandlerEcho(ConnHandlerBase):
- def __init__(self, socket, addr, server):
- self.status = "init"
- self.super_class = super(ConnHandlerEcho, self)
- self.super_class.__init__(socket, addr, server)
- self.server = server
- def welcome_client(self):
- self.send("welcome to the client")
- def handle(self, data):
- lschat.info("Client:"+data)
- for h in list(set(self.server.connection_handler)-{self}):
- h.send(data)
- class Server(object):
- """docstring for SocketHandler"""
- def __init__(self,
- host,
- port=None,
- af_family="AF_INET",
- log_ip=False,
- max_allowed_clients=5,
- handler=None,
- handler_kwargs={},
- ):
- super(Server, self).__init__()
- self.exit_event = False
- self.host=host
- self.port=port
- self.af_family = af_family
- self.log_ip = log_ip
- self.handler_kwargs = handler_kwargs
- self.handler = handler
- self.max_allowed_clients = max_allowed_clients
- self.socket = self.make_socket()
- self.connection_handler = []
- self.conn_to_addr = {}
- self.addr_to_conn = {}
- self.conn_to_handler ={}
- self.handler_to_conn = {}
- self.read_sockets_expected = [self.socket]
- def cleanup(self):
- pass
- def make_socket(self):
- lserver.debug("making a {} socket".format(self.af_family))
- if self.af_family == "AF_INET":
- s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
- elif self.af_family == "AF_UNIX":
- s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
- else:
- raise ValueError(
- "AF_FAMILY '{}' not supported!".format(
- self.af_family
- )
- )
- s = curio.io.Socket(s)
- return s
- def make_handler(self, conn, addr):
- return self.handler(conn, addr, self, **self.handler_kwargs)
- def register_conn(self, conn, addr):
- # if self.log_ip:
- # lserver.info("New connection from {} on port {}".format(*addr))
- self.read_sockets_expected.append(conn)
- if addr:
- self.conn_to_addr[conn] = addr
- self.addr_to_conn[addr] = conn
- def unregister_conn(self, conn):
- self.read_sockets_expected.remove(conn)
- addr = self.conn_to_addr.get(conn, False)
- if addr:
- del self.addr_to_conn[addr]
- del self.conn_to_addr[conn]
- def register_handler(self, handler, conn):
- self.connection_handler.append(handler)
- self.conn_to_handler[conn] = handler
- self.handler_to_conn[handler] = conn
- def unregister_handler(self, handler, conn):
- self.connection_handler.remove(handler)
- del self.conn_to_handler[conn]
- del self.handler_to_conn[handler]
- def setup(self):
- lserver.info("setting up socket")
- if self.af_family == "AF_INET":
- self.socket.bind((self.host, self.port))
- elif self.af_family == "AF_UNIX":
- if os.path.exists(self.host):
- lserver.debug("file already exists")
- lserver.debug("attempting to remove it")
- os.remove(self.host)
- self.socket.bind(self.host)
- self.socket.listen(self.max_allowed_clients)
- # self.socket.settimeout(1)
- def start(self):
- # lserver.debug(dir(self))
- curio.run(self.run)
- async def run(self):
- self.setup()
- lserver.debug("entering main loop")
- while ( not self.exit_event ):
- # lserver.debug(self.read_sockets_expected)
- # lserver.debug(self.write_sockets_expected)
- # lserver.debug(self.exc_sockets_expected)
- # read_sockets = select.select(self.read_sockets_expected,[],[])
- lserver.debug("waiting for client to connect")
- conn,addr = await self.socket.accept()
- if self.log_ip:
- lserver.info(
- "new client connection, {}:{}!".format(*addr))
- else:
- lserver.info("a new client connected, let's handle it")
- handler = self.make_handler(conn, addr)
- self.register_conn(conn, addr)
- self.register_handler(handler, conn)
- await curio.spawn(self.wait_for_client(conn,handler))
- # for s in read_sockets_confirmed:
- # socket_handler = self.conn_to_handler.get(s, None)
- # if ( s == self.socket ):
- # lserver.debug("handling new client")
- # conn, addr = self.socket.accept()
- # if self.log_ip:
- # lserver.info(
- # "New connection from {} on port {}".format(*addr))
- # handler = self.make_handler(conn, addr)
- # self.register_conn(conn, addr)
- # self.register_handler(handler, conn)
- # elif ( socket_handler
- # and (socket_handler in self.connection_handler) ):
- # lserver.debug("handling client connection")
- # try:
- # data = socket_handler.recv()
- # if not data:
- # lserver.info("connection {} closed".format(self.socket))
- # self.unregister_handler(socket_handler, s)
- # self.unregister_conn(conn)
- # socket_handler.close()
- # else:
- # lschat.info("Client:"+data.strip())
- # socket_handler.handle(data)
- # except Exception as e:
- # lserver.error(e, exc_info=True)
- # else:
- # lserver.debug("else!")
- # lserver.debug(socket_handler)
- # time.sleep(1)
- async def wait_for_client(self,socket,handler):
- while True:
- try:
- if self.log_ip:
- lserver.debug(
- "waiting for {} to send something"
- .format(socket.getsockname()))
- else:
- lserver.debug(
- "waiting for the client to send something")
- data = await handler.recv()
- if not data:
- if self.log_ip:
- lserver.info(
- "the connection to {} was closed"
- .format(socket.getsockname()))
- else:
- lserver.info(
- "the connection to the client was closed")
- self.unregister_handler(handler, socket)
- self.unregister_conn(socket)
- handler.close()
- else:
- lschat.info("Client:"+data.strip())
- await handler.handle(data)
- except Exception as e:
- lserver.error(e, exc_info=True)
- # saver to use time.sleep
- time.sleep(0.1)
- class Client(threading.Thread):
- """docstring for Client"""
- is_connecting = False
- is_connected = False
- status = "uninitialized"
- def __init__(self,
- host,
- port=None,
- af_family="AF_INET",
- handle_data_func=None,
- error_handler=None,
- block_size=1024,
- ):
- self.super_class = super(Client, self)
- self.super_class.__init__()
- self.name = "Client"
- self.exit_event = False
- self.host = host
- self.port = port
- self.af_family = af_family
- self.block_size = block_size
- self.handle_data_func = handle_data_func
- self.is_connected = False
- self.error_handler = error_handler
- # self.socket = self.make_socket()
- self.socket = None
- self.status = "disconnected"
- def connect(self):
- self.status = "connecting"
- self.socket = self.make_socket()
- lclient.info(
- "connecting to socket '{}' of type {}".format(
- self.host,
- self.af_family
- )
- )
- try:
- if self.af_family == "AF_INET":
- self.socket.connect((self.host, self.port))
- elif self.af_family == "AF_UNIX":
- if os.path.exists(self.host):
- self.socket.connect(self.host)
- else:
- lclient.warn("File not found. Aborting.")
- return
- self.is_connected = True
- self.status = "connected"
- lclient.info("connected")
- return True
- except Exception as e:
- lclient.debug(e, exc_info=True)
- if type(e) is ConnectionRefusedError:
- lclient.info("failed to connect to socket '{}'".format(self.host))
- self.disconnect()
- return False
- def disconnect(self):
- lclient.info("disconnecting from socket '{}'".format(self.host))
- self.is_connected = False
- self.status = "disconnected"
- if self.socket:
- try:
- self.socket.shutdown(socket.SHUT_RDWR)
- except Exception as e:
- lclient.error(e)
- try:
- self.socket.close()
- except Exception as e:
- lclient.error("error occured while closing the socket, " +
- "maybe it is already closed",exc_info=e)
- del self.socket
- self.socket = None
- def handle_data(self, data_received):
- data_decoded = data_received.decode("utf-8")
- lcchat.info("Server: "+data_decoded)
- if self.handle_data_func:
- try:
- self.handle_data_func(data_decoded)
- except Exception as e:
- lclient.error(e, exc_info=True)
- def is_running(self):
- return (self in threading.enumerate())
- def make_socket(self):
- lclient.info("creating a {} socket".format(self.af_family))
- if self.af_family == "AF_INET":
- s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
- elif self.af_family == "AF_UNIX":
- s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
- else:
- raise ValueError(
- "AF_FAMILY '{}' not supported!".format(
- self.af_family
- )
- )
- return s
- def main_loop(self):
- lclient.debug("starting main loop")
- while ( not self.exit_event ):
- if not self.status in ["connected"]:
- time.sleep(0.1)
- continue
- # print(0)
- read_confirmed, write_confirmed, exc_confirmed \
- = select.select(
- [self.socket],
- [],
- [self.socket],
- 1
- )
- if self.socket in exc_confirmed:
- self.is_connected = False
- lclient.warning("socket is expected to corrupt, exiting")
- self.disconnect()
- # self.stop()
- break
- elif self.socket in read_confirmed:
- try:
- data_received = self.read_from_socket()
- if data_received == b'':
- lclient.info("connection is broken, closing socket exiting")
- self.disconnect()
- # self.stop()
- # break
- else:
- try:
- self.handle_data(data_received)
- except Exception as e:
- lserver.error(
- "Error while handling data",
- exc_info=e
- )
- except Exception as e:
- lclient.error(e, exc_info=True)
- if type(e) is OSError:
- self.is_connected = False
- lclient.warn("connection broken, exiting")
- self.disconnect()
- # self.stop()
- # break
- else:
- raise
- else:
- time.sleep(0.1)
- def read_from_socket(self):
- data_received = self.socket.recv(self.block_size)
- return data_received
- def run(self):
- # self.connect()
- if self.error_handler:
- self.error_handler(self.main_loop)
- else:
- self.main_loop()
- def send(self, msg):
- msg = msg.rstrip()
- msg_encoded = bytes(msg+"\r\n", "utf-8")
- try:
- lcchat.info("Client: "+msg)
- self.socket.send(msg_encoded)
- except Exception as e:
- self.is_connected = False
- lclient.error(e, exc_info=True)
- self.status = "shutdown"
- def setup(self):
- pass
- def stop(self,reason=None):
- self.disconnect()
- self.exit_event = True
- if reason:
- print(reason)
- class AsyncClient(object):
- """docstring for Client"""
- is_connecting = False
- is_connected = False
- status = "uninitialized"
- def __init__(self,
- host,
- port=None,
- af_family="AF_INET",
- handle_data_func=None,
- error_handler=None,
- block_size=1024,
- ):
- self.super_class = super(AsyncClient, self)
- self.super_class.__init__()
- self.name = "Client"
- self.exit_event = False
- self.host = host
- self.port = port
- self.af_family = af_family
- self.block_size = block_size
- self.handle_data_func = handle_data_func
- self.is_connected = False
- self.error_handler = error_handler
- self.socket = None
- self.status = "disconnected"
- def connect(self):
- self.status = "connecting"
- self.socket = self.make_socket()
- lclient.info("connecting to socket '{}' of type {}".format(
- self.host,self.af_family))
- try:
- if self.af_family == "AF_INET":
- self.socket.connect((self.host, self.port))
- elif self.af_family == "AF_UNIX":
- self.socket.connect(self.host)
- # if os.path.exists(self.host):
- # pass
- # else:
- # lclient.warn("File not found. Aborting.")
- # return
- self.is_connected = True
- self.status = "connected"
- lclient.info("connected")
- return True
- except Exception as e:
- lclient.debug(e, exc_info=True)
- if type(e) is ConnectionRefusedError:
- lclient.info("failed to connect to socket '{}'".format(self.host))
- self.disconnect()
- return False
- def disconnect(self):
- lclient.info("disconnecting from socket '{}'".format(self.host))
- self.is_connected = False
- self.status = "disconnected"
- if self.socket:
- try:
- self.socket.shutdown(socket.SHUT_RDWR)
- except Exception as e:
- lclient.error(e)
- try:
- self.socket.close()
- except Exception as e:
- lclient.error("error occured while closing the socket, " +
- "maybe it is already closed",exc_info=e)
- del self.socket
- self.socket = None
- def handle_data(self, data_received):
- data_decoded = data_received.decode("utf-8")
- lcchat.info("Server: "+data_decoded)
- if self.handle_data_func:
- try:
- self.handle_data_func(data_decoded)
- except Exception as e:
- lclient.error(e, exc_info=True)
- def is_running(self):
- return (self in threading.enumerate())
- def make_socket(self):
- lclient.info("creating a {} socket".format(self.af_family))
- if self.af_family == "AF_INET":
- s = trio.socket.socket(socket.AF_INET,socket.SOCK_STREAM)
- elif self.af_family == "AF_UNIX":
- s = trio.socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
- else:
- raise ValueError(
- "AF_FAMILY '{}' not supported!".format(
- self.af_family
- )
- )
- return s
- def main_loop(self):
- lclient.debug("starting main loop")
- while ( not self.exit_event ):
- if not self.status in ["connected"]:
- time.sleep(0.1)
- continue
- # print(0)
- read_confirmed, write_confirmed, exc_confirmed \
- = select.select(
- [self.socket],
- [],
- [self.socket],
- 1
- )
- if self.socket in exc_confirmed:
- self.is_connected = False
- lclient.warning("socket is expected to corrupt, exiting")
- self.disconnect()
- # self.stop()
- break
- elif self.socket in read_confirmed:
- try:
- data_received = self.read_from_socket()
- if data_received == b'':
- lclient.info("connection is broken, closing socket exiting")
- self.disconnect()
- # self.stop()
- # break
- else:
- self.handle_data(data_received)
- except Exception as e:
- lclient.error(e, exc_info=True)
- if type(e) is OSError:
- self.is_connected = False
- lclient.warn("connection broken, exiting")
- self.disconnect()
- # self.stop()
- # break
- else:
- raise
- else:
- time.sleep(0.1)
- def read_from_socket(self):
- data_received = self.socket.recv(self.block_size)
- return data_received
- def run(self,connect=False):
- if connect:
- self.connect()
- if self.error_handler:
- self.error_handler(self.main_loop)
- else:
- self.main_loop()
- def send(self, msg):
- msg = msg.rstrip()
- msg_encoded = bytes(msg+"\r\n", "utf-8")
- try:
- lcchat.info("Client: "+msg)
- self.socket.send(msg_encoded)
- except Exception as e:
- self.is_connected = False
- lclient.error(e, exc_info=True)
- self.status = "shutdown"
- def setup(self):
- pass
- def stop(self,reason=None):
- self.disconnect()
- self.exit_event = True
- if reason:
- print(reason)
- #
|