import contextlib import os import platform import subprocess import sys import time import yaml from glob import glob from slugify import slugify def term(): """Get the Terminal reference to make output pretty Returns: (blessings.Terminal): Returns a `blessings `_ terminal instance. If running in windows and not cygwin it will return an `intercessions `_ terminal instance instead """ if not hasattr(term, '_handle'): if sys.platform != "cygwin" and platform.system() == 'Windows': from intercessions import Terminal else: from blessings import Terminal term._handle = Terminal() return term._handle @contextlib.contextmanager def pushd(newDir): previousDir = os.getcwd() os.chdir(newDir) try: yield finally: os.chdir(previousDir) class StateException(Exception): pass class BackupException(Exception): pass class Job(object): pool = [] maxthreads = 4 READY = 0 ACTIVE = 1 RUNNING = 2 FAILED = 3 SUCCESSFUL = 4 @staticmethod def initPool(): if Job.pool: maxthreads = Job.maxthreads if len(Job.pool) < maxthreads: maxthreads = len(Job.pool) for i in range(0, maxthreads): Job.pool.pop(0).start() @staticmethod def finished(): return not [x.state for x in Job.pool] def __init__(self, backup, config): self._config = config self._backup = backup self._state = Job.READY def __str__(self): return 'Backup {0} Job #{1} ({2})'.format( self._backup.name, self._backup.jobs.index(self), { Job.READY: 'ready', Job.ACTIVE: 'active', Job.RUNNING: 'running', Job.FAILED: 'failed', Job.SUCCESSFUL: 'successful', }[self.state]) @property def config(self): return self._config @property def state(self): if self._state is Job.RUNNING: code = self._process.poll() if code is not None: self._state = Job.FAILED if code else Job.SUCCESSFUL Job.initPool() return self._state def queue(self): if self.state is not Job.READY: raise StateException('{} not in state to queue'.format(self)) if self in Job.pool: raise StateException('{} already in queued pool'.format(self)) self._state = Job.ACTIVE Job.pool.append(self) @property def args(self): if Backup.engine == "rdiff-backup": args = ['rdiff-backup'] for item in self.config['filters']: if 'include' in item: args += ['--include', item['include']] elif 'exclude' in item: args += ['--exclude', item['exclude']] else: raise BackupException( '{0} has an invalid filter {1}'.format(self, item)) return args + [self.fromPath, self.toPath] raise StateException('Invalid backup engine {}'.format(Backup.engine)) @property def logfile(self): return self._backup.logfile @property def fromPath(self): if not hasattr(self, '_fromPath'): fromPath = self.config['from'] if 'roots' in self._backup.config: roots = self._backup.config['roots'] if 'from' in roots: fromPath = os.path.join(roots['from'], fromPath) self._fromPath = fromPath return self._fromPath @property def toPath(self): if not hasattr(self, '_toPath'): toPath = self.config['to'] if 'roots' in self._backup.config: roots = self._backup.config['roots'] if 'to' in roots: toPath = os.path.join(roots['to'], toPath) self._toPath = toPath return self._toPath @property def process(self): return self._process def start(self): if self.state is not Job.ACTIVE: raise StateException('Invalid state to start {}'.format(self)) self._process = subprocess.Popen( self.args, stdout=self.logfile, stderr=self.logfile, stdin=subprocess.DEVNULL, universal_newlines=True) self._state = Job.RUNNING self._backup._state = Backup.RUNNING class Backup(object): instances = {} _queue = [] engine = 'rdiff-backup' logdir = '/var/log/backup.d' BLOCKED = 0 READY = 1 QUEUED = 2 RUNNING = 3 FAILED = 4 SUCCESSFUL = 5 @staticmethod def _log(): print('Backup status:') for backup in Backup.instances.values(): print(' {}'.format(backup)) for job in backup.pending: print(' {}'.format(job)) @staticmethod def load(paths): for path in paths: Backup.get(path) @staticmethod def blocked(): return [x for x in Backup.instances.values() if x.status is Backup.BLOCKED] @staticmethod def get(path): if path not in Backup.instances: Backup(sources()[path]) return Backup.instances[path] @staticmethod def start(): for backup in Backup.instances.values(): backup.queue() Job.initPool() @staticmethod def wait(log=False): while Backup._queue: for backup in Backup._queue: if backup.status is Backup.BLOCKED: for dependency in backup.blocking: if dependency.status is Backup.READY: dependency.queue() if log: Backup._log() time.sleep(1) def __init__(self, config): self._config = config self._path = self._config['path'] self._name = os.path.basename(self._path) self._logpath = os.path.realpath(os.path.join( Backup.logdir, '{}.log'.format(slugify(self._path)))) self._status = Backup.BLOCKED if self.blocking else Backup.READY Backup.instances[self._path] = self def __str__(self): return 'Backup {0} ({1}, {2} jobs)'.format( self.name, { Backup.BLOCKED: 'blocked', Backup.READY: 'ready', Backup.QUEUED: 'queued', Backup.RUNNING: 'running', Backup.FAILED: 'failed', Backup.SUCCESSFUL: 'successful' }[self.status], len([x for x in self.jobs if x.state not in (Job.FAILED, Job.SUCCESSFUL)])) @property def config(self): return self._config @property def name(self): return self._name @property def path(self): return self._path @property def logpath(self): return self._logpath @property def logfile(self): if not hasattr(self, '_logfile'): self._logfile = open(self.logpath, 'w') return self._logfile def log(self, text): self._logfile.write(text) @property def status(self): if self.blocking: if [x for x in self.blocking if x.status is Backup.FAILED]: self._status = Backup.FAILED elif self._status is not Backup.BLOCKED: self._status = Backup.BLOCKED elif self._status is Backup.BLOCKED: if self not in Backup._queue: self._status = Backup.READY else: self._status = Backup.QUEUED for job in self.ready: job.queue() elif self._status is Backup.QUEUED and self not in Backup._queue: self._status = Backup.READY if self._status in (Backup.RUNNING, Backup.QUEUED) and not self.pending: self._status = Backup.FAILED if self.failed else Backup.SUCCESSFUL if self._status in (Backup.FAILED, Backup.SUCCESSFUL) \ and self in Backup._queue: Backup._queue.remove(self) return self._status @property def blocking(self): return [x for x in self.depends if x.status is not Backup.SUCCESSFUL] @property def depends(self): if not hasattr(self, '_depends'): self._depends = [] for path in self.config["depends"]: if path not in Backup.instances: Backup(sources()[path]) self._depends.append(Backup.instances[path]) return self._depends @property def jobs(self): if not hasattr(self, '_jobs'): self._jobs = [] for job in self.config['jobs']: self._jobs.append(Job(self, job)) return self._jobs @property def pending(self): return [x for x in self.jobs if x.state not in (Job.FAILED, Job.SUCCESSFUL)] @property def ready(self): return [x for x in self.jobs if x.state is Job.READY] @property def failed(self): return [x for x in self.jobs if x.state is Job.FAILED] def setStatus(self, state): if state in (None, False, True): self._status = state def queue(self): if self in Backup._queue: raise StateException('Backup already queued') Backup._queue.append(self) self._status = Backup.QUEUED if self.status is not Backup.BLOCKED: for job in self.ready: job.queue() def config(): if hasattr(config, '_handle'): return config._handle with pushd('etc/backup.d'): with open("backup.yml") as f: config._handle = yaml.load(f, Loader=yaml.SafeLoader) return config._handle def sources(): if hasattr(sources, '_handle'): return sources._handle sources._handle = {} with pushd('etc/backup.d'): for source in config()['sources']: source = os.path.realpath(source) for path in glob('{}/*.yml'.format(source)): path = os.path.realpath(path) with pushd(os.path.dirname(path)), open(path) as f: data = yaml.load(f, Loader=yaml.SafeLoader) if "active" in data and data["active"]: data['path'] = path if "depends" not in data: data["depends"] = [] for i in range(0, len(data["depends"])): data["depends"][i] = os.path.realpath( '{}.yml'.format(data["depends"][i])) sources._handle[path] = data return sources._handle def main(args): if 'engine' in config(): engine = config()["engine"] if engine not in ("rdiff-backup"): raise BackupException('Unknown backup engine: {}'.format(engine)) Backup.engine = engine if 'logdir' in config(): logdir = config()['logdir'] os.makedirs(logdir, exist_ok=True) if not os.path.exists(logdir): raise BackupException( 'Unable to create logging directory: {}'.format(logdir)) Backup.logdir = logdir if 'maxthreads' in config(): Job.maxthreads = config()['maxthreads'] Backup.load(sources().keys()) Backup._log() Backup.start() Backup._log() Backup.wait(True) if __name__ == '__main__': try: main(sys.argv[1:]) except BackupException as ex: print(ex) sys.exit(1) except Exception: from traceback import format_exc msg = "Error encountered:\n" + format_exc().strip() print(msg) sys.exit(1)