|
@@ -25,7 +25,8 @@ import blinker
|
|
|
import curio
|
|
|
import digilib.network
|
|
|
|
|
|
-lpin = logging.getLogger(__name__+".pin")
|
|
|
+log = logging.getLogger(__name__+"")
|
|
|
+lgpio = logging.getLogger(__name__+".gpio")
|
|
|
|
|
|
_pins_for_cleanup = set()
|
|
|
|
|
@@ -34,81 +35,154 @@ ERROR_TAKES_ARGUMENTS = "{} takes {} {} argument(s): {}"
|
|
|
# respond(ERROR_TAKES_ARGUMENTS.format(
|
|
|
# command, "one", "positional", "<name>"))
|
|
|
|
|
|
-def _parse_to_int_list(parse):
|
|
|
- if type(parse) is list:
|
|
|
- return parse
|
|
|
- elif type(parse) is int:
|
|
|
- return [parse]
|
|
|
- elif type(parse) is bool:
|
|
|
- return [int(parse)]
|
|
|
- elif type(parse) is not str:
|
|
|
- raise ValueError("Failed to parse value {}".format(parse))
|
|
|
- if parse.lower() in ["true","high"]:
|
|
|
- return [1]
|
|
|
- elif parse.lower() in ["false","low"]:
|
|
|
- return [0]
|
|
|
- parse = parse.split(".")
|
|
|
- for part in parse.copy():
|
|
|
- parse.remove(part)
|
|
|
- parse.append(int(part))
|
|
|
- return parse
|
|
|
-
|
|
|
-def _output(pins,state):
|
|
|
- lpin.debug("setting pin(s) {:0>2} to value {}".format(
|
|
|
- pins,state))
|
|
|
- if type(pins) is int:
|
|
|
- pins = [pins]
|
|
|
- _pins_for_cleanup.update(pins)
|
|
|
- gpio.output(pins,state)
|
|
|
-
|
|
|
-def _read(self,pin):
|
|
|
- state = gpio.read(pin)
|
|
|
- lpin.debug("reading pin {}: {}".format(
|
|
|
- pin,state))
|
|
|
-
|
|
|
-class FakeGPIO(object):
|
|
|
+# def _parse_to_int_list(parse):
|
|
|
+# if type(parse) is list:
|
|
|
+# return parse
|
|
|
+# elif type(parse) is int:
|
|
|
+# return [parse]
|
|
|
+# elif type(parse) is bool:
|
|
|
+# return [int(parse)]
|
|
|
+# elif type(parse) is not str:
|
|
|
+# raise ValueError("Failed to parse value {}".format(parse))
|
|
|
+# if parse.lower() in ["true","high"]:
|
|
|
+# return [1]
|
|
|
+# elif parse.lower() in ["false","low"]:
|
|
|
+# return [0]
|
|
|
+# parse = parse.split(".")
|
|
|
+# for part in parse.copy():
|
|
|
+# parse.remove(part)
|
|
|
+# parse.append(int(part))
|
|
|
+# return parse
|
|
|
+#
|
|
|
+# def _setup()
|
|
|
+#
|
|
|
+# def _output(pins,state):
|
|
|
+# lgpio.debug("setting pin(s) {:0>2} to value {}".format(
|
|
|
+# pins,state))
|
|
|
+# if type(pins) is int:
|
|
|
+# pins = [pins]
|
|
|
+# _pins_for_cleanup.update(pins)
|
|
|
+# gpio.output(pins,state)
|
|
|
+#
|
|
|
+# def _read(self,pin):
|
|
|
+# state = gpio.read(pin)
|
|
|
+# lgpio.debug("reading pin {}: {}".format(
|
|
|
+# pin,state))
|
|
|
+
|
|
|
+# class FakeGPIO(object):
|
|
|
+# OUT = "out"
|
|
|
+# IN = "in"
|
|
|
+# BCM = "broadcom"
|
|
|
+# pin_values = {}
|
|
|
+# def cleanup(self,*args):
|
|
|
+# lgpio.debug("cleanup with these args '{}'!".format(args))
|
|
|
+# def setup(self,pins,value):
|
|
|
+# lgpio.debug("pin(s) {} set to {}".format(pins,value))
|
|
|
+# def setmode(self,mode):
|
|
|
+# lgpio.debug("mode set to {}".format(mode))
|
|
|
+# def output(self,pins,value):
|
|
|
+# lgpio.debug("setting pin(s) {} to value {}".format(
|
|
|
+# pins, value
|
|
|
+# ))
|
|
|
+# if type(pins) == int:
|
|
|
+# pins = [pins]
|
|
|
+# for p in pins:
|
|
|
+# self.pin_values[p] = value
|
|
|
+# def read(self,pin):
|
|
|
+# return self.pin_values[pin]
|
|
|
+#
|
|
|
+# try:
|
|
|
+# import RPi.GPIO as gpio
|
|
|
+# except:
|
|
|
+# # print("Using FakeGPIO because RPi.GPIO was not found")
|
|
|
+# gpio = FakeGPIO()
|
|
|
+#
|
|
|
+# def call_gpio_cleanup(*args):
|
|
|
+# log.info(args)
|
|
|
+# gpio.cleanup(_pins_for_cleanup)
|
|
|
+
|
|
|
+
|
|
|
+class GPIOWrapper(object):
|
|
|
+ gpio = None
|
|
|
OUT = "out"
|
|
|
IN = "in"
|
|
|
BCM = "broadcom"
|
|
|
pin_values = {}
|
|
|
+
|
|
|
+ def __init__(self):
|
|
|
+ super(GPIOWrapper,self).__init__()
|
|
|
+ self.load_rpi_gpio()
|
|
|
+ lgpio.debug("setting pin numbering to broadcom")
|
|
|
+ if self.gpio:
|
|
|
+ self.gpio.setmode(self.gpio.BCM)
|
|
|
+ sig_exit = blinker.signal("global-exit")
|
|
|
+ sig_exit.connect(self.cleanup)
|
|
|
+
|
|
|
def cleanup(self,*args):
|
|
|
- lpin.debug("cleanup with these args '{}'!".format(args))
|
|
|
- def setup(self,pins,value):
|
|
|
- lpin.debug("pin(s) {} set to {}".format(pins,value))
|
|
|
- def setmode(self,mode):
|
|
|
- lpin.debug("mode set to {}".format(mode))
|
|
|
- def output(self,pins,value):
|
|
|
- lpin.debug("setting pin(s) {} to value {}".format(
|
|
|
- pins, value
|
|
|
- ))
|
|
|
- if type(pins) == int:
|
|
|
+ lgpio.debug("cleanup! ({})".format(args))
|
|
|
+ if self.gpio:
|
|
|
+ self.gpio.cleanup(_pins_for_cleanup)
|
|
|
+
|
|
|
+ def load_rpi_gpio(self):
|
|
|
+ try:
|
|
|
+ self.gpio = __import__("RPi.GPIO")
|
|
|
+ except ImportError as e:
|
|
|
+ lgpio.debug("failed to import RPi.GPIO")
|
|
|
+ except Exception as e:
|
|
|
+ lgpio.debug("unknown error occured", exc_info=e)
|
|
|
+ finally:
|
|
|
+ self.gpio = False
|
|
|
+ self.OUT = getattr(self.gpio,self.OUT)
|
|
|
+ self.IN = getattr(self.gpio,self.IN)
|
|
|
+ self.BCM = getattr(self.gpio,self.BCM)
|
|
|
+
|
|
|
+ def output(pins,state,*args):
|
|
|
+ lgpio.debug("setting pin(s) {:0>2} to value {}".format(
|
|
|
+ pins,state))
|
|
|
+ if type(pins) is int:
|
|
|
pins = [pins]
|
|
|
- for p in pins:
|
|
|
- self.pin_values[p] = value
|
|
|
- def read(self,pin):
|
|
|
- return self.pin_values[pin]
|
|
|
-
|
|
|
-try:
|
|
|
- import RPi.GPIO as gpio
|
|
|
-except:
|
|
|
- # print("Using FakeGPIO because RPi.GPIO was not found")
|
|
|
- gpio = FakeGPIO()
|
|
|
-
|
|
|
-def call_gpio_cleanup(*args):
|
|
|
- log.info(args)
|
|
|
- gpio.cleanup(_pins_for_cleanup)
|
|
|
-
|
|
|
-sig_exit = blinker.signal("global-exit")
|
|
|
-sig_exit.connect(call_gpio_cleanup)
|
|
|
-gpio.setmode(gpio.BCM)
|
|
|
-log = logging.getLogger(__name__+"")
|
|
|
-lpin = logging.getLogger(__name__+".pin")
|
|
|
+ _pins_for_cleanup.update(pins)
|
|
|
+ if self.gpio:
|
|
|
+ self.gpio.output(pins,state)
|
|
|
+
|
|
|
+ def parse_to_int_list(parse,*args):
|
|
|
+ if type(parse) is list:
|
|
|
+ return parse
|
|
|
+ elif type(parse) is int:
|
|
|
+ return [parse]
|
|
|
+ elif type(parse) is bool:
|
|
|
+ return [int(parse)]
|
|
|
+ elif type(parse) is not str:
|
|
|
+ raise ValueError("Failed to parse value {}".format(parse))
|
|
|
+ if parse.lower() in ["true","high"]:
|
|
|
+ return [1]
|
|
|
+ elif parse.lower() in ["false","low"]:
|
|
|
+ return [0]
|
|
|
+ parse = parse.split(".")
|
|
|
+ for part in parse.copy():
|
|
|
+ parse.remove(part)
|
|
|
+ parse.append(int(part))
|
|
|
+ return parse
|
|
|
+
|
|
|
+ def read(self,pin,*args):
|
|
|
+ if self.gpio:
|
|
|
+ state = self.gpio.read(pin)
|
|
|
+ else:
|
|
|
+ state = -1
|
|
|
+ lgpio.debug("reading pin {:0>2}: {}".format(
|
|
|
+ pin,state))
|
|
|
+ return state
|
|
|
+
|
|
|
+ def setup(self,pins,value,*args):
|
|
|
+ lgpio.debug("setting pin(s) {} to {}".format(pins,value))
|
|
|
+ if self.gpio:
|
|
|
+ self.gpio.setup(pins,value)
|
|
|
|
|
|
class PinBase(object):
|
|
|
"""PinBase is the base class for all classes representing a gpio pin"""
|
|
|
pin_number = None
|
|
|
value = None
|
|
|
- def __init__(self,pin_number):
|
|
|
+ def __init__(self,pin_number,mode):
|
|
|
super(PinBase,self).__init__()
|
|
|
self.pin_number = pin_number
|
|
|
self.value = self.value_low
|
|
@@ -125,8 +199,8 @@ class PinBase(object):
|
|
|
class DigitalPin(PinBase):
|
|
|
value_high = True
|
|
|
value_low = False
|
|
|
- def __init__(self,pin_number):
|
|
|
- super(DigitalPin,self).__init__(pin_number)
|
|
|
+ def __init__(self,pin_number,mode):
|
|
|
+ super(DigitalPin,self).__init__(pin_number,mode)
|
|
|
|
|
|
class AnalogPin(PinBase):
|
|
|
value_high = 1
|
|
@@ -167,11 +241,11 @@ class PCEngine(PinControllerBase):
|
|
|
super(PCEngine, self).__init__([pin_on_off,pin_analog])
|
|
|
# self.pins.append(pin_on_off)
|
|
|
# self.pins.append(pin_analog)
|
|
|
- self.pin_on_off = DigitalPin(pin_on_off)
|
|
|
- self.pin_analog = DigitalPin(pin_analog)
|
|
|
+ self.pin_on_off = DigitalPin(pin_on_off,gpio.OUT)
|
|
|
+ self.pin_analog = DigitalPin(pin_analog,gpio.OUT)
|
|
|
# self.pins.append(self.pin_on_off)
|
|
|
# self.pins.append(self.pin_analog)
|
|
|
- gpio.setup(self.pins,gpio.OUT)
|
|
|
+ # gpio.setup(self.pins,gpio.OUT)
|
|
|
# gpio.setup(self.pin_analog,gpio.OUT)
|
|
|
self.set_speed(1)
|
|
|
self.set_state(1)
|
|
@@ -196,7 +270,7 @@ class EnginesController(PinAPIBase):
|
|
|
self.engine_left = self.make_engine(*left)
|
|
|
def __enter__(self):
|
|
|
if self.action_in_process:
|
|
|
- lpin.debug("action already in process, not entering")
|
|
|
+ lgpio.debug("action already in process, not entering")
|
|
|
return False
|
|
|
def make_engine(self,*args):
|
|
|
return digilib.pin.PCEngine(*args)
|
|
@@ -227,9 +301,9 @@ class EnginesController(PinAPIBase):
|
|
|
respond("one missing argument: direction")
|
|
|
return
|
|
|
[direction] = args
|
|
|
- # lpin.debug(threading.current_thread())
|
|
|
+ # lgpio.debug(threading.current_thread())
|
|
|
# with self ad
|
|
|
- lpin.info("turning {}".format(direction))
|
|
|
+ lgpio.info("turning {}".format(direction))
|
|
|
right_state = self.engine_right.is_on
|
|
|
right_speed = self.engine_right.speed
|
|
|
left_state = self.engine_left.is_on
|
|
@@ -248,14 +322,13 @@ class EnginesController(PinAPIBase):
|
|
|
self.engine_right.set_speed(right_speed)
|
|
|
self.engine_left.set_state(left_state)
|
|
|
self.engine_left.set_speed(left_speed)
|
|
|
- lpin.info("done turning {}".format(direction))
|
|
|
+ lgpio.info("done turning {}".format(direction))
|
|
|
# set engines to previous values
|
|
|
|
|
|
|
|
|
class LED(DigitalPin):
|
|
|
def __init__(self,pin):
|
|
|
- super(DigitalPin,self).__init__(pin)
|
|
|
- gpio.setup(pin,gpio.OUT)
|
|
|
+ super(DigitalPin,self).__init__(pin,gpio.OUT)
|
|
|
self.on()
|
|
|
def on(self,args=[],command=None,respond=None):
|
|
|
self.output(True)
|