import contextlib import os import platform import psutil import shlex import subprocess import sys import time import yaml from datetime import datetime from glob import glob from slugify import slugify def term(): """Get the Terminal reference to make output pretty Returns: (blessings.Terminal): Returns a `blessings `_ terminal instance. If running in windows and not cygwin it will return an `intercessions `_ terminal instance instead """ if not hasattr(term, '_handle'): if sys.platform != "cygwin" and platform.system() == 'Windows': from intercessions import Terminal else: from blessings import Terminal term._handle = Terminal() return term._handle @contextlib.contextmanager def pushd(newDir): previousDir = os.getcwd() os.chdir(newDir) try: yield finally: os.chdir(previousDir) class StateException(Exception): pass class BackupException(Exception): pass class Job(object): pool = [] maxthreads = 4 verbosity = 3 logTransitions = False READY = 0 QUEUED = 1 STARTING = 2 RUNNING = 3 ENDING = 4 FAILED = 5 SUCCESSFUL = 6 @staticmethod def initPool(): if Job.pool: maxthreads = Job.maxthreads running = len(Job.running()) if maxthreads > running: queued = Job.queued() if len(queued) < maxthreads - running: maxthreads = len(queued) if maxthreads: for i in range(0, maxthreads): if len(queued) > i: queued[i].start() @staticmethod def finished(): return [x for x in Job.pool if x.state in (Job.FAILED, Job.SUCCESSFUL)] @staticmethod def pending(): return [x for x in Job.pool if x.state not in (Job.FAILED, Job.SUCCESSFUL)] @staticmethod def running(): return [x for x in Job.pool if x.state in (Job.STARTING, Job.RUNNING, Job.ENDING)] @staticmethod def queued(): return [x for x in Job.pending() if x.state is Job.QUEUED] def __init__(self, backup, config): self._config = config self._backup = backup self._state = Job.READY def __str__(self): return 'Backup {0} Job #{1} ({2})'.format( self._backup.name, self.index, self.getState()) @property def config(self): return self._config @property def state(self): if self._state is Job.READY and self in Job.pool: self.setState(Job.QUEUED) elif self._state in (Job.STARTING, Job.RUNNING, Job.ENDING): code = self._process.poll() or self._process.returncode if code is None and not psutil.pid_exists(self._process.pid): code = -1 if code is not None: if code: self.setState(Job.FAILED) Job.initPool() elif self._state is Job.ENDING: self.setState(Job.SUCCESSFUL) Job.initPool() else: self.start() return self._state def queue(self): if self.state is not Job.READY: raise StateException('{} not in state to queue'.format(self)) if self in Job.pool: raise StateException('{} already in queued pool'.format(self)) self.setState(Job.QUEUED) Job.pool.append(self) @property def args(self): if self._state is Job.STARTING: return shlex.split(self.pre) elif self._state is Job.RUNNING: if Backup.engine == "rdiff-backup": args = ['rdiff-backup', '-v{}'.format(Backup.verbosity)] if 'filters' in self.config: for item in self.config['filters']: if 'include' in item: args += ['--include', item['include']] elif 'exclude' in item: args += ['--exclude', item['exclude']] else: raise BackupException( '{0} has an invalid filter {1}'.format(self, item)) return args + [self.fromPath, self.toPath] else: raise StateException( 'Invalid backup engine {}'.format(Backup.engine)) elif self._state is Job.ENDING: return shlex.split(self.post) else: raise StateException('Invalid state {}'.format(self.getState())) @property def logfile(self): if not hasattr(self, '_logfile'): path = os.path.dirname(self.logpath) if not os.path.exists(path): os.makedirs(path, exist_ok=True) self._logfile = open(self.logpath, 'w' if Backup.truncateLogs else 'a') return self._logfile @property def logpath(self): if not hasattr(self, '_logpath'): self._logpath = os.path.join(os.path.dirname( self._backup.logpath), 'job{}.log'.format(self.index)) return self._logpath @property def fromPath(self): if not hasattr(self, '_fromPath'): fromPath = self.config['from'] if 'roots' in self._backup.config: roots = self._backup.config['roots'] if 'from' in roots: if '::' in roots['from']: if roots['from'].endswith('::'): fromPath = roots['from'] + fromPath else: fromPath = roots['from'] + os.sep + fromPath else: fromPath = os.path.join(roots['from'], fromPath) self._fromPath = fromPath return self._fromPath @property def toPath(self): if not hasattr(self, '_toPath'): toPath = self.config['to'] if 'roots' in self._backup.config: roots = self._backup.config['roots'] if 'to' in roots: if '::' in roots['to']: if roots['to'].endswith('::'): toPath = roots['to'] + toPath else: toPath = roots['to'] + os.sep + toPath else: toPath = os.path.join(roots['to'], toPath) self._toPath = toPath return self._toPath @property def process(self): return self._process @property def index(self): if not hasattr(self, '_index'): self._index = self._backup.jobs.index(self) return self._index @property def pre(self): if not hasattr(self, '_pre'): self._pre = self.config['pre'] if 'pre' in self.config else None return self._pre @property def post(self): if not hasattr(self, '_post'): self._post = self.config['post'] if 'post' in self.config else None return self._post def log(self, text): text = '[{0}] (Backup {1} Job #{2}) {3}\n'.format( datetime.now().isoformat(), self._backup.name, self.index, text) print(text, end='') self._backup.logfile.write(text) self._backup.logfile.flush() def start(self): if self._state is self.QUEUED: self._backup.setStatus(Backup.RUNNING) self.setState(Job.STARTING) if self.pre is None: self.setState(Job.RUNNING) elif self._state is self.STARTING: self.setState(Job.RUNNING) elif self._state is self.RUNNING: self.setState(Job.ENDING) if self.post is None: self.setState(Job.SUCCESSFUL) return else: raise StateException('Invalid state to start {}'.format(self)) args = self.args self.logfile.write( "[{0}] {1} &\n".format( datetime.now().isoformat(), ' '.join([shlex.quote(x) for x in args]))) self.logfile.flush() self._process = subprocess.Popen( args, stdout=self.logfile, stderr=subprocess.STDOUT, stdin=subprocess.DEVNULL, universal_newlines=True, bufsize=1) def setState(self, state): if self._state != state: self.log('{0} -> {1}'.format( self.getState(self._state), self.getState(state))) self._state = state if state in (Job.SUCCESSFUL, Job.FAILED): self.logfile.close() def getState(self, state=None): return { Job.READY: 'ready', Job.QUEUED: 'queued', Job.STARTING: 'starting', Job.RUNNING: 'running', Job.ENDING: 'ending', Job.FAILED: 'failed', Job.SUCCESSFUL: 'successful', }[self.state if state is None else state] class Backup(object): instances = {} _queue = [] logTransitions = False truncateLogs = True engine = 'rdiff-backup' logdir = '/var/log/backup.d' BLOCKED = 0 READY = 1 QUEUED = 2 RUNNING = 3 FAILED = 4 SUCCESSFUL = 5 @staticmethod def _log(): print('Backup status:') for backup in Backup.instances.values(): print(' {}'.format(backup)) for job in backup.pending: print(' {}'.format(job)) @staticmethod def load(paths): for path in paths: Backup.get(path) @staticmethod def blocked(): return [x for x in Backup.instances.values() if x.status is Backup.BLOCKED] @staticmethod def get(path): if path not in Backup.instances: Backup(sources()[path]) return Backup.instances[path] @staticmethod def start(): for backup in Backup.instances.values(): backup.queue() Job.initPool() @staticmethod def wait(log=False): while Backup._queue: for backup in Backup._queue: if backup.status is Backup.BLOCKED: for dependency in backup.blocking: if dependency.status is Backup.READY: dependency.queue() if log: Backup._log() time.sleep(1) def __init__(self, config): self._config = config self._path = self._config['path'] self._name = slugify(self._path, max_length=255) self._logpath = os.path.realpath(os.path.join( Backup.logdir, self.name, 'backup.log')) self._status = Backup.READY if self.blocking: self.setStatus(Backup.BLOCKED) Backup.instances[self._path] = self def __str__(self): return 'Backup {0} ({1}, {2} jobs)'.format( self.name, self.getStatus(), len([x for x in self.jobs if x.state not in (Job.FAILED, Job.SUCCESSFUL)])) @property def config(self): return self._config @property def name(self): return self._name @property def path(self): return self._path @property def logpath(self): return self._logpath @property def logfile(self): if not hasattr(self, '_logfile'): path = os.path.dirname(self.logpath) if not os.path.exists(path): os.makedirs(path, exist_ok=True) self._logfile = open(self.logpath, 'w' if Backup.truncateLogs else 'a') return self._logfile def log(self, text): text = '[{0}] (Backup {1}) {2}\n'.format( datetime.now().isoformat(), self.name, text) print(text, end='') self.logfile.write(text) self.logfile.flush() @property def status(self): if self.blocking: if [x for x in self.blocking if x.status is Backup.FAILED]: self.setStatus(Backup.FAILED) elif self._status is not Backup.BLOCKED: self.setStatus(Backup.BLOCKED) elif self._status is Backup.BLOCKED: if self not in Backup._queue: self.setStatus(Backup.READY) else: self.setStatus(Backup.QUEUED) for job in self.ready: job.queue() Job.initPool() elif self._status is Backup.QUEUED and self not in Backup._queue: self.setStatus(Backup.READY) if self._status in (Backup.RUNNING, Backup.QUEUED) and not self.pending: self.setStatus(Backup.FAILED if self.failed else Backup.SUCCESSFUL) if self._status in (Backup.FAILED, Backup.SUCCESSFUL) \ and self in Backup._queue: Backup._queue.remove(self) return self._status @property def blocking(self): return [x for x in self.depends if x.status is not Backup.SUCCESSFUL] @property def depends(self): if not hasattr(self, '_depends'): self._depends = [] for path in self.config["depends"]: if path not in Backup.instances: Backup(sources()[path]) self._depends.append(Backup.instances[path]) return self._depends @property def jobs(self): if not hasattr(self, '_jobs'): self._jobs = [] if 'jobs' in self.config: for job in self.config['jobs']: self._jobs.append(Job(self, job)) return self._jobs @property def pending(self): return [x for x in self.jobs if x.state not in (Job.FAILED, Job.SUCCESSFUL)] @property def ready(self): return [x for x in self.jobs if x.state is Job.READY] @property def failed(self): return [x for x in self.jobs if x.state is Job.FAILED] def setStatus(self, status): if self._status != status: self.log('{0} -> {1}'.format( self.getStatus(self._status), self.getStatus(status))) self._status = status if status in (Backup.SUCCESSFUL, Backup.FAILED): self.logfile.close() def getStatus(self, status=None): return { Backup.BLOCKED: 'blocked', Backup.READY: 'ready', Backup.QUEUED: 'queued', Backup.RUNNING: 'running', Backup.FAILED: 'failed', Backup.SUCCESSFUL: 'successful' }[self.status if status is None else status] def queue(self): if self in Backup._queue: raise StateException('Backup already queued') Backup._queue.append(self) self.setStatus(Backup.QUEUED) if self.status is not Backup.BLOCKED: for job in self.ready: job.queue() Job.initPool() def config(): if hasattr(config, '_handle'): return config._handle with pushd(config._root): with open("backup.yml") as f: config._handle = yaml.load(f, Loader=yaml.SafeLoader) return config._handle def sources(): if hasattr(sources, '_handle'): return sources._handle sources._handle = {} with pushd(config._root): for source in config()['sources']: source = os.path.realpath(source) for path in glob('{}/*.yml'.format(source)) + \ glob('{}/*.yaml'.format(source)): path = os.path.realpath(path) with pushd(os.path.dirname(path)), open(path) as f: data = yaml.load(f, Loader=yaml.SafeLoader) if "active" in data and data["active"]: data['path'] = path if "depends" not in data: data["depends"] = [] for i in range(0, len(data["depends"])): data["depends"][i] = os.path.realpath( '{}.yml'.format(data["depends"][i])) sources._handle[path] = data return sources._handle def main(args): try: config._root = args[0] if len(args) else '/etc/backup.d' if not os.path.exists(config._root): raise BackupException( 'Configuration files missing from {}'.format(config._root)) if 'engine' in config(): engine = config()["engine"] if engine not in ("rdiff-backup"): raise BackupException('Unknown backup engine: {}'.format(engine)) Backup.engine = engine if 'logdir' in config(): logdir = config()['logdir'] os.makedirs(logdir, exist_ok=True) if not os.path.exists(logdir): raise BackupException( 'Unable to create logging directory: {}'.format(logdir)) Backup.logdir = logdir if 'maxthreads' in config(): Job.maxthreads = config()['maxthreads'] if 'verbosity' in config(): Backup.verbosity = config()['verbosity'] Backup.logTransitions = Job.logTransitions = True Backup.truncateLogs = "truncateLogs" in config() and config["truncateLogs"] Backup.load(sources().keys()) Backup.start() Backup.wait() except BackupException as ex: print(ex) sys.exit(1) except Exception: from traceback import format_exc msg = "Error encountered:\n" + format_exc().strip() print(msg) sys.exit(1) if __name__ == '__main__': main(sys.argv[1:])