#!/usr/bin/env python3.5
# Copyright 2017 Digital
#
# This file is part of DigiLib.
#
# DigiLib is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# DigiLib is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with DigiLib. If not, see .
# Python modules
import atexit
import logging
import threading
import traceback
# Third party modules
import blinker
import curio
import digilib.network
import digilib.misc
log = logging.getLogger(__name__+"")
lgpio = logging.getLogger(__name__+".gpio")
# Error messages
ERROR_TAKES_ARGUMENTS = "{} takes {} {} argument(s): {}"
# respond(ERROR_TAKES_ARGUMENTS.format(
# command, "one", "positional", ""))
_pins_for_cleanup = set()
_gpio = None
class GPIOWrapper(object):
gpio = None
OUT = "out"
IN = "in"
# BCM = "broadcom"
pin_values = {}
def __init__(self):
super(GPIOWrapper,self).__init__()
self.load_rpi_gpio()
lgpio.debug("setting pin numbering to broadcom")
if self.gpio:
self.gpio.setmode(self.gpio.BCM)
atexit.register(self.cleanup)
def cleanup(self,*args):
lgpio.debug("cleanup! ({})".format(args))
if self.gpio:
# gpio.cleanup wants a list or tuple, but _pins_for_cleanup is
# a set. we have to convert it first.
self.gpio.cleanup(list(_pins_for_cleanup))
def load_rpi_gpio(self):
try:
self.gpio = False
self.gpio = __import__("RPi.GPIO",fromlist="GPIO")
except ImportError as e:
lgpio.debug("failed to import RPi.GPIO")
print("134")
except Exception as e:
lgpio.debug("unknown error occured", exc_info=e)
print("137")
finally:
self.OUT = getattr(self.gpio,"OUT",self.OUT)
self.IN = getattr(self.gpio,"IN",self.IN)
# self.BCM = getattr(self.gpio,self.BCM)
def output(self,pins,state,*args):
lgpio.debug("setting pin(s) {} to value {}".format(
pins,state))
if type(pins) is int:
pins = [pins]
_pins_for_cleanup.update(pins)
if self.gpio:
self.gpio.output(pins,state)
def input(self,pins,*args):
values = []
for p in pins:
if self.gpio:
values.append(self.gpio.input(p))
else:
values.append(-1)
lgpio.debug("reading pins {}: {}".format(
pins,values))
return values
def setup(self,pins,value,*args):
lgpio.debug("setting pin(s) {} to {}".format(pins,value))
if self.gpio:
self.gpio.setup(pins,value)
_gpio = GPIOWrapper()
class PinBase(object):
"""PinBase is the base class for all classes representing a gpio pin"""
pin_number = None
value = None
def __init__(self,pin_number,mode):
super(PinBase,self).__init__()
self.pin_number = pin_number
self.value = self.value_low
_gpio.setup(self.pin_number,_gpio.OUT)
def output(self,value):
[value] = digilib.misc.parse_to_int_list(value)
self.value = value
_gpio.output(self.pin_number,value)
def input(self):
value = _gpio.input(self.pin_number,value)
return value
class DigitalPin(PinBase):
value_high = True
value_low = False
def __init__(self,pin_number,mode):
super(DigitalPin,self).__init__(pin_number,mode)
class AnalogPin(PinBase):
value_high = 1
value_low = 0
def __init__(self,pin_number):
super(AnalogPin,self).__init__(pin_number)
class PinControllerBase(object):
"""docstring for PinControllerBase.
PinControllerBase is the base class for all classes controlling one or more physical devices connected to a gpio header
"""
pins = []
def __init__(self,pins):
super(PinControllerBase, self).__init__()
self.pins.extend(pins)
# def make_digital_pin(self,*args):
# return DigitalPin(*args)
# def make_analog_pin(self,*args):
# return AnalogPin(*args)
class PinAPIBase(object):
"""docstring for PinAPI.
PinAPIBase is the base class for all classes providing an api to multiple
PinController.
"""
controllers = []
def __init__(self):
super(PinAPIBase, self).__init__()
class PCEngine(PinControllerBase):
"""Test Class"""
max_speed=1
speed = 0
turn_on_speed = 1
is_on = False
def __init__(self,pin_on_off,pin_analog):
super(PCEngine, self).__init__([pin_on_off,pin_analog])
# self.pins.append(pin_on_off)
# self.pins.append(pin_analog)
self.pin_on_off = DigitalPin(pin_on_off,_gpio.OUT)
self.pin_analog = DigitalPin(pin_analog,_gpio.OUT)
# self.pins.append(self.pin_on_off)
# self.pins.append(self.pin_analog)
# gpio.setup(self.pins,_gpio.OUT)
# gpio.setup(self.pin_analog,_gpio.OUT)
self.set_speed(1)
self.set_state(1)
def set_speed(self,speed):
self.pin_analog.output(speed)
self.speed = speed
def set_state(self,state):
"""state is integer and specifies if the engine should be turned on (1) or turned off (0) """
if state == self.is_on:
return
# self.set_speed(speed)
self.pin_on_off.output(state)
self.is_on = state
class EnginesController(PinAPIBase):
engine_left = None
engine_right = None
action_in_process = False
def __init__(self,left,right):
super(EnginesController,self).__init__()
self.engine_right = self.make_engine(*right)
self.engine_left = self.make_engine(*left)
def make_engine(self,*args):
return digilib.pin.PCEngine(*args)
def set_engine_state(self,args=[],command=None,respond=None):
if len(args) != 2:
respond("missing argument(s): ")
return
[engine,state] = args
if engine == "right":
self.engine_right.set_state(state)
elif engine == "left":
self.engine_left.set_state(state)
else:
respond("unknown engine. use 'left' or 'right'")
def set_engine_speed(self,args=[],command=None,respond=None):
if len(args) != 2:
respond("missing argument(s): ")
return
[engine,speed] = args
if engine == "right":
self.engine_right.set_speed(speed)
elif engine == "left":
self.engine_left.set_speed(speed)
else:
respond("unknown engine. use 'left' or 'right'")
async def turn(self,args=[],command=None,respond=None):
if len(args) != 1:
respond("one missing argument: direction")
return
[direction] = args
# lgpio.debug(threading.current_thread())
# with self ad
lgpio.info("turning {}".format(direction))
right_state = self.engine_right.is_on
right_speed = self.engine_right.speed
left_state = self.engine_left.is_on
left_speed = self.engine_left.speed
if direction == "right":
self.engine_right.set_state(False)
self.engine_left.set_state(True)
self.engine_left.set_speed(1)
elif direction == "left":
self.engine_right.set_state(True)
self.engine_right.set_speed(1)
self.engine_left.set_state(False)
await curio.sleep(2)
# time.sleep(2)
self.engine_right.set_state(right_state)
self.engine_right.set_speed(right_speed)
self.engine_left.set_state(left_state)
self.engine_left.set_speed(left_speed)
lgpio.info("done turning {}".format(direction))
# set engines to previous values
class LED(DigitalPin):
def __init__(self,pin):
super(DigitalPin,self).__init__(pin,_gpio.OUT)
self.on()
def on(self,args=[],command=None,respond=None):
self.output(True)
def off(self,args=[],command=None,respond=None):
self.output(False)
def set(self,args=[],command=None,respond=None):
if len(args) != 1:
respond("one missing argument: state")
return
[state] = args
self.output(state)
class StatusLED(PinControllerBase):
def __init__(self,pin_red,pin_green):
super(StatusLED,self).__init__([pin_red,pin_green])
self.pin_red = DigitalPin(pin_red,_gpio.OUT)
self.pin_green = DigitalPin(pin_green,_gpio.OUT)
self.green()
def red(self,args=[],command=None,respond=None):
if len(args) > 1:
respond(ERROR_TAKES_ARGUMENTS.format(
command,"one","optional",""))
return
elif len(args) == 1:
state = digilib.misc.parse_to_int_list(*args)
else:
state = 1
self.pin_red.output(state)
self.pin_green.output(int(not state))
def green(self,args=[],command=None,respond=None):
if len(args) > 1:
respond(ERROR_TAKES_ARGUMENTS.format(
command,"one","optional",""))
return
elif len(args) == 1:
state = int(*args)
else:
state = 1
self.pin_green.output(state)
self.pin_red.output(int(not state))
class DebugPinController(PinControllerBase):
def output(self,args=[],command=None,respond=None):
if len(args) != 2:
respond(ERROR_TAKES_ARGUMENTS.format(
command, "two", "positional", ""))
return False
pins = digilib.misc.parse_to_int_list(args[0])
[state] = digilib.misc.parse_to_int_list(args[1])
_gpio.output(pins,state)
def input(self,args=[],command=None,respond=None):
if len(args) != 2:
respond(ERROR_TAKES_ARGUMENTS.format(
command, "two", "positional", ""))
return False
pins = digilib.misc.parse_to_int_list(args[0])
[state] = digilib.misc.parse_to_int_list(args[1])
rv = _gpio.input(pins,state)
lgpio.debug(rv)
respond(str(rv))
def raise_exc(self,args=[],command=None,respond=None):
raise Exception("Test Exception")
async def araise_exc(self,args=[],command=None,respond=None):
state = digilib.misc.parse_to_int_list("1,2,3,4")
a = 1+2
raise Exception("Test Async Exception")
if __name__ == "__main__":
pass
#