123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403 |
- import contextlib
- import os
- import platform
- import subprocess
- import sys
- import time
- import yaml
- from glob import glob
- from slugify import slugify
- def term():
- """Get the Terminal reference to make output pretty
- Returns:
- (blessings.Terminal): Returns
- a `blessings <https://blessings.readthedocs.io/en/latest>`_ terminal
- instance. If running in windows and not cygwin it will return an
- `intercessions <https://pypi.org/project/intercessions>`_ terminal
- instance instead
- """
- if not hasattr(term, '_handle'):
- if sys.platform != "cygwin" and platform.system() == 'Windows':
- from intercessions import Terminal
- else:
- from blessings import Terminal
- term._handle = Terminal()
- return term._handle
- @contextlib.contextmanager
- def pushd(newDir):
- previousDir = os.getcwd()
- os.chdir(newDir)
- try:
- yield
- finally:
- os.chdir(previousDir)
- class StateException(Exception):
- pass
- class BackupException(Exception):
- pass
- class Job(object):
- pool = []
- maxthreads = 4
- READY = 0
- ACTIVE = 1
- RUNNING = 2
- FAILED = 3
- SUCCESSFUL = 4
- @staticmethod
- def initPool():
- if Job.pool:
- maxthreads = Job.maxthreads
- if len(Job.pool) < maxthreads:
- maxthreads = len(Job.pool)
- for i in range(0, maxthreads):
- Job.pool.pop(0).start()
- @staticmethod
- def finished():
- return not [x.state for x in Job.pool]
- def __init__(self, backup, config):
- self._config = config
- self._backup = backup
- self._state = Job.READY
- def __str__(self):
- return 'Backup {0} Job #{1} ({2})'.format(
- self._backup.name, self._backup.jobs.index(self), {
- Job.READY: 'ready',
- Job.ACTIVE: 'active',
- Job.RUNNING: 'running',
- Job.FAILED: 'failed',
- Job.SUCCESSFUL: 'successful',
- }[self.state])
- @property
- def config(self):
- return self._config
- @property
- def state(self):
- if self._state is Job.RUNNING:
- code = self._process.poll()
- if code is not None:
- self._state = Job.FAILED if code else Job.SUCCESSFUL
- Job.initPool()
- return self._state
- def queue(self):
- if self.state is not Job.READY:
- raise StateException('{} not in state to queue'.format(self))
- if self in Job.pool:
- raise StateException('{} already in queued pool'.format(self))
- self._state = Job.ACTIVE
- Job.pool.append(self)
- @property
- def args(self):
- return Backup.args + []
- @property
- def logfile(self):
- return self._backup.logfile
- @property
- def process(self):
- return self._process
- def start(self):
- if self.state is not Job.ACTIVE:
- raise StateException('Invalid state to start {}'.format(self))
- self._process = subprocess.Popen(
- self.args, stdout=self.logfile, stderr=self.logfile,
- stdin=subprocess.DEVNULL, universal_newlines=True)
- self._state = Job.RUNNING
- self._backup._state = Backup.RUNNING
- class Backup(object):
- instances = {}
- _queue = []
- args = ['rdiff-backup']
- logdir = '/var/log/backup.d'
- BLOCKED = 0
- READY = 1
- QUEUED = 2
- RUNNING = 3
- FAILED = 4
- SUCCESSFUL = 5
- @staticmethod
- def _log():
- print('Backup status:')
- for backup in Backup.instances.values():
- print(' {}'.format(backup))
- for job in backup.pending:
- print(' {}'.format(job))
- @staticmethod
- def load(paths):
- for path in paths:
- Backup.get(path)
- @staticmethod
- def blocked():
- return [x for x in Backup.instances.values()
- if x.status is Backup.BLOCKED]
- @staticmethod
- def get(path):
- if path not in Backup.instances:
- Backup(sources()[path])
- return Backup.instances[path]
- @staticmethod
- def start():
- for backup in Backup.instances.values():
- backup.queue()
- Job.initPool()
- @staticmethod
- def wait(log=False):
- while Backup._queue:
- for backup in Backup._queue:
- if backup.status is Backup.BLOCKED:
- for dependency in backup.blocking:
- if dependency.status is Backup.READY:
- dependency.queue()
- if log:
- Backup._log()
- time.sleep(1)
- def __init__(self, config):
- self._config = config
- self._path = self._config['path']
- self._name = os.path.basename(self._path)
- self._logpath = os.path.realpath(os.path.join(
- Backup.logdir, '{}.log'.format(slugify(self._path))))
- self._status = Backup.BLOCKED if self.blocking else Backup.READY
- Backup.instances[self._path] = self
- def __str__(self):
- return 'Backup {0} ({1}, {2} jobs)'.format(
- self.name, {
- Backup.BLOCKED: 'blocked',
- Backup.READY: 'ready',
- Backup.QUEUED: 'queued',
- Backup.RUNNING: 'running',
- Backup.FAILED: 'failed',
- Backup.SUCCESSFUL: 'successful'
- }[self.status], len([x for x in self.jobs
- if x.state not in (Job.FAILED, Job.SUCCESSFUL)]))
- @property
- def config(self):
- return self._config
- @property
- def name(self):
- return self._name
- @property
- def path(self):
- return self._path
- @property
- def logpath(self):
- return self._logpath
- @property
- def logfile(self):
- if not hasattr(self, '_logfile'):
- self._logfile = open(self.logpath, 'w')
- return self._logfile
- def log(self, text):
- self._logfile.write(text)
- @property
- def status(self):
- if self.blocking:
- if [x for x in self.blocking if x.status is Backup.FAILED]:
- self._status = Backup.FAILED
- elif self._status is not Backup.BLOCKED:
- self._status = Backup.BLOCKED
- elif self._status is Backup.BLOCKED:
- if self not in Backup._queue:
- self._status = Backup.READY
- else:
- self._status = Backup.QUEUED
- for job in self.ready:
- job.queue()
- elif self._status is Backup.QUEUED and self not in Backup._queue:
- self._status = Backup.READY
- if self._status in (Backup.RUNNING, Backup.QUEUED) and not self.pending:
- self._status = Backup.FAILED if self.failed else Backup.SUCCESSFUL
- if self._status in (Backup.FAILED, Backup.SUCCESSFUL) \
- and self in Backup._queue:
- Backup._queue.remove(self)
- return self._status
- @property
- def blocking(self):
- return [x for x in self.depends
- if x.status is not Backup.SUCCESSFUL]
- @property
- def depends(self):
- if not hasattr(self, '_depends'):
- self._depends = []
- for path in self.config["depends"]:
- if path not in Backup.instances:
- Backup(sources()[path])
- self._depends.append(Backup.instances[path])
- return self._depends
- @property
- def jobs(self):
- if not hasattr(self, '_jobs'):
- self._jobs = []
- for job in self.config['jobs']:
- self._jobs.append(Job(self, job))
- return self._jobs
- @property
- def pending(self):
- return [x for x in self.jobs if x.state not in (Job.FAILED, Job.SUCCESSFUL)]
- @property
- def ready(self):
- return [x for x in self.jobs if x.state is Job.READY]
- @property
- def failed(self):
- return [x for x in self.jobs if x.state is Job.FAILED]
- def setStatus(self, state):
- if state in (None, False, True):
- self._status = state
- def queue(self):
- if self in Backup._queue:
- raise StateException('Backup already queued')
- Backup._queue.append(self)
- self._status = Backup.QUEUED
- if self.status is not Backup.BLOCKED:
- for job in self.ready:
- job.queue()
- def config():
- if hasattr(config, '_handle'):
- return config._handle
- with pushd('etc/backup.d'):
- with open("backup.yml") as f:
- config._handle = yaml.load(f, Loader=yaml.SafeLoader)
- return config._handle
- def sources():
- if hasattr(sources, '_handle'):
- return sources._handle
- sources._handle = {}
- with pushd('etc/backup.d'):
- for source in config()['sources']:
- source = os.path.realpath(source)
- for path in glob('{}/*.yml'.format(source)):
- path = os.path.realpath(path)
- with pushd(os.path.dirname(path)), open(path) as f:
- data = yaml.load(f, Loader=yaml.SafeLoader)
- if "active" in data and data["active"]:
- data['path'] = path
- if "depends" not in data:
- data["depends"] = []
- for i in range(0, len(data["depends"])):
- data["depends"][i] = os.path.realpath(
- '{}.yml'.format(data["depends"][i]))
- sources._handle[path] = data
- return sources._handle
- def main(args):
- if 'engine' in config():
- engine = config()["engine"]
- if engine not in ("rdiff-backup"):
- raise BackupException('Unknown backup engine: {}'.format(engine))
- Backup.args = [engine]
- if 'logdir' in config():
- logdir = config()['logdir']
- os.makedirs(logdir, exist_ok=True)
- if not os.path.exists(logdir):
- raise BackupException(
- 'Unable to create logging directory: {}'.format(logdir))
- Backup.logdir = logdir
- if 'maxthreads' in config():
- Job.maxthreads = config()['maxthreads']
- Backup.load(sources().keys())
- Backup._log()
- Backup.start()
- Backup._log()
- Backup.wait(True)
- if __name__ == '__main__':
- try:
- main(sys.argv[1:])
- except BackupException as ex:
- print(ex)
- sys.exit(1)
- except Exception:
- from traceback import format_exc
- msg = "Error encountered:\n" + format_exc().strip()
- print(msg)
- sys.exit(1)
|