#!/usr/bin/env python3.5 # Copyright 2017 Digital # # This file is part of DigiLib. # # DigiLib is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # DigiLib is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with DigiLib. If not, see . # Python modules import atexit import logging import threading import traceback # Third party modules import blinker import curio import digilib.network import digilib.misc log = logging.getLogger(__name__+"") lgpio = logging.getLogger(__name__+".gpio") # Error messages ERROR_TAKES_ARGUMENTS = "{} takes {} {} argument(s): {}" # respond(ERROR_TAKES_ARGUMENTS.format( # command, "one", "positional", "")) _pins_for_cleanup = set() _gpio = None class GPIOWrapper(object): gpio = None OUT = "out" IN = "in" # BCM = "broadcom" pin_values = {} def __init__(self): super(GPIOWrapper,self).__init__() self.load_rpi_gpio() lgpio.debug("setting pin numbering to broadcom") if self.gpio: self.gpio.setmode(self.gpio.BCM) atexit.register(self.cleanup) def cleanup(self,*args): lgpio.debug("cleanup! ({})".format(args)) if self.gpio: # gpio.cleanup wants a list or tuple, but _pins_for_cleanup is # a set. we have to convert it first. self.gpio.cleanup(list(_pins_for_cleanup)) def load_rpi_gpio(self): try: self.gpio = False self.gpio = __import__("RPi.GPIO",fromlist="GPIO") except ImportError as e: lgpio.debug("failed to import RPi.GPIO") print("134") except Exception as e: lgpio.debug("unknown error occured", exc_info=e) print("137") finally: self.OUT = getattr(self.gpio,"OUT",self.OUT) self.IN = getattr(self.gpio,"IN",self.IN) # self.BCM = getattr(self.gpio,self.BCM) def output(self,pins,state,*args): lgpio.debug("setting pin(s) {} to value {}".format( pins,state)) if type(pins) is int: pins = [pins] _pins_for_cleanup.update(pins) if self.gpio: self.gpio.output(pins,state) def input(self,pins,*args): values = [] for p in pins: if self.gpio: values.append(self.gpio.input(p)) else: values.append(-1) lgpio.debug("reading pins {}: {}".format( pins,values)) return values def setup(self,pins,value,*args): lgpio.debug("setting pin(s) {} to {}".format(pins,value)) if self.gpio: self.gpio.setup(pins,value) _gpio = GPIOWrapper() class PinBase(object): """PinBase is the base class for all classes representing a gpio pin""" pin_number = None value = None def __init__(self,pin_number,mode): super(PinBase,self).__init__() self.pin_number = pin_number self.value = self.value_low _gpio.setup(self.pin_number,_gpio.OUT) def output(self,value): [value] = digilib.misc.parse_to_int_list(value) self.value = value _gpio.output(self.pin_number,value) def input(self): value = _gpio.input(self.pin_number,value) return value class DigitalPin(PinBase): value_high = True value_low = False def __init__(self,pin_number,mode): super(DigitalPin,self).__init__(pin_number,mode) class AnalogPin(PinBase): value_high = 1 value_low = 0 def __init__(self,pin_number): super(AnalogPin,self).__init__(pin_number) class PinControllerBase(object): """docstring for PinControllerBase. PinControllerBase is the base class for all classes controlling one or more physical devices connected to a gpio header """ pins = [] def __init__(self,pins): super(PinControllerBase, self).__init__() self.pins.extend(pins) # def make_digital_pin(self,*args): # return DigitalPin(*args) # def make_analog_pin(self,*args): # return AnalogPin(*args) class PinAPIBase(object): """docstring for PinAPI. PinAPIBase is the base class for all classes providing an api to multiple PinController. """ controllers = [] def __init__(self): super(PinAPIBase, self).__init__() class PCEngine(PinControllerBase): """Test Class""" max_speed=1 speed = 0 turn_on_speed = 1 is_on = False def __init__(self,pin_on_off,pin_analog): super(PCEngine, self).__init__([pin_on_off,pin_analog]) # self.pins.append(pin_on_off) # self.pins.append(pin_analog) self.pin_on_off = DigitalPin(pin_on_off,_gpio.OUT) self.pin_analog = DigitalPin(pin_analog,_gpio.OUT) # self.pins.append(self.pin_on_off) # self.pins.append(self.pin_analog) # gpio.setup(self.pins,_gpio.OUT) # gpio.setup(self.pin_analog,_gpio.OUT) self.set_speed(1) self.set_state(1) def set_speed(self,speed): self.pin_analog.output(speed) self.speed = speed def set_state(self,state): """state is integer and specifies if the engine should be turned on (1) or turned off (0) """ if state == self.is_on: return # self.set_speed(speed) self.pin_on_off.output(state) self.is_on = state class EnginesController(PinAPIBase): engine_left = None engine_right = None action_in_process = False def __init__(self,left,right): super(EnginesController,self).__init__() self.engine_right = self.make_engine(*right) self.engine_left = self.make_engine(*left) def make_engine(self,*args): return digilib.pin.PCEngine(*args) def set_engine_state(self,args=[],command=None,respond=None): if len(args) != 2: respond("missing argument(s): ") return [engine,state] = args if engine == "right": self.engine_right.set_state(state) elif engine == "left": self.engine_left.set_state(state) else: respond("unknown engine. use 'left' or 'right'") def set_engine_speed(self,args=[],command=None,respond=None): if len(args) != 2: respond("missing argument(s): ") return [engine,speed] = args if engine == "right": self.engine_right.set_speed(speed) elif engine == "left": self.engine_left.set_speed(speed) else: respond("unknown engine. use 'left' or 'right'") async def turn(self,args=[],command=None,respond=None): if len(args) != 1: respond("one missing argument: direction") return [direction] = args # lgpio.debug(threading.current_thread()) # with self ad lgpio.info("turning {}".format(direction)) right_state = self.engine_right.is_on right_speed = self.engine_right.speed left_state = self.engine_left.is_on left_speed = self.engine_left.speed if direction == "right": self.engine_right.set_state(False) self.engine_left.set_state(True) self.engine_left.set_speed(1) elif direction == "left": self.engine_right.set_state(True) self.engine_right.set_speed(1) self.engine_left.set_state(False) await curio.sleep(2) # time.sleep(2) self.engine_right.set_state(right_state) self.engine_right.set_speed(right_speed) self.engine_left.set_state(left_state) self.engine_left.set_speed(left_speed) lgpio.info("done turning {}".format(direction)) # set engines to previous values class LED(DigitalPin): def __init__(self,pin): super(DigitalPin,self).__init__(pin,_gpio.OUT) self.on() def on(self,args=[],command=None,respond=None): self.output(True) def off(self,args=[],command=None,respond=None): self.output(False) def set(self,args=[],command=None,respond=None): if len(args) != 1: respond("one missing argument: state") return [state] = args self.output(state) class StatusLED(PinControllerBase): def __init__(self,pin_red,pin_green): super(StatusLED,self).__init__([pin_red,pin_green]) self.pin_red = DigitalPin(pin_red,_gpio.OUT) self.pin_green = DigitalPin(pin_green,_gpio.OUT) self.green() def red(self,args=[],command=None,respond=None): if len(args) > 1: respond(ERROR_TAKES_ARGUMENTS.format( command,"one","optional","")) return elif len(args) == 1: state = digilib.misc.parse_to_int_list(*args) else: state = 1 self.pin_red.output(state) self.pin_green.output(int(not state)) def green(self,args=[],command=None,respond=None): if len(args) > 1: respond(ERROR_TAKES_ARGUMENTS.format( command,"one","optional","")) return elif len(args) == 1: state = int(*args) else: state = 1 self.pin_green.output(state) self.pin_red.output(int(not state)) class DebugPinController(PinControllerBase): def output(self,args=[],command=None,respond=None): if len(args) != 2: respond(ERROR_TAKES_ARGUMENTS.format( command, "two", "positional", "")) return False pins = digilib.misc.parse_to_int_list(args[0]) [state] = digilib.misc.parse_to_int_list(args[1]) _gpio.output(pins,state) def input(self,args=[],command=None,respond=None): if len(args) != 2: respond(ERROR_TAKES_ARGUMENTS.format( command, "two", "positional", "")) return False pins = digilib.misc.parse_to_int_list(args[0]) [state] = digilib.misc.parse_to_int_list(args[1]) rv = _gpio.input(pins,state) lgpio.debug(rv) respond(str(rv)) def raise_exc(self,args=[],command=None,respond=None): raise Exception("Test Exception") async def araise_exc(self,args=[],command=None,respond=None): state = digilib.misc.parse_to_int_list("1,2,3,4") a = 1+2 raise Exception("Test Async Exception") if __name__ == "__main__": pass #