|
@@ -0,0 +1,169 @@
|
|
|
+# Copyright 2017 Digital
|
|
|
+#
|
|
|
+# This file is part of DigiLib.
|
|
|
+#
|
|
|
+# DigiLib is free software: you can redistribute it and/or modify
|
|
|
+# it under the terms of the GNU General Public License as published by
|
|
|
+# the Free Software Foundation, either version 3 of the License, or
|
|
|
+# (at your option) any later version.
|
|
|
+#
|
|
|
+# DigiLib is distributed in the hope that it will be useful,
|
|
|
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
+# GNU General Public License for more details.
|
|
|
+#
|
|
|
+# You should have received a copy of the GNU General Public License
|
|
|
+# along with DigiLib. If not, see <http://www.gnu.org/licen
|
|
|
+
|
|
|
+class CtrlManager(object):
|
|
|
+ """
|
|
|
+ The CtrlManager collects data from Collectors and analyses it.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ config: dict
|
|
|
+ dictionary holding config information. for more information see :doc:`/configure`
|
|
|
+
|
|
|
+ Attributes
|
|
|
+ ----------
|
|
|
+ self.timed_tg: curio.TaskGroup
|
|
|
+ A ``curio.TaskGroup`` in which all timed tasks will be spawned
|
|
|
+ """
|
|
|
+ def __init__(self):
|
|
|
+ super().__init__()
|
|
|
+ self.timed_tg = curio.TaskGroup(name="timed tasks")
|
|
|
+
|
|
|
+ async def astart(self):
|
|
|
+ """
|
|
|
+ This method starts the core. It does the setup and calls the run method.
|
|
|
+ """
|
|
|
+ # lets run the timed callers
|
|
|
+ await self.timed_tg.run(self.minutely())
|
|
|
+ await self.timed_tg.run(self.hourly())
|
|
|
+ await self.timed_tg.run(self.daily())
|
|
|
+
|
|
|
+ async def astop(self):
|
|
|
+ """
|
|
|
+ asynchronous stop method. cancels timed tasks and calls stop handlers
|
|
|
+ """
|
|
|
+ lcore.debug("canceling remaining timed tasks")
|
|
|
+ try:
|
|
|
+ self.timed_tg.cancel_remaining()
|
|
|
+ self.timed_tg.join()
|
|
|
+ except Exception as exc:
|
|
|
+ lcore.error("an error occured in a timed caller:",exc_info=exc)
|
|
|
+ # call the shutdown handlers
|
|
|
+ self.exec_coll_methods("shutdown")
|
|
|
+
|
|
|
+ async def call(self, ctrl_name, func_name, kwargs, respond):
|
|
|
+ """
|
|
|
+ The call method takes commands and calls the corresponding function with ``args`` and ``respond``. It treats all functions as asynchronous and logs the traceback if an exception occured.
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ ctrl_name: str
|
|
|
+ name of the controller
|
|
|
+ func_name: str
|
|
|
+ name of the function to call
|
|
|
+ kwargs: dict
|
|
|
+ the keyword arguments for the function
|
|
|
+ respond: function or method
|
|
|
+ a function to send error messages
|
|
|
+ """
|
|
|
+ ctrl = beewatch._controllers.get(ctrl_name,False)
|
|
|
+ if not ctrl:
|
|
|
+ self.respond("can't call function '{}' of controller '{}', "
|
|
|
+ "there is no such controller!")
|
|
|
+ return
|
|
|
+ func = getattr(ctrl,func_name,False)
|
|
|
+ if not func:
|
|
|
+ self.respond("can't call function '{}' of controller '{}', "
|
|
|
+ "the controller doesn have this function!")
|
|
|
+ return
|
|
|
+ try:
|
|
|
+ await func(**kwargs)
|
|
|
+ except Exception as e:
|
|
|
+ lch.error(
|
|
|
+ "an error was raised when calling func {}:".format(func),
|
|
|
+ exc_info=e)
|
|
|
+ tb = traceback.format_exc()
|
|
|
+ await self.respond(tb,log_msg="traceback of '{}'"
|
|
|
+ .format(e.__cause__))
|
|
|
+ # # task joins iself to suppress the "task not joined" warning
|
|
|
+ # cur_task = await curio.current_task()
|
|
|
+ # await curio.ignore_after(0,cur_task.wait)
|
|
|
+
|
|
|
+ async def daily(self):
|
|
|
+ """
|
|
|
+ This method is calls the collectors daily method once every day
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ while True:
|
|
|
+ await self.exec_coll_methods("daily")
|
|
|
+ # sleep one day
|
|
|
+ await curio.sleep(24*60*60*1000)
|
|
|
+ except TaskCancelled as exc:
|
|
|
+ # we catch this so when we join the timed_tg later we only get
|
|
|
+ # unexpected exceptions
|
|
|
+ lcore.debug("Daily loop was canceled")
|
|
|
+
|
|
|
+ async def exec_coll_methods(self,name):
|
|
|
+ """
|
|
|
+ This method calls the method of every controller with the name inside th name parameter
|
|
|
+
|
|
|
+ Parameters
|
|
|
+ ----------
|
|
|
+ name: str
|
|
|
+ The name of the controllers method which is to be called.
|
|
|
+ """
|
|
|
+ lcore.debug("executing every collector's {} function!".format(name))
|
|
|
+ for c in beewatch._collectors:
|
|
|
+ try:
|
|
|
+ method = getattr(c,name)
|
|
|
+ await method()
|
|
|
+ except TaskCancelled as exc:
|
|
|
+ raise
|
|
|
+ except Exception as exc:
|
|
|
+ lcore.error(
|
|
|
+ "an error occured when calling {}'s {} method!"
|
|
|
+ .format(repr(c),name))
|
|
|
+
|
|
|
+ async def hourly(self):
|
|
|
+ """
|
|
|
+ This method is calls the collectors hourly method once every hour
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ while True:
|
|
|
+ await self.exec_coll_methods("hourly")
|
|
|
+ # sleep one hour
|
|
|
+ await curio.sleep(60*60*1000)
|
|
|
+ except TaskCancelled as exc:
|
|
|
+ # we catch this so when we join the timed_tg later we only get
|
|
|
+ # unexpected exceptions
|
|
|
+ lcore.debug("Hourly loop was canceled")
|
|
|
+
|
|
|
+ async def minutely(self):
|
|
|
+ """
|
|
|
+ This method is calls the collectors minutely method once every minute
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ while True:
|
|
|
+ await self.exec_coll_methods("minutely")
|
|
|
+ # sleep one minute
|
|
|
+ await curio.sleep(60*1000)
|
|
|
+ except TaskCancelled as exc:
|
|
|
+ # we catch this so when we join the timed_tg later we only get
|
|
|
+ # unexpected exceptions
|
|
|
+ lcore.debug("Minutely loop was canceled")
|
|
|
+
|
|
|
+ def start(self):
|
|
|
+ """
|
|
|
+ synchronous start method wich calls astart asynchronously.
|
|
|
+ """
|
|
|
+ curio.run(self.async_start)
|
|
|
+
|
|
|
+ def stop(self):
|
|
|
+ """
|
|
|
+ synchronous stop method wich calls astop asynchronously.
|
|
|
+ """
|
|
|
+ curio.run(self.async_stop)
|