pin.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. #!/usr/bin/env python3.5
  2. # Copyright 2017 Digital
  3. #
  4. # This file is part of DigiLib.
  5. #
  6. # DigiLib is free software: you can redistribute it and/or modify
  7. # it under the terms of the GNU General Public License as published by
  8. # the Free Software Foundation, either version 3 of the License, or
  9. # (at your option) any later version.
  10. #
  11. # DigiLib is distributed in the hope that it will be useful,
  12. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. # GNU General Public License for more details.
  15. #
  16. # You should have received a copy of the GNU General Public License
  17. # along with DigiLib. If not, see <http://www.gnu.org/licenses/>.
  18. # Python modules
  19. import atexit
  20. import logging
  21. import threading
  22. import traceback
  23. # Third party modules
  24. import curio
  25. import digilib.network
  26. import digilib.misc
  27. log = logging.getLogger(__name__+"")
  28. lgpio = logging.getLogger(__name__+".gpio")
  29. # Error messages
  30. ERROR_TAKES_ARGUMENTS = "{} takes {} {} argument(s): {}"
  31. # respond(ERROR_TAKES_ARGUMENTS.format(
  32. # command, "one", "positional", "<name>"))
  33. _pins_for_cleanup = set()
  34. _gpio = None
  35. class PinBase(object):
  36. """
  37. PinBase is the base class for all classes representing a gpio pin
  38. Parameters
  39. ----------
  40. pin_number: int
  41. number of the pin
  42. mode: gpio.OUT or gpio.IN
  43. ``gpio.IN`` if the pin is an read pin or ``gpio.OUT`` if the pin is an write pin
  44. """
  45. pin_number = None
  46. def __init__(self,pin_number,mode):
  47. super(PinBase,self).__init__()
  48. self.pin_number = pin_number
  49. _gpio.setup(self.pin_number,_gpio.OUT)
  50. def write(self,value):
  51. [value] = digilib.misc.parse_to_int_list(value)
  52. _gpio.write(self.pin_number,value)
  53. def read(self):
  54. value = _gpio.read(self.pin_number,value)
  55. return value
  56. class DigitalPin(PinBase):
  57. def __init__(self,pin_number,mode):
  58. super(DigitalPin,self).__init__(pin_number,mode)
  59. class AnalogPin(PinBase):
  60. def __init__(self,pin_number):
  61. super(AnalogPin,self).__init__(pin_number)
  62. class ControllerBase(object):
  63. """
  64. PinControllerBase is the base class for all classes controlling one or more physical devices connected to a gpio header or other controllers
  65. """
  66. def __init__(self):
  67. super().__init__()
  68. class LED(DigitalPin):
  69. """
  70. Controlls a LED
  71. Parameters
  72. ----------
  73. pin: int
  74. number of the led's pin
  75. Attributes
  76. ----------
  77. lock: threading.Lock
  78. The lock used to protect gpio operations.
  79. """
  80. lock = None
  81. def __init__(self,pin):
  82. super(DigitalPin,self).__init__(pin,_gpio.OUT)
  83. self.lock = threading.Lock()
  84. self.on()
  85. async def on(self,args=[],command=None,respond=None):
  86. if self.lock.locked():
  87. response("This LED is already in use")
  88. return
  89. with self.lock:
  90. self.write(True)
  91. async def off(self,args=[],command=None,respond=None):
  92. if self.lock.locked():
  93. respond("This LED is already in use")
  94. return
  95. with self.lock:
  96. self.write(False)
  97. async def set(self,args=[],command=None,respond=None):
  98. if len(args) != 1:
  99. respond("one missing argument: state")
  100. return
  101. [state] = args
  102. if self.lock.locked():
  103. response("This LED is already in use")
  104. return
  105. with self.lock:
  106. self.write(state)
  107. class StatusLED(PinControllerBase):
  108. def __init__(self,pin_red,pin_green):
  109. super(StatusLED,self).__init__([pin_red,pin_green])
  110. self.pin_red = DigitalPin(pin_red,_gpio.OUT)
  111. self.pin_green = DigitalPin(pin_green,_gpio.OUT)
  112. self.green()
  113. def red(self,args=[],command=None,respond=None):
  114. if len(args) > 1:
  115. respond(ERROR_TAKES_ARGUMENTS.format(
  116. command,"one","optional","<state>"))
  117. return
  118. elif len(args) == 1:
  119. state = digilib.misc.parse_to_int_list(*args)
  120. else:
  121. state = 1
  122. self.pin_red.write(state)
  123. self.pin_green.write(int(not state))
  124. def green(self,args=[],command=None,respond=None):
  125. if len(args) > 1:
  126. respond(ERROR_TAKES_ARGUMENTS.format(
  127. command,"one","optional","<state>"))
  128. return
  129. elif len(args) == 1:
  130. state = int(*args)
  131. else:
  132. state = 1
  133. self.pin_green.write(state)
  134. self.pin_red.write(int(not state))
  135. class DebugPinController(PinControllerBase):
  136. def write(self,args=[],command=None,respond=None):
  137. if len(args) != 2:
  138. respond(ERROR_TAKES_ARGUMENTS.format(
  139. command, "two", "positional", "<name>"))
  140. return False
  141. pins = digilib.misc.parse_to_int_list(args[0])
  142. [state] = digilib.misc.parse_to_int_list(args[1])
  143. _gpio.write(pins,state)
  144. def read(self,args=[],command=None,respond=None):
  145. if len(args) != 2:
  146. respond(ERROR_TAKES_ARGUMENTS.format(
  147. command, "two", "positional", "<name>"))
  148. return False
  149. pins = digilib.misc.parse_to_int_list(args[0])
  150. [state] = digilib.misc.parse_to_int_list(args[1])
  151. rv = _gpio.read(pins,state)
  152. lgpio.debug(rv)
  153. respond(str(rv))
  154. def raise_exc(self,args=[],command=None,respond=None):
  155. raise Exception("Test Exception")
  156. async def araise_exc(self,args=[],command=None,respond=None):
  157. state = digilib.misc.parse_to_int_list("1,2,3,4")
  158. a = 1+2
  159. raise Exception("Test Async Exception")
  160. if __name__ == "__main__":
  161. pass
  162. #