|
@@ -0,0 +1,223 @@
|
|
|
+#!/usr/bin/env python3.5
|
|
|
+# Copyright 2017 Digital
|
|
|
+#
|
|
|
+# This file is part of DigiLib.
|
|
|
+#
|
|
|
+# DigiLib is free software: you can redistribute it and/or modify
|
|
|
+# it under the terms of the GNU General Public License as published by
|
|
|
+# the Free Software Foundation, either version 3 of the License, or
|
|
|
+# (at your option) any later version.
|
|
|
+#
|
|
|
+# DigiLib is distributed in the hope that it will be useful,
|
|
|
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
+# GNU General Public License for more details.
|
|
|
+#
|
|
|
+# You should have received a copy of the GNU General Public License
|
|
|
+# along with DigiLib. If not, see <http://www.gnu.org/licenses/>.
|
|
|
+
|
|
|
+# Python modules
|
|
|
+import atexit
|
|
|
+import logging
|
|
|
+import threading
|
|
|
+import traceback
|
|
|
+# Third party modules
|
|
|
+import curio
|
|
|
+import digilib.network
|
|
|
+import digilib.misc
|
|
|
+
|
|
|
+log = logging.getLogger(__name__+"")
|
|
|
+lgpio = logging.getLogger(__name__+".gpio")
|
|
|
+
|
|
|
+# Error messages
|
|
|
+ERROR_TAKES_ARGUMENTS = "{} takes {} {} argument(s): {}"
|
|
|
+# respond(ERROR_TAKES_ARGUMENTS.format(
|
|
|
+# command, "one", "positional", "<name>"))
|
|
|
+
|
|
|
+_pins_for_cleanup = set()
|
|
|
+_gpio = None
|
|
|
+
|
|
|
+class PinBase(object):
|
|
|
+ """
|
|
|
+ PinBase is the base class for all classes representing a gpio pin
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ pin_number: int
|
|
|
+ number of the pin
|
|
|
+ mode: gpio.OUT or gpio.IN
|
|
|
+ ``gpio.IN`` if the pin is an read pin or ``gpio.OUT`` if the pin is an write pin
|
|
|
+ """
|
|
|
+ pin_number = None
|
|
|
+ def __init__(self,pin_number,mode):
|
|
|
+ super(PinBase,self).__init__()
|
|
|
+ self.pin_number = pin_number
|
|
|
+ _gpio.setup(self.pin_number,_gpio.OUT)
|
|
|
+
|
|
|
+ def write(self,value):
|
|
|
+ [value] = digilib.misc.parse_to_int_list(value)
|
|
|
+ _gpio.write(self.pin_number,value)
|
|
|
+
|
|
|
+ def read(self):
|
|
|
+ value = _gpio.read(self.pin_number,value)
|
|
|
+ return value
|
|
|
+
|
|
|
+class DigitalPin(PinBase):
|
|
|
+ def __init__(self,pin_number,mode):
|
|
|
+ super(DigitalPin,self).__init__(pin_number,mode)
|
|
|
+
|
|
|
+class AnalogPin(PinBase):
|
|
|
+ def __init__(self,pin_number):
|
|
|
+ super(AnalogPin,self).__init__(pin_number)
|
|
|
+
|
|
|
+class ControllerBase(object):
|
|
|
+ """
|
|
|
+ PinControllerBase is the base class for all classes controlling one or more physical devices connected to a gpio header or other controllers
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ super().__init__()
|
|
|
+
|
|
|
+
|
|
|
+class LED(DigitalPin):
|
|
|
+ """
|
|
|
+ Controlls a LED
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ pin: int
|
|
|
+ number of the led's pin
|
|
|
+
|
|
|
+ Attributes
|
|
|
+ ----------
|
|
|
+ lock: threading.Lock
|
|
|
+ The lock used to protect gpio operations.
|
|
|
+ """
|
|
|
+ lock = None
|
|
|
+
|
|
|
+ def __init__(self,pin):
|
|
|
+ super(DigitalPin,self).__init__(pin,_gpio.OUT)
|
|
|
+ self.lock = threading.Lock()
|
|
|
+ self.on()
|
|
|
+
|
|
|
+ async def on(self,args=[],command=None,respond=None):
|
|
|
+ if self.lock.locked():
|
|
|
+ response("This LED is already in use")
|
|
|
+ return
|
|
|
+ with self.lock:
|
|
|
+ self.write(True)
|
|
|
+
|
|
|
+ async def off(self,args=[],command=None,respond=None):
|
|
|
+ if self.lock.locked():
|
|
|
+ respond("This LED is already in use")
|
|
|
+ return
|
|
|
+ with self.lock:
|
|
|
+ self.write(False)
|
|
|
+
|
|
|
+ async def set(self,args=[],command=None,respond=None):
|
|
|
+ if len(args) != 1:
|
|
|
+ respond("one missing argument: state")
|
|
|
+ return
|
|
|
+ [state] = args
|
|
|
+ if self.lock.locked():
|
|
|
+ response("This LED is already in use")
|
|
|
+ return
|
|
|
+ with self.lock:
|
|
|
+ self.write(state)
|
|
|
+
|
|
|
+
|
|
|
+class StatusLED(PinControllerBase):
|
|
|
+
|
|
|
+ def __init__(self,pin_red,pin_green):
|
|
|
+ super(StatusLED,self).__init__([pin_red,pin_green])
|
|
|
+ self.pin_red = DigitalPin(pin_red,_gpio.OUT)
|
|
|
+ self.pin_green = DigitalPin(pin_green,_gpio.OUT)
|
|
|
+ self.green()
|
|
|
+
|
|
|
+ def red(self,args=[],command=None,respond=None):
|
|
|
+ if len(args) > 1:
|
|
|
+ respond(ERROR_TAKES_ARGUMENTS.format(
|
|
|
+ command,"one","optional","<state>"))
|
|
|
+ return
|
|
|
+ elif len(args) == 1:
|
|
|
+ state = digilib.misc.parse_to_int_list(*args)
|
|
|
+ else:
|
|
|
+ state = 1
|
|
|
+ self.pin_red.write(state)
|
|
|
+ self.pin_green.write(int(not state))
|
|
|
+
|
|
|
+ def green(self,args=[],command=None,respond=None):
|
|
|
+ if len(args) > 1:
|
|
|
+ respond(ERROR_TAKES_ARGUMENTS.format(
|
|
|
+ command,"one","optional","<state>"))
|
|
|
+ return
|
|
|
+ elif len(args) == 1:
|
|
|
+ state = int(*args)
|
|
|
+ else:
|
|
|
+ state = 1
|
|
|
+ self.pin_green.write(state)
|
|
|
+ self.pin_red.write(int(not state))
|
|
|
+
|
|
|
+class DebugPinController(PinControllerBase):
|
|
|
+
|
|
|
+ def write(self,args=[],command=None,respond=None):
|
|
|
+ if len(args) != 2:
|
|
|
+ respond(ERROR_TAKES_ARGUMENTS.format(
|
|
|
+ command, "two", "positional", "<name>"))
|
|
|
+ return False
|
|
|
+ pins = digilib.misc.parse_to_int_list(args[0])
|
|
|
+ [state] = digilib.misc.parse_to_int_list(args[1])
|
|
|
+ _gpio.write(pins,state)
|
|
|
+
|
|
|
+ def read(self,args=[],command=None,respond=None):
|
|
|
+ if len(args) != 2:
|
|
|
+ respond(ERROR_TAKES_ARGUMENTS.format(
|
|
|
+ command, "two", "positional", "<name>"))
|
|
|
+ return False
|
|
|
+ pins = digilib.misc.parse_to_int_list(args[0])
|
|
|
+ [state] = digilib.misc.parse_to_int_list(args[1])
|
|
|
+ rv = _gpio.read(pins,state)
|
|
|
+ lgpio.debug(rv)
|
|
|
+ respond(str(rv))
|
|
|
+
|
|
|
+ def raise_exc(self,args=[],command=None,respond=None):
|
|
|
+ raise Exception("Test Exception")
|
|
|
+
|
|
|
+ async def araise_exc(self,args=[],command=None,respond=None):
|
|
|
+ state = digilib.misc.parse_to_int_list("1,2,3,4")
|
|
|
+ a = 1+2
|
|
|
+ raise Exception("Test Async Exception")
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ pass
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+#
|