__init__.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680
  1. #!/usr/bin/env python3.5
  2. # Copyright 2017 Digital
  3. #
  4. # This file is part of DigiLib.
  5. #
  6. # DigiLib is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # DigiLib is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with DigiLib. If not, see <http://www.gnu.org/licenses/>.
  18. import atexit
  19. import logging
  20. import logging.handlers
  21. import os
  22. import queue
  23. import select
  24. import socket
  25. import sys
  26. import threading
  27. import time
  28. import traceback
  29. import blinker
  30. import curio
  31. import digilib.misc
  32. lclient = logging.getLogger(__name__+".client")
  33. lserver = logging.getLogger(__name__+".server")
  34. lschat = logging.getLogger(__name__+".server.chat")
  35. lcchat = logging.getLogger(__name__+".client.chat")
  36. _tasks = []
  37. class ConnHandlerBase(object):
  38. def __init__(self, socket, addr, server):
  39. self.status = "init"
  40. super(ConnHandlerBase, self).__init__()
  41. self.socket = socket
  42. self.addr = addr
  43. self.server = server
  44. self.block_size = 1024
  45. # self.welcome_client()
  46. self.status="connected"
  47. async def welcome_client(self):
  48. pass
  49. async def handle(self, data):
  50. return
  51. async def recv(self):
  52. data_received = await self.socket.recv(self.block_size)
  53. data_decoded = data_received.decode("utf-8")
  54. return data_decoded
  55. async def send(self, msg, log_msg=False):
  56. msg_encoded = bytes(msg, "utf-8")
  57. if log_msg:
  58. lschat.info("server:"+log_msg)
  59. else:
  60. lschat.info("Server:"+msg)
  61. try:
  62. await self.socket.send(msg_encoded)
  63. return True
  64. except Exception as e:
  65. lserver.error(e, exc_info=True)
  66. return False
  67. async def close(self):
  68. self.status = "closed"
  69. try:
  70. await self.socket.shutdown(0)
  71. except:
  72. lserver.debug("error during socket shutdown")
  73. try:
  74. await self.socket.close()
  75. except:
  76. lserver.debug("error closing socket")
  77. class ConnHandlerEcho(ConnHandlerBase):
  78. def __init__(self, socket, addr, server):
  79. self.status = "init"
  80. self.super_class = super(ConnHandlerEcho, self)
  81. self.super_class.__init__(socket, addr, server)
  82. self.server = server
  83. def welcome_client(self):
  84. self.send("welcome to the client")
  85. def handle(self, data):
  86. lschat.info("Client:"+data)
  87. for h in list(set(self.server.connection_handler)-{self}):
  88. h.send(data)
  89. class Server(object):
  90. """
  91. Server opens either an unix or an inet connection. for every client which connects a new ClientHandler class is created.
  92. """
  93. def __init__(self,
  94. host,
  95. port=None,
  96. af_family="AF_INET",
  97. log_ip=False,
  98. max_allowed_clients=5,
  99. handler_class=None,
  100. handler_kwargs={},
  101. ):
  102. super(Server, self).__init__()
  103. # set to true when the server shuts down, for instance after a
  104. # fatal exception
  105. self.exit_event = False
  106. # on which hostname or ip address the connection will be opened
  107. self.host = host
  108. # on which port the connection will be opened
  109. self.port = port
  110. # what AF_INET family to use, either AF_INET or AF_UNIX (file socket)
  111. self.af_family = af_family
  112. # whether ip addresses will be logged
  113. self.log_ip = log_ip
  114. # number of maximum client connection at a time
  115. self.max_allowed_clients = max_allowed_clients
  116. # this handler class will be used when creating a new handler
  117. self.handler_class = handler_class
  118. # the kwargs passed to the handler's __init__ function
  119. self.handler_kwargs = handler_kwargs
  120. # don't make the socket yet, we don't need right now. will be created
  121. # when the start method is called
  122. # self.socket = self.make_socket()
  123. self.socket = None
  124. # create a task group for handlers, so we can easily cancel/terminate
  125. # them all at once
  126. self.handle_tasks = curio.TaskGroup(name="tg_handle_clients")
  127. # list of all active connection handlers
  128. self.connection_handler = []
  129. # these aren't actually used, I will remove them soon
  130. # self.conn_to_addr = {}
  131. # self.addr_to_conn = {}
  132. # self.conn_to_handler ={}
  133. # self.handler_to_conn = {}
  134. # this was used for the select.select approach. we use the async approach now, let's remove it
  135. # self.read_sockets_expected = [self.socket]
  136. # register our cleanup method to be executed when the program exits.
  137. # the cleanup function unregisters itself, so it won't get executed twice when the user called it befor the program exites
  138. atexit.register(self.shutdown)
  139. def make_socket(self):
  140. """
  141. factory method for sockets.
  142. this method makes a normal socket and wraps it in a curi.io.Socket wrapper
  143. """
  144. lserver.debug("making a {} socket".format(self.af_family))
  145. if self.af_family == "AF_INET":
  146. s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  147. elif self.af_family == "AF_UNIX":
  148. s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
  149. else:
  150. raise ValueError(
  151. "AF_FAMILY '{}' not supported!".format(
  152. self.af_family
  153. )
  154. )
  155. s = curio.io.Socket(s)
  156. return s
  157. def make_handler(self, conn, addr):
  158. """
  159. factory method for handlers.
  160. this method creates a handler object from self.handler_class and self.handler_kwargs
  161. """
  162. return self.handler_class(conn, addr, self, **self.handler_kwargs)
  163. # DEPRECATED
  164. # def register_conn(self, conn, addr):
  165. # # if self.log_ip:
  166. # # lserver.info("New connection from {} on port {}".format(*addr))
  167. # self.read_sockets_expected.append(conn)
  168. # if addr:
  169. # self.conn_to_addr[conn] = addr
  170. # self.addr_to_conn[addr] = conn
  171. #
  172. # def unregister_conn(self, conn):
  173. # self.read_sockets_expected.remove(conn)
  174. # addr = self.conn_to_addr.get(conn, False)
  175. # if addr:
  176. # del self.addr_to_conn[addr]
  177. # del self.conn_to_addr[conn]
  178. #
  179. # def register_handler(self, handler, conn):
  180. # self.connection_handler.append(handler)
  181. # self.conn_to_handler[conn] = handler
  182. # self.handler_to_conn[handler] = conn
  183. #
  184. # def unregister_handler(self, handler, conn):
  185. # self.connection_handler.remove(handler)
  186. # del self.conn_to_handler[conn]
  187. # del self.handler_to_conn[handler]
  188. def setup(self):
  189. """
  190. creates a scoket, opens the connection and starts listening for
  191. clients. this method does not block, it returns after starting to
  192. listen.
  193. """
  194. lserver.info("setting up server")
  195. self.socket = self.make_socket()
  196. if self.af_family == "AF_INET":
  197. self.socket.bind((self.host, self.port))
  198. elif self.af_family == "AF_UNIX":
  199. if os.path.exists(self.host):
  200. lserver.debug("file already exists")
  201. lserver.debug("attempting to remove it")
  202. os.remove(self.host)
  203. self.socket.bind(self.host)
  204. self.socket.listen(self.max_allowed_clients)
  205. def shutdown(self):
  206. """
  207. This method properly shuts down the sockets and closes them.
  208. it unregisters itself from atexit, so it doesn't get executed twice
  209. when it was manually called before to program exits.
  210. """
  211. def error_handler(func,*args,log_text="error",**kwargs):
  212. try:
  213. func(*args,**kwargs)
  214. except Exception as exc:
  215. lserver.debug("error occured during "+log_text,exc_info=exc)
  216. atexit.unregister(self.shutdown)
  217. lserver.info("shutting down server")
  218. error_handler(
  219. self.handle_tasks.cancel_remaining,log_text="handler cancel")
  220. # check if there is actually a socket. if the shutdown method is
  221. # executed before the start method, there is no socket.
  222. if self.socket:
  223. error_handler(self.socket.shutdown,log_text="socket shutdown")
  224. error_handler(self.socket.close,log_text="socket close")
  225. def start(self):
  226. """
  227. this method starts the server. it is blocking.
  228. """
  229. self.setup()
  230. curio.run(self.run)
  231. async def run(self):
  232. """
  233. this method is the main loop of the Server. it waits for new client
  234. connections and creates a handle task for each of them. it does not
  235. receive or send anything.
  236. """
  237. lserver.debug("entering main loop")
  238. while ( not self.exit_event ):
  239. lserver.debug("waiting for client to connect")
  240. # wait for a client to connect
  241. conn,addr = await self.socket.accept()
  242. if self.log_ip:
  243. lserver.info(
  244. "new client connection, {}:{}!".format(*addr))
  245. else:
  246. lserver.info("a new client connected, let's handle it")
  247. handler = self.make_handler(conn, addr)
  248. # self.register_conn(conn, addr)
  249. # self.register_handler(handler, conn)
  250. await self.handle_tasks.spawn(self.handle_client(conn,handler))
  251. async def handle_client(self,socket,handler):
  252. """
  253. This method waits for the client to send something and calls the
  254. ClientHandler's handle method. there is a handle_client method running for each client connected.
  255. """
  256. while True:
  257. try:
  258. if self.log_ip:
  259. lserver.debug("waiting for {} to send something"
  260. .format(socket.getsockname()))
  261. else:
  262. lserver.debug("waiting for the client to send something")
  263. # wait for the client to send something
  264. data = await handler.recv()
  265. # if there is no data the client disconnected. this is a
  266. # tcp protocoll specification.
  267. if not data:
  268. if self.log_ip:
  269. lserver.info("the connection to {} was closed"
  270. .format(socket.getsockname()))
  271. else:
  272. lserver.info("the connection to the client was closed")
  273. # self.unregister_handler(handler, socket)
  274. # self.unregister_conn(socket)
  275. await handler.close()
  276. # break out of the loop. don't return because we need to
  277. # do cleanup
  278. break
  279. else:
  280. # don't strip the data of its whitespaces, since they may
  281. # be important.
  282. lschat.info("Client:"+data.rstrip())
  283. coro = handler.handle(data)
  284. task = await curio.spawn(coro)
  285. except Exception as e:
  286. lserver.error(e, exc_info=True)
  287. # let's sleep a bit, in case something is broken and the
  288. # loop throws an exception every time
  289. await curio.sleep(0.01)
  290. # if a task exits and hasn't been joined, curio prints a warning.
  291. # we don't want the warning, so let's join the current task for 0
  292. # seconds. instead of task.join() we use task.wait(). the only
  293. # difference is that wait doesn't throw a exception if the task was
  294. # stopped or crashed
  295. cur_task = await curio.current_task()
  296. await curio.ignore_after(0,cur_task.wait)
  297. class Client(threading.Thread):
  298. """docstring for Client"""
  299. is_connecting = False
  300. is_connected = False
  301. status = "uninitialized"
  302. def __init__(self,
  303. host,
  304. port=None,
  305. af_family="AF_INET",
  306. handle_data_func=None,
  307. error_handler=None,
  308. block_size=1024,
  309. ):
  310. self.super_class = super(Client, self)
  311. self.super_class.__init__()
  312. self.name = "Client"
  313. self.exit_event = False
  314. self.host = host
  315. self.port = port
  316. self.af_family = af_family
  317. self.block_size = block_size
  318. self.handle_data_func = handle_data_func
  319. self.is_connected = False
  320. self.error_handler = error_handler
  321. # self.socket = self.make_socket()
  322. self.socket = None
  323. self.status = "disconnected"
  324. def connect(self):
  325. self.status = "connecting"
  326. self.socket = self.make_socket()
  327. lclient.info(
  328. "connecting to socket '{}' of type {}".format(
  329. self.host,
  330. self.af_family
  331. )
  332. )
  333. try:
  334. if self.af_family == "AF_INET":
  335. self.socket.connect((self.host, self.port))
  336. elif self.af_family == "AF_UNIX":
  337. if os.path.exists(self.host):
  338. self.socket.connect(self.host)
  339. else:
  340. lclient.warn("File not found. Aborting.")
  341. return
  342. self.is_connected = True
  343. self.status = "connected"
  344. lclient.info("connected")
  345. return True
  346. except Exception as e:
  347. lclient.debug(e, exc_info=True)
  348. if type(e) is ConnectionRefusedError:
  349. lclient.info("failed to connect to socket '{}'".format(self.host))
  350. self.disconnect()
  351. return False
  352. def disconnect(self):
  353. lclient.info("disconnecting from socket '{}'".format(self.host))
  354. self.is_connected = False
  355. self.status = "disconnected"
  356. if self.socket:
  357. try:
  358. self.socket.shutdown(socket.SHUT_RDWR)
  359. except Exception as e:
  360. lclient.error(e)
  361. try:
  362. self.socket.close()
  363. except Exception as e:
  364. lclient.error("error occured while closing the socket, " +
  365. "maybe it is already closed",exc_info=e)
  366. del self.socket
  367. self.socket = None
  368. def handle_data(self, data_received):
  369. data_decoded = data_received.decode("utf-8")
  370. lcchat.info("Server: "+data_decoded)
  371. if self.handle_data_func:
  372. try:
  373. self.handle_data_func(data_decoded)
  374. except Exception as e:
  375. lclient.error(e, exc_info=True)
  376. def is_running(self):
  377. return (self in threading.enumerate())
  378. def make_socket(self):
  379. lclient.info("creating a {} socket".format(self.af_family))
  380. if self.af_family == "AF_INET":
  381. s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  382. elif self.af_family == "AF_UNIX":
  383. s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
  384. else:
  385. raise ValueError(
  386. "AF_FAMILY '{}' not supported!".format(
  387. self.af_family
  388. )
  389. )
  390. return s
  391. def main_loop(self):
  392. lclient.debug("starting main loop")
  393. while ( not self.exit_event ):
  394. if not self.status in ["connected"]:
  395. time.sleep(0.1)
  396. continue
  397. # print(0)
  398. read_confirmed, write_confirmed, exc_confirmed \
  399. = select.select(
  400. [self.socket],
  401. [],
  402. [self.socket],
  403. 1
  404. )
  405. if self.socket in exc_confirmed:
  406. self.is_connected = False
  407. lclient.warning("socket is expected to corrupt, exiting")
  408. self.disconnect()
  409. # self.stop()
  410. break
  411. elif self.socket in read_confirmed:
  412. try:
  413. data_received = self.read_from_socket()
  414. if data_received == b'':
  415. lclient.info("connection is broken, closing socket exiting")
  416. self.disconnect()
  417. # self.stop()
  418. # break
  419. else:
  420. try:
  421. self.handle_data(data_received)
  422. except Exception as e:
  423. lserver.error(
  424. "Error while handling data",
  425. exc_info=e
  426. )
  427. except Exception as e:
  428. lclient.error(e, exc_info=True)
  429. if type(e) is OSError:
  430. self.is_connected = False
  431. lclient.warn("connection broken, exiting")
  432. self.disconnect()
  433. # self.stop()
  434. # break
  435. else:
  436. raise
  437. else:
  438. time.sleep(0.1)
  439. def read_from_socket(self):
  440. data_received = self.socket.recv(self.block_size)
  441. return data_received
  442. def run(self):
  443. # self.connect()
  444. if self.error_handler:
  445. self.error_handler(self.main_loop)
  446. else:
  447. self.main_loop()
  448. def send(self, msg):
  449. msg = msg.rstrip()
  450. msg_encoded = bytes(msg+"\r\n", "utf-8")
  451. try:
  452. lcchat.info("Client: "+msg)
  453. self.socket.send(msg_encoded)
  454. except Exception as e:
  455. self.is_connected = False
  456. lclient.error(e, exc_info=True)
  457. self.status = "shutdown"
  458. def setup(self):
  459. pass
  460. def stop(self,reason=None):
  461. self.disconnect()
  462. self.exit_event = True
  463. if reason:
  464. print(reason)
  465. class AsyncClient(object):
  466. """docstring for Client"""
  467. is_connecting = False
  468. is_connected = False
  469. status = "uninitialized"
  470. def __init__(self,
  471. host,
  472. port=None,
  473. af_family="AF_INET",
  474. handle_data_func=None,
  475. error_handler=None,
  476. block_size=1024,
  477. ):
  478. self.super_class = super(AsyncClient, self)
  479. self.super_class.__init__()
  480. self.name = "Client"
  481. self.exit_event = False
  482. self.host = host
  483. self.port = port
  484. self.af_family = af_family
  485. self.block_size = block_size
  486. self.handle_data_func = handle_data_func
  487. self.is_connected = False
  488. self.error_handler = error_handler
  489. self.socket = None
  490. self.status = "disconnected"
  491. def connect(self):
  492. self.status = "connecting"
  493. self.socket = self.make_socket()
  494. lclient.info("connecting to socket '{}' of type {}".format(
  495. self.host,self.af_family))
  496. try:
  497. if self.af_family == "AF_INET":
  498. self.socket.connect((self.host, self.port))
  499. elif self.af_family == "AF_UNIX":
  500. self.socket.connect(self.host)
  501. self.is_connected = True
  502. self.status = "connected"
  503. lclient.info("connected")
  504. return True
  505. except Exception as e:
  506. lclient.debug(e, exc_info=True)
  507. if type(e) is ConnectionRefusedError:
  508. lclient.info("failed to connect to socket '{}'".format(self.host))
  509. self.disconnect()
  510. return False
  511. def disconnect(self):
  512. lclient.info("disconnecting from socket '{}'".format(self.host))
  513. self.is_connected = False
  514. self.status = "disconnected"
  515. if self.socket:
  516. try:
  517. self.socket.shutdown(socket.SHUT_RDWR)
  518. except Exception as e:
  519. lclient.error(e)
  520. try:
  521. self.socket.close()
  522. except Exception as e:
  523. lclient.error("error occured while closing the socket, " +
  524. "maybe it is already closed",exc_info=e)
  525. del self.socket
  526. self.socket = None
  527. def handle_data(self, data_received):
  528. data_decoded = data_received.decode("utf-8")
  529. lcchat.info("Server: "+data_decoded)
  530. if self.handle_data_func:
  531. try:
  532. self.handle_data_func(data_decoded)
  533. except Exception as e:
  534. lclient.error(e, exc_info=True)
  535. def is_running(self):
  536. return (self in threading.enumerate())
  537. def make_socket(self):
  538. lclient.info("creating a {} socket".format(self.af_family))
  539. if self.af_family == "AF_INET":
  540. s = trio.socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  541. elif self.af_family == "AF_UNIX":
  542. s = trio.socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
  543. else:
  544. raise ValueError(
  545. "AF_FAMILY '{}' not supported!".format(
  546. self.af_family
  547. )
  548. )
  549. return s
  550. async def read_from_socket(self):
  551. data_received = await self.socket.recv(self.block_size)
  552. return data_received
  553. async def recv(self):
  554. data_received = await self.socket.recv(self.block_size)
  555. data_decoded = data_received.decode("utf-8")
  556. return data_decoded
  557. async def run(self):
  558. lclient.debug("starting main loop")
  559. while ( not self.exit_event ):
  560. if not self.status in ["connected"]:
  561. time.sleep(0.1)
  562. continue
  563. # if self.socket in exc_confirmed:
  564. # self.is_connected = False
  565. # lclient.warning("socket is expected to corrupt, exiting")
  566. # self.disconnect()
  567. # # self.stop()
  568. # break
  569. # elif self.socket in read_confirmed:
  570. try:
  571. data = await self.read_from_socket()
  572. if not data:
  573. lclient.info("connection closed")
  574. self.disconnect()
  575. else:
  576. self.handle_data(data)
  577. except Exception as e:
  578. lclient.error(e, exc_info=True)
  579. if type(e) is OSError:
  580. self.is_connected = False
  581. lclient.warn("connection broken, exiting")
  582. self.disconnect()
  583. else:
  584. raise
  585. async def start(self,connect=False):
  586. if connect:
  587. self.connect()
  588. if self.error_handler:
  589. self.error_handler(self.run)
  590. else:
  591. self.run()
  592. def send(self, msg):
  593. msg = msg.rstrip()
  594. msg_encoded = bytes(msg+"\r\n", "utf-8")
  595. try:
  596. lcchat.info("Client: "+msg)
  597. self.socket.send(msg_encoded)
  598. except Exception as e:
  599. self.is_connected = False
  600. lclient.error(e, exc_info=True)
  601. self.disconnect()
  602. # self.exit_event = True
  603. # self.status = "shutdown"
  604. def setup(self):
  605. pass
  606. def stop(self,reason=None):
  607. self.disconnect()
  608. self.exit_event = True
  609. if reason:
  610. print(reason)
  611. #