123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- import curio
- import logging
- import logging.handlers
- import digilib.network
- import os
- import queue
- import select
- import socket
- import sys
- import threading
- import time
- import traceback
- import digilib.pin
- import digilib.network
- import beewatch.pinapi
- log = logging.getLogger(__name__+"")
- lapi = logging.getLogger(__name__+".api")
- lchat = logging.getLogger(__name__+".chat")
- def log_something():
- lapi.info("logging works")
- class BeeWatchServer(digilib.network.Server):
- """docstring for Server."""
- def __init__(self,*args,**kwargs):
- super(BeeWatchServer, self).__init__(
- *args,
- **kwargs,
- handler=beewatch.server.ConnHandlerBeeWatch,
- )
- class ConnHandlerBeeWatch(digilib.network.ConnHandlerBase):
- def __init__(self, socket, addr, server):
- super(ConnHandlerBeeWatch, self).__init__(socket, addr, server)
- lchat.info('hey')
- self.engine_controller = digilib.pin.EnginesController(
- left=[4,17],
- right=[27,22],
- )
- self.text_to_func = {
- "turn":self.engine_controller.turn
- }
- def call(self,func_name,args,respond_method,kwargs):
- ttf = self.text_to_func
- if func_name in ttf.keys():
- try:
- task = curio.run(
- ttf[func_name](
-
- *args,
- respond_method=respond_method,
- **kwargs,
- )
- )
- except:
- tb = traceback.format_exc()
- print(tb)
- respond_method(tb)
- else:
- respond_method("Unknown command")
- def welcome_client(self):
- self.send("this is the server speaking, hello new client!")
- def handle(self, data_decoded):
- data_decoded=data_decoded.strip()
- lchat.info("Client:"+data_decoded)
- if data_decoded.find(" ") != -1:
- cmd = data_decoded.split()[0]
- args = data_decoded.split()[1:]
- else:
- cmd=data_decoded
- args = ()
- kwargs = {}
- self.call(
- cmd,
- respond_method=self.respond,
- args=args,
- kwargs={},
- )
- def do(self,func_num,*args,respond_method):
- arg_to_func = {
- "1":self.bee_api.do_something_1,
- "2":self.bee_api.do_something_2,
- "3":self.bee_api.do_something_3,
- }
- arg_to_func[func_num](*args,respond_method=respond_method)
- def echo(self,*args,**kwargs):
- lchat.info(str(args))
- lchat.info(str(kwargs))
- def speedcheck(self,respond_method):
- left_speed=str(self.bee_api.c.movement_control.get_speed("left"))
- right_speed=str(self.bee_api.c.movement_control.get_speed("right"))
- result = "left: "+left_speed+", right: "+right_speed
- lchat.info(result)
- self.respond(result)
- def respond(self,text,*args):
- self.send(text)
|