|
@@ -100,36 +100,61 @@ class ConnHandlerEcho(ConnHandlerBase):
|
|
|
h.send(data)
|
|
|
|
|
|
class Server(object):
|
|
|
- """docstring for SocketHandler"""
|
|
|
+ """
|
|
|
+ Server opens either an unix or an inet connection. for every client which connects a new ClientHandler class is created.
|
|
|
+ """
|
|
|
def __init__(self,
|
|
|
host,
|
|
|
port=None,
|
|
|
af_family="AF_INET",
|
|
|
log_ip=False,
|
|
|
max_allowed_clients=5,
|
|
|
- handler=None,
|
|
|
+ handler_class=None,
|
|
|
handler_kwargs={},
|
|
|
):
|
|
|
super(Server, self).__init__()
|
|
|
+ # set to true when the server shuts down, for instance after a
|
|
|
+ # fatal exception
|
|
|
self.exit_event = False
|
|
|
+ # on which hostname or ip address the connection will be opened
|
|
|
self.host = host
|
|
|
+ # on which port the connection will be opened
|
|
|
self.port = port
|
|
|
+ # what AF_INET family to use, either AF_INET or AF_UNIX (file socket)
|
|
|
self.af_family = af_family
|
|
|
+ # whether ip addresses will be logged
|
|
|
self.log_ip = log_ip
|
|
|
- self.handler_kwargs = handler_kwargs
|
|
|
- self.handler = handler
|
|
|
+ # number of maximum client connection at a time
|
|
|
self.max_allowed_clients = max_allowed_clients
|
|
|
- self.socket = self.make_socket()
|
|
|
+ # this handler class will be used when creating a new handler
|
|
|
+ self.handler_class = handler_class
|
|
|
+ # the kwargs passed to the handler's __init__ function
|
|
|
+ self.handler_kwargs = handler_kwargs
|
|
|
+ # don't make the socket yet, we don't need right now. will be created
|
|
|
+ # when the start method is called
|
|
|
+ # self.socket = self.make_socket()
|
|
|
+ self.socket = None
|
|
|
+ # create a task group for handlers, so we can easily cancel/terminate
|
|
|
+ # them all at once
|
|
|
self.handle_tasks = curio.TaskGroup(name="tg_handle_clients")
|
|
|
+ # list of all active connection handlers
|
|
|
self.connection_handler = []
|
|
|
- self.conn_to_addr = {}
|
|
|
- self.addr_to_conn = {}
|
|
|
- self.conn_to_handler ={}
|
|
|
- self.handler_to_conn = {}
|
|
|
- self.read_sockets_expected = [self.socket]
|
|
|
+ # these aren't actually used, I will remove them soon
|
|
|
+ # self.conn_to_addr = {}
|
|
|
+ # self.addr_to_conn = {}
|
|
|
+ # self.conn_to_handler ={}
|
|
|
+ # self.handler_to_conn = {}
|
|
|
+ # this was used for the select.select approach. we use the async approach now, let's remove it
|
|
|
+ # self.read_sockets_expected = [self.socket]
|
|
|
+ # register our cleanup method to be executed when the program exits.
|
|
|
+ # the cleanup function unregisters itself, so it won't get executed twice when the user called it befor the program exites
|
|
|
atexit.register(self.shutdown)
|
|
|
|
|
|
def make_socket(self):
|
|
|
+ """
|
|
|
+ factory method for sockets.
|
|
|
+ this method makes a normal socket and wraps it in a curi.io.Socket wrapper
|
|
|
+ """
|
|
|
lserver.debug("making a {} socket".format(self.af_family))
|
|
|
if self.af_family == "AF_INET":
|
|
|
s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
|
|
@@ -145,34 +170,43 @@ class Server(object):
|
|
|
return s
|
|
|
|
|
|
def make_handler(self, conn, addr):
|
|
|
- return self.handler(conn, addr, self, **self.handler_kwargs)
|
|
|
-
|
|
|
- def register_conn(self, conn, addr):
|
|
|
- # if self.log_ip:
|
|
|
- # lserver.info("New connection from {} on port {}".format(*addr))
|
|
|
- self.read_sockets_expected.append(conn)
|
|
|
- if addr:
|
|
|
- self.conn_to_addr[conn] = addr
|
|
|
- self.addr_to_conn[addr] = conn
|
|
|
-
|
|
|
- def unregister_conn(self, conn):
|
|
|
- self.read_sockets_expected.remove(conn)
|
|
|
- addr = self.conn_to_addr.get(conn, False)
|
|
|
- if addr:
|
|
|
- del self.addr_to_conn[addr]
|
|
|
- del self.conn_to_addr[conn]
|
|
|
-
|
|
|
- def register_handler(self, handler, conn):
|
|
|
- self.connection_handler.append(handler)
|
|
|
- self.conn_to_handler[conn] = handler
|
|
|
- self.handler_to_conn[handler] = conn
|
|
|
-
|
|
|
- def unregister_handler(self, handler, conn):
|
|
|
- self.connection_handler.remove(handler)
|
|
|
- del self.conn_to_handler[conn]
|
|
|
- del self.handler_to_conn[handler]
|
|
|
+ """
|
|
|
+ factory method for handlers.
|
|
|
+ this method creates a handler object from self.handler_class and self.handler_kwargs
|
|
|
+ """
|
|
|
+ return self.handler_class(conn, addr, self, **self.handler_kwargs)
|
|
|
+
|
|
|
+ # DEPRECATED
|
|
|
+ # def register_conn(self, conn, addr):
|
|
|
+ # # if self.log_ip:
|
|
|
+ # # lserver.info("New connection from {} on port {}".format(*addr))
|
|
|
+ # self.read_sockets_expected.append(conn)
|
|
|
+ # if addr:
|
|
|
+ # self.conn_to_addr[conn] = addr
|
|
|
+ # self.addr_to_conn[addr] = conn
|
|
|
+ #
|
|
|
+ # def unregister_conn(self, conn):
|
|
|
+ # self.read_sockets_expected.remove(conn)
|
|
|
+ # addr = self.conn_to_addr.get(conn, False)
|
|
|
+ # if addr:
|
|
|
+ # del self.addr_to_conn[addr]
|
|
|
+ # del self.conn_to_addr[conn]
|
|
|
+ #
|
|
|
+ # def register_handler(self, handler, conn):
|
|
|
+ # self.connection_handler.append(handler)
|
|
|
+ # self.conn_to_handler[conn] = handler
|
|
|
+ # self.handler_to_conn[handler] = conn
|
|
|
+ #
|
|
|
+ # def unregister_handler(self, handler, conn):
|
|
|
+ # self.connection_handler.remove(handler)
|
|
|
+ # del self.conn_to_handler[conn]
|
|
|
+ # del self.handler_to_conn[handler]
|
|
|
|
|
|
def setup(self):
|
|
|
+ """
|
|
|
+ opens the connection and starts listening for clients. this method
|
|
|
+ does not block, it returns after starting to listen.
|
|
|
+ """
|
|
|
lserver.info("setting up socket")
|
|
|
if self.af_family == "AF_INET":
|
|
|
self.socket.bind((self.host, self.port))
|
|
@@ -186,31 +220,43 @@ class Server(object):
|
|
|
# self.socket.settimeout(1)
|
|
|
|
|
|
def shutdown(self):
|
|
|
+ """
|
|
|
+ This method properly shuts down the sockets and closes them.
|
|
|
+ it unregisters itself from atexit, so it doesn't get executed twice
|
|
|
+ when it was manually called before to program exits.
|
|
|
+ """
|
|
|
def error_handler(func,*args,log_text="error",**kwargs):
|
|
|
try:
|
|
|
func(*args,**kwargs)
|
|
|
except Exception as exc:
|
|
|
lserver.debug("error occured during "+log_text,exc_info=exc)
|
|
|
- # this function can be called by the user and we don't need to clean
|
|
|
- # up a second time when the program exits
|
|
|
atexit.unregister(self.shutdown)
|
|
|
lserver.info("shutting down server")
|
|
|
- error_handler(self.handle_tasks.cancel_remaining,log_text="handler cancel")
|
|
|
+ error_handler(
|
|
|
+ self.handle_tasks.cancel_remaining,log_text="handler cancel")
|
|
|
+ # check if there is actually a socket. if the shutdown method is
|
|
|
+ # executed before the start method, there is no socket.
|
|
|
if self.socket:
|
|
|
error_handler(self.socket.shutdown,log_text="socket shutdown")
|
|
|
error_handler(self.socket.close,log_text="socket close")
|
|
|
- # del self.socket
|
|
|
- # self.socket = None
|
|
|
|
|
|
def start(self):
|
|
|
- # lserver.debug(dir(self))
|
|
|
+ """
|
|
|
+ this method starts the server. it is blocking.
|
|
|
+ """
|
|
|
+ self.setup()
|
|
|
curio.run(self.run)
|
|
|
|
|
|
async def run(self):
|
|
|
- self.setup()
|
|
|
+ """
|
|
|
+ this method is the main loop of the Server. it waits for new client
|
|
|
+ connections and creates a handle task for each of them. it does not
|
|
|
+ receive or send anything.
|
|
|
+ """
|
|
|
lserver.debug("entering main loop")
|
|
|
while ( not self.exit_event ):
|
|
|
lserver.debug("waiting for client to connect")
|
|
|
+ # wait for a client to connect
|
|
|
conn,addr = await self.socket.accept()
|
|
|
if self.log_ip:
|
|
|
lserver.info(
|
|
@@ -218,44 +264,56 @@ class Server(object):
|
|
|
else:
|
|
|
lserver.info("a new client connected, let's handle it")
|
|
|
handler = self.make_handler(conn, addr)
|
|
|
- self.register_conn(conn, addr)
|
|
|
- self.register_handler(handler, conn)
|
|
|
+ # self.register_conn(conn, addr)
|
|
|
+ # self.register_handler(handler, conn)
|
|
|
await self.handle_tasks.spawn(self.handle_client(conn,handler))
|
|
|
|
|
|
async def handle_client(self,socket,handler):
|
|
|
- # _append_task()
|
|
|
+ """
|
|
|
+ This method waits for the client to send something and calls the
|
|
|
+ ClientHandler's handle method. there is a handle_client method running for each client connected.
|
|
|
+ """
|
|
|
while True:
|
|
|
- # for i in range(1):
|
|
|
try:
|
|
|
if self.log_ip:
|
|
|
lserver.debug("waiting for {} to send something"
|
|
|
.format(socket.getsockname()))
|
|
|
else:
|
|
|
lserver.debug("waiting for the client to send something")
|
|
|
+ # wait for the client to send something
|
|
|
data = await handler.recv()
|
|
|
+ # if there is no data the client disconnected. this is a
|
|
|
+ # tcp protocoll specification.
|
|
|
if not data:
|
|
|
if self.log_ip:
|
|
|
lserver.info("the connection to {} was closed"
|
|
|
.format(socket.getsockname()))
|
|
|
else:
|
|
|
lserver.info("the connection to the client was closed")
|
|
|
- self.unregister_handler(handler, socket)
|
|
|
- self.unregister_conn(socket)
|
|
|
+ # self.unregister_handler(handler, socket)
|
|
|
+ # self.unregister_conn(socket)
|
|
|
await handler.close()
|
|
|
+ # break out of the loop. don't return because we need to
|
|
|
+ # do cleanup
|
|
|
break
|
|
|
else:
|
|
|
- lschat.info("Client:"+data.strip())
|
|
|
+ # don't strip the data of its whitespaces, since they may
|
|
|
+ # be important.
|
|
|
+ lschat.info("Client:"+data.rstrip())
|
|
|
coro = handler.handle(data)
|
|
|
task = await curio.spawn(coro)
|
|
|
- _tasks.append(task)
|
|
|
except Exception as e:
|
|
|
lserver.error(e, exc_info=True)
|
|
|
- # saver to use time.sleep
|
|
|
- time.sleep(0.1)
|
|
|
+ # let's sleep a bit, in case something is broken and the
|
|
|
+ # loop throws an exception every time
|
|
|
+ await async.sleep(0.01)
|
|
|
+ # if a task exits and hasn't been joined, curio prints a warning.
|
|
|
+ # we don't want the warning, so let's join the current task for 0
|
|
|
+ # seconds. instead of task.join() we use task.wait(). the only
|
|
|
+ # difference is that wait doesn't throw a exception if the task was
|
|
|
+ # stopped or crashed
|
|
|
cur_task = await curio.current_task()
|
|
|
await curio.ignore_after(0,cur_task.wait)
|
|
|
- # _tasks.remove(cur_task)
|
|
|
-
|
|
|
|
|
|
class Client(threading.Thread):
|
|
|
"""docstring for Client"""
|