|
@@ -24,6 +24,7 @@ import traceback
|
|
|
import blinker
|
|
|
import curio
|
|
|
import digilib.network
|
|
|
+import digilib.misc
|
|
|
|
|
|
log = logging.getLogger(__name__+"")
|
|
|
lgpio = logging.getLogger(__name__+".gpio")
|
|
@@ -84,25 +85,6 @@ class GPIOWrapper(object):
|
|
|
lgpio.debug("no gpio module")
|
|
|
lgpio.debug("gpio module: {}".format(self.gpio))
|
|
|
|
|
|
- def parse_to_int_list(self,parse,*args):
|
|
|
- if type(parse) is list:
|
|
|
- return parse
|
|
|
- elif type(parse) is int:
|
|
|
- return [parse]
|
|
|
- elif type(parse) is bool:
|
|
|
- return [int(parse)]
|
|
|
- elif type(parse) is not str:
|
|
|
- raise ValueError("Failed to parse value {}".format(parse))
|
|
|
- if parse.lower() in ["true","high"]:
|
|
|
- return [1]
|
|
|
- elif parse.lower() in ["false","low"]:
|
|
|
- return [0]
|
|
|
- parse = parse.split(".")
|
|
|
- for part in parse.copy():
|
|
|
- parse.remove(part)
|
|
|
- parse.append(int(part))
|
|
|
- return parse
|
|
|
-
|
|
|
def read(self,pin,*args):
|
|
|
if self.gpio:
|
|
|
state = self.gpio.read(pin)
|
|
@@ -130,23 +112,25 @@ class PinBase(object):
|
|
|
_gpio.setup(self.pin_number,_gpio.OUT)
|
|
|
|
|
|
def output(self,value):
|
|
|
- [value] = _gpio.parse_to_int_list(value)
|
|
|
+ [value] = digilib.misc.parse_to_int_list(value)
|
|
|
self.value = value
|
|
|
_gpio.output(self.pin_number,value)
|
|
|
|
|
|
def read(self):
|
|
|
- value = _gpio.ead(self.pin_number,value)
|
|
|
+ value = _gpio.read(self.pin_number,value)
|
|
|
return value
|
|
|
|
|
|
class DigitalPin(PinBase):
|
|
|
value_high = True
|
|
|
value_low = False
|
|
|
+
|
|
|
def __init__(self,pin_number,mode):
|
|
|
super(DigitalPin,self).__init__(pin_number,mode)
|
|
|
|
|
|
class AnalogPin(PinBase):
|
|
|
value_high = 1
|
|
|
value_low = 0
|
|
|
+
|
|
|
def __init__(self,pin_number):
|
|
|
super(AnalogPin,self).__init__(pin_number)
|
|
|
|
|
@@ -156,6 +140,7 @@ class PinControllerBase(object):
|
|
|
|
|
|
"""
|
|
|
pins = []
|
|
|
+
|
|
|
def __init__(self,pins):
|
|
|
super(PinControllerBase, self).__init__()
|
|
|
self.pins.extend(pins)
|
|
@@ -170,6 +155,7 @@ class PinAPIBase(object):
|
|
|
PinController.
|
|
|
"""
|
|
|
controllers = []
|
|
|
+
|
|
|
def __init__(self):
|
|
|
super(PinAPIBase, self).__init__()
|
|
|
|
|
@@ -179,6 +165,7 @@ class PCEngine(PinControllerBase):
|
|
|
speed = 0
|
|
|
turn_on_speed = 1
|
|
|
is_on = False
|
|
|
+
|
|
|
def __init__(self,pin_on_off,pin_analog):
|
|
|
super(PCEngine, self).__init__([pin_on_off,pin_analog])
|
|
|
# self.pins.append(pin_on_off)
|
|
@@ -191,9 +178,11 @@ class PCEngine(PinControllerBase):
|
|
|
# gpio.setup(self.pin_analog,_gpio.OUT)
|
|
|
self.set_speed(1)
|
|
|
self.set_state(1)
|
|
|
+
|
|
|
def set_speed(self,speed):
|
|
|
self.pin_analog.output(speed)
|
|
|
self.speed = speed
|
|
|
+
|
|
|
def set_state(self,state):
|
|
|
"""state is integer and specifies if the engine should be turned on (1) or turned off (0) """
|
|
|
if state == self.is_on:
|
|
@@ -206,17 +195,16 @@ class EnginesController(PinAPIBase):
|
|
|
engine_left = None
|
|
|
engine_right = None
|
|
|
action_in_process = False
|
|
|
+
|
|
|
def __init__(self,left,right):
|
|
|
super(EnginesController,self).__init__()
|
|
|
self.engine_right = self.make_engine(*right)
|
|
|
self.engine_left = self.make_engine(*left)
|
|
|
- def __enter__(self):
|
|
|
- if self.action_in_process:
|
|
|
- lgpio.debug("action already in process, not entering")
|
|
|
- return False
|
|
|
+
|
|
|
def make_engine(self,*args):
|
|
|
return digilib.pin.PCEngine(*args)
|
|
|
- def set_engine_state(self,args=[],command=None,respond=None):
|
|
|
+
|
|
|
+ def set_engine_state(self,args=[],command=None,respond=None):
|
|
|
if len(args) != 2:
|
|
|
respond("missing argument(s): <engine> <state>")
|
|
|
return
|
|
@@ -227,6 +215,7 @@ class EnginesController(PinAPIBase):
|
|
|
self.engine_left.set_state(state)
|
|
|
else:
|
|
|
respond("unknown engine. use 'left' or 'right'")
|
|
|
+
|
|
|
def set_engine_speed(self,args=[],command=None,respond=None):
|
|
|
if len(args) != 2:
|
|
|
respond("missing argument(s): <engine> <speed>")
|
|
@@ -238,6 +227,7 @@ class EnginesController(PinAPIBase):
|
|
|
self.engine_left.set_speed(speed)
|
|
|
else:
|
|
|
respond("unknown engine. use 'left' or 'right'")
|
|
|
+
|
|
|
async def turn(self,args=[],command=None,respond=None):
|
|
|
if len(args) != 1:
|
|
|
respond("one missing argument: direction")
|
|
@@ -269,13 +259,17 @@ class EnginesController(PinAPIBase):
|
|
|
|
|
|
|
|
|
class LED(DigitalPin):
|
|
|
+
|
|
|
def __init__(self,pin):
|
|
|
super(DigitalPin,self).__init__(pin,_gpio.OUT)
|
|
|
self.on()
|
|
|
+
|
|
|
def on(self,args=[],command=None,respond=None):
|
|
|
self.output(True)
|
|
|
+
|
|
|
def off(self,args=[],command=None,respond=None):
|
|
|
self.output(False)
|
|
|
+
|
|
|
def set(self,args=[],command=None,respond=None):
|
|
|
if len(args) != 1:
|
|
|
respond("one missing argument: state")
|
|
@@ -283,48 +277,57 @@ class LED(DigitalPin):
|
|
|
[state] = args
|
|
|
self.output(state)
|
|
|
|
|
|
-class LEDRGB(PinControllerBase):
|
|
|
+
|
|
|
+class StatusLED(PinControllerBase):
|
|
|
+
|
|
|
def __init__(self,pin_red,pin_green):
|
|
|
- super(LEDRGB,self).__init__([pin_red,pin_green])
|
|
|
+ super(StatusLED,self).__init__([pin_red,pin_green])
|
|
|
self.pin_red = DigitalPin(pin_red,_gpio.OUT)
|
|
|
self.pin_green = DigitalPin(pin_green,_gpio.OUT)
|
|
|
- self.red()
|
|
|
+ self.green()
|
|
|
+
|
|
|
def red(self,args=[],command=None,respond=None):
|
|
|
if len(args) > 1:
|
|
|
- respond("{} takes one optinal agument: <state>".foramt(command))
|
|
|
+ respond(ERROR_TAKES_ARGUMENTS.format(
|
|
|
+ command,"one","optional","<state>"))
|
|
|
return
|
|
|
elif len(args) == 1:
|
|
|
- state = int(*args)
|
|
|
+ state = digilib.misc.parse_to_int_list(*args)
|
|
|
else:
|
|
|
state = 1
|
|
|
self.pin_red.output(state)
|
|
|
+ self.pin_green.output(int(not state))
|
|
|
+
|
|
|
def green(self,args=[],command=None,respond=None):
|
|
|
if len(args) > 1:
|
|
|
- respond("{} takes one optinal agument: <state>".foramt(command))
|
|
|
+ respond(ERROR_TAKES_ARGUMENTS.format(
|
|
|
+ command,"one","optional","<state>"))
|
|
|
return
|
|
|
elif len(args) == 1:
|
|
|
state = int(*args)
|
|
|
else:
|
|
|
state = 1
|
|
|
self.pin_green.output(state)
|
|
|
+ self.pin_red.output(int(not state))
|
|
|
|
|
|
class DebugPinController(PinControllerBase):
|
|
|
+
|
|
|
def output(self,args=[],command=None,respond=None):
|
|
|
if len(args) != 2:
|
|
|
respond(ERROR_TAKES_ARGUMENTS.format(
|
|
|
command, "two", "positional", "<name>"))
|
|
|
return False
|
|
|
- pins = _gpio.arse_to_int_list(args[0])
|
|
|
- [state] = _gpio.arse_to_int_list(args[1])
|
|
|
+ pins = digilib.misc.parse_to_int_list(args[0])
|
|
|
+ [state] = digilib.misc.parse_to_int_list(args[1])
|
|
|
gpio.output(pins,state)
|
|
|
def read(self,args=[],command=None,respond=None):
|
|
|
if len(args) != 2:
|
|
|
respond(ERROR_TAKES_ARGUMENTS.format(
|
|
|
command, "two", "positional", "<name>"))
|
|
|
return False
|
|
|
- pins = _gpio.arse_to_int_list(args[0])
|
|
|
- [state] = _gpio.arse_to_int_list(args[1])
|
|
|
- return _gpio.ead(pins,state)
|
|
|
+ pins = digilib.misc.parse_to_int_list(args[0])
|
|
|
+ [state] = digilib.misc.parse_to_int_list(args[1])
|
|
|
+ return _gpio.read(pins,state)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
pass
|