ctrl_manager.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. # Copyright 2017 Digital
  2. #
  3. # This file is part of DigiLib.
  4. #
  5. # DigiLib is free software: you can redistribute it and/or modify
  6. # it under the terms of the GNU General Public License as published by
  7. # the Free Software Foundation, either version 3 of the License, or
  8. # (at your option) any later version.
  9. #
  10. # DigiLib is distributed in the hope that it will be useful,
  11. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. # GNU General Public License for more details.
  14. #
  15. # You should have received a copy of the GNU General Public License
  16. # along with DigiLib. If not, see <http://www.gnu.org/licen
  17. class CtrlManager(object):
  18. """
  19. The CtrlManager collects data from Collectors and analyses it.
  20. Parameters
  21. ----------
  22. config: dict
  23. dictionary holding config information. for more information see :doc:`/configure`
  24. Attributes
  25. ----------
  26. self.timed_tg: curio.TaskGroup
  27. A ``curio.TaskGroup`` in which all timed tasks will be spawned
  28. """
  29. def __init__(self):
  30. super().__init__()
  31. self.timed_tg = curio.TaskGroup(name="timed tasks")
  32. async def astart(self):
  33. """
  34. This method starts the core. It does the setup and calls the run method.
  35. """
  36. # lets run the timed callers
  37. await self.timed_tg.run(self.minutely())
  38. await self.timed_tg.run(self.hourly())
  39. await self.timed_tg.run(self.daily())
  40. async def astop(self):
  41. """
  42. asynchronous stop method. cancels timed tasks and calls stop handlers
  43. """
  44. lcore.debug("canceling remaining timed tasks")
  45. try:
  46. self.timed_tg.cancel_remaining()
  47. self.timed_tg.join()
  48. except Exception as exc:
  49. lcore.error("an error occured in a timed caller:",exc_info=exc)
  50. # call the shutdown handlers
  51. self.exec_coll_methods("shutdown")
  52. async def call(self, ctrl_name, func_name, kwargs, respond):
  53. """
  54. The call method takes commands and calls the corresponding function with ``args`` and ``respond``. It treats all functions as asynchronous and logs the traceback if an exception occured.
  55. Parameters
  56. ----------
  57. ctrl_name: str
  58. name of the controller
  59. func_name: str
  60. name of the function to call
  61. kwargs: dict
  62. the keyword arguments for the function
  63. respond: function or method
  64. a function to send error messages
  65. """
  66. ctrl = beewatch._controllers.get(ctrl_name,False)
  67. if not ctrl:
  68. self.respond("can't call function '{}' of controller '{}', "
  69. "there is no such controller!")
  70. return
  71. func = getattr(ctrl,func_name,False)
  72. if not func:
  73. self.respond("can't call function '{}' of controller '{}', "
  74. "the controller doesn have this function!")
  75. return
  76. try:
  77. await func(**kwargs)
  78. except Exception as e:
  79. lch.error(
  80. "an error was raised when calling func {}:".format(func),
  81. exc_info=e)
  82. tb = traceback.format_exc()
  83. await self.respond(tb,log_msg="traceback of '{}'"
  84. .format(e.__cause__))
  85. # # task joins iself to suppress the "task not joined" warning
  86. # cur_task = await curio.current_task()
  87. # await curio.ignore_after(0,cur_task.wait)
  88. async def daily(self):
  89. """
  90. This method is calls the collectors daily method once every day
  91. """
  92. try:
  93. while True:
  94. await self.exec_coll_methods("daily")
  95. # sleep one day
  96. await curio.sleep(24*60*60*1000)
  97. except TaskCancelled as exc:
  98. # we catch this so when we join the timed_tg later we only get
  99. # unexpected exceptions
  100. lcore.debug("Daily loop was canceled")
  101. async def exec_coll_methods(self,name):
  102. """
  103. This method calls the method of every controller with the name inside th name parameter
  104. Parameters
  105. ----------
  106. name: str
  107. The name of the controllers method which is to be called.
  108. """
  109. lcore.debug("executing every collector's {} function!".format(name))
  110. for c in beewatch._collectors:
  111. try:
  112. method = getattr(c,name)
  113. await method()
  114. except TaskCancelled as exc:
  115. raise
  116. except Exception as exc:
  117. lcore.error(
  118. "an error occured when calling {}'s {} method!"
  119. .format(repr(c),name))
  120. async def hourly(self):
  121. """
  122. This method is calls the collectors hourly method once every hour
  123. """
  124. try:
  125. while True:
  126. await self.exec_coll_methods("hourly")
  127. # sleep one hour
  128. await curio.sleep(60*60*1000)
  129. except TaskCancelled as exc:
  130. # we catch this so when we join the timed_tg later we only get
  131. # unexpected exceptions
  132. lcore.debug("Hourly loop was canceled")
  133. async def minutely(self):
  134. """
  135. This method is calls the collectors minutely method once every minute
  136. """
  137. try:
  138. while True:
  139. await self.exec_coll_methods("minutely")
  140. # sleep one minute
  141. await curio.sleep(60*1000)
  142. except TaskCancelled as exc:
  143. # we catch this so when we join the timed_tg later we only get
  144. # unexpected exceptions
  145. lcore.debug("Minutely loop was canceled")
  146. def start(self):
  147. """
  148. synchronous start method wich calls astart asynchronously.
  149. """
  150. curio.run(self.async_start)
  151. def stop(self):
  152. """
  153. synchronous stop method wich calls astop asynchronously.
  154. """
  155. curio.run(self.async_stop)