123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376 |
- import atexit
- import logging
- import threading
- import traceback
- import blinker
- import curio
- import digilib.network
- import digilib.misc
- log = logging.getLogger(__name__+"")
- lgpio = logging.getLogger(__name__+".gpio")
- ERROR_TAKES_ARGUMENTS = "{} takes {} {} argument(s): {}"
- _pins_for_cleanup = set()
- _gpio = None
- class GPIOWrapper(object):
- gpio = None
- OUT = "out"
- IN = "in"
-
- pin_values = {}
- def __init__(self):
- super(GPIOWrapper,self).__init__()
- self.load_rpi_gpio()
- lgpio.debug("setting pin numbering to broadcom")
- if self.gpio:
- self.gpio.setmode(self.gpio.BCM)
- atexit.register(self.cleanup)
- def cleanup(self,*args):
- lgpio.debug("cleanup! ({})".format(args))
- if self.gpio:
-
-
- self.gpio.cleanup(list(_pins_for_cleanup))
- def load_rpi_gpio(self):
- try:
- self.gpio = False
- self.gpio = __import__("RPi.GPIO",fromlist="GPIO")
- except ImportError as e:
- lgpio.debug("failed to import RPi.GPIO")
- print("134")
- except Exception as e:
- lgpio.debug("unknown error occured", exc_info=e)
- print("137")
- finally:
- self.OUT = getattr(self.gpio,"OUT",self.OUT)
- self.IN = getattr(self.gpio,"IN",self.IN)
-
- def output(self,pins,state,*args):
- lgpio.debug("setting pin(s) {} to value {}".format(
- pins,state))
- if type(pins) is int:
- pins = [pins]
- _pins_for_cleanup.update(pins)
- if self.gpio:
- self.gpio.output(pins,state)
- def input(self,pins,*args):
- values = []
- for p in pins:
- if self.gpio:
- values.append(self.gpio.input(p))
- else:
- values.append(-1)
- lgpio.debug("reading pins {}: {}".format(
- pins,values))
- return values
- def setup(self,pins,value,*args):
- lgpio.debug("setting pin(s) {} to {}".format(pins,value))
- if self.gpio:
- self.gpio.setup(pins,value)
- _gpio = GPIOWrapper()
- class PinBase(object):
- """PinBase is the base class for all classes representing a gpio pin"""
- pin_number = None
- value = None
- def __init__(self,pin_number,mode):
- super(PinBase,self).__init__()
- self.pin_number = pin_number
- self.value = self.value_low
- _gpio.setup(self.pin_number,_gpio.OUT)
- def output(self,value):
- [value] = digilib.misc.parse_to_int_list(value)
- self.value = value
- _gpio.output(self.pin_number,value)
- def input(self):
- value = _gpio.input(self.pin_number,value)
- return value
- class DigitalPin(PinBase):
- value_high = True
- value_low = False
- def __init__(self,pin_number,mode):
- super(DigitalPin,self).__init__(pin_number,mode)
- class AnalogPin(PinBase):
- value_high = 1
- value_low = 0
- def __init__(self,pin_number):
- super(AnalogPin,self).__init__(pin_number)
- class PinControllerBase(object):
- """docstring for PinControllerBase.
- PinControllerBase is the base class for all classes controlling one or more physical devices connected to a gpio header
- """
- pins = []
- def __init__(self,pins):
- super(PinControllerBase, self).__init__()
- self.pins.extend(pins)
-
-
-
-
- class PinAPIBase(object):
- """docstring for PinAPI.
- PinAPIBase is the base class for all classes providing an api to multiple
- PinController.
- """
- controllers = []
- def __init__(self):
- super(PinAPIBase, self).__init__()
- class PCEngine(PinControllerBase):
- """Test Class"""
- max_speed=1
- speed = 0
- turn_on_speed = 1
- is_on = False
- def __init__(self,pin_on_off,pin_analog):
- super(PCEngine, self).__init__([pin_on_off,pin_analog])
-
-
- self.pin_on_off = DigitalPin(pin_on_off,_gpio.OUT)
- self.pin_analog = DigitalPin(pin_analog,_gpio.OUT)
-
-
-
-
- self.set_speed(1)
- self.set_state(1)
- def set_speed(self,speed):
- self.pin_analog.output(speed)
- self.speed = speed
- def set_state(self,state):
- """state is integer and specifies if the engine should be turned on (1) or turned off (0) """
- if state == self.is_on:
- return
-
- self.pin_on_off.output(state)
- self.is_on = state
- class EnginesController(PinAPIBase):
- engine_left = None
- engine_right = None
- action_in_process = False
- def __init__(self,left,right):
- super(EnginesController,self).__init__()
- self.engine_right = self.make_engine(*right)
- self.engine_left = self.make_engine(*left)
- def make_engine(self,*args):
- return digilib.pin.PCEngine(*args)
- def set_engine_state(self,args=[],command=None,respond=None):
- if len(args) != 2:
- respond("missing argument(s): <engine> <state>")
- return
- [engine,state] = args
- if engine == "right":
- self.engine_right.set_state(state)
- elif engine == "left":
- self.engine_left.set_state(state)
- else:
- respond("unknown engine. use 'left' or 'right'")
- def set_engine_speed(self,args=[],command=None,respond=None):
- if len(args) != 2:
- respond("missing argument(s): <engine> <speed>")
- return
- [engine,speed] = args
- if engine == "right":
- self.engine_right.set_speed(speed)
- elif engine == "left":
- self.engine_left.set_speed(speed)
- else:
- respond("unknown engine. use 'left' or 'right'")
- async def turn(self,args=[],command=None,respond=None):
- if len(args) != 1:
- respond("one missing argument: direction")
- return
- [direction] = args
-
-
- lgpio.info("turning {}".format(direction))
- right_state = self.engine_right.is_on
- right_speed = self.engine_right.speed
- left_state = self.engine_left.is_on
- left_speed = self.engine_left.speed
- if direction == "right":
- self.engine_right.set_state(False)
- self.engine_left.set_state(True)
- self.engine_left.set_speed(1)
- elif direction == "left":
- self.engine_right.set_state(True)
- self.engine_right.set_speed(1)
- self.engine_left.set_state(False)
- await curio.sleep(2)
-
- self.engine_right.set_state(right_state)
- self.engine_right.set_speed(right_speed)
- self.engine_left.set_state(left_state)
- self.engine_left.set_speed(left_speed)
- lgpio.info("done turning {}".format(direction))
-
- class LED(DigitalPin):
- def __init__(self,pin):
- super(DigitalPin,self).__init__(pin,_gpio.OUT)
- self.on()
- def on(self,args=[],command=None,respond=None):
- self.output(True)
- def off(self,args=[],command=None,respond=None):
- self.output(False)
- def set(self,args=[],command=None,respond=None):
- if len(args) != 1:
- respond("one missing argument: state")
- return
- [state] = args
- self.output(state)
- class StatusLED(PinControllerBase):
- def __init__(self,pin_red,pin_green):
- super(StatusLED,self).__init__([pin_red,pin_green])
- self.pin_red = DigitalPin(pin_red,_gpio.OUT)
- self.pin_green = DigitalPin(pin_green,_gpio.OUT)
- self.green()
- def red(self,args=[],command=None,respond=None):
- if len(args) > 1:
- respond(ERROR_TAKES_ARGUMENTS.format(
- command,"one","optional","<state>"))
- return
- elif len(args) == 1:
- state = digilib.misc.parse_to_int_list(*args)
- else:
- state = 1
- self.pin_red.output(state)
- self.pin_green.output(int(not state))
- def green(self,args=[],command=None,respond=None):
- if len(args) > 1:
- respond(ERROR_TAKES_ARGUMENTS.format(
- command,"one","optional","<state>"))
- return
- elif len(args) == 1:
- state = int(*args)
- else:
- state = 1
- self.pin_green.output(state)
- self.pin_red.output(int(not state))
- class DebugPinController(PinControllerBase):
- def output(self,args=[],command=None,respond=None):
- if len(args) != 2:
- respond(ERROR_TAKES_ARGUMENTS.format(
- command, "two", "positional", "<name>"))
- return False
- pins = digilib.misc.parse_to_int_list(args[0])
- [state] = digilib.misc.parse_to_int_list(args[1])
- _gpio.output(pins,state)
- def input(self,args=[],command=None,respond=None):
- if len(args) != 2:
- respond(ERROR_TAKES_ARGUMENTS.format(
- command, "two", "positional", "<name>"))
- return False
- pins = digilib.misc.parse_to_int_list(args[0])
- [state] = digilib.misc.parse_to_int_list(args[1])
- rv = _gpio.input(pins,state)
- lgpio.debug(rv)
- respond(str(rv))
- def raise_exc(self,args=[],command=None,respond=None):
- raise Exception("Test Exception")
- async def araise_exc(self,args=[],command=None,respond=None):
- state = digilib.misc.parse_to_int_list("1,2,3,4")
- a = 1+2
- raise Exception("Test Async Exception")
- if __name__ == "__main__":
- pass
|