@@ -0,0 +1,224 @@
+#!/usr/bin/env python3.5
+# Copyright 2017 Digital
+# This file is part of DigiLib.
+# DigiLib is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+# DigiLib is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# GNU General Public License for more details.
+# You should have received a copy of the GNU General Public License
+# along with DigiLib. If not, see <http://www.gnu.org/licenses/>.
+# Python modules
+import atexit
+import logging
+import threading
+import traceback
+# Third party modules
+import blinker
+import curio
+import digilib.network
+import digilib.misc
+log = logging.getLogger(__name__+"")
+lgpio = logging.getLogger(__name__+".gpio")
+# Error messages
+ERROR_TAKES_ARGUMENTS = "{} takes {} {} argument(s): {}"
+# respond(ERROR_TAKES_ARGUMENTS.format(
+# command, "one", "positional", "<name>"))
+_pins_for_cleanup = set()
+_gpio = None
+class PinBase(object):
+ """
+ PinBase is the base class for all classes representing a gpio pin
+ Parameters
+ ----------
+ pin_number: int
+ number of the pin
+ mode: gpio.OUT or gpio.IN
+ ``gpio.IN`` if the pin is an read pin or ``gpio.OUT`` if the pin is an write pin
+ """
+ pin_number = None
+ def __init__(self,pin_number,mode):
+ super(PinBase,self).__init__()
+ self.pin_number = pin_number
+ _gpio.setup(self.pin_number,_gpio.OUT)
+ def write(self,value):
+ [value] = digilib.misc.parse_to_int_list(value)
+ _gpio.write(self.pin_number,value)
+ def read(self):
+ value = _gpio.read(self.pin_number,value)
+ return value
+class DigitalPin(PinBase):
+ def __init__(self,pin_number,mode):
+ super(DigitalPin,self).__init__(pin_number,mode)
+class AnalogPin(PinBase):
+ def __init__(self,pin_number):
+ super(AnalogPin,self).__init__(pin_number)
+class ControllerBase(object):
+ """
+ PinControllerBase is the base class for all classes controlling one or more physical devices connected to a gpio header or other controllers
+ """
+ def __init__(self):
+ super().__init__()
+class LED(DigitalPin):
+ """
+ Controlls a LED
+ Parameters
+ ----------
+ pin: int
+ number of the led's pin
+ Attributes
+ ----------
+ lock: threading.Lock
+ The lock used to protect gpio operations.
+ """
+ lock = None
+ def __init__(self,pin):
+ super(DigitalPin,self).__init__(pin,_gpio.OUT)
+ self.lock = threading.Lock()
+ self.on()
+ async def on(self,args=[],command=None,respond=None):
+ if self.lock.locked():
+ response("This LED is already in use")
+ return
+ with self.lock:
+ self.write(True)
+ async def off(self,args=[],command=None,respond=None):
+ if self.lock.locked():
+ respond("This LED is already in use")
+ return
+ with self.lock:
+ self.write(False)
+ async def set(self,args=[],command=None,respond=None):
+ if len(args) != 1:
+ respond("one missing argument: state")
+ return
+ [state] = args
+ if self.lock.locked():
+ response("This LED is already in use")
+ return
+ with self.lock:
+ self.write(state)
+class StatusLED(PinControllerBase):
+ def __init__(self,pin_red,pin_green):
+ super(StatusLED,self).__init__([pin_red,pin_green])
+ self.pin_red = DigitalPin(pin_red,_gpio.OUT)
+ self.pin_green = DigitalPin(pin_green,_gpio.OUT)
+ self.green()
+ def red(self,args=[],command=None,respond=None):
+ if len(args) > 1:
+ respond(ERROR_TAKES_ARGUMENTS.format(
+ command,"one","optional","<state>"))
+ return
+ elif len(args) == 1:
+ state = digilib.misc.parse_to_int_list(*args)
+ else:
+ state = 1
+ self.pin_red.write(state)
+ self.pin_green.write(int(not state))
+ def green(self,args=[],command=None,respond=None):
+ if len(args) > 1:
+ respond(ERROR_TAKES_ARGUMENTS.format(
+ command,"one","optional","<state>"))
+ return
+ elif len(args) == 1:
+ state = int(*args)
+ else:
+ state = 1
+ self.pin_green.write(state)
+ self.pin_red.write(int(not state))
+class DebugPinController(PinControllerBase):
+ def write(self,args=[],command=None,respond=None):
+ if len(args) != 2:
+ respond(ERROR_TAKES_ARGUMENTS.format(
+ command, "two", "positional", "<name>"))
+ return False
+ pins = digilib.misc.parse_to_int_list(args[0])
+ [state] = digilib.misc.parse_to_int_list(args[1])
+ _gpio.write(pins,state)
+ def read(self,args=[],command=None,respond=None):
+ if len(args) != 2:
+ respond(ERROR_TAKES_ARGUMENTS.format(
+ command, "two", "positional", "<name>"))
+ return False
+ pins = digilib.misc.parse_to_int_list(args[0])
+ [state] = digilib.misc.parse_to_int_list(args[1])
+ rv = _gpio.read(pins,state)
+ lgpio.debug(rv)
+ respond(str(rv))
+ def raise_exc(self,args=[],command=None,respond=None):
+ raise Exception("Test Exception")
+ async def araise_exc(self,args=[],command=None,respond=None):
+ state = digilib.misc.parse_to_int_list("1,2,3,4")
+ a = 1+2
+ raise Exception("Test Async Exception")
+if __name__ == "__main__":
+ pass