__init__.py 26 KB


  1. #!/usr/bin/env python3.5
  2. # Copyright 2017 Digital
  3. #
  4. # This file is part of DigiLib.
  5. #
  6. # DigiLib is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # DigiLib is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with DigiLib. If not, see <http://www.gnu.org/licenses/>.
  18. import atexit
  19. import logging
  20. import logging.handlers
  21. import os
  22. import queue
  23. import select
  24. import socket
  25. import sys
  26. import threading
  27. import time
  28. import traceback
  29. import blinker
  30. import curio
  31. import digilib.misc
  32. lclient = logging.getLogger(__name__+".client")
  33. lserver = logging.getLogger(__name__+".server")
  34. lch = logging.getLogger(__name__+".chandler")
  35. lschat = logging.getLogger(__name__+".server.chat")
  36. lcchat = logging.getLogger(__name__+".client.chat")
  37. lerr = logging.getLogger(__name__+".errhandler")
  38. class ErrorHandler(object):
  39. """
  40. This Class provides a easy way to execute a function which is likely to
  41. raise an exception and log the traceback.
  42. """
  43. def __init__(self,logger):
  44. super().__init__()
  45. self.logger = logger
  46. def run(self,func,args=(),kwargs={},text=None,logger=None):
  47. if not logger:
  48. logger = self.logger
  49. if not text:
  50. print("ErrorHandler:WARNING:no text specified,"
  51. " this is highly discouraged")
  52. try:
  53. func(*args,**kwargs)
  54. except Exception as exc:
  55. logger.debug("error occured during "+text,exc_info=exc)
  56. async def arun(self,func,args=(),kwargs={},text=None,logger=None):
  57. if not logger:
  58. logger = self.logger
  59. if not text:
  60. print("ErrorHandler:WARNING:no text specified,"
  61. " this is highly discouraged")
  62. try:
  63. await func(*args,**kwargs)
  64. except Exception as exc:
  65. logger.debug("error occured during "+text,exc_info=exc)
  66. class ConnHandlerBase(object):
  67. """
  68. ConnectionHandlerBase is the base class for all connection handlers.
  69. It provides basic methodes. Consider inheriting form ConnectionHandler
  70. instead, as it provides better functionality.
  71. """
  72. addr = None
  73. block_size = 1024
  74. server = None
  75. def __init__(self, socket, addr, server):
  76. super(ConnHandlerBase, self).__init__()
  77. self.socket = socket
  78. self.addr = addr
  79. self.server = server
  80. async def disconnect(self):
  81. """
  82. disconenct explicitely disconnects the client, shuts the socket down
  83. and closes it.
  84. """
  85. try:
  86. await self.send(bytes())
  87. except:
  88. lch.debug("error during disconenct")
  89. try:
  90. await self.socket.shutdown(0)
  91. except:
  92. lch.debug("error during socket shutdown")
  93. try:
  94. await self.socket.close()
  95. except:
  96. lch.debug("error closing socket")
  97. async def handle(self, data):
  98. """
  99. This method is called for every message the server receives from the
  100. client. It should handle the data. Performing asynchronous blocking
  101. actions is ok, as the client loop does not wait for this method to
  102. finish. therefore, this method can be called multiple times at once,
  103. use curio locks if appropriate.
  104. """
  105. raise NotImplemented()
  106. async def recv(self,block_size=block_size):
  107. """
  108. This method waits for the client to send something and returns bytes!
  109. """
  110. data_received = await self.socket.recv(block_size)
  111. return data_received
  112. async def send(self, data, log_msg=None):
  113. """
  114. This method sends bytes to the client. Returns False if an exception
  115. was raised during sending, otherwise True.
  116. """
  117. if log_msg:
  118. lschat.info("server:"+str(log_msg))
  119. else:
  120. lschat.info("Server:"+str(data))
  121. try:
  122. await self.socket.send(data)
  123. return True
  124. except Exception as e:
  125. lch.error(e, exc_info=True)
  126. return False
  127. class ConnHandler(ConnHandlerBase):
  128. """
  129. More advanced connection handler than ConnectionHandlerBase. For
  130. instance, sends() takes a string and encodes it, recv() decodes
  131. the client's and returns a string and welcome_client is called after
  132. the ConnHandler is initialized (Not after the inheriting class is
  133. initialized though!)
  134. """
  135. def __init__(self, socket, addr, server):
  136. super(ConnHandler, self).__init__(socket,addr,server)
  137. async def disconnect(self):
  138. """
  139. disconenct() explicitely disconnects the client, performes a proper
  140. the shutdow on the socket and closes it.
  141. """
  142. try:
  143. await self.send("")
  144. except:
  145. lch.debug("error during disconenct")
  146. try:
  147. await self.socket.shutdown(0)
  148. except:
  149. lch.debug("error during socket shutdown")
  150. try:
  151. await self.socket.close()
  152. except:
  153. lch.debug("error closing socket")
  154. async def handle(self, data):
  155. return
  156. async def on_welcome(self):
  157. """
  158. This method can be used to send a welcome message to the client.
  159. """
  160. await self.send("Welcome client, this is the server sending!")
  161. async def recv(self,block_size=None):
  162. """
  163. This method waits for the client to send something, decodes it and
  164. returns a string.
  165. """
  166. if block_size == None:
  167. block_size = self.block_size
  168. data_received = await self.socket.recv(block_size)
  169. data_decoded = data_received.decode("utf-8")
  170. return data_decoded
  171. async def send(self, data, log_msg=False):
  172. """
  173. This method takes a string, encodes it and sends it to the client.
  174. Returns False if an exception was raised during sending, otherwise True.
  175. """
  176. if log_msg:
  177. lschat.info("server:"+log_msg)
  178. else:
  179. lschat.info("Server:"+data)
  180. data_encoded = bytes(data, "utf-8")
  181. try:
  182. await self.socket.send(data_encoded)
  183. return True
  184. except Exception as e:
  185. lch.error(e, exc_info=True)
  186. return False
  187. class ConnHandlerEcho(ConnHandler):
  188. """
  189. A Conn handler which sends everything it receives to every other client
  190. connected to the server
  191. """
  192. def __init__(self, socket, addr, server):
  193. super(ConnHandlerEcho, self).__init__(socket, addr, server)
  194. def handle(self, data):
  195. for h in self.server.connection_handler:
  196. if not h is self:
  197. h.send(data)
  198. class Server(object):
  199. """
  200. Server opens either an unix or an inet connection. For every new client
  201. a new ClientHandler object is created.
  202. _string_ **host** and _int_ **port** are the filename, hostname or ip
  203. address and the port respectively on which the connection will be opened.
  204. If you make an AF_UNIX socket, port is ignored so simply pass None.
  205. _object_ **handler_class** is the class (not an object of the class) used
  206. for making connection handlers.
  207. _dict_ **handler_kwargs** is a dict which will be passed as keyword
  208. argumetns to the __init__ function of handler_class when creating a new
  209. connection handler. Default is an emtpy dict.
  210. _string_ **af_family** specifies the AF_FAMILY socket type, valid options
  211. are: "AF_INET" for a inet socket and "AF_UNIX" for a unix (file) socket.
  212. Default is "AF_INET".
  213. _bool_ **log_ip** specifies wether the ip address of the server/client is logged.
  214. Default is False.
  215. _int_ **max_allowed_clients** specifies the maximum amount of clients
  216. connected to the server at once. Default is 5 (for now. this will change
  217. in the future).
  218. """
  219. # set to true when the server shuts down, for instance after a
  220. # fatal exception
  221. exit_event = False
  222. def __init__(
  223. self,
  224. host,
  225. port,
  226. handler_class,
  227. handler_kwargs={},
  228. af_family="AF_INET",
  229. log_ip=False,
  230. max_allowed_clients=5,
  231. ):
  232. super(Server, self).__init__()
  233. self.host = host
  234. self.port = port
  235. self.handler_class = handler_class
  236. self.handler_kwargs = handler_kwargs
  237. self.af_family = af_family
  238. self.log_ip = log_ip
  239. self.max_allowed_clients = max_allowed_clients
  240. # the error handler used by, for instance server.shutdown
  241. self.safe_handle = ErrorHandler(lserver)
  242. # don't make the socket yet, we don't need right now. will be created
  243. # when the start method is called
  244. self.socket = None
  245. # create a task group for handlers, so we can easily cancel/terminate
  246. # them all at once
  247. self.handle_tasks = curio.TaskGroup(name="tg_handle_clients")
  248. # list of all active connection handlers
  249. self.connection_handler = []
  250. # register our cleanup method to be executed when the program exits.
  251. # the cleanup function unregisters itself, so it won't get executed
  252. # twice when the user called it befor the program exites
  253. atexit.register(self.shutdown)
  254. def make_socket(self):
  255. """
  256. factory method for sockets.
  257. this method makes a normal socket and wraps it in a curi.io.Socket wrapper
  258. """
  259. lserver.debug("making a {} socket".format(self.af_family))
  260. if self.af_family == "AF_INET":
  261. s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  262. elif self.af_family == "AF_UNIX":
  263. s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
  264. else:
  265. raise ValueError(
  266. "AF_FAMILY '{}' not supported!".format(
  267. self.af_family
  268. )
  269. )
  270. s = curio.io.Socket(s)
  271. return s
  272. def make_handler(self, conn, addr):
  273. """
  274. factory method for handlers.
  275. this method creates a handler object from self.handler_class and self.handler_kwargs
  276. """
  277. return self.handler_class(conn, addr, self, **self.handler_kwargs)
  278. def setup(self):
  279. """
  280. creates a scoket, opens the connection and starts listening for
  281. clients. this method does not block, it returns after starting to
  282. listen.
  283. """
  284. lserver.info("setting up server")
  285. self.socket = self.make_socket()
  286. if self.af_family == "AF_INET":
  287. self.socket.bind((self.host, self.port))
  288. elif self.af_family == "AF_UNIX":
  289. if os.path.exists(self.host):
  290. lserver.debug("file already exists")
  291. lserver.debug("attempting to remove it")
  292. os.remove(self.host)
  293. self.socket.bind(self.host)
  294. self.socket.listen(self.max_allowed_clients)
  295. def shutdown(self):
  296. """
  297. this method can be called synchronous and calls the asynchronous
  298. shutdown method.
  299. """
  300. curio.run(self.async_shutdown)
  301. async def async_shutdown(self):
  302. """
  303. This method properly shuts down the sockets and closes them.
  304. it unregisters itself from atexit, so it doesn't get executed twice
  305. when it was manually called before to program exits.
  306. """
  307. atexit.unregister(self.shutdown)
  308. lserver.info("shutting down server")
  309. await self.safe_handle.arun(
  310. self.handle_tasks.cancel_remaining,text="handler cancel")
  311. try:
  312. await curio.ignore_after(0.01,self.handle_tasks.join)
  313. except Exception as exc:
  314. lserver.error("error joining tasks:",exc_info=exc)
  315. # check if there is actually a socket. if the shutdown method is
  316. # executed before the start method, there is no socket.
  317. if self.socket:
  318. await self.safe_handle.arun(self.socket.shutdown,
  319. args=[socket.SHUT_RDWR,],text="socket shutdown")
  320. await self.safe_handle.arun(self.socket.close,text="socket close")
  321. def start(self):
  322. """
  323. this method starts the server. it is blocking.
  324. """
  325. self.setup()
  326. curio.run(self.run)
  327. async def run(self):
  328. """
  329. this method is the main loop of the Server. it waits for new client
  330. connections and creates a handle task for each of them. it does not
  331. receive or send anything.
  332. """
  333. lserver.debug("entering main loop")
  334. while ( not self.exit_event ):
  335. lserver.debug("waiting for client to connect")
  336. # wait for a client to connect
  337. conn,addr = await self.socket.accept()
  338. if self.log_ip:
  339. lserver.info(
  340. "new client connection, {}:{}!".format(*addr))
  341. else:
  342. lserver.info("a new client connected, let's handle it")
  343. handler = self.make_handler(conn, addr)
  344. # self.register_conn(conn, addr)
  345. # self.register_handler(handler, conn)
  346. await self.handle_tasks.spawn(self.handle_client(conn,handler))
  347. async def handle_client(self,socket,handler):
  348. """
  349. This method waits for the client to send something and calls the
  350. ClientHandler's handle method. there is a handle_client method running for each client connected.
  351. """
  352. if hasattr(handler,"on_welcome"):
  353. await handler.on_welcome()
  354. while True:
  355. try:
  356. if self.log_ip:
  357. lserver.debug("waiting for {} to send something"
  358. .format(socket.getsockname()))
  359. else:
  360. lserver.debug("waiting for the client to send something")
  361. # wait for the client to send something
  362. data = await handler.recv()
  363. # if there is no data the client disconnected. this is a
  364. # tcp protocoll specification.
  365. if not data:
  366. if self.log_ip:
  367. lserver.info("the connection to {} was closed"
  368. .format(socket.getsockname()))
  369. else:
  370. lserver.info("the connection to the client was closed")
  371. await handler.disconnect()
  372. # break out of the loop. don't return because we need to
  373. # do cleanup
  374. break
  375. else:
  376. # don't strip the data of its whitespaces, since they may
  377. # be important.
  378. lschat.info("Client:"+data.rstrip())
  379. await handler.handle(data)
  380. except Exception as e:
  381. lserver.error(e, exc_info=True)
  382. # let's sleep a bit, in case something is broken and the
  383. # loop throws an exception every time
  384. await curio.sleep(0.01)
  385. # if a task exits and hasn't been joined, curio prints a warning.
  386. # we don't want the warning, so let's join the current task for 0
  387. # seconds. instead of task.join() we use task.wait(). the only
  388. # difference is that wait doesn't throw a exception if the task was
  389. # stopped or crashed
  390. cur_task = await curio.current_task()
  391. await curio.ignore_after(0.01,cur_task.wait)
  392. class Client(threading.Thread):
  393. """docstring for Client"""
  394. status = "uninitialized"
  395. exit_event = False
  396. def __init__(
  397. self,
  398. host,
  399. port=None,
  400. af_family="AF_INET",
  401. handle_data_func=None,
  402. error_handler=None,
  403. block_size=1024,
  404. ):
  405. super().__init__()
  406. self.name = "Client"
  407. self.host = host
  408. self.port = port
  409. self.af_family = af_family
  410. self.block_size = block_size
  411. self.handle_data_func = handle_data_func
  412. self.error_handler = error_handler
  413. self.socket = None
  414. self.status = "disconnected"
  415. def connect(self):
  416. self.status = "connecting"
  417. self.socket = self.make_socket()
  418. lclient.info("connecting to socket '{}' of type {}"
  419. .format(self.host,self.af_family))
  420. try:
  421. if self.af_family == "AF_INET":
  422. self.socket.connect((self.host, self.port))
  423. elif self.af_family == "AF_UNIX":
  424. self.socket.connect(self.host)
  425. self.status = "connected"
  426. lclient.info("connected")
  427. return True
  428. except Exception as e:
  429. lclient.debug(e, exc_info=True)
  430. if type(e) is ConnectionRefusedError:
  431. lclient.info(
  432. "failed to connect to socket '{}'".format(self.host))
  433. self.disconnect()
  434. return False
  435. def disconnect(self):
  436. lclient.info("disconnecting from socket '{}'".format(self.host))
  437. self.status = "disconnected"
  438. if self.socket:
  439. try:
  440. self.socket.shutdown(socket.SHUT_RDWR)
  441. except Exception as e:
  442. lclient.error(e)
  443. try:
  444. self.socket.close()
  445. except Exception as e:
  446. lclient.error("error occured while closing the socket, " +
  447. "maybe it is already closed",exc_info=e)
  448. del self.socket
  449. self.socket = None
  450. def handle_data(self, data_received):
  451. data_decoded = data_received.decode("utf-8")
  452. lcchat.info("Server: "+data_decoded)
  453. if self.handle_data_func:
  454. try:
  455. self.handle_data_func(data_decoded)
  456. except Exception as e:
  457. lclient.error(e, exc_info=True)
  458. def is_running(self):
  459. return (self in threading.enumerate())
  460. def make_socket(self):
  461. lclient.info("creating a {} socket".format(self.af_family))
  462. if self.af_family == "AF_INET":
  463. s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  464. elif self.af_family == "AF_UNIX":
  465. s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
  466. else:
  467. raise ValueError(
  468. "AF_FAMILY '{}' not supported!".format(
  469. self.af_family
  470. )
  471. )
  472. return s
  473. def main_loop(self):
  474. lclient.debug("starting main loop")
  475. while ( not self.exit_event ):
  476. if not self.status in ["connected"]:
  477. time.sleep(0.1)
  478. continue
  479. # print(0)
  480. read_confirmed, write_confirmed, exc_confirmed \
  481. = select.select(
  482. [self.socket],
  483. [],
  484. [self.socket],
  485. 1
  486. )
  487. if self.socket in exc_confirmed:
  488. lclient.warning("socket is expected to corrupt, exiting")
  489. self.disconnect()
  490. # self.stop()
  491. break
  492. elif self.socket in read_confirmed:
  493. try:
  494. data_received = self.read_from_socket()
  495. if data_received == b'':
  496. lclient.info("connection is broken, closing socket exiting")
  497. self.disconnect()
  498. # self.stop()
  499. # break
  500. else:
  501. try:
  502. self.handle_data(data_received)
  503. except Exception as e:
  504. lserver.error(
  505. "Error while handling data",
  506. exc_info=e
  507. )
  508. except Exception as e:
  509. lclient.error(e, exc_info=True)
  510. if type(e) is OSError:
  511. lclient.warn("connection broken, exiting")
  512. self.disconnect()
  513. # self.stop()
  514. # break
  515. else:
  516. raise
  517. else:
  518. time.sleep(0.1)
  519. def read_from_socket(self):
  520. data_received = self.socket.recv(self.block_size)
  521. return data_received
  522. def run(self):
  523. # self.connect()
  524. if self.error_handler:
  525. self.error_handler(self.main_loop)
  526. else:
  527. self.main_loop()
  528. def send(self, msg):
  529. msg = msg.rstrip()
  530. msg_encoded = bytes(msg+"\r\n", "utf-8")
  531. try:
  532. lcchat.info("Client: "+msg)
  533. self.socket.send(msg_encoded)
  534. except Exception as e:
  535. lclient.error(e, exc_info=True)
  536. self.status = "shutdown"
  537. def setup(self):
  538. pass
  539. def stop(self,reason=None):
  540. self.disconnect()
  541. self.exit_event = True
  542. if reason:
  543. print(reason)
  544. class AsyncClient(object):
  545. """docstring for Client"""
  546. block_size = 1024
  547. status = "uninitialized"
  548. exit_event = False
  549. def __init__(
  550. self,
  551. host,
  552. port=None,
  553. af_family="AF_INET",
  554. handle_data_func=None,
  555. error_handler=None,
  556. block_size=1024,
  557. ):
  558. super().__init__()
  559. self.host = host
  560. self.port = port
  561. self.af_family = af_family
  562. self.block_size = block_size
  563. self.handle_data_func = handle_data_func
  564. self.error_handler = error_handler
  565. self.socket = None
  566. self.status = "disconnected"
  567. def connect(self):
  568. self.status = "connecting"
  569. self.socket = self.make_socket()
  570. lclient.info("connecting to socket '{}' of type {}".format(
  571. self.host,self.af_family))
  572. try:
  573. if self.af_family == "AF_INET":
  574. self.socket.connect((self.host, self.port))
  575. elif self.af_family == "AF_UNIX":
  576. self.socket.connect(self.host)
  577. self.is_connected = True
  578. self.status = "connected"
  579. lclient.info("connected")
  580. return True
  581. except Exception as e:
  582. lclient.debug(e, exc_info=True)
  583. if type(e) is ConnectionRefusedError:
  584. lclient.info("failed to connect to socket '{}'".format(self.host))
  585. self.disconnect()
  586. return False
  587. def disconnect(self):
  588. lclient.info("disconnecting from socket '{}'".format(self.host))
  589. self.is_connected = False
  590. self.status = "disconnected"
  591. if self.socket:
  592. try:
  593. self.socket.shutdown(socket.SHUT_RDWR)
  594. except Exception as e:
  595. lclient.error(e)
  596. try:
  597. self.socket.close()
  598. except Exception as e:
  599. lclient.error("error occured while closing the socket, " +
  600. "maybe it is already closed",exc_info=e)
  601. del self.socket
  602. self.socket = None
  603. def handle_data(self, data_received):
  604. data_decoded = data_received.decode("utf-8")
  605. lcchat.info("Server: "+data_decoded)
  606. if self.handle_data_func:
  607. try:
  608. self.handle_data_func(data_decoded)
  609. except Exception as e:
  610. lclient.error(e, exc_info=True)
  611. def is_running(self):
  612. return (self in threading.enumerate())
  613. def make_socket(self):
  614. lclient.info("creating a {} socket".format(self.af_family))
  615. if self.af_family == "AF_INET":
  616. s = trio.socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  617. elif self.af_family == "AF_UNIX":
  618. s = trio.socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
  619. else:
  620. raise ValueError(
  621. "AF_FAMILY '{}' not supported!".format(
  622. self.af_family
  623. )
  624. )
  625. return s
  626. async def read_from_socket(self):
  627. data_received = await self.socket.recv(self.block_size)
  628. return data_received
  629. async def recv(self):
  630. data_received = await self.socket.recv(self.block_size)
  631. data_decoded = data_received.decode("utf-8")
  632. return data_decoded
  633. async def run(self):
  634. lclient.debug("starting main loop")
  635. while ( not self.exit_event ):
  636. if not self.status in ["connected"]:
  637. time.sleep(0.1)
  638. continue
  639. # if self.socket in exc_confirmed:
  640. # self.is_connected = False
  641. # lclient.warning("socket is expected to corrupt, exiting")
  642. # self.disconnect()
  643. # # self.stop()
  644. # break
  645. # elif self.socket in read_confirmed:
  646. try:
  647. data = await self.read_from_socket()
  648. if not data:
  649. lclient.info("connection closed")
  650. self.disconnect()
  651. else:
  652. self.handle_data(data)
  653. except Exception as e:
  654. lclient.error(e, exc_info=True)
  655. if type(e) is OSError:
  656. self.is_connected = False
  657. lclient.warn("connection broken, exiting")
  658. self.disconnect()
  659. else:
  660. raise
  661. async def start(self,connect=False):
  662. if connect:
  663. self.connect()
  664. if self.error_handler:
  665. self.error_handler(self.run)
  666. else:
  667. self.run()
  668. def send(self, msg):
  669. msg = msg.rstrip()
  670. msg_encoded = bytes(msg+"\r\n", "utf-8")
  671. try:
  672. lcchat.info("Client: "+msg)
  673. self.socket.send(msg_encoded)
  674. except Exception as e:
  675. self.is_connected = False
  676. lclient.error(e, exc_info=True)
  677. self.disconnect()
  678. # self.exit_event = True
  679. # self.status = "shutdown"
  680. def setup(self):
  681. pass
  682. def stop(self,reason=None):
  683. self.disconnect()
  684. self.exit_event = True
  685. if reason:
  686. print(reason)
  687. #