123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- #!/usr/bin/env python3.5
- # Copyright 2017 Digital
- #
- # This file is part of DigiLib.
- #
- # DigiLib is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # DigiLib is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with DigiLib. If not, see <http://www.gnu.org/licenses/>.
- # Python modules
- import atexit
- import logging
- import threading
- import traceback
- # Third party modules
- import curio
- import digilib.network
- import digilib.misc
- log = logging.getLogger(__name__+"")
- lgpio = logging.getLogger(__name__+".gpio")
- # Error messages
- ERROR_TAKES_ARGUMENTS = "{} takes {} {} argument(s): {}"
- # respond(ERROR_TAKES_ARGUMENTS.format(
- # command, "one", "positional", "<name>"))
- _pins_for_cleanup = set()
- _gpio = None
- class PinBase(object):
- """
- PinBase is the base class for all classes representing a gpio pin
- Parameters
- ----------
- pin_number: int
- number of the pin
- mode: gpio.OUT or gpio.IN
- ``gpio.IN`` if the pin is an read pin or ``gpio.OUT`` if the pin is an write pin
- """
- pin_number = None
- def __init__(self,pin_number,mode):
- super(PinBase,self).__init__()
- self.pin_number = pin_number
- _gpio.setup(self.pin_number,_gpio.OUT)
- def write(self,value):
- [value] = digilib.misc.parse_to_int_list(value)
- _gpio.write(self.pin_number,value)
- def read(self):
- value = _gpio.read(self.pin_number,value)
- return value
- class DigitalPin(PinBase):
- def __init__(self,pin_number,mode):
- super(DigitalPin,self).__init__(pin_number,mode)
- class AnalogPin(PinBase):
- def __init__(self,pin_number):
- super(AnalogPin,self).__init__(pin_number)
- class ControllerBase(object):
- """
- PinControllerBase is the base class for all classes controlling one or more physical devices connected to a gpio header or other controllers
- """
- def __init__(self):
- super().__init__()
- class LED(DigitalPin):
- """
- Controlls a LED
- Parameters
- ----------
- pin: int
- number of the led's pin
- Attributes
- ----------
- lock: threading.Lock
- The lock used to protect gpio operations.
- """
- lock = None
- def __init__(self,pin):
- super(DigitalPin,self).__init__(pin,_gpio.OUT)
- self.lock = threading.Lock()
- self.on()
- async def on(self,args=[],command=None,respond=None):
- if self.lock.locked():
- response("This LED is already in use")
- return
- with self.lock:
- self.write(True)
- async def off(self,args=[],command=None,respond=None):
- if self.lock.locked():
- respond("This LED is already in use")
- return
- with self.lock:
- self.write(False)
- async def set(self,args=[],command=None,respond=None):
- if len(args) != 1:
- respond("one missing argument: state")
- return
- [state] = args
- if self.lock.locked():
- response("This LED is already in use")
- return
- with self.lock:
- self.write(state)
- class StatusLED(PinControllerBase):
- def __init__(self,pin_red,pin_green):
- super(StatusLED,self).__init__([pin_red,pin_green])
- self.pin_red = DigitalPin(pin_red,_gpio.OUT)
- self.pin_green = DigitalPin(pin_green,_gpio.OUT)
- self.green()
- def red(self,args=[],command=None,respond=None):
- if len(args) > 1:
- respond(ERROR_TAKES_ARGUMENTS.format(
- command,"one","optional","<state>"))
- return
- elif len(args) == 1:
- state = digilib.misc.parse_to_int_list(*args)
- else:
- state = 1
- self.pin_red.write(state)
- self.pin_green.write(int(not state))
- def green(self,args=[],command=None,respond=None):
- if len(args) > 1:
- respond(ERROR_TAKES_ARGUMENTS.format(
- command,"one","optional","<state>"))
- return
- elif len(args) == 1:
- state = int(*args)
- else:
- state = 1
- self.pin_green.write(state)
- self.pin_red.write(int(not state))
- class DebugPinController(PinControllerBase):
- def write(self,args=[],command=None,respond=None):
- if len(args) != 2:
- respond(ERROR_TAKES_ARGUMENTS.format(
- command, "two", "positional", "<name>"))
- return False
- pins = digilib.misc.parse_to_int_list(args[0])
- [state] = digilib.misc.parse_to_int_list(args[1])
- _gpio.write(pins,state)
- def read(self,args=[],command=None,respond=None):
- if len(args) != 2:
- respond(ERROR_TAKES_ARGUMENTS.format(
- command, "two", "positional", "<name>"))
- return False
- pins = digilib.misc.parse_to_int_list(args[0])
- [state] = digilib.misc.parse_to_int_list(args[1])
- rv = _gpio.read(pins,state)
- lgpio.debug(rv)
- respond(str(rv))
- def raise_exc(self,args=[],command=None,respond=None):
- raise Exception("Test Exception")
- async def araise_exc(self,args=[],command=None,respond=None):
- state = digilib.misc.parse_to_int_list("1,2,3,4")
- a = 1+2
- raise Exception("Test Async Exception")
- if __name__ == "__main__":
- pass
- #
|