123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319 |
- # Copyright 2017 Digital
- #
- # This file is part of DigiLib.
- #
- # DigiLib is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # DigiLib is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with DigiLib. If not, see <http://www.gnu.org/licen
- import logging
- log = logging.getLogger(__name__+"")
- lctrl = logging.getLogger(__name__+".ctrl")
- import time
- import curio
- import digilib.network
- import digilib.misc
- import digilib.gpio.wrapper
- class ControllerBase(object):
- """
- ControllerBase is the baseclass for Controller. All collectors need to inherit from CollectorBase or provide the same methods.
- A collector collects information form sensors and puts them in a pipe, so the CtrlManager can access it. The minutely, hourly and daily methods are easy to use but there execution time depends on when the core was started. use curio's execute_at feature to execute a function at a specific time.
- Parameters
- ----------
- """
- def __init__(self):
- """
- during initilazion, a Controller should register its methods.
- """
- super().__init__()
- async def collect_loadcell_data(self):
- """
- This method collects data from a sensor.
- """
- async def daily(self):
- """
- This method is called once every day by the core.
- """
- async def hourly(self):
- """
- This method is called once every hour by the core.
- """
- async def main_loop(self):
- """
- This is the main loop of the Controller class.
- """
- async def minutely(self):
- """
- This method is called once every minute by the core.
- """
- async def on_startup(self):
- """
- This method is called by the core when it starts. This is a good entry point for a Controller class, however the main_loop should be in a different method wich can be called here.
- """
- # self.main_loop()
- async def on_shutdown(self):
- """
- This method is called by the core when it shuts down.
- """
- class ButtonController(object):
- """
- ButtonController can be used with a hardware push button. It provides events you can register a callback to, join it or test the buttons state.
- .. A collector collects information form sensors and puts them in a pipe, so the CtrlManager can access it. The minutely, hourly and daily methods are easy to use but there execution time depends on when the core was started. use curio's execute_at feature to execute a function at a specific time.
- Parameters
- ----------
- pin_num: int
- the number of the pin wich is connected to the button and to logical HIGH through an appropriate resistor.
- time_short_press: int
- the maximum time the button needs to be pressed to be registered as a short press.
- time_long_press: int
- the maximum time the button needs to be pressed to be registered as a long press. if ``time_short_press`` is greater the long press feature is disabled.
- Attributes
- ----------
- STATE_PRESSED: int
- the value returned from gpio.read if the button is pressed
- STATE_RELEASED: int
- the value returned from gpio.read if the button is released
- """
- def __init__(self,pin_num,time_short_press,time_long_press,):
- """
- """
- super().__init__()
- self.pin_num = pin_num
- self.time_short_press = time_short_press
- self.time_long_press = time_long_press
- digilib.gpio.wrapper.setup(self.pin_num,digilib.gpio.wrapper.OUT)
- async def read_button_state(self):
- """
- This method reads the current state from the button's gpio pin.
- """
- return digilib.gpio.wrapper.read(self.pin_num)
- async def daily(self):
- """
- This method is called once every day by the core.
- """
- async def hourly(self):
- """
- This method is called once every hour by the core.
- """
- async def main_loop(self):
- """
- The main loop executes registered callbacks if the button state was changed, was pressed for ``self.time_short_press`` or ``self.time_long_press``.
- Attributes
- ----------
- prev_state: int
- the state of the button during the last loop.
- time_pressed: int
- the time when the button was pressed, taken from time.time()
- time_released: int
- the time when the button was released, taken from time.time()
- """
- prev_state = None
- time_pressed = 0
- time_released = 0
- try:
- while True:
- state = self.read_button_state()
- if state != prev_state:
- if state = self.STATE_PRESSED:
- # the button was pressed just now
- time_pressed = time.time()
- # TODO execute registered methods
- else:
- # the button was released just now
- time_released = time.time()
- # TODO execute registered methods
- if ( time_released - time_pressed
- >= self.time_short_press ):
- # the button was pressed for a short time.
- # TODO execute registered methods
- pass
- elif ( time_released - time_pressed
- >= self.time_long_press ):
- # the button was pressed for a short time.
- # TODO execute registered methods
- pass
- prev_state = state
- await curio.sleep(0.1)
- async def minutely(self):
- """
- This method is called once every minute by the core.
- """
- async def on_startup(self):
- """
- This method is called by the core when it starts. This is a good entry point for a Controller class, however the main_loop should be in a different method wich can be called here.
- """
- # self.main_loop()
- async def on_shutdown(self):
- """
- This method is called by the core when it shuts down.
- """
- class LED(ControllerBase):
- """
- Controllerbase controlls a normal LED.
- Parameters
- ----------
- pin_num: int
- number of the led's pin
- Attributes
- ----------
- lock: threading.Lock
- The lock used to protect gpio operations.
- """
- lock = None
- def __init__(self,pin_num):
- super().__init__()
- self.pin = pin.DigitalPin(pin_num,_gpio.OUT)
- self.lock = threading.Lock()
- self.on()
- async def on(self,args=[],command=None,respond=None):
- if self.lock.locked():
- response("This LED is already in use")
- return
- with self.lock:
- self.write(True)
- async def off(self,args=[],command=None,respond=None):
- if self.lock.locked():
- respond("This LED is already in use")
- return
- with self.lock:
- self.write(False)
- async def set(self,args=[],command=None,respond=None):
- if len(args) != 1:
- respond("one missing argument: state")
- return
- [state] = args
- if self.lock.locked():
- response("This LED is already in use")
- return
- with self.lock:
- self.write(state)
- class StatusLED(PinControllerBase):
- def __init__(self,pin_red,pin_green):
- super(StatusLED,self).__init__([pin_red,pin_green])
- self.pin_red = DigitalPin(pin_red,_gpio.OUT)
- self.pin_green = DigitalPin(pin_green,_gpio.OUT)
- self.green()
- def red(self,args=[],command=None,respond=None):
- if len(args) > 1:
- respond(errmsg.args(command,"one","optional","<state>"))
- return
- elif len(args) == 1:
- state = digilib.misc.parse_to_int_list(*args)
- else:
- state = 1
- self.pin_red.write(state)
- self.pin_green.write(int(not state))
- def green(self,args=[],command=None,respond=None):
- if len(args) > 1:
- respond(ERROR_TAKES_ARGUMENTS.format(
- command,"one","optional","<state>"))
- return
- elif len(args) == 1:
- state = int(*args)
- else:
- state = 1
- self.pin_green.write(state)
- self.pin_red.write(int(not state))
- class DebugPinController(PinControllerBase):
- def write(self,args=[],command=None,respond=None):
- if len(args) != 2:
- respond(ERROR_TAKES_ARGUMENTS.format(
- command, "two", "positional", "<name>"))
- return False
- pins = digilib.misc.parse_to_int_list(args[0])
- [state] = digilib.misc.parse_to_int_list(args[1])
- _gpio.write(pins,state)
- def read(self,args=[],command=None,respond=None):
- if len(args) != 2:
- respond(ERROR_TAKES_ARGUMENTS.format(
- command, "two", "positional", "<name>"))
- return False
- pins = digilib.misc.parse_to_int_list(args[0])
- [state] = digilib.misc.parse_to_int_list(args[1])
- rv = _gpio.read(pins,state)
- lgpio.debug(rv)
- respond(str(rv))
- def raise_exc(self,args=[],command=None,respond=None):
- raise Exception("Test Exception")
- async def araise_exc(self,args=[],command=None,respond=None):
- state = digilib.misc.parse_to_int_list("1,2,3,4")
- a = 1+2
- raise Exception("Test Async Exception")
- #
|