|
@@ -0,0 +1,319 @@
|
|
|
|
+# Copyright 2017 Digital
|
|
|
|
+#
|
|
|
|
+# This file is part of DigiLib.
|
|
|
|
+#
|
|
|
|
+# DigiLib is free software: you can redistribute it and/or modify
|
|
|
|
+# it under the terms of the GNU General Public License as published by
|
|
|
|
+# the Free Software Foundation, either version 3 of the License, or
|
|
|
|
+# (at your option) any later version.
|
|
|
|
+#
|
|
|
|
+# DigiLib is distributed in the hope that it will be useful,
|
|
|
|
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
|
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
|
+# GNU General Public License for more details.
|
|
|
|
+#
|
|
|
|
+# You should have received a copy of the GNU General Public License
|
|
|
|
+# along with DigiLib. If not, see <http://www.gnu.org/licen
|
|
|
|
+
|
|
|
|
+import logging
|
|
|
|
+log = logging.getLogger(__name__+"")
|
|
|
|
+lctrl = logging.getLogger(__name__+".ctrl")
|
|
|
|
+
|
|
|
|
+import time
|
|
|
|
+
|
|
|
|
+import curio
|
|
|
|
+import digilib.network
|
|
|
|
+import digilib.misc
|
|
|
|
+import digilib.gpio.wrapper
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class ControllerBase(object):
|
|
|
|
+ """
|
|
|
|
+ ControllerBase is the baseclass for Controller. All collectors need to inherit from CollectorBase or provide the same methods.
|
|
|
|
+
|
|
|
|
+ A collector collects information form sensors and puts them in a pipe, so the CtrlManager can access it. The minutely, hourly and daily methods are easy to use but there execution time depends on when the core was started. use curio's execute_at feature to execute a function at a specific time.
|
|
|
|
+
|
|
|
|
+ Parameters
|
|
|
|
+ ----------
|
|
|
|
+ """
|
|
|
|
+ def __init__(self):
|
|
|
|
+ """
|
|
|
|
+ during initilazion, a Controller should register its methods.
|
|
|
|
+ """
|
|
|
|
+ super().__init__()
|
|
|
|
+
|
|
|
|
+ async def collect_loadcell_data(self):
|
|
|
|
+ """
|
|
|
|
+ This method collects data from a sensor.
|
|
|
|
+ """
|
|
|
|
+
|
|
|
|
+ async def daily(self):
|
|
|
|
+ """
|
|
|
|
+ This method is called once every day by the core.
|
|
|
|
+ """
|
|
|
|
+
|
|
|
|
+ async def hourly(self):
|
|
|
|
+ """
|
|
|
|
+ This method is called once every hour by the core.
|
|
|
|
+ """
|
|
|
|
+
|
|
|
|
+ async def main_loop(self):
|
|
|
|
+ """
|
|
|
|
+ This is the main loop of the Controller class.
|
|
|
|
+ """
|
|
|
|
+
|
|
|
|
+ async def minutely(self):
|
|
|
|
+ """
|
|
|
|
+ This method is called once every minute by the core.
|
|
|
|
+ """
|
|
|
|
+
|
|
|
|
+ async def on_startup(self):
|
|
|
|
+ """
|
|
|
|
+ This method is called by the core when it starts. This is a good entry point for a Controller class, however the main_loop should be in a different method wich can be called here.
|
|
|
|
+ """
|
|
|
|
+ # self.main_loop()
|
|
|
|
+
|
|
|
|
+ async def on_shutdown(self):
|
|
|
|
+ """
|
|
|
|
+ This method is called by the core when it shuts down.
|
|
|
|
+ """
|
|
|
|
+
|
|
|
|
+class ButtonController(object):
|
|
|
|
+ """
|
|
|
|
+ ButtonController can be used with a hardware push button. It provides events you can register a callback to, join it or test the buttons state.
|
|
|
|
+
|
|
|
|
+ .. A collector collects information form sensors and puts them in a pipe, so the CtrlManager can access it. The minutely, hourly and daily methods are easy to use but there execution time depends on when the core was started. use curio's execute_at feature to execute a function at a specific time.
|
|
|
|
+
|
|
|
|
+ Parameters
|
|
|
|
+ ----------
|
|
|
|
+ pin_num: int
|
|
|
|
+ the number of the pin wich is connected to the button and to logical HIGH through an appropriate resistor.
|
|
|
|
+ time_short_press: int
|
|
|
|
+ the maximum time the button needs to be pressed to be registered as a short press.
|
|
|
|
+ time_long_press: int
|
|
|
|
+ the maximum time the button needs to be pressed to be registered as a long press. if ``time_short_press`` is greater the long press feature is disabled.
|
|
|
|
+
|
|
|
|
+ Attributes
|
|
|
|
+ ----------
|
|
|
|
+ STATE_PRESSED: int
|
|
|
|
+ the value returned from gpio.read if the button is pressed
|
|
|
|
+ STATE_RELEASED: int
|
|
|
|
+ the value returned from gpio.read if the button is released
|
|
|
|
+ """
|
|
|
|
+ def __init__(self,pin_num,time_short_press,time_long_press,):
|
|
|
|
+ """
|
|
|
|
+ """
|
|
|
|
+ super().__init__()
|
|
|
|
+ self.pin_num = pin_num
|
|
|
|
+ self.time_short_press = time_short_press
|
|
|
|
+ self.time_long_press = time_long_press
|
|
|
|
+ digilib.gpio.wrapper.setup(self.pin_num,digilib.gpio.wrapper.OUT)
|
|
|
|
+
|
|
|
|
+ async def read_button_state(self):
|
|
|
|
+ """
|
|
|
|
+ This method reads the current state from the button's gpio pin.
|
|
|
|
+ """
|
|
|
|
+ return digilib.gpio.wrapper.read(self.pin_num)
|
|
|
|
+
|
|
|
|
+ async def daily(self):
|
|
|
|
+ """
|
|
|
|
+ This method is called once every day by the core.
|
|
|
|
+ """
|
|
|
|
+
|
|
|
|
+ async def hourly(self):
|
|
|
|
+ """
|
|
|
|
+ This method is called once every hour by the core.
|
|
|
|
+ """
|
|
|
|
+
|
|
|
|
+ async def main_loop(self):
|
|
|
|
+ """
|
|
|
|
+ The main loop executes registered callbacks if the button state was changed, was pressed for ``self.time_short_press`` or ``self.time_long_press``.
|
|
|
|
+
|
|
|
|
+ Attributes
|
|
|
|
+ ----------
|
|
|
|
+ prev_state: int
|
|
|
|
+ the state of the button during the last loop.
|
|
|
|
+ time_pressed: int
|
|
|
|
+ the time when the button was pressed, taken from time.time()
|
|
|
|
+ time_released: int
|
|
|
|
+ the time when the button was released, taken from time.time()
|
|
|
|
+ """
|
|
|
|
+ prev_state = None
|
|
|
|
+ time_pressed = 0
|
|
|
|
+ time_released = 0
|
|
|
|
+ try:
|
|
|
|
+ while True:
|
|
|
|
+ state = self.read_button_state()
|
|
|
|
+ if state != prev_state:
|
|
|
|
+ if state = self.STATE_PRESSED:
|
|
|
|
+ # the button was pressed just now
|
|
|
|
+ time_pressed = time.time()
|
|
|
|
+ # TODO execute registered methods
|
|
|
|
+ else:
|
|
|
|
+ # the button was released just now
|
|
|
|
+ time_released = time.time()
|
|
|
|
+ # TODO execute registered methods
|
|
|
|
+ if ( time_released - time_pressed
|
|
|
|
+ >= self.time_short_press ):
|
|
|
|
+ # the button was pressed for a short time.
|
|
|
|
+ # TODO execute registered methods
|
|
|
|
+ pass
|
|
|
|
+ elif ( time_released - time_pressed
|
|
|
|
+ >= self.time_long_press ):
|
|
|
|
+ # the button was pressed for a short time.
|
|
|
|
+ # TODO execute registered methods
|
|
|
|
+ pass
|
|
|
|
+ prev_state = state
|
|
|
|
+ await curio.sleep(0.1)
|
|
|
|
+
|
|
|
|
+ async def minutely(self):
|
|
|
|
+ """
|
|
|
|
+ This method is called once every minute by the core.
|
|
|
|
+ """
|
|
|
|
+
|
|
|
|
+ async def on_startup(self):
|
|
|
|
+ """
|
|
|
|
+ This method is called by the core when it starts. This is a good entry point for a Controller class, however the main_loop should be in a different method wich can be called here.
|
|
|
|
+ """
|
|
|
|
+ # self.main_loop()
|
|
|
|
+
|
|
|
|
+ async def on_shutdown(self):
|
|
|
|
+ """
|
|
|
|
+ This method is called by the core when it shuts down.
|
|
|
|
+ """
|
|
|
|
+
|
|
|
|
+class LED(ControllerBase):
|
|
|
|
+ """
|
|
|
|
+ Controllerbase controlls a normal LED.
|
|
|
|
+
|
|
|
|
+ Parameters
|
|
|
|
+ ----------
|
|
|
|
+ pin_num: int
|
|
|
|
+ number of the led's pin
|
|
|
|
+
|
|
|
|
+ Attributes
|
|
|
|
+ ----------
|
|
|
|
+ lock: threading.Lock
|
|
|
|
+ The lock used to protect gpio operations.
|
|
|
|
+ """
|
|
|
|
+ lock = None
|
|
|
|
+
|
|
|
|
+ def __init__(self,pin_num):
|
|
|
|
+ super().__init__()
|
|
|
|
+ self.pin = pin.DigitalPin(pin_num,_gpio.OUT)
|
|
|
|
+ self.lock = threading.Lock()
|
|
|
|
+ self.on()
|
|
|
|
+
|
|
|
|
+ async def on(self,args=[],command=None,respond=None):
|
|
|
|
+ if self.lock.locked():
|
|
|
|
+ response("This LED is already in use")
|
|
|
|
+ return
|
|
|
|
+ with self.lock:
|
|
|
|
+ self.write(True)
|
|
|
|
+
|
|
|
|
+ async def off(self,args=[],command=None,respond=None):
|
|
|
|
+ if self.lock.locked():
|
|
|
|
+ respond("This LED is already in use")
|
|
|
|
+ return
|
|
|
|
+ with self.lock:
|
|
|
|
+ self.write(False)
|
|
|
|
+
|
|
|
|
+ async def set(self,args=[],command=None,respond=None):
|
|
|
|
+ if len(args) != 1:
|
|
|
|
+ respond("one missing argument: state")
|
|
|
|
+ return
|
|
|
|
+ [state] = args
|
|
|
|
+ if self.lock.locked():
|
|
|
|
+ response("This LED is already in use")
|
|
|
|
+ return
|
|
|
|
+ with self.lock:
|
|
|
|
+ self.write(state)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class StatusLED(PinControllerBase):
|
|
|
|
+
|
|
|
|
+ def __init__(self,pin_red,pin_green):
|
|
|
|
+ super(StatusLED,self).__init__([pin_red,pin_green])
|
|
|
|
+ self.pin_red = DigitalPin(pin_red,_gpio.OUT)
|
|
|
|
+ self.pin_green = DigitalPin(pin_green,_gpio.OUT)
|
|
|
|
+ self.green()
|
|
|
|
+
|
|
|
|
+ def red(self,args=[],command=None,respond=None):
|
|
|
|
+ if len(args) > 1:
|
|
|
|
+ respond(errmsg.args(command,"one","optional","<state>"))
|
|
|
|
+ return
|
|
|
|
+ elif len(args) == 1:
|
|
|
|
+ state = digilib.misc.parse_to_int_list(*args)
|
|
|
|
+ else:
|
|
|
|
+ state = 1
|
|
|
|
+ self.pin_red.write(state)
|
|
|
|
+ self.pin_green.write(int(not state))
|
|
|
|
+
|
|
|
|
+ def green(self,args=[],command=None,respond=None):
|
|
|
|
+ if len(args) > 1:
|
|
|
|
+ respond(ERROR_TAKES_ARGUMENTS.format(
|
|
|
|
+ command,"one","optional","<state>"))
|
|
|
|
+ return
|
|
|
|
+ elif len(args) == 1:
|
|
|
|
+ state = int(*args)
|
|
|
|
+ else:
|
|
|
|
+ state = 1
|
|
|
|
+ self.pin_green.write(state)
|
|
|
|
+ self.pin_red.write(int(not state))
|
|
|
|
+
|
|
|
|
+class DebugPinController(PinControllerBase):
|
|
|
|
+
|
|
|
|
+ def write(self,args=[],command=None,respond=None):
|
|
|
|
+ if len(args) != 2:
|
|
|
|
+ respond(ERROR_TAKES_ARGUMENTS.format(
|
|
|
|
+ command, "two", "positional", "<name>"))
|
|
|
|
+ return False
|
|
|
|
+ pins = digilib.misc.parse_to_int_list(args[0])
|
|
|
|
+ [state] = digilib.misc.parse_to_int_list(args[1])
|
|
|
|
+ _gpio.write(pins,state)
|
|
|
|
+
|
|
|
|
+ def read(self,args=[],command=None,respond=None):
|
|
|
|
+ if len(args) != 2:
|
|
|
|
+ respond(ERROR_TAKES_ARGUMENTS.format(
|
|
|
|
+ command, "two", "positional", "<name>"))
|
|
|
|
+ return False
|
|
|
|
+ pins = digilib.misc.parse_to_int_list(args[0])
|
|
|
|
+ [state] = digilib.misc.parse_to_int_list(args[1])
|
|
|
|
+ rv = _gpio.read(pins,state)
|
|
|
|
+ lgpio.debug(rv)
|
|
|
|
+ respond(str(rv))
|
|
|
|
+
|
|
|
|
+ def raise_exc(self,args=[],command=None,respond=None):
|
|
|
|
+ raise Exception("Test Exception")
|
|
|
|
+
|
|
|
|
+ async def araise_exc(self,args=[],command=None,respond=None):
|
|
|
|
+ state = digilib.misc.parse_to_int_list("1,2,3,4")
|
|
|
|
+ a = 1+2
|
|
|
|
+ raise Exception("Test Async Exception")
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+#
|