|
@@ -19,6 +19,8 @@ import logging
|
|
|
log = logging.getLogger(__name__+"")
|
|
|
lctrl = logging.getLogger(__name__+".ctrl")
|
|
|
|
|
|
+import time
|
|
|
+
|
|
|
import curio
|
|
|
import digilib.network
|
|
|
import digilib.misc
|
|
@@ -86,12 +88,25 @@ class ButtonController(object):
|
|
|
----------
|
|
|
pin_num: int
|
|
|
the number of the pin wich is connected to the button and to logical HIGH through an appropriate resistor.
|
|
|
+ time_short_press: int
|
|
|
+ the maximum time the button needs to be pressed to be registered as a short press.
|
|
|
+ time_long_press: int
|
|
|
+ the maximum time the button needs to be pressed to be registered as a long press. if ``time_short_press`` is greater the long press feature is disabled.
|
|
|
+
|
|
|
+ Attributes
|
|
|
+ ----------
|
|
|
+ STATE_PRESSED: int
|
|
|
+ the value returned from gpio.read if the button is pressed
|
|
|
+ STATE_RELEASED: int
|
|
|
+ the value returned from gpio.read if the button is released
|
|
|
"""
|
|
|
- def __init__(self,pin_num):
|
|
|
+ def __init__(self,pin_num,time_short_press,time_long_press,):
|
|
|
"""
|
|
|
"""
|
|
|
super().__init__()
|
|
|
self.pin_num = pin_num
|
|
|
+ self.time_short_press = time_short_press
|
|
|
+ self.time_long_press = time_long_press
|
|
|
digilib.gpio.wrapper.setup(self.pin_num,digilib.gpio.wrapper.OUT)
|
|
|
|
|
|
async def read_button_state(self):
|
|
@@ -112,8 +127,44 @@ class ButtonController(object):
|
|
|
|
|
|
async def main_loop(self):
|
|
|
"""
|
|
|
- This is the main loop of the Controller class.
|
|
|
- """
|
|
|
+ The main loop executes registered callbacks if the button state was changed, was pressed for ``self.time_short_press`` or ``self.time_long_press``.
|
|
|
+
|
|
|
+ Attributes
|
|
|
+ ----------
|
|
|
+ prev_state: int
|
|
|
+ the state of the button during the last loop.
|
|
|
+ time_pressed: int
|
|
|
+ the time when the button was pressed, taken from time.time()
|
|
|
+ time_released: int
|
|
|
+ the time when the button was released, taken from time.time()
|
|
|
+ """
|
|
|
+ prev_state = None
|
|
|
+ time_pressed = 0
|
|
|
+ time_released = 0
|
|
|
+ try:
|
|
|
+ while True:
|
|
|
+ state = self.read_button_state()
|
|
|
+ if state != prev_state:
|
|
|
+ if state = self.STATE_PRESSED:
|
|
|
+ # the button was pressed just now
|
|
|
+ time_pressed = time.time()
|
|
|
+ # TODO execute registered methods
|
|
|
+ else:
|
|
|
+ # the button was released just now
|
|
|
+ time_released = time.time()
|
|
|
+ # TODO execute registered methods
|
|
|
+ if ( time_released - time_pressed
|
|
|
+ >= self.time_short_press ):
|
|
|
+ # the button was pressed for a short time.
|
|
|
+ # TODO execute registered methods
|
|
|
+ pass
|
|
|
+ elif ( time_released - time_pressed
|
|
|
+ >= self.time_long_press ):
|
|
|
+ # the button was pressed for a short time.
|
|
|
+ # TODO execute registered methods
|
|
|
+ pass
|
|
|
+ prev_state = state
|
|
|
+ await curio.sleep(0.1)
|
|
|
|
|
|
async def minutely(self):
|
|
|
"""
|
|
@@ -131,9 +182,114 @@ class ButtonController(object):
|
|
|
This method is called by the core when it shuts down.
|
|
|
"""
|
|
|
|
|
|
+class LED(ControllerBase):
|
|
|
+ """
|
|
|
+ Controllerbase controlls a normal LED.
|
|
|
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ pin_num: int
|
|
|
+ number of the led's pin
|
|
|
|
|
|
+ Attributes
|
|
|
+ ----------
|
|
|
+ lock: threading.Lock
|
|
|
+ The lock used to protect gpio operations.
|
|
|
+ """
|
|
|
+ lock = None
|
|
|
|
|
|
+ def __init__(self,pin_num):
|
|
|
+ super().__init__()
|
|
|
+ self.pin = pin.DigitalPin(pin_num,_gpio.OUT)
|
|
|
+ self.lock = threading.Lock()
|
|
|
+ self.on()
|
|
|
+
|
|
|
+ async def on(self,args=[],command=None,respond=None):
|
|
|
+ if self.lock.locked():
|
|
|
+ response("This LED is already in use")
|
|
|
+ return
|
|
|
+ with self.lock:
|
|
|
+ self.write(True)
|
|
|
+
|
|
|
+ async def off(self,args=[],command=None,respond=None):
|
|
|
+ if self.lock.locked():
|
|
|
+ respond("This LED is already in use")
|
|
|
+ return
|
|
|
+ with self.lock:
|
|
|
+ self.write(False)
|
|
|
+
|
|
|
+ async def set(self,args=[],command=None,respond=None):
|
|
|
+ if len(args) != 1:
|
|
|
+ respond("one missing argument: state")
|
|
|
+ return
|
|
|
+ [state] = args
|
|
|
+ if self.lock.locked():
|
|
|
+ response("This LED is already in use")
|
|
|
+ return
|
|
|
+ with self.lock:
|
|
|
+ self.write(state)
|
|
|
+
|
|
|
+
|
|
|
+class StatusLED(PinControllerBase):
|
|
|
+
|
|
|
+ def __init__(self,pin_red,pin_green):
|
|
|
+ super(StatusLED,self).__init__([pin_red,pin_green])
|
|
|
+ self.pin_red = DigitalPin(pin_red,_gpio.OUT)
|
|
|
+ self.pin_green = DigitalPin(pin_green,_gpio.OUT)
|
|
|
+ self.green()
|
|
|
+
|
|
|
+ def red(self,args=[],command=None,respond=None):
|
|
|
+ if len(args) > 1:
|
|
|
+ respond(errmsg.args(command,"one","optional","<state>"))
|
|
|
+ return
|
|
|
+ elif len(args) == 1:
|
|
|
+ state = digilib.misc.parse_to_int_list(*args)
|
|
|
+ else:
|
|
|
+ state = 1
|
|
|
+ self.pin_red.write(state)
|
|
|
+ self.pin_green.write(int(not state))
|
|
|
+
|
|
|
+ def green(self,args=[],command=None,respond=None):
|
|
|
+ if len(args) > 1:
|
|
|
+ respond(ERROR_TAKES_ARGUMENTS.format(
|
|
|
+ command,"one","optional","<state>"))
|
|
|
+ return
|
|
|
+ elif len(args) == 1:
|
|
|
+ state = int(*args)
|
|
|
+ else:
|
|
|
+ state = 1
|
|
|
+ self.pin_green.write(state)
|
|
|
+ self.pin_red.write(int(not state))
|
|
|
+
|
|
|
+class DebugPinController(PinControllerBase):
|
|
|
+
|
|
|
+ def write(self,args=[],command=None,respond=None):
|
|
|
+ if len(args) != 2:
|
|
|
+ respond(ERROR_TAKES_ARGUMENTS.format(
|
|
|
+ command, "two", "positional", "<name>"))
|
|
|
+ return False
|
|
|
+ pins = digilib.misc.parse_to_int_list(args[0])
|
|
|
+ [state] = digilib.misc.parse_to_int_list(args[1])
|
|
|
+ _gpio.write(pins,state)
|
|
|
+
|
|
|
+ def read(self,args=[],command=None,respond=None):
|
|
|
+ if len(args) != 2:
|
|
|
+ respond(ERROR_TAKES_ARGUMENTS.format(
|
|
|
+ command, "two", "positional", "<name>"))
|
|
|
+ return False
|
|
|
+ pins = digilib.misc.parse_to_int_list(args[0])
|
|
|
+ [state] = digilib.misc.parse_to_int_list(args[1])
|
|
|
+ rv = _gpio.read(pins,state)
|
|
|
+ lgpio.debug(rv)
|
|
|
+ respond(str(rv))
|
|
|
+
|
|
|
+ def raise_exc(self,args=[],command=None,respond=None):
|
|
|
+ raise Exception("Test Exception")
|
|
|
+
|
|
|
+ async def araise_exc(self,args=[],command=None,respond=None):
|
|
|
+ state = digilib.misc.parse_to_int_list("1,2,3,4")
|
|
|
+ a = 1+2
|
|
|
+ raise Exception("Test Async Exception")
|
|
|
|
|
|
|
|
|
|