__init__.py 13 KB


  1. #!/usr/bin/env python3.5
  2. # Copyright 2017 Digital
  3. #
  4. # This file is part of DigiLib.
  5. #
  6. # DigiLib is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # DigiLib is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with DigiLib. If not, see <http://www.gnu.org/licenses/>.
  18. # Python modules
  19. import logging
  20. import threading
  21. import traceback
  22. # Third party modules
  23. import blinker
  24. import curio
  25. import digilib.network
  26. log = logging.getLogger(__name__+"")
  27. lgpio = logging.getLogger(__name__+".gpio")
  28. # Error messages
  29. ERROR_TAKES_ARGUMENTS = "{} takes {} {} argument(s): {}"
  30. _pins_for_cleanup = set()
  31. # gpio = GPIOWrapper()
  32. # respond(ERROR_TAKES_ARGUMENTS.format(
  33. # command, "one", "positional", "<name>"))
  34. # def gpio.parse_to_int_list(parse):
  35. # if type(parse) is list:
  36. # return parse
  37. # elif type(parse) is int:
  38. # return [parse]
  39. # elif type(parse) is bool:
  40. # return [int(parse)]
  41. # elif type(parse) is not str:
  42. # raise ValueError("Failed to parse value {}".format(parse))
  43. # if parse.lower() in ["true","high"]:
  44. # return [1]
  45. # elif parse.lower() in ["false","low"]:
  46. # return [0]
  47. # parse = parse.split(".")
  48. # for part in parse.copy():
  49. # parse.remove(part)
  50. # parse.append(int(part))
  51. # return parse
  52. #
  53. # def _setup()
  54. #
  55. # def gpio.output(pins,state):
  56. # lgpio.debug("setting pin(s) {:0>2} to value {}".format(
  57. # pins,state))
  58. # if type(pins) is int:
  59. # pins = [pins]
  60. # _pins_for_cleanup.update(pins)
  61. # gpio.output(pins,state)
  62. #
  63. # def gpio.read(self,pin):
  64. # state = gpio.read(pin)
  65. # lgpio.debug("reading pin {}: {}".format(
  66. # pin,state))
  67. # class FakeGPIO(object):
  68. # OUT = "out"
  69. # IN = "in"
  70. # BCM = "broadcom"
  71. # pin_values = {}
  72. # def cleanup(self,*args):
  73. # lgpio.debug("cleanup with these args '{}'!".format(args))
  74. # def setup(self,pins,value):
  75. # lgpio.debug("pin(s) {} set to {}".format(pins,value))
  76. # def setmode(self,mode):
  77. # lgpio.debug("mode set to {}".format(mode))
  78. # def output(self,pins,value):
  79. # lgpio.debug("setting pin(s) {} to value {}".format(
  80. # pins, value
  81. # ))
  82. # if type(pins) == int:
  83. # pins = [pins]
  84. # for p in pins:
  85. # self.pin_values[p] = value
  86. # def read(self,pin):
  87. # return self.pin_values[pin]
  88. #
  89. # try:
  90. # import RPi.GPIO as gpio
  91. # except:
  92. # # print("Using FakeGPIO because RPi.GPIO was not found")
  93. # gpio = FakeGPIO()
  94. #
  95. # def call_gpio_cleanup(*args):
  96. # log.info(args)
  97. # gpio.cleanup(_pins_for_cleanup)
  98. class GPIOWrapper(object):
  99. gpio = None
  100. OUT = "out"
  101. IN = "in"
  102. # BCM = "broadcom"
  103. pin_values = {}
  104. def __init__(self):
  105. super(GPIOWrapper,self).__init__()
  106. self.load_rpi_gpio()
  107. lgpio.debug("setting pin numbering to broadcom")
  108. if self.gpio:
  109. self.gpio.setmode(self.gpio.BCM)
  110. sig_exit = blinker.signal("global-exit")
  111. sig_exit.connect(self.cleanup)
  112. def cleanup(self,*args):
  113. lgpio.debug("cleanup! ({})".format(args))
  114. if self.gpio:
  115. self.gpio.cleanup(_pins_for_cleanup)
  116. def load_rpi_gpio(self):
  117. try:
  118. self.gpio = __import__("RPi.GPIO")
  119. except ImportError as e:
  120. lgpio.debug("failed to import RPi.GPIO")
  121. print("134")
  122. except Exception as e:
  123. lgpio.debug("unknown error occured", exc_info=e)
  124. print("137")
  125. finally:
  126. self.gpio = False
  127. self.OUT = getattr(self.gpio,"OUT",self.OUT)
  128. self.IN = getattr(self.gpio,"IN",self.IN)
  129. # self.BCM = getattr(self.gpio,self.BCM)
  130. def output(self,pins,state,*args):
  131. lgpio.debug("setting pin(s) {:0>2} to value {}".format(
  132. pins,state))
  133. if type(pins) is int:
  134. pins = [pins]
  135. _pins_for_cleanup.update(pins)
  136. if self.gpio:
  137. self.gpio.output(pins,state)
  138. else:
  139. lgpio.debug("no gpio module")
  140. def parse_to_int_list(self,parse,*args):
  141. if type(parse) is list:
  142. return parse
  143. elif type(parse) is int:
  144. return [parse]
  145. elif type(parse) is bool:
  146. return [int(parse)]
  147. elif type(parse) is not str:
  148. raise ValueError("Failed to parse value {}".format(parse))
  149. if parse.lower() in ["true","high"]:
  150. return [1]
  151. elif parse.lower() in ["false","low"]:
  152. return [0]
  153. parse = parse.split(".")
  154. for part in parse.copy():
  155. parse.remove(part)
  156. parse.append(int(part))
  157. return parse
  158. def read(self,pin,*args):
  159. if self.gpio:
  160. state = self.gpio.read(pin)
  161. else:
  162. state = -1
  163. lgpio.debug("reading pin {:0>2}: {}".format(
  164. pin,state))
  165. return state
  166. def setup(self,pins,value,*args):
  167. lgpio.debug("setting pin(s) {} to {}".format(pins,value))
  168. if self.gpio:
  169. self.gpio.setup(pins,value)
  170. class PinBase(object):
  171. """PinBase is the base class for all classes representing a gpio pin"""
  172. pin_number = None
  173. value = None
  174. def __init__(self,pin_number,mode):
  175. super(PinBase,self).__init__()
  176. self.pin_number = pin_number
  177. self.value = self.value_low
  178. def output(self,value):
  179. [value] = gpio.parse_to_int_list(value)
  180. self.value = value
  181. gpio.output(self.pin_number,value)
  182. def read(self):
  183. value = gpio.read(self.pin_number,value)
  184. return value
  185. class DigitalPin(PinBase):
  186. value_high = True
  187. value_low = False
  188. def __init__(self,pin_number,mode):
  189. super(DigitalPin,self).__init__(pin_number,mode)
  190. class AnalogPin(PinBase):
  191. value_high = 1
  192. value_low = 0
  193. def __init__(self,pin_number):
  194. super(AnalogPin,self).__init__(pin_number)
  195. class PinControllerBase(object):
  196. """docstring for PinControllerBase.
  197. PinControllerBase is the base class for all classes controlling one or more physical devices connected to a gpio header
  198. """
  199. pins = []
  200. def __init__(self,pins):
  201. super(PinControllerBase, self).__init__()
  202. self.pins.extend(pins)
  203. # def make_digital_pin(self,*args):
  204. # return DigitalPin(*args)
  205. # def make_analog_pin(self,*args):
  206. # return AnalogPin(*args)
  207. class PinAPIBase(object):
  208. """docstring for PinAPI.
  209. PinAPIBase is the base class for all classes providing an api to multiple
  210. PinController.
  211. """
  212. controllers = []
  213. def __init__(self):
  214. super(PinAPIBase, self).__init__()
  215. class PCEngine(PinControllerBase):
  216. """Test Class"""
  217. max_speed=1
  218. speed = 0
  219. turn_on_speed = 1
  220. is_on = False
  221. def __init__(self,pin_on_off,pin_analog):
  222. super(PCEngine, self).__init__([pin_on_off,pin_analog])
  223. # self.pins.append(pin_on_off)
  224. # self.pins.append(pin_analog)
  225. self.pin_on_off = DigitalPin(pin_on_off,gpio.OUT)
  226. self.pin_analog = DigitalPin(pin_analog,gpio.OUT)
  227. # self.pins.append(self.pin_on_off)
  228. # self.pins.append(self.pin_analog)
  229. # gpio.setup(self.pins,gpio.OUT)
  230. # gpio.setup(self.pin_analog,gpio.OUT)
  231. self.set_speed(1)
  232. self.set_state(1)
  233. def set_speed(self,speed):
  234. self.pin_analog.output(speed)
  235. self.speed = speed
  236. def set_state(self,state):
  237. """state is integer and specifies if the engine should be turned on (1) or turned off (0) """
  238. if state == self.is_on:
  239. return
  240. # self.set_speed(speed)
  241. self.pin_on_off.output(state)
  242. self.is_on = state
  243. class EnginesController(PinAPIBase):
  244. engine_left = None
  245. engine_right = None
  246. action_in_process = False
  247. def __init__(self,left,right):
  248. super(EnginesController,self).__init__()
  249. self.engine_right = self.make_engine(*right)
  250. self.engine_left = self.make_engine(*left)
  251. def __enter__(self):
  252. if self.action_in_process:
  253. lgpio.debug("action already in process, not entering")
  254. return False
  255. def make_engine(self,*args):
  256. return digilib.pin.PCEngine(*args)
  257. def set_engine_state(self,args=[],command=None,respond=None):
  258. if len(args) != 2:
  259. respond("missing argument(s): <engine> <state>")
  260. return
  261. [engine,state] = args
  262. if engine == "right":
  263. self.engine_right.set_state(state)
  264. elif engine == "left":
  265. self.engine_left.set_state(state)
  266. else:
  267. respond("unknown engine. use 'left' or 'right'")
  268. def set_engine_speed(self,args=[],command=None,respond=None):
  269. if len(args) != 2:
  270. respond("missing argument(s): <engine> <speed>")
  271. return
  272. [engine,speed] = args
  273. if engine == "right":
  274. self.engine_right.set_speed(speed)
  275. elif engine == "left":
  276. self.engine_left.set_speed(speed)
  277. else:
  278. respond("unknown engine. use 'left' or 'right'")
  279. async def turn(self,args=[],command=None,respond=None):
  280. if len(args) != 1:
  281. respond("one missing argument: direction")
  282. return
  283. [direction] = args
  284. # lgpio.debug(threading.current_thread())
  285. # with self ad
  286. lgpio.info("turning {}".format(direction))
  287. right_state = self.engine_right.is_on
  288. right_speed = self.engine_right.speed
  289. left_state = self.engine_left.is_on
  290. left_speed = self.engine_left.speed
  291. if direction == "right":
  292. self.engine_right.set_state(False)
  293. self.engine_left.set_state(True)
  294. self.engine_left.set_speed(1)
  295. elif direction == "left":
  296. self.engine_right.set_state(True)
  297. self.engine_right.set_speed(1)
  298. self.engine_left.set_state(False)
  299. await curio.sleep(2)
  300. # time.sleep(2)
  301. self.engine_right.set_state(right_state)
  302. self.engine_right.set_speed(right_speed)
  303. self.engine_left.set_state(left_state)
  304. self.engine_left.set_speed(left_speed)
  305. lgpio.info("done turning {}".format(direction))
  306. # set engines to previous values
  307. class LED(DigitalPin):
  308. def __init__(self,pin):
  309. super(DigitalPin,self).__init__(pin,gpio.OUT)
  310. self.on()
  311. def on(self,args=[],command=None,respond=None):
  312. self.output(True)
  313. def off(self,args=[],command=None,respond=None):
  314. self.output(False)
  315. def set(self,args=[],command=None,respond=None):
  316. if len(args) != 1:
  317. respond("one missing argument: state")
  318. return
  319. [state] = args
  320. self.output(state)
  321. class LEDRGB(PinControllerBase):
  322. def __init__(self,pin_red,pin_green):
  323. super(LEDRGB,self).__init__([pin_red,pin_green])
  324. self.pin_red = DigitalPin(pin_red,gpio.OUT)
  325. self.pin_green = DigitalPin(pin_green,gpio.OUT)
  326. self.red()
  327. def red(self,args=[],command=None,respond=None):
  328. if len(args) > 1:
  329. respond("{} takes one optinal agument: <state>".foramt(command))
  330. return
  331. elif len(args) == 1:
  332. state = int(*args)
  333. else:
  334. state = 1
  335. self.pin_red.output(state)
  336. def green(self,args=[],command=None,respond=None):
  337. if len(args) > 1:
  338. respond("{} takes one optinal agument: <state>".foramt(command))
  339. return
  340. elif len(args) == 1:
  341. state = int(*args)
  342. else:
  343. state = 1
  344. self.pin_green.output(state)
  345. class DebugPinController(PinControllerBase):
  346. def output(self,args=[],command=None,respond=None):
  347. if len(args) != 2:
  348. respond(ERROR_TAKES_ARGUMENTS.format(
  349. command, "two", "positional", "<name>"))
  350. return False
  351. pins = gpio.parse_to_int_list(args[0])
  352. [state] = gpio.parse_to_int_list(args[1])
  353. gpio.output(pins,state)
  354. def read(self,args=[],command=None,respond=None):
  355. if len(args) != 2:
  356. respond(ERROR_TAKES_ARGUMENTS.format(
  357. command, "two", "positional", "<name>"))
  358. return False
  359. pins = gpio.parse_to_int_list(args[0])
  360. [state] = gpio.parse_to_int_list(args[1])
  361. return gpio.read(pins,state)
  362. gpio = GPIOWrapper()
  363. if __name__ == "__main__":
  364. pass
  365. #