فهرست منبع

process making the server async

digital 7 سال پیش
والد
کامیت
8c2cd161c2
3فایلهای تغییر یافته به همراه34 افزوده شده و 35 حذف شده
  1. 6 0
      misc/__init__.py
  2. 26 35
      network/__init__.py
  3. 2 0
      pin/__init__.py

+ 6 - 0
misc/__init__.py

@@ -22,6 +22,8 @@ import sys
 import time
 import yaml
 
+import curio
+
 log = logging.getLogger(__name__+"")
 
 LOREM_IPSUM = """Lorem ipsum dolor sit amet, consectetur adipiscing elit.
@@ -56,6 +58,10 @@ config_files = [
     "config/commands.yaml",
 ]
 
+async def fake_wait_task():
+    cur_task = await curio.current_task()
+    await curio.ignore_after(0,cur_task.wait)
+
 def load_files(file_list,call):
     if not type(file_list) is list:
         file_list = [file_list]

+ 26 - 35
network/__init__.py

@@ -35,6 +35,17 @@ lserver = logging.getLogger(__name__+".server")
 lschat = logging.getLogger(__name__+".server.chat")
 lcchat = logging.getLogger(__name__+".client.chat")
 
+_tasks = []
+
+def _append_task():
+    cur_task = curio.current_task()
+    _tasks.append(cur_task)
+
+def _remove_task():
+    cur_task = curio.current_task()
+    if cur_task in _tasks:
+        _tasks.remove(cur_task)
+
 class ConnHandlerBase(object):
     def __init__(self, socket, addr, server):
         self.status = "init"
@@ -57,9 +68,12 @@ class ConnHandlerBase(object):
         data_decoded = data_received.decode("utf-8")
         return data_decoded
 
-    async def send(self, msg):
+    async def send(self, msg, log_msg=False):
         msg_encoded = bytes(msg, "utf-8")
-        lschat.info("Server:"+msg)
+        if log_msg:
+            lschat.info("server:"+log_msg)
+        else:
+            lschat.info("Server:"+msg)
         try:
             await self.socket.send(msg_encoded)
             return True
@@ -203,39 +217,12 @@ class Server(object):
             self.register_handler(handler, conn)
             await curio.spawn(self.wait_for_client(conn,handler))
 
-            # for s in read_sockets_confirmed:
-            #     socket_handler = self.conn_to_handler.get(s, None)
-            #     if ( s == self.socket ):
-            #         lserver.debug("handling new client")
-            #         conn, addr = self.socket.accept()
-            #         if self.log_ip:
-            #             lserver.info(
-            #                 "New connection from {} on port {}".format(*addr))
-            #         handler = self.make_handler(conn, addr)
-            #         self.register_conn(conn, addr)
-            #         self.register_handler(handler, conn)
-            #     elif ( socket_handler
-            #             and (socket_handler in self.connection_handler) ):
-            #         lserver.debug("handling client connection")
-            #         try:
-            #             data = socket_handler.recv()
-            #             if not data:
-            #                 lserver.info("connection {} closed".format(self.socket))
-            #                 self.unregister_handler(socket_handler, s)
-            #                 self.unregister_conn(conn)
-            #                 socket_handler.close()
-            #             else:
-            #                 lschat.info("Client:"+data.strip())
-            #                 socket_handler.handle(data)
-            #         except Exception as e:
-            #             lserver.error(e, exc_info=True)
-            #     else:
-            #         lserver.debug("else!")
-            #         lserver.debug(socket_handler)
-            #         time.sleep(1)
-
     async def wait_for_client(self,socket,handler):
-        while True:
+        _append_task()
+        # cur_task = curio.current_task()
+        # _tasks.append(cur_task)
+        # while True:
+        for i in range(1):
             try:
                 if self.log_ip:
                     lserver.debug(
@@ -258,11 +245,15 @@ class Server(object):
                     handler.close()
                 else:
                     lschat.info("Client:"+data.strip())
-                    await handler.handle(data)
+                    coro = handler.handle(data)
+                    task = await curio.spawn(coro)
+                    _tasks.append(task)
             except Exception as e:
                     lserver.error(e, exc_info=True)
                     # saver to use time.sleep
                     time.sleep(0.1)
+        digilib.misc.fake_wait_task()
+        _tasks.remove(cur_task)
 
 
 class Client(threading.Thread):

+ 2 - 0
pin/__init__.py

@@ -338,6 +338,8 @@ class DebugPinController(PinControllerBase):
         raise Exception("Test Exception")
 
     async def araise_exc(self,args=[],command=None,respond=None):
+        state = digilib.misc.parse_to_int_list("1,2,3,4")
+        a = 1+2
         raise Exception("Test Async Exception")
 
 if __name__ == "__main__":