__init__.py 20 KB


  1. #!/usr/bin/env python3.5
  2. # Copyright 2017 Digital
  3. #
  4. # This file is part of DigiLib.
  5. #
  6. # DigiLib is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # DigiLib is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with DigiLib. If not, see <http://www.gnu.org/licenses/>.
  18. import logging
  19. import logging.handlers
  20. import os
  21. import queue
  22. import select
  23. import socket
  24. import sys
  25. import threading
  26. import time
  27. import traceback
  28. import blinker
  29. import curio
  30. lclient = logging.getLogger(__name__+".client")
  31. lserver = logging.getLogger(__name__+".server")
  32. lschat = logging.getLogger(__name__+".server.chat")
  33. lcchat = logging.getLogger(__name__+".client.chat")
  34. _tasks = []
  35. def _append_task():
  36. cur_task = curio.current_task()
  37. _tasks.append(cur_task)
  38. def _remove_task():
  39. cur_task = curio.current_task()
  40. if cur_task in _tasks:
  41. _tasks.remove(cur_task)
  42. class ConnHandlerBase(object):
  43. def __init__(self, socket, addr, server):
  44. self.status = "init"
  45. super(ConnHandlerBase, self).__init__()
  46. self.socket = socket
  47. self.addr = addr
  48. self.server = server
  49. self.block_size = 1024
  50. # self.welcome_client()
  51. self.status="connected"
  52. async def welcome_client(self):
  53. pass
  54. async def handle(self, data):
  55. return
  56. async def recv(self):
  57. data_received = await self.socket.recv(self.block_size)
  58. data_decoded = data_received.decode("utf-8")
  59. return data_decoded
  60. async def send(self, msg, log_msg=False):
  61. msg_encoded = bytes(msg, "utf-8")
  62. if log_msg:
  63. lschat.info("server:"+log_msg)
  64. else:
  65. lschat.info("Server:"+msg)
  66. try:
  67. await self.socket.send(msg_encoded)
  68. return True
  69. except Exception as e:
  70. lserver.error(e, exc_info=True)
  71. return False
  72. def close(self):
  73. self.status = "closed"
  74. try:
  75. self.socket.shutdown(0)
  76. except:
  77. lserver.debug("error during socket shutdown")
  78. try:
  79. self.socket.close()
  80. except:
  81. lserver.debug("error closing socket")
  82. class ConnHandlerEcho(ConnHandlerBase):
  83. def __init__(self, socket, addr, server):
  84. self.status = "init"
  85. self.super_class = super(ConnHandlerEcho, self)
  86. self.super_class.__init__(socket, addr, server)
  87. self.server = server
  88. def welcome_client(self):
  89. self.send("welcome to the client")
  90. def handle(self, data):
  91. lschat.info("Client:"+data)
  92. for h in list(set(self.server.connection_handler)-{self}):
  93. h.send(data)
  94. class Server(object):
  95. """docstring for SocketHandler"""
  96. def __init__(self,
  97. host,
  98. port=None,
  99. af_family="AF_INET",
  100. log_ip=False,
  101. max_allowed_clients=5,
  102. handler=None,
  103. handler_kwargs={},
  104. ):
  105. super(Server, self).__init__()
  106. self.exit_event = False
  107. self.host=host
  108. self.port=port
  109. self.af_family = af_family
  110. self.log_ip = log_ip
  111. self.handler_kwargs = handler_kwargs
  112. self.handler = handler
  113. self.max_allowed_clients = max_allowed_clients
  114. self.socket = self.make_socket()
  115. self.connection_handler = []
  116. self.conn_to_addr = {}
  117. self.addr_to_conn = {}
  118. self.conn_to_handler ={}
  119. self.handler_to_conn = {}
  120. self.read_sockets_expected = [self.socket]
  121. def cleanup(self):
  122. pass
  123. def make_socket(self):
  124. lserver.debug("making a {} socket".format(self.af_family))
  125. if self.af_family == "AF_INET":
  126. s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  127. elif self.af_family == "AF_UNIX":
  128. s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
  129. else:
  130. raise ValueError(
  131. "AF_FAMILY '{}' not supported!".format(
  132. self.af_family
  133. )
  134. )
  135. s = curio.io.Socket(s)
  136. return s
  137. def make_handler(self, conn, addr):
  138. return self.handler(conn, addr, self, **self.handler_kwargs)
  139. def register_conn(self, conn, addr):
  140. # if self.log_ip:
  141. # lserver.info("New connection from {} on port {}".format(*addr))
  142. self.read_sockets_expected.append(conn)
  143. if addr:
  144. self.conn_to_addr[conn] = addr
  145. self.addr_to_conn[addr] = conn
  146. def unregister_conn(self, conn):
  147. self.read_sockets_expected.remove(conn)
  148. addr = self.conn_to_addr.get(conn, False)
  149. if addr:
  150. del self.addr_to_conn[addr]
  151. del self.conn_to_addr[conn]
  152. def register_handler(self, handler, conn):
  153. self.connection_handler.append(handler)
  154. self.conn_to_handler[conn] = handler
  155. self.handler_to_conn[handler] = conn
  156. def unregister_handler(self, handler, conn):
  157. self.connection_handler.remove(handler)
  158. del self.conn_to_handler[conn]
  159. del self.handler_to_conn[handler]
  160. def setup(self):
  161. lserver.info("setting up socket")
  162. if self.af_family == "AF_INET":
  163. self.socket.bind((self.host, self.port))
  164. elif self.af_family == "AF_UNIX":
  165. if os.path.exists(self.host):
  166. lserver.debug("file already exists")
  167. lserver.debug("attempting to remove it")
  168. os.remove(self.host)
  169. self.socket.bind(self.host)
  170. self.socket.listen(self.max_allowed_clients)
  171. # self.socket.settimeout(1)
  172. def start(self):
  173. # lserver.debug(dir(self))
  174. curio.run(self.run)
  175. async def run(self):
  176. self.setup()
  177. lserver.debug("entering main loop")
  178. while ( not self.exit_event ):
  179. # lserver.debug(self.read_sockets_expected)
  180. # lserver.debug(self.write_sockets_expected)
  181. # lserver.debug(self.exc_sockets_expected)
  182. # read_sockets = select.select(self.read_sockets_expected,[],[])
  183. lserver.debug("waiting for client to connect")
  184. conn,addr = await self.socket.accept()
  185. if self.log_ip:
  186. lserver.info(
  187. "new client connection, {}:{}!".format(*addr))
  188. else:
  189. lserver.info("a new client connected, let's handle it")
  190. handler = self.make_handler(conn, addr)
  191. self.register_conn(conn, addr)
  192. self.register_handler(handler, conn)
  193. await curio.spawn(self.wait_for_client(conn,handler))
  194. async def wait_for_client(self,socket,handler):
  195. _append_task()
  196. # cur_task = curio.current_task()
  197. # _tasks.append(cur_task)
  198. # while True:
  199. for i in range(1):
  200. try:
  201. if self.log_ip:
  202. lserver.debug(
  203. "waiting for {} to send something"
  204. .format(socket.getsockname()))
  205. else:
  206. lserver.debug(
  207. "waiting for the client to send something")
  208. data = await handler.recv()
  209. if not data:
  210. if self.log_ip:
  211. lserver.info(
  212. "the connection to {} was closed"
  213. .format(socket.getsockname()))
  214. else:
  215. lserver.info(
  216. "the connection to the client was closed")
  217. self.unregister_handler(handler, socket)
  218. self.unregister_conn(socket)
  219. handler.close()
  220. else:
  221. lschat.info("Client:"+data.strip())
  222. coro = handler.handle(data)
  223. task = await curio.spawn(coro)
  224. _tasks.append(task)
  225. except Exception as e:
  226. lserver.error(e, exc_info=True)
  227. # saver to use time.sleep
  228. time.sleep(0.1)
  229. digilib.misc.fake_wait_task()
  230. _tasks.remove(cur_task)
  231. class Client(threading.Thread):
  232. """docstring for Client"""
  233. is_connecting = False
  234. is_connected = False
  235. status = "uninitialized"
  236. def __init__(self,
  237. host,
  238. port=None,
  239. af_family="AF_INET",
  240. handle_data_func=None,
  241. error_handler=None,
  242. block_size=1024,
  243. ):
  244. self.super_class = super(Client, self)
  245. self.super_class.__init__()
  246. self.name = "Client"
  247. self.exit_event = False
  248. self.host = host
  249. self.port = port
  250. self.af_family = af_family
  251. self.block_size = block_size
  252. self.handle_data_func = handle_data_func
  253. self.is_connected = False
  254. self.error_handler = error_handler
  255. # self.socket = self.make_socket()
  256. self.socket = None
  257. self.status = "disconnected"
  258. def connect(self):
  259. self.status = "connecting"
  260. self.socket = self.make_socket()
  261. lclient.info(
  262. "connecting to socket '{}' of type {}".format(
  263. self.host,
  264. self.af_family
  265. )
  266. )
  267. try:
  268. if self.af_family == "AF_INET":
  269. self.socket.connect((self.host, self.port))
  270. elif self.af_family == "AF_UNIX":
  271. if os.path.exists(self.host):
  272. self.socket.connect(self.host)
  273. else:
  274. lclient.warn("File not found. Aborting.")
  275. return
  276. self.is_connected = True
  277. self.status = "connected"
  278. lclient.info("connected")
  279. return True
  280. except Exception as e:
  281. lclient.debug(e, exc_info=True)
  282. if type(e) is ConnectionRefusedError:
  283. lclient.info("failed to connect to socket '{}'".format(self.host))
  284. self.disconnect()
  285. return False
  286. def disconnect(self):
  287. lclient.info("disconnecting from socket '{}'".format(self.host))
  288. self.is_connected = False
  289. self.status = "disconnected"
  290. if self.socket:
  291. try:
  292. self.socket.shutdown(socket.SHUT_RDWR)
  293. except Exception as e:
  294. lclient.error(e)
  295. try:
  296. self.socket.close()
  297. except Exception as e:
  298. lclient.error("error occured while closing the socket, " +
  299. "maybe it is already closed",exc_info=e)
  300. del self.socket
  301. self.socket = None
  302. def handle_data(self, data_received):
  303. data_decoded = data_received.decode("utf-8")
  304. lcchat.info("Server: "+data_decoded)
  305. if self.handle_data_func:
  306. try:
  307. self.handle_data_func(data_decoded)
  308. except Exception as e:
  309. lclient.error(e, exc_info=True)
  310. def is_running(self):
  311. return (self in threading.enumerate())
  312. def make_socket(self):
  313. lclient.info("creating a {} socket".format(self.af_family))
  314. if self.af_family == "AF_INET":
  315. s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  316. elif self.af_family == "AF_UNIX":
  317. s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
  318. else:
  319. raise ValueError(
  320. "AF_FAMILY '{}' not supported!".format(
  321. self.af_family
  322. )
  323. )
  324. return s
  325. def main_loop(self):
  326. lclient.debug("starting main loop")
  327. while ( not self.exit_event ):
  328. if not self.status in ["connected"]:
  329. time.sleep(0.1)
  330. continue
  331. # print(0)
  332. read_confirmed, write_confirmed, exc_confirmed \
  333. = select.select(
  334. [self.socket],
  335. [],
  336. [self.socket],
  337. 1
  338. )
  339. if self.socket in exc_confirmed:
  340. self.is_connected = False
  341. lclient.warning("socket is expected to corrupt, exiting")
  342. self.disconnect()
  343. # self.stop()
  344. break
  345. elif self.socket in read_confirmed:
  346. try:
  347. data_received = self.read_from_socket()
  348. if data_received == b'':
  349. lclient.info("connection is broken, closing socket exiting")
  350. self.disconnect()
  351. # self.stop()
  352. # break
  353. else:
  354. try:
  355. self.handle_data(data_received)
  356. except Exception as e:
  357. lserver.error(
  358. "Error while handling data",
  359. exc_info=e
  360. )
  361. except Exception as e:
  362. lclient.error(e, exc_info=True)
  363. if type(e) is OSError:
  364. self.is_connected = False
  365. lclient.warn("connection broken, exiting")
  366. self.disconnect()
  367. # self.stop()
  368. # break
  369. else:
  370. raise
  371. else:
  372. time.sleep(0.1)
  373. def read_from_socket(self):
  374. data_received = self.socket.recv(self.block_size)
  375. return data_received
  376. def run(self):
  377. # self.connect()
  378. if self.error_handler:
  379. self.error_handler(self.main_loop)
  380. else:
  381. self.main_loop()
  382. def send(self, msg):
  383. msg = msg.rstrip()
  384. msg_encoded = bytes(msg+"\r\n", "utf-8")
  385. try:
  386. lcchat.info("Client: "+msg)
  387. self.socket.send(msg_encoded)
  388. except Exception as e:
  389. self.is_connected = False
  390. lclient.error(e, exc_info=True)
  391. self.status = "shutdown"
  392. def setup(self):
  393. pass
  394. def stop(self,reason=None):
  395. self.disconnect()
  396. self.exit_event = True
  397. if reason:
  398. print(reason)
  399. class AsyncClient(object):
  400. """docstring for Client"""
  401. is_connecting = False
  402. is_connected = False
  403. status = "uninitialized"
  404. def __init__(self,
  405. host,
  406. port=None,
  407. af_family="AF_INET",
  408. handle_data_func=None,
  409. error_handler=None,
  410. block_size=1024,
  411. ):
  412. self.super_class = super(AsyncClient, self)
  413. self.super_class.__init__()
  414. self.name = "Client"
  415. self.exit_event = False
  416. self.host = host
  417. self.port = port
  418. self.af_family = af_family
  419. self.block_size = block_size
  420. self.handle_data_func = handle_data_func
  421. self.is_connected = False
  422. self.error_handler = error_handler
  423. self.socket = None
  424. self.status = "disconnected"
  425. def connect(self):
  426. self.status = "connecting"
  427. self.socket = self.make_socket()
  428. lclient.info("connecting to socket '{}' of type {}".format(
  429. self.host,self.af_family))
  430. try:
  431. if self.af_family == "AF_INET":
  432. self.socket.connect((self.host, self.port))
  433. elif self.af_family == "AF_UNIX":
  434. self.socket.connect(self.host)
  435. # if os.path.exists(self.host):
  436. # pass
  437. # else:
  438. # lclient.warn("File not found. Aborting.")
  439. # return
  440. self.is_connected = True
  441. self.status = "connected"
  442. lclient.info("connected")
  443. return True
  444. except Exception as e:
  445. lclient.debug(e, exc_info=True)
  446. if type(e) is ConnectionRefusedError:
  447. lclient.info("failed to connect to socket '{}'".format(self.host))
  448. self.disconnect()
  449. return False
  450. def disconnect(self):
  451. lclient.info("disconnecting from socket '{}'".format(self.host))
  452. self.is_connected = False
  453. self.status = "disconnected"
  454. if self.socket:
  455. try:
  456. self.socket.shutdown(socket.SHUT_RDWR)
  457. except Exception as e:
  458. lclient.error(e)
  459. try:
  460. self.socket.close()
  461. except Exception as e:
  462. lclient.error("error occured while closing the socket, " +
  463. "maybe it is already closed",exc_info=e)
  464. del self.socket
  465. self.socket = None
  466. def handle_data(self, data_received):
  467. data_decoded = data_received.decode("utf-8")
  468. lcchat.info("Server: "+data_decoded)
  469. if self.handle_data_func:
  470. try:
  471. self.handle_data_func(data_decoded)
  472. except Exception as e:
  473. lclient.error(e, exc_info=True)
  474. def is_running(self):
  475. return (self in threading.enumerate())
  476. def make_socket(self):
  477. lclient.info("creating a {} socket".format(self.af_family))
  478. if self.af_family == "AF_INET":
  479. s = trio.socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  480. elif self.af_family == "AF_UNIX":
  481. s = trio.socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
  482. else:
  483. raise ValueError(
  484. "AF_FAMILY '{}' not supported!".format(
  485. self.af_family
  486. )
  487. )
  488. return s
  489. def main_loop(self):
  490. lclient.debug("starting main loop")
  491. while ( not self.exit_event ):
  492. if not self.status in ["connected"]:
  493. time.sleep(0.1)
  494. continue
  495. # print(0)
  496. read_confirmed, write_confirmed, exc_confirmed \
  497. = select.select(
  498. [self.socket],
  499. [],
  500. [self.socket],
  501. 1
  502. )
  503. if self.socket in exc_confirmed:
  504. self.is_connected = False
  505. lclient.warning("socket is expected to corrupt, exiting")
  506. self.disconnect()
  507. # self.stop()
  508. break
  509. elif self.socket in read_confirmed:
  510. try:
  511. data_received = self.read_from_socket()
  512. if data_received == b'':
  513. lclient.info("connection is broken, closing socket exiting")
  514. self.disconnect()
  515. # self.stop()
  516. # break
  517. else:
  518. self.handle_data(data_received)
  519. except Exception as e:
  520. lclient.error(e, exc_info=True)
  521. if type(e) is OSError:
  522. self.is_connected = False
  523. lclient.warn("connection broken, exiting")
  524. self.disconnect()
  525. # self.stop()
  526. # break
  527. else:
  528. raise
  529. else:
  530. time.sleep(0.1)
  531. def read_from_socket(self):
  532. data_received = self.socket.recv(self.block_size)
  533. return data_received
  534. def run(self,connect=False):
  535. if connect:
  536. self.connect()
  537. if self.error_handler:
  538. self.error_handler(self.main_loop)
  539. else:
  540. self.main_loop()
  541. def send(self, msg):
  542. msg = msg.rstrip()
  543. msg_encoded = bytes(msg+"\r\n", "utf-8")
  544. try:
  545. lcchat.info("Client: "+msg)
  546. self.socket.send(msg_encoded)
  547. except Exception as e:
  548. self.is_connected = False
  549. lclient.error(e, exc_info=True)
  550. self.status = "shutdown"
  551. def setup(self):
  552. pass
  553. def stop(self,reason=None):
  554. self.disconnect()
  555. self.exit_event = True
  556. if reason:
  557. print(reason)
  558. #