__init__.py 11 KB


  1. #!/usr/bin/env python3.5
  2. # Copyright 2017 Digital
  3. #
  4. # This file is part of DigiLib.
  5. #
  6. # DigiLib is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # DigiLib is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with DigiLib. If not, see <http://www.gnu.org/licenses/>.
  18. # Python modules
  19. import atexit
  20. import logging
  21. import threading
  22. import traceback
  23. # Third party modules
  24. import blinker
  25. import curio
  26. import digilib.network
  27. import digilib.misc
  28. log = logging.getLogger(__name__+"")
  29. lgpio = logging.getLogger(__name__+".gpio")
  30. # Error messages
  31. ERROR_TAKES_ARGUMENTS = "{} takes {} {} argument(s): {}"
  32. # respond(ERROR_TAKES_ARGUMENTS.format(
  33. # command, "one", "positional", "<name>"))
  34. _pins_for_cleanup = set()
  35. _gpio = None
  36. class GPIOWrapper(object):
  37. gpio = None
  38. OUT = "out"
  39. IN = "in"
  40. # BCM = "broadcom"
  41. pin_values = {}
  42. def __init__(self):
  43. super(GPIOWrapper,self).__init__()
  44. self.load_rpi_gpio()
  45. lgpio.debug("setting pin numbering to broadcom")
  46. if self.gpio:
  47. self.gpio.setmode(self.gpio.BCM)
  48. atexit.register(self.cleanup)
  49. def cleanup(self,*args):
  50. lgpio.debug("cleanup! ({})".format(args))
  51. if self.gpio:
  52. # gpio.cleanup wants a list or tuple, but _pins_for_cleanup is
  53. # a set. we have to convert it first.
  54. self.gpio.cleanup(list(_pins_for_cleanup))
  55. def load_rpi_gpio(self):
  56. try:
  57. self.gpio = False
  58. self.gpio = __import__("RPi.GPIO",fromlist="GPIO")
  59. except ImportError as e:
  60. lgpio.debug("failed to import RPi.GPIO")
  61. print("134")
  62. except Exception as e:
  63. lgpio.debug("unknown error occured", exc_info=e)
  64. print("137")
  65. finally:
  66. self.OUT = getattr(self.gpio,"OUT",self.OUT)
  67. self.IN = getattr(self.gpio,"IN",self.IN)
  68. # self.BCM = getattr(self.gpio,self.BCM)
  69. def output(self,pins,state,*args):
  70. lgpio.debug("setting pin(s) {} to value {}".format(
  71. pins,state))
  72. if type(pins) is int:
  73. pins = [pins]
  74. _pins_for_cleanup.update(pins)
  75. if self.gpio:
  76. self.gpio.output(pins,state)
  77. else:
  78. lgpio.debug("no gpio module")
  79. lgpio.debug("gpio module: {}".format(self.gpio))
  80. def input(self,pins,*args):
  81. values = []
  82. for p in pins:
  83. if self.gpio:
  84. values.append(self.gpio.input(p))
  85. else:
  86. values.append(-1)
  87. lgpio.debug("reading pins {}: {}".format(
  88. pins,values))
  89. return values
  90. def setup(self,pins,value,*args):
  91. lgpio.debug("setting pin(s) {} to {}".format(pins,value))
  92. if self.gpio:
  93. self.gpio.setup(pins,value)
  94. _gpio = GPIOWrapper()
  95. class PinBase(object):
  96. """PinBase is the base class for all classes representing a gpio pin"""
  97. pin_number = None
  98. value = None
  99. def __init__(self,pin_number,mode):
  100. super(PinBase,self).__init__()
  101. self.pin_number = pin_number
  102. self.value = self.value_low
  103. _gpio.setup(self.pin_number,_gpio.OUT)
  104. def output(self,value):
  105. [value] = digilib.misc.parse_to_int_list(value)
  106. self.value = value
  107. _gpio.output(self.pin_number,value)
  108. def input(self):
  109. value = _gpio.input(self.pin_number,value)
  110. return value
  111. class DigitalPin(PinBase):
  112. value_high = True
  113. value_low = False
  114. def __init__(self,pin_number,mode):
  115. super(DigitalPin,self).__init__(pin_number,mode)
  116. class AnalogPin(PinBase):
  117. value_high = 1
  118. value_low = 0
  119. def __init__(self,pin_number):
  120. super(AnalogPin,self).__init__(pin_number)
  121. class PinControllerBase(object):
  122. """docstring for PinControllerBase.
  123. PinControllerBase is the base class for all classes controlling one or more physical devices connected to a gpio header
  124. """
  125. pins = []
  126. def __init__(self,pins):
  127. super(PinControllerBase, self).__init__()
  128. self.pins.extend(pins)
  129. # def make_digital_pin(self,*args):
  130. # return DigitalPin(*args)
  131. # def make_analog_pin(self,*args):
  132. # return AnalogPin(*args)
  133. class PinAPIBase(object):
  134. """docstring for PinAPI.
  135. PinAPIBase is the base class for all classes providing an api to multiple
  136. PinController.
  137. """
  138. controllers = []
  139. def __init__(self):
  140. super(PinAPIBase, self).__init__()
  141. class PCEngine(PinControllerBase):
  142. """Test Class"""
  143. max_speed=1
  144. speed = 0
  145. turn_on_speed = 1
  146. is_on = False
  147. def __init__(self,pin_on_off,pin_analog):
  148. super(PCEngine, self).__init__([pin_on_off,pin_analog])
  149. # self.pins.append(pin_on_off)
  150. # self.pins.append(pin_analog)
  151. self.pin_on_off = DigitalPin(pin_on_off,_gpio.OUT)
  152. self.pin_analog = DigitalPin(pin_analog,_gpio.OUT)
  153. # self.pins.append(self.pin_on_off)
  154. # self.pins.append(self.pin_analog)
  155. # gpio.setup(self.pins,_gpio.OUT)
  156. # gpio.setup(self.pin_analog,_gpio.OUT)
  157. self.set_speed(1)
  158. self.set_state(1)
  159. def set_speed(self,speed):
  160. self.pin_analog.output(speed)
  161. self.speed = speed
  162. def set_state(self,state):
  163. """state is integer and specifies if the engine should be turned on (1) or turned off (0) """
  164. if state == self.is_on:
  165. return
  166. # self.set_speed(speed)
  167. self.pin_on_off.output(state)
  168. self.is_on = state
  169. class EnginesController(PinAPIBase):
  170. engine_left = None
  171. engine_right = None
  172. action_in_process = False
  173. def __init__(self,left,right):
  174. super(EnginesController,self).__init__()
  175. self.engine_right = self.make_engine(*right)
  176. self.engine_left = self.make_engine(*left)
  177. def make_engine(self,*args):
  178. return digilib.pin.PCEngine(*args)
  179. def set_engine_state(self,args=[],command=None,respond=None):
  180. if len(args) != 2:
  181. respond("missing argument(s): <engine> <state>")
  182. return
  183. [engine,state] = args
  184. if engine == "right":
  185. self.engine_right.set_state(state)
  186. elif engine == "left":
  187. self.engine_left.set_state(state)
  188. else:
  189. respond("unknown engine. use 'left' or 'right'")
  190. def set_engine_speed(self,args=[],command=None,respond=None):
  191. if len(args) != 2:
  192. respond("missing argument(s): <engine> <speed>")
  193. return
  194. [engine,speed] = args
  195. if engine == "right":
  196. self.engine_right.set_speed(speed)
  197. elif engine == "left":
  198. self.engine_left.set_speed(speed)
  199. else:
  200. respond("unknown engine. use 'left' or 'right'")
  201. async def turn(self,args=[],command=None,respond=None):
  202. if len(args) != 1:
  203. respond("one missing argument: direction")
  204. return
  205. [direction] = args
  206. # lgpio.debug(threading.current_thread())
  207. # with self ad
  208. lgpio.info("turning {}".format(direction))
  209. right_state = self.engine_right.is_on
  210. right_speed = self.engine_right.speed
  211. left_state = self.engine_left.is_on
  212. left_speed = self.engine_left.speed
  213. if direction == "right":
  214. self.engine_right.set_state(False)
  215. self.engine_left.set_state(True)
  216. self.engine_left.set_speed(1)
  217. elif direction == "left":
  218. self.engine_right.set_state(True)
  219. self.engine_right.set_speed(1)
  220. self.engine_left.set_state(False)
  221. await curio.sleep(2)
  222. # time.sleep(2)
  223. self.engine_right.set_state(right_state)
  224. self.engine_right.set_speed(right_speed)
  225. self.engine_left.set_state(left_state)
  226. self.engine_left.set_speed(left_speed)
  227. lgpio.info("done turning {}".format(direction))
  228. # set engines to previous values
  229. class LED(DigitalPin):
  230. def __init__(self,pin):
  231. super(DigitalPin,self).__init__(pin,_gpio.OUT)
  232. self.on()
  233. def on(self,args=[],command=None,respond=None):
  234. self.output(True)
  235. def off(self,args=[],command=None,respond=None):
  236. self.output(False)
  237. def set(self,args=[],command=None,respond=None):
  238. if len(args) != 1:
  239. respond("one missing argument: state")
  240. return
  241. [state] = args
  242. self.output(state)
  243. class StatusLED(PinControllerBase):
  244. def __init__(self,pin_red,pin_green):
  245. super(StatusLED,self).__init__([pin_red,pin_green])
  246. self.pin_red = DigitalPin(pin_red,_gpio.OUT)
  247. self.pin_green = DigitalPin(pin_green,_gpio.OUT)
  248. self.green()
  249. def red(self,args=[],command=None,respond=None):
  250. if len(args) > 1:
  251. respond(ERROR_TAKES_ARGUMENTS.format(
  252. command,"one","optional","<state>"))
  253. return
  254. elif len(args) == 1:
  255. state = digilib.misc.parse_to_int_list(*args)
  256. else:
  257. state = 1
  258. self.pin_red.output(state)
  259. self.pin_green.output(int(not state))
  260. def green(self,args=[],command=None,respond=None):
  261. if len(args) > 1:
  262. respond(ERROR_TAKES_ARGUMENTS.format(
  263. command,"one","optional","<state>"))
  264. return
  265. elif len(args) == 1:
  266. state = int(*args)
  267. else:
  268. state = 1
  269. self.pin_green.output(state)
  270. self.pin_red.output(int(not state))
  271. class DebugPinController(PinControllerBase):
  272. def output(self,args=[],command=None,respond=None):
  273. if len(args) != 2:
  274. respond(ERROR_TAKES_ARGUMENTS.format(
  275. command, "two", "positional", "<name>"))
  276. return False
  277. pins = digilib.misc.parse_to_int_list(args[0])
  278. [state] = digilib.misc.parse_to_int_list(args[1])
  279. _gpio.output(pins,state)
  280. def input(self,args=[],command=None,respond=None):
  281. if len(args) != 2:
  282. respond(ERROR_TAKES_ARGUMENTS.format(
  283. command, "two", "positional", "<name>"))
  284. return False
  285. pins = digilib.misc.parse_to_int_list(args[0])
  286. [state] = digilib.misc.parse_to_int_list(args[1])
  287. rv = _gpio.input(pins,state)
  288. lgpio.debug(rv)
  289. respond(str(rv))
  290. def raise_exc(self,args=[],command=None,respond=None):
  291. raise Exception("Test Exception")
  292. async def araise_exc(self,args=[],command=None,respond=None):
  293. state = digilib.misc.parse_to_int_list("1,2,3,4")
  294. a = 1+2
  295. raise Exception("Test Async Exception")
  296. if __name__ == "__main__":
  297. pass
  298. #