__init__.py 19 KB


  1. #!/usr/bin/env python3.5
  2. # Copyright 2017 Digital
  3. #
  4. # This file is part of DigiLib.
  5. #
  6. # DigiLib is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # DigiLib is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with DigiLib. If not, see <http://www.gnu.org/licenses/>.
  18. import logging
  19. import logging.handlers
  20. import os
  21. import queue
  22. import select
  23. import socket
  24. import sys
  25. import threading
  26. import time
  27. import traceback
  28. lclient = logging.getLogger(__name__+".client")
  29. lserver = logging.getLogger(__name__+".server")
  30. lschat = logging.getLogger(__name__+".server.chat")
  31. lcchat = logging.getLogger(__name__+".client.chat")
  32. class ConnHandlerBase(object):
  33. def __init__(self, socket, addr, server):
  34. self.status = "init"
  35. super(ConnHandlerBase, self).__init__()
  36. self.socket = socket
  37. self.addr = addr
  38. self.server = server
  39. self.block_size = 1024
  40. self.welcome_client()
  41. self.status="connected"
  42. def welcome_client(self):
  43. pass
  44. def handle(self, data_decoded):
  45. return
  46. def recv(self):
  47. try:
  48. data_received = self.socket.recv(self.block_size)
  49. data_decoded = data_received.decode("utf-8")
  50. if data_decoded:
  51. lschat.info("Client:"+data_decoded.strip())
  52. self.handle(data_decoded)
  53. return True
  54. else:
  55. lserver.debug("connection corrupted")
  56. return False
  57. except Exception as e:
  58. lserver.error(e, exc_info=True)
  59. return False
  60. def send(self, msg):
  61. msg_encoded = bytes(msg, "utf-8")
  62. lschat.info("Server:"+msg)
  63. try:
  64. self.socket.send(msg_encoded)
  65. return True
  66. except Exception as e:
  67. lserver.error(e, exc_info=True)
  68. return False
  69. def close(self):
  70. self.status = "closed"
  71. try:
  72. self.socket.shutdown(0)
  73. except:
  74. lserver.error("error during socket shutdown, ignoring")
  75. try:
  76. self.socket.close()
  77. except:
  78. lserver.error("error closing socket, maybe already closed")
  79. class ConnHandlerEcho(ConnHandlerBase):
  80. def __init__(self, socket, addr, server):
  81. self.status = "init"
  82. self.super_class = super(ConnHandlerEcho, self)
  83. self.super_class.__init__(socket, addr, server)
  84. self.server = server
  85. def welcome_client(self):
  86. self.send("welcome to the client")
  87. def handle(self, data_decoded):
  88. lschat.info("Client:"+data_decoded)
  89. for h in list(set(self.server.connection_handler)-{self}):
  90. h.send(data_decoded)
  91. class Server(object):
  92. """docstring for SocketHandler"""
  93. def __init__(self,
  94. host,
  95. port=None,
  96. af_family="AF_INET",
  97. max_allowed_clients=5,
  98. handler=None,
  99. handler_kwargs={},
  100. ):
  101. super(Server, self).__init__()
  102. self.exit_event = False
  103. self.host=host
  104. self.port=port
  105. self.af_family = af_family
  106. self.handler_kwargs = handler_kwargs
  107. self.handler = handler
  108. self.max_allowed_clients = max_allowed_clients
  109. self.socket = self.make_socket()
  110. self.connection_handler = []
  111. self.conn_to_addr = {}
  112. self.addr_to_conn = {}
  113. self.conn_to_handler ={}
  114. self.handler_to_conn = {}
  115. self.read_sockets_expected = [self.socket]
  116. self.write_sockets_expected = []
  117. self.exc_sockets_expected = []
  118. def make_socket(self):
  119. lserver.debug("making a {} socket".format(self.af_family))
  120. if self.af_family == "AF_INET":
  121. return socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  122. elif self.af_family == "AF_UNIX":
  123. return socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
  124. else:
  125. raise ValueError(
  126. "AF_FAMILY '{}' not supported!".format(
  127. self.af_family
  128. )
  129. )
  130. def make_handler(self, conn, addr):
  131. return self.handler(conn, addr, self, **self.handler_kwargs)
  132. def register_conn(self, conn, addr):
  133. lserver.info(
  134. "New Connection, addr: '{}', socket: '{}'".format(addr,conn)
  135. )
  136. self.read_sockets_expected.append(conn)
  137. if addr:
  138. self.conn_to_addr[conn] = addr
  139. self.addr_to_conn[addr] = conn
  140. def unregister_conn(self, conn):
  141. self.read_sockets_expected.remove(conn)
  142. addr = self.conn_to_addr.get(conn, False)
  143. if addr:
  144. del self.addr_to_conn[addr]
  145. del self.conn_to_addr[conn]
  146. def register_handler(self, handler, conn):
  147. self.connection_handler.append(handler)
  148. self.conn_to_handler[conn] = handler
  149. self.handler_to_conn[handler] = conn
  150. def unregister_handler(self, handler, conn):
  151. self.connection_handler.remove(handler)
  152. del self.conn_to_handler[conn]
  153. del self.handler_to_conn[handler]
  154. def setup(self):
  155. lserver.info("setting up socket")
  156. if self.af_family == "AF_INET":
  157. self.socket.bind((self.host, self.port))
  158. elif self.af_family == "AF_UNIX":
  159. if os.path.exists(self.host):
  160. lserver.debug("file already exists")
  161. lserver.debug("attempting to remove it")
  162. os.remove(self.host)
  163. self.socket.bind(self.host)
  164. self.socket.listen(self.max_allowed_clients)
  165. # self.socket.settimeout(1)
  166. def run(self):
  167. self.setup()
  168. lserver.debug("entering main loop")
  169. while ( not self.exit_event ):
  170. # lserver.debug(self.read_sockets_expected)
  171. # lserver.debug(self.write_sockets_expected)
  172. # lserver.debug(self.exc_sockets_expected)
  173. read_sockets_confirmed, \
  174. write_sockets_confirmed, \
  175. exc_sockets_confirmed \
  176. = select.select(self.read_sockets_expected,
  177. self.write_sockets_expected,
  178. self.exc_sockets_expected,
  179. )
  180. for s in read_sockets_confirmed:
  181. socket_handler = self.conn_to_handler.get(s, None)
  182. if ( s == self.socket ):
  183. lserver.debug("handling new client")
  184. conn, addr = self.socket.accept()
  185. handler = self.make_handler(conn, addr)
  186. self.register_conn(conn, addr)
  187. self.register_handler(handler, conn)
  188. elif ( socket_handler
  189. and (socket_handler in self.connection_handler) ):
  190. lserver.debug("handling client connection")
  191. try:
  192. if not socket_handler.recv():
  193. lserver.info("connection is broken, closing socket and removing it")
  194. self.unregister_handler(socket_handler, s)
  195. self.unregister_conn(conn)
  196. socket_handler.close()
  197. except Exception as e:
  198. lserver.error(e, exc_info=True)
  199. else:
  200. lserver.debug("else!")
  201. lserver.debug(socket_handler)
  202. time.sleep(1)
  203. def cleanup(self):
  204. pass
  205. class Client(threading.Thread):
  206. """docstring for Client"""
  207. is_connecting = False
  208. is_connected = False
  209. status = "uninitialized"
  210. def __init__(self,
  211. host,
  212. port=None,
  213. af_family="AF_INET",
  214. handle_data_func=None,
  215. error_handler=None,
  216. block_size=1024,
  217. ):
  218. self.super_class = super(Client, self)
  219. self.super_class.__init__()
  220. self.name = "Client"
  221. self.exit_event = False
  222. self.host = host
  223. self.port = port
  224. self.af_family = af_family
  225. self.block_size = block_size
  226. self.handle_data_func = handle_data_func
  227. self.is_connected = False
  228. self.error_handler = error_handler
  229. # self.socket = self.make_socket()
  230. self.socket = None
  231. self.status = "disconnected"
  232. def connect(self):
  233. self.status = "connecting"
  234. self.socket = self.make_socket()
  235. lclient.info(
  236. "connecting to socket '{}' of type {}".format(
  237. self.host,
  238. self.af_family
  239. )
  240. )
  241. try:
  242. if self.af_family == "AF_INET":
  243. self.socket.connect((self.host, self.port))
  244. elif self.af_family == "AF_UNIX":
  245. if os.path.exists(self.host):
  246. self.socket.connect(self.host)
  247. else:
  248. lclient.warn("File not found. Aborting.")
  249. return
  250. self.is_connected = True
  251. self.status = "connected"
  252. lclient.info("connected")
  253. return True
  254. except Exception as e:
  255. lclient.debug(e, exc_info=True)
  256. if type(e) is ConnectionRefusedError:
  257. lclient.info("failed to connect to socket '{}'".format(self.host))
  258. self.disconnect()
  259. return False
  260. def disconnect(self):
  261. lclient.info("disconnecting from socket '{}'".format(self.host))
  262. self.is_connected = False
  263. self.status = "disconnected"
  264. if self.socket:
  265. try:
  266. self.socket.shutdown(socket.SHUT_RDWR)
  267. except Exception as e:
  268. lclient.error(e)
  269. try:
  270. self.socket.close()
  271. except Exception as e:
  272. lclient.error("error occured while closing the socket, " +
  273. "maybe it is already closed",exc_info=e)
  274. del self.socket
  275. self.socket = None
  276. def handle_data(self, data_received):
  277. data_decoded = data_received.decode("utf-8")
  278. lcchat.info("Server: "+data_decoded)
  279. if self.handle_data_func:
  280. try:
  281. self.handle_data_func(data_decoded)
  282. except Exception as e:
  283. lclient.error(e, exc_info=True)
  284. def is_running(self):
  285. return (self in threading.enumerate())
  286. def make_socket(self):
  287. lclient.info("creating a {} socket".format(self.af_family))
  288. if self.af_family == "AF_INET":
  289. s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  290. elif self.af_family == "AF_UNIX":
  291. s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
  292. else:
  293. raise ValueError(
  294. "AF_FAMILY '{}' not supported!".format(
  295. self.af_family
  296. )
  297. )
  298. return s
  299. def main_loop(self):
  300. lclient.debug("starting main loop")
  301. while ( not self.exit_event ):
  302. if not self.status in ["connected"]:
  303. time.sleep(0.1)
  304. continue
  305. # print(0)
  306. read_confirmed, write_confirmed, exc_confirmed \
  307. = select.select(
  308. [self.socket],
  309. [],
  310. [self.socket],
  311. 1
  312. )
  313. if self.socket in exc_confirmed:
  314. self.is_connected = False
  315. lclient.warning("socket is expected to corrupt, exiting")
  316. self.disconnect()
  317. # self.stop()
  318. break
  319. elif self.socket in read_confirmed:
  320. try:
  321. data_received = self.read_from_socket()
  322. if data_received == b'':
  323. lclient.info("connection is broken, closing socket exiting")
  324. self.disconnect()
  325. # self.stop()
  326. # break
  327. else:
  328. try:
  329. self.handle_data(data_received)
  330. except Exception as e:
  331. lserver.error(
  332. "Error while handling data",
  333. exc_info=e
  334. )
  335. except Exception as e:
  336. lclient.error(e, exc_info=True)
  337. if type(e) is OSError:
  338. self.is_connected = False
  339. lclient.warn("connection broken, exiting")
  340. self.disconnect()
  341. # self.stop()
  342. # break
  343. else:
  344. raise
  345. else:
  346. time.sleep(0.1)
  347. def read_from_socket(self):
  348. data_received = self.socket.recv(self.block_size)
  349. return data_received
  350. def run(self):
  351. # self.connect()
  352. if self.error_handler:
  353. self.error_handler(self.main_loop)
  354. else:
  355. self.main_loop()
  356. def send(self, msg):
  357. msg = msg.rstrip()
  358. msg_encoded = bytes(msg+"\r\n", "utf-8")
  359. try:
  360. lcchat.info("Client: "+msg)
  361. self.socket.send(msg_encoded)
  362. except Exception as e:
  363. self.is_connected = False
  364. lclient.error(e, exc_info=True)
  365. self.status = "shutdown"
  366. def setup(self):
  367. pass
  368. def stop(self,reason=None):
  369. self.disconnect()
  370. self.exit_event = True
  371. if reason:
  372. print(reason)
  373. class AsyncClient(object):
  374. """docstring for Client"""
  375. is_connecting = False
  376. is_connected = False
  377. status = "uninitialized"
  378. def __init__(self,
  379. host,
  380. port=None,
  381. af_family="AF_INET",
  382. handle_data_func=None,
  383. error_handler=None,
  384. block_size=1024,
  385. ):
  386. self.super_class = super(AsyncClient, self)
  387. self.super_class.__init__()
  388. self.name = "Client"
  389. self.exit_event = False
  390. self.host = host
  391. self.port = port
  392. self.af_family = af_family
  393. self.block_size = block_size
  394. self.handle_data_func = handle_data_func
  395. self.is_connected = False
  396. self.error_handler = error_handler
  397. self.socket = None
  398. self.status = "disconnected"
  399. def connect(self):
  400. self.status = "connecting"
  401. self.socket = self.make_socket()
  402. lclient.info("connecting to socket '{}' of type {}".format(
  403. self.host,self.af_family))
  404. try:
  405. if self.af_family == "AF_INET":
  406. self.socket.connect((self.host, self.port))
  407. elif self.af_family == "AF_UNIX":
  408. self.socket.connect(self.host)
  409. # if os.path.exists(self.host):
  410. # pass
  411. # else:
  412. # lclient.warn("File not found. Aborting.")
  413. # return
  414. self.is_connected = True
  415. self.status = "connected"
  416. lclient.info("connected")
  417. return True
  418. except Exception as e:
  419. lclient.debug(e, exc_info=True)
  420. if type(e) is ConnectionRefusedError:
  421. lclient.info("failed to connect to socket '{}'".format(self.host))
  422. self.disconnect()
  423. return False
  424. def disconnect(self):
  425. lclient.info("disconnecting from socket '{}'".format(self.host))
  426. self.is_connected = False
  427. self.status = "disconnected"
  428. if self.socket:
  429. try:
  430. self.socket.shutdown(socket.SHUT_RDWR)
  431. except Exception as e:
  432. lclient.error(e)
  433. try:
  434. self.socket.close()
  435. except Exception as e:
  436. lclient.error("error occured while closing the socket, " +
  437. "maybe it is already closed",exc_info=e)
  438. del self.socket
  439. self.socket = None
  440. def handle_data(self, data_received):
  441. data_decoded = data_received.decode("utf-8")
  442. lcchat.info("Server: "+data_decoded)
  443. if self.handle_data_func:
  444. try:
  445. self.handle_data_func(data_decoded)
  446. except Exception as e:
  447. lclient.error(e, exc_info=True)
  448. def is_running(self):
  449. return (self in threading.enumerate())
  450. def make_socket(self):
  451. lclient.info("creating a {} socket".format(self.af_family))
  452. if self.af_family == "AF_INET":
  453. s = trio.socket.socket(socket.AF_INET,socket.SOCK_STREAM)
  454. elif self.af_family == "AF_UNIX":
  455. s = trio.socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
  456. else:
  457. raise ValueError(
  458. "AF_FAMILY '{}' not supported!".format(
  459. self.af_family
  460. )
  461. )
  462. return s
  463. def main_loop(self):
  464. lclient.debug("starting main loop")
  465. while ( not self.exit_event ):
  466. if not self.status in ["connected"]:
  467. time.sleep(0.1)
  468. continue
  469. # print(0)
  470. read_confirmed, write_confirmed, exc_confirmed \
  471. = select.select(
  472. [self.socket],
  473. [],
  474. [self.socket],
  475. 1
  476. )
  477. if self.socket in exc_confirmed:
  478. self.is_connected = False
  479. lclient.warning("socket is expected to corrupt, exiting")
  480. self.disconnect()
  481. # self.stop()
  482. break
  483. elif self.socket in read_confirmed:
  484. try:
  485. data_received = self.read_from_socket()
  486. if data_received == b'':
  487. lclient.info("connection is broken, closing socket exiting")
  488. self.disconnect()
  489. # self.stop()
  490. # break
  491. else:
  492. self.handle_data(data_received)
  493. except Exception as e:
  494. lclient.error(e, exc_info=True)
  495. if type(e) is OSError:
  496. self.is_connected = False
  497. lclient.warn("connection broken, exiting")
  498. self.disconnect()
  499. # self.stop()
  500. # break
  501. else:
  502. raise
  503. else:
  504. time.sleep(0.1)
  505. def read_from_socket(self):
  506. data_received = self.socket.recv(self.block_size)
  507. return data_received
  508. def run(self,connect=False):
  509. if connect:
  510. self.connect()
  511. if self.error_handler:
  512. self.error_handler(self.main_loop)
  513. else:
  514. self.main_loop()
  515. def send(self, msg):
  516. msg = msg.rstrip()
  517. msg_encoded = bytes(msg+"\r\n", "utf-8")
  518. try:
  519. lcchat.info("Client: "+msg)
  520. self.socket.send(msg_encoded)
  521. except Exception as e:
  522. self.is_connected = False
  523. lclient.error(e, exc_info=True)
  524. self.status = "shutdown"
  525. def setup(self):
  526. pass
  527. def stop(self,reason=None):
  528. self.disconnect()
  529. self.exit_event = True
  530. if reason:
  531. print(reason)
  532. #