123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- # Copyright 2017 Digital
- #
- # This file is part of DigiLib.
- #
- # DigiLib is free software: you can redistribute it and/or modify
- # it under the terms of the GNU General Public License as published by
- # the Free Software Foundation, either version 3 of the License, or
- # (at your option) any later version.
- #
- # DigiLib is distributed in the hope that it will be useful,
- # but WITHOUT ANY WARRANTY; without even the implied warranty of
- # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- # GNU General Public License for more details.
- #
- # You should have received a copy of the GNU General Public License
- # along with DigiLib. If not, see <http://www.gnu.org/licen
- class CtrlManager(object):
- """
- The CtrlManager collects data from Collectors and analyses it.
- Parameters
- ----------
- config: dict
- dictionary holding config information. for more information see :doc:`/configure`
- Attributes
- ----------
- self.timed_tg: curio.TaskGroup
- A ``curio.TaskGroup`` in which all timed tasks will be spawned
- """
- def __init__(self):
- super().__init__()
- self.timed_tg = curio.TaskGroup(name="timed tasks")
- async def astart(self):
- """
- This method starts the core. It does the setup and calls the run method.
- """
- # lets run the timed callers
- await self.timed_tg.run(self.minutely())
- await self.timed_tg.run(self.hourly())
- await self.timed_tg.run(self.daily())
- async def astop(self):
- """
- asynchronous stop method. cancels timed tasks and calls stop handlers
- """
- lcore.debug("canceling remaining timed tasks")
- try:
- self.timed_tg.cancel_remaining()
- self.timed_tg.join()
- except Exception as exc:
- lcore.error("an error occured in a timed caller:",exc_info=exc)
- # call the shutdown handlers
- self.exec_coll_methods("shutdown")
- async def call(self, ctrl_name, func_name, kwargs, respond):
- """
- The call method takes commands and calls the corresponding function with ``args`` and ``respond``. It treats all functions as asynchronous and logs the traceback if an exception occured.
- Parameters
- ----------
- ctrl_name: str
- name of the controller
- func_name: str
- name of the function to call
- kwargs: dict
- the keyword arguments for the function
- respond: function or method
- a function to send error messages
- """
- ctrl = beewatch._controllers.get(ctrl_name,False)
- if not ctrl:
- self.respond("can't call function '{}' of controller '{}', "
- "there is no such controller!")
- return
- func = getattr(ctrl,func_name,False)
- if not func:
- self.respond("can't call function '{}' of controller '{}', "
- "the controller doesn have this function!")
- return
- try:
- await func(**kwargs)
- except Exception as e:
- lch.error(
- "an error was raised when calling func {}:".format(func),
- exc_info=e)
- tb = traceback.format_exc()
- await self.respond(tb,log_msg="traceback of '{}'"
- .format(e.__cause__))
- # # task joins iself to suppress the "task not joined" warning
- # cur_task = await curio.current_task()
- # await curio.ignore_after(0,cur_task.wait)
- async def daily(self):
- """
- This method is calls the collectors daily method once every day
- """
- try:
- while True:
- await self.exec_coll_methods("daily")
- # sleep one day
- await curio.sleep(24*60*60*1000)
- except TaskCancelled as exc:
- # we catch this so when we join the timed_tg later we only get
- # unexpected exceptions
- lcore.debug("Daily loop was canceled")
- async def exec_coll_methods(self,name):
- """
- This method calls the method of every controller with the name inside th name parameter
- Parameters
- ----------
- name: str
- The name of the controllers method which is to be called.
- """
- lcore.debug("executing every collector's {} function!".format(name))
- for c in beewatch._collectors:
- try:
- method = getattr(c,name)
- await method()
- except TaskCancelled as exc:
- raise
- except Exception as exc:
- lcore.error(
- "an error occured when calling {}'s {} method!"
- .format(repr(c),name))
- async def hourly(self):
- """
- This method is calls the collectors hourly method once every hour
- """
- try:
- while True:
- await self.exec_coll_methods("hourly")
- # sleep one hour
- await curio.sleep(60*60*1000)
- except TaskCancelled as exc:
- # we catch this so when we join the timed_tg later we only get
- # unexpected exceptions
- lcore.debug("Hourly loop was canceled")
- async def minutely(self):
- """
- This method is calls the collectors minutely method once every minute
- """
- try:
- while True:
- await self.exec_coll_methods("minutely")
- # sleep one minute
- await curio.sleep(60*1000)
- except TaskCancelled as exc:
- # we catch this so when we join the timed_tg later we only get
- # unexpected exceptions
- lcore.debug("Minutely loop was canceled")
- def start(self):
- """
- synchronous start method wich calls astart asynchronously.
- """
- curio.run(self.async_start)
- def stop(self):
- """
- synchronous stop method wich calls astop asynchronously.
- """
- curio.run(self.async_stop)
|