__init__.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627
  1. #!/usr/bin/env python3.5
  2. # Copyright 2017 Digital
  3. #
  4. # This file is part of DigiLib.
  5. #
  6. # DigiLib is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # DigiLib is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with DigiLib. If not, see <http://www.gnu.org/licenses/>.
  18. import atexit
  19. import logging
  20. import logging.handlers
  21. import os
  22. import queue
  23. import select
  24. import socket
  25. import sys
  26. import threading
  27. import time
  28. import traceback
  29. import blinker
  30. import curio
  31. import digilib.misc
  32. lclient = logging.getLogger(__name__+".client")
  33. lserver = logging.getLogger(__name__+".server")
  34. lschat = logging.getLogger(__name__+".server.chat")
  35. lcchat = logging.getLogger(__name__+".client.chat")
  36. _tasks = []
  37. def _append_task():
  38. cur_task = curio.current_task()
  39. _tasks.append(cur_task)
  40. def _remove_task():
  41. cur_task = curio.current_task()
  42. if cur_task in _tasks:
  43. _tasks.remove(cur_task)
  44. class ConnHandlerBase(object):
  45. def __init__(self, socket, addr, server):
  46. self.status = "init"
  47. super(ConnHandlerBase, self).__init__()
  48. self.socket = socket
  49. self.addr = addr
  50. self.server = server
  51. self.block_size = 1024
  52. # self.welcome_client()
  53. self.status="connected"
  54. async def welcome_client(self):
  55. pass
  56. async def handle(self, data):
  57. return
  58. async def recv(self):
  59. data_received = await self.socket.recv(self.block_size)
  60. data_decoded = data_received.decode("utf-8")
  61. return data_decoded
  62. async def send(self, msg, log_msg=False):
  63. msg_encoded = bytes(msg, "utf-8")
  64. if log_msg:
  65. lschat.info("server:"+log_msg)
  66. else:
  67. lschat.info("Server:"+msg)
  68. try:
  69. await self.socket.send(msg_encoded)
  70. return True
  71. except Exception as e:
  72. lserver.error(e, exc_info=True)
  73. return False
  74. async def close(self):
  75. self.status = "closed"
  76. try:
  77. await self.socket.shutdown(0)
  78. except:
  79. lserver.debug("error during socket shutdown")
  80. try:
  81. await self.socket.close()
  82. except:
  83. lserver.debug("error closing socket")
  84. class ConnHandlerEcho(ConnHandlerBase):
  85. def __init__(self, socket, addr, server):
  86. self.status = "init"
  87. self.super_class = super(ConnHandlerEcho, self)
  88. self.super_class.__init__(socket, addr, server)
  89. self.server = server
  90. def welcome_client(self):
  91. self.send("welcome to the client")
  92. def handle(self, data):
  93. lschat.info("Client:"+data)
  94. for h in list(set(self.server.connection_handler)-{self}):
  95. h.send(data)
  96. class Server(object):
  97. """docstring for SocketHandler"""
  98. def __init__(self,
  99. host,
  100. port=None,
  101. af_family="AF_INET",
  102. log_ip=False,
  103. max_allowed_clients=5,
  104. handler=None,
  105. handler_kwargs={},
  106. ):
  107. super(Server, self).__init__()
  108. self.exit_event = False
  109. self.host = host
  110. self.port = port
  111. self.af_family = af_family
  112. self.log_ip = log_ip
  113. self.handler_kwargs = handler_kwargs
  114. self.handler = handler
  115. self.max_allowed_clients = max_allowed_clients
  116. self.socket = self.make_socket()
  117. self.handle_tasks = curio.TaskGroup(name="tg_handle_clients")
  118. self.connection_handler = []
  119. self.conn_to_addr = {}
  120. self.addr_to_conn = {}
  121. self.conn_to_handler ={}
  122. self.handler_to_conn = {}
  123. self.read_sockets_expected = [self.socket]
  124. atexit.register(self.shutdown)
  125. def make_socket(self):
  126. lserver.debug("making a {} socket".format(self.af_family))
  127. if self.af_family == "AF_INET":
  128. s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  129. elif self.af_family == "AF_UNIX":
  130. s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
  131. else:
  132. raise ValueError(
  133. "AF_FAMILY '{}' not supported!".format(
  134. self.af_family
  135. )
  136. )
  137. s = curio.io.Socket(s)
  138. return s
  139. def make_handler(self, conn, addr):
  140. return self.handler(conn, addr, self, **self.handler_kwargs)
  141. def register_conn(self, conn, addr):
  142. # if self.log_ip:
  143. # lserver.info("New connection from {} on port {}".format(*addr))
  144. self.read_sockets_expected.append(conn)
  145. if addr:
  146. self.conn_to_addr[conn] = addr
  147. self.addr_to_conn[addr] = conn
  148. def unregister_conn(self, conn):
  149. self.read_sockets_expected.remove(conn)
  150. addr = self.conn_to_addr.get(conn, False)
  151. if addr:
  152. del self.addr_to_conn[addr]
  153. del self.conn_to_addr[conn]
  154. def register_handler(self, handler, conn):
  155. self.connection_handler.append(handler)
  156. self.conn_to_handler[conn] = handler
  157. self.handler_to_conn[handler] = conn
  158. def unregister_handler(self, handler, conn):
  159. self.connection_handler.remove(handler)
  160. del self.conn_to_handler[conn]
  161. del self.handler_to_conn[handler]
  162. def setup(self):
  163. lserver.info("setting up socket")
  164. if self.af_family == "AF_INET":
  165. self.socket.bind((self.host, self.port))
  166. elif self.af_family == "AF_UNIX":
  167. if os.path.exists(self.host):
  168. lserver.debug("file already exists")
  169. lserver.debug("attempting to remove it")
  170. os.remove(self.host)
  171. self.socket.bind(self.host)
  172. self.socket.listen(self.max_allowed_clients)
  173. # self.socket.settimeout(1)
  174. def shutdown(self):
  175. def error_handler(func,*args,log_text="error",**kwargs):
  176. try:
  177. func(*args,**kwargs)
  178. except Exception as exc:
  179. lserver.debug("error occured during "+log_text,exc_info=exc)
  180. # this function can be called by the user and we don't need to clean
  181. # up a second time when the program exits
  182. atexit.unregister(self.shutdown)
  183. lserver.info("shutting down server")
  184. error_handler(self.handle_tasks.cancel_remaining,log_text="handler cancel")
  185. if self.socket:
  186. error_handler(self.socket.shutdown,log_text="socket shutdown")
  187. error_handler(self.socket.close,log_text="socket close")
  188. # del self.socket
  189. # self.socket = None
  190. def start(self):
  191. # lserver.debug(dir(self))
  192. curio.run(self.run)
  193. async def run(self):
  194. self.setup()
  195. lserver.debug("entering main loop")
  196. while ( not self.exit_event ):
  197. lserver.debug("waiting for client to connect")
  198. conn,addr = await self.socket.accept()
  199. if self.log_ip:
  200. lserver.info(
  201. "new client connection, {}:{}!".format(*addr))
  202. else:
  203. lserver.info("a new client connected, let's handle it")
  204. handler = self.make_handler(conn, addr)
  205. self.register_conn(conn, addr)
  206. self.register_handler(handler, conn)
  207. await self.handle_tasks.spawn(self.handle_client(conn,handler))
  208. async def handle_client(self,socket,handler):
  209. # _append_task()
  210. while True:
  211. # for i in range(1):
  212. try:
  213. if self.log_ip:
  214. lserver.debug("waiting for {} to send something"
  215. .format(socket.getsockname()))
  216. else:
  217. lserver.debug("waiting for the client to send something")
  218. data = await handler.recv()
  219. if not data:
  220. if self.log_ip:
  221. lserver.info("the connection to {} was closed"
  222. .format(socket.getsockname()))
  223. else:
  224. lserver.info("the connection to the client was closed")
  225. self.unregister_handler(handler, socket)
  226. self.unregister_conn(socket)
  227. await handler.close()
  228. break
  229. else:
  230. lschat.info("Client:"+data.strip())
  231. coro = handler.handle(data)
  232. task = await curio.spawn(coro)
  233. _tasks.append(task)
  234. except Exception as e:
  235. lserver.error(e, exc_info=True)
  236. # saver to use time.sleep
  237. time.sleep(0.1)
  238. cur_task = await curio.current_task()
  239. await curio.ignore_after(0,cur_task.join)
  240. # _tasks.remove(cur_task)
  241. class Client(threading.Thread):
  242. """docstring for Client"""
  243. is_connecting = False
  244. is_connected = False
  245. status = "uninitialized"
  246. def __init__(self,
  247. host,
  248. port=None,
  249. af_family="AF_INET",
  250. handle_data_func=None,
  251. error_handler=None,
  252. block_size=1024,
  253. ):
  254. self.super_class = super(Client, self)
  255. self.super_class.__init__()
  256. self.name = "Client"
  257. self.exit_event = False
  258. self.host = host
  259. self.port = port
  260. self.af_family = af_family
  261. self.block_size = block_size
  262. self.handle_data_func = handle_data_func
  263. self.is_connected = False
  264. self.error_handler = error_handler
  265. # self.socket = self.make_socket()
  266. self.socket = None
  267. self.status = "disconnected"
  268. def connect(self):
  269. self.status = "connecting"
  270. self.socket = self.make_socket()
  271. lclient.info(
  272. "connecting to socket '{}' of type {}".format(
  273. self.host,
  274. self.af_family
  275. )
  276. )
  277. try:
  278. if self.af_family == "AF_INET":
  279. self.socket.connect((self.host, self.port))
  280. elif self.af_family == "AF_UNIX":
  281. if os.path.exists(self.host):
  282. self.socket.connect(self.host)
  283. else:
  284. lclient.warn("File not found. Aborting.")
  285. return
  286. self.is_connected = True
  287. self.status = "connected"
  288. lclient.info("connected")
  289. return True
  290. except Exception as e:
  291. lclient.debug(e, exc_info=True)
  292. if type(e) is ConnectionRefusedError:
  293. lclient.info("failed to connect to socket '{}'".format(self.host))
  294. self.disconnect()
  295. return False
  296. def disconnect(self):
  297. lclient.info("disconnecting from socket '{}'".format(self.host))
  298. self.is_connected = False
  299. self.status = "disconnected"
  300. if self.socket:
  301. try:
  302. self.socket.shutdown(socket.SHUT_RDWR)
  303. except Exception as e:
  304. lclient.error(e)
  305. try:
  306. self.socket.close()
  307. except Exception as e:
  308. lclient.error("error occured while closing the socket, " +
  309. "maybe it is already closed",exc_info=e)
  310. del self.socket
  311. self.socket = None
  312. def handle_data(self, data_received):
  313. data_decoded = data_received.decode("utf-8")
  314. lcchat.info("Server: "+data_decoded)
  315. if self.handle_data_func:
  316. try:
  317. self.handle_data_func(data_decoded)
  318. except Exception as e:
  319. lclient.error(e, exc_info=True)
  320. def is_running(self):
  321. return (self in threading.enumerate())
  322. def make_socket(self):
  323. lclient.info("creating a {} socket".format(self.af_family))
  324. if self.af_family == "AF_INET":
  325. s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  326. elif self.af_family == "AF_UNIX":
  327. s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
  328. else:
  329. raise ValueError(
  330. "AF_FAMILY '{}' not supported!".format(
  331. self.af_family
  332. )
  333. )
  334. return s
  335. def main_loop(self):
  336. lclient.debug("starting main loop")
  337. while ( not self.exit_event ):
  338. if not self.status in ["connected"]:
  339. time.sleep(0.1)
  340. continue
  341. # print(0)
  342. read_confirmed, write_confirmed, exc_confirmed \
  343. = select.select(
  344. [self.socket],
  345. [],
  346. [self.socket],
  347. 1
  348. )
  349. if self.socket in exc_confirmed:
  350. self.is_connected = False
  351. lclient.warning("socket is expected to corrupt, exiting")
  352. self.disconnect()
  353. # self.stop()
  354. break
  355. elif self.socket in read_confirmed:
  356. try:
  357. data_received = self.read_from_socket()
  358. if data_received == b'':
  359. lclient.info("connection is broken, closing socket exiting")
  360. self.disconnect()
  361. # self.stop()
  362. # break
  363. else:
  364. try:
  365. self.handle_data(data_received)
  366. except Exception as e:
  367. lserver.error(
  368. "Error while handling data",
  369. exc_info=e
  370. )
  371. except Exception as e:
  372. lclient.error(e, exc_info=True)
  373. if type(e) is OSError:
  374. self.is_connected = False
  375. lclient.warn("connection broken, exiting")
  376. self.disconnect()
  377. # self.stop()
  378. # break
  379. else:
  380. raise
  381. else:
  382. time.sleep(0.1)
  383. def read_from_socket(self):
  384. data_received = self.socket.recv(self.block_size)
  385. return data_received
  386. def run(self):
  387. # self.connect()
  388. if self.error_handler:
  389. self.error_handler(self.main_loop)
  390. else:
  391. self.main_loop()
  392. def send(self, msg):
  393. msg = msg.rstrip()
  394. msg_encoded = bytes(msg+"\r\n", "utf-8")
  395. try:
  396. lcchat.info("Client: "+msg)
  397. self.socket.send(msg_encoded)
  398. except Exception as e:
  399. self.is_connected = False
  400. lclient.error(e, exc_info=True)
  401. self.status = "shutdown"
  402. def setup(self):
  403. pass
  404. def stop(self,reason=None):
  405. self.disconnect()
  406. self.exit_event = True
  407. if reason:
  408. print(reason)
  409. class AsyncClient(object):
  410. """docstring for Client"""
  411. is_connecting = False
  412. is_connected = False
  413. status = "uninitialized"
  414. def __init__(self,
  415. host,
  416. port=None,
  417. af_family="AF_INET",
  418. handle_data_func=None,
  419. error_handler=None,
  420. block_size=1024,
  421. ):
  422. self.super_class = super(AsyncClient, self)
  423. self.super_class.__init__()
  424. self.name = "Client"
  425. self.exit_event = False
  426. self.host = host
  427. self.port = port
  428. self.af_family = af_family
  429. self.block_size = block_size
  430. self.handle_data_func = handle_data_func
  431. self.is_connected = False
  432. self.error_handler = error_handler
  433. self.socket = None
  434. self.status = "disconnected"
  435. def connect(self):
  436. self.status = "connecting"
  437. self.socket = self.make_socket()
  438. lclient.info("connecting to socket '{}' of type {}".format(
  439. self.host,self.af_family))
  440. try:
  441. if self.af_family == "AF_INET":
  442. self.socket.connect((self.host, self.port))
  443. elif self.af_family == "AF_UNIX":
  444. self.socket.connect(self.host)
  445. # if os.path.exists(self.host):
  446. # pass
  447. # else:
  448. # lclient.warn("File not found. Aborting.")
  449. # return
  450. self.is_connected = True
  451. self.status = "connected"
  452. lclient.info("connected")
  453. return True
  454. except Exception as e:
  455. lclient.debug(e, exc_info=True)
  456. if type(e) is ConnectionRefusedError:
  457. lclient.info("failed to connect to socket '{}'".format(self.host))
  458. self.disconnect()
  459. return False
  460. def disconnect(self):
  461. lclient.info("disconnecting from socket '{}'".format(self.host))
  462. self.is_connected = False
  463. self.status = "disconnected"
  464. if self.socket:
  465. try:
  466. self.socket.shutdown(socket.SHUT_RDWR)
  467. except Exception as e:
  468. lclient.error(e)
  469. try:
  470. self.socket.close()
  471. except Exception as e:
  472. lclient.error("error occured while closing the socket, " +
  473. "maybe it is already closed",exc_info=e)
  474. del self.socket
  475. self.socket = None
  476. def handle_data(self, data_received):
  477. data_decoded = data_received.decode("utf-8")
  478. lcchat.info("Server: "+data_decoded)
  479. if self.handle_data_func:
  480. try:
  481. self.handle_data_func(data_decoded)
  482. except Exception as e:
  483. lclient.error(e, exc_info=True)
  484. def is_running(self):
  485. return (self in threading.enumerate())
  486. def make_socket(self):
  487. lclient.info("creating a {} socket".format(self.af_family))
  488. if self.af_family == "AF_INET":
  489. s = trio.socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  490. elif self.af_family == "AF_UNIX":
  491. s = trio.socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
  492. else:
  493. raise ValueError(
  494. "AF_FAMILY '{}' not supported!".format(
  495. self.af_family
  496. )
  497. )
  498. return s
  499. def main_loop(self):
  500. lclient.debug("starting main loop")
  501. while ( not self.exit_event ):
  502. if not self.status in ["connected"]:
  503. time.sleep(0.1)
  504. continue
  505. # print(0)
  506. read_confirmed, write_confirmed, exc_confirmed \
  507. = select.select(
  508. [self.socket],
  509. [],
  510. [self.socket],
  511. 1
  512. )
  513. if self.socket in exc_confirmed:
  514. self.is_connected = False
  515. lclient.warning("socket is expected to corrupt, exiting")
  516. self.disconnect()
  517. # self.stop()
  518. break
  519. elif self.socket in read_confirmed:
  520. try:
  521. data_received = self.read_from_socket()
  522. if data_received == b'':
  523. lclient.info("connection is broken, closing socket exiting")
  524. self.disconnect()
  525. # self.stop()
  526. # break
  527. else:
  528. self.handle_data(data_received)
  529. except Exception as e:
  530. lclient.error(e, exc_info=True)
  531. if type(e) is OSError:
  532. self.is_connected = False
  533. lclient.warn("connection broken, exiting")
  534. self.disconnect()
  535. # self.stop()
  536. # break
  537. else:
  538. raise
  539. else:
  540. time.sleep(0.1)
  541. def read_from_socket(self):
  542. data_received = self.socket.recv(self.block_size)
  543. return data_received
  544. def run(self,connect=False):
  545. if connect:
  546. self.connect()
  547. if self.error_handler:
  548. self.error_handler(self.main_loop)
  549. else:
  550. self.main_loop()
  551. def send(self, msg):
  552. msg = msg.rstrip()
  553. msg_encoded = bytes(msg+"\r\n", "utf-8")
  554. try:
  555. lcchat.info("Client: "+msg)
  556. self.socket.send(msg_encoded)
  557. except Exception as e:
  558. self.is_connected = False
  559. lclient.error(e, exc_info=True)
  560. self.status = "shutdown"
  561. def setup(self):
  562. pass
  563. def stop(self,reason=None):
  564. self.disconnect()
  565. self.exit_event = True
  566. if reason:
  567. print(reason)
  568. #