123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614 |
- import contextlib
- import os
- import platform
- import psutil
- import shlex
- import subprocess
- import sys
- import time
- import yaml
- from glob import glob
- from slugify import slugify
- def term():
- """Get the Terminal reference to make output pretty
- Returns:
- (blessings.Terminal): Returns
- a `blessings <https://blessings.readthedocs.io/en/latest>`_ terminal
- instance. If running in windows and not cygwin it will return an
- `intercessions <https://pypi.org/project/intercessions>`_ terminal
- instance instead
- """
- if not hasattr(term, '_handle'):
- if sys.platform != "cygwin" and platform.system() == 'Windows':
- from intercessions import Terminal
- else:
- from blessings import Terminal
- term._handle = Terminal()
- return term._handle
- @contextlib.contextmanager
- def pushd(newDir):
- previousDir = os.getcwd()
- os.chdir(newDir)
- try:
- yield
- finally:
- os.chdir(previousDir)
- class StateException(Exception):
- pass
- class BackupException(Exception):
- pass
- class Job(object):
- pool = []
- maxthreads = 4
- verbosity = 3
- logTransitions = False
- READY = 0
- QUEUED = 1
- STARTING = 2
- RUNNING = 3
- ENDING = 4
- FAILED = 5
- SUCCESSFUL = 6
- @staticmethod
- def initPool():
- if Job.pool:
- maxthreads = Job.maxthreads
- running = len(Job.running())
- if maxthreads > running:
- queued = Job.queued()
- if len(queued) < maxthreads - running:
- maxthreads = len(queued)
- if maxthreads:
- for i in range(0, maxthreads):
- if len(queued) > i:
- queued[i].start()
- @staticmethod
- def finished():
- return [x for x in Job.pool if x.state in (Job.FAILED, Job.SUCCESSFUL)]
- @staticmethod
- def pending():
- return [x for x in Job.pool
- if x.state not in (Job.FAILED, Job.SUCCESSFUL)]
- @staticmethod
- def running():
- return [x for x in Job.pool
- if x.state in (Job.STARTING, Job.RUNNING, Job.ENDING)]
- @staticmethod
- def queued():
- return [x for x in Job.pending() if x.state is Job.QUEUED]
- def __init__(self, backup, config):
- self._config = config
- self._backup = backup
- self._state = Job.READY
- def __str__(self):
- return 'Backup {0} Job #{1} ({2})'.format(
- self._backup.name, self.index, self.getState())
- @property
- def config(self):
- return self._config
- @property
- def state(self):
- if self._state is Job.READY and self in Job.pool:
- self.setState(Job.QUEUED)
- elif self._state in (Job.STARTING, Job.RUNNING, Job.ENDING):
- code = self._process.poll() or self._process.returncode
- if code is None and not psutil.pid_exists(self._process.pid):
- code = -1
- if code is not None:
- if code:
- self.setState(Job.FAILED)
- Job.initPool()
- elif self._state is Job.ENDING:
- self.setState(Job.SUCCESSFUL)
- Job.initPool()
- else:
- self.start()
- return self._state
- def queue(self):
- if self.state is not Job.READY:
- raise StateException('{} not in state to queue'.format(self))
- if self in Job.pool:
- raise StateException('{} already in queued pool'.format(self))
- self.setState(Job.QUEUED)
- Job.pool.append(self)
- @property
- def args(self):
- if self._state is Job.STARTING:
- return shlex.split(self.pre)
- elif self._state is Job.RUNNING:
- if Backup.engine == "rdiff-backup":
- args = ['rdiff-backup', '-v{}'.format(Backup.verbosity)]
- if 'filters' in self.config:
- for item in self.config['filters']:
- if 'include' in item:
- args += ['--include', item['include']]
- elif 'exclude' in item:
- args += ['--exclude', item['exclude']]
- else:
- raise BackupException(
- '{0} has an invalid filter {1}'.format(self, item))
- return args + [self.fromPath, self.toPath]
- else:
- raise StateException(
- 'Invalid backup engine {}'.format(Backup.engine))
- elif self._state is Job.ENDING:
- return shlex.split(self.post)
- else:
- raise StateException('Invalid state {}'.format(self.getState()))
- @property
- def logfile(self):
- if not hasattr(self, '_logfile'):
- path = os.path.dirname(self.logpath)
- if not os.path.exists(path):
- os.makedirs(path, exist_ok=True)
- self._logfile = open(self.logpath, 'w')
- return self._logfile
- @property
- def logpath(self):
- if not hasattr(self, '_logpath'):
- self._logpath = os.path.join(os.path.dirname(
- self._backup.logpath), 'job{}.log'.format(self.index))
- return self._logpath
- @property
- def fromPath(self):
- if not hasattr(self, '_fromPath'):
- fromPath = self.config['from']
- if 'roots' in self._backup.config:
- roots = self._backup.config['roots']
- if 'from' in roots:
- if '::' in roots['from']:
- if roots['from'].endswith('::'):
- fromPath = roots['from'] + fromPath
- else:
- fromPath = roots['from'] + os.sep + fromPath
- else:
- fromPath = os.path.join(roots['from'], fromPath)
- self._fromPath = fromPath
- return self._fromPath
- @property
- def toPath(self):
- if not hasattr(self, '_toPath'):
- toPath = self.config['to']
- if 'roots' in self._backup.config:
- roots = self._backup.config['roots']
- if 'to' in roots:
- if '::' in roots['to']:
- if roots['to'].endswith('::'):
- toPath = roots['to'] + toPath
- else:
- toPath = roots['to'] + os.sep + toPath
- else:
- toPath = os.path.join(roots['to'], toPath)
- self._toPath = toPath
- return self._toPath
- @property
- def process(self):
- return self._process
- @property
- def index(self):
- if not hasattr(self, '_index'):
- self._index = self._backup.jobs.index(self)
- return self._index
- @property
- def pre(self):
- if not hasattr(self, '_pre'):
- self._pre = self.config['pre'] if 'pre' in self.config else None
- return self._pre
- @property
- def post(self):
- if not hasattr(self, '_post'):
- self._post = self.config['post'] if 'post' in self.config else None
- return self._post
- def log(self, text):
- text = '[Backup {0} Job #{1}] {2}\n'.format(
- self._backup.name, self.index, text)
- print(text, end='')
- self._backup.logfile.write(text)
- self._backup.logfile.flush()
- def start(self):
- if self._state is self.QUEUED:
- self._backup.setStatus(Backup.RUNNING)
- self.setState(Job.STARTING)
- if self.pre is None:
- self.setState(Job.RUNNING)
- elif self._state is self.STARTING:
- self.setState(Job.RUNNING)
- elif self._state is self.RUNNING:
- self.setState(Job.ENDING)
- if self.post is None:
- self.setState(Job.SUCCESSFUL)
- return
- else:
- raise StateException('Invalid state to start {}'.format(self))
- args = self.args
- self.logfile.write(' '.join([shlex.quote(x) for x in args]) + '\n')
- self.logfile.flush()
- self._process = subprocess.Popen(
- args, stdout=self.logfile, stderr=subprocess.STDOUT,
- stdin=subprocess.DEVNULL, universal_newlines=True, bufsize=1)
- def setState(self, state):
- if self._state != state:
- self.log('{0} -> {1}'.format(
- self.getState(self._state), self.getState(state)))
- self._state = state
- if state in (Job.SUCCESSFUL, Job.FAILED):
- self.logfile.close()
- def getState(self, state=None):
- return {
- Job.READY: 'ready',
- Job.QUEUED: 'queued',
- Job.STARTING: 'starting',
- Job.RUNNING: 'running',
- Job.ENDING: 'ending',
- Job.FAILED: 'failed',
- Job.SUCCESSFUL: 'successful',
- }[self.state if state is None else state]
- class Backup(object):
- instances = {}
- _queue = []
- logTransitions = False
- engine = 'rdiff-backup'
- logdir = '/var/log/backup.d'
- BLOCKED = 0
- READY = 1
- QUEUED = 2
- RUNNING = 3
- FAILED = 4
- SUCCESSFUL = 5
- @staticmethod
- def _log():
- print('Backup status:')
- for backup in Backup.instances.values():
- print(' {}'.format(backup))
- for job in backup.pending:
- print(' {}'.format(job))
- @staticmethod
- def load(paths):
- for path in paths:
- Backup.get(path)
- @staticmethod
- def blocked():
- return [x for x in Backup.instances.values()
- if x.status is Backup.BLOCKED]
- @staticmethod
- def get(path):
- if path not in Backup.instances:
- Backup(sources()[path])
- return Backup.instances[path]
- @staticmethod
- def start():
- for backup in Backup.instances.values():
- backup.queue()
- Job.initPool()
- @staticmethod
- def wait(log=False):
- while Backup._queue:
- for backup in Backup._queue:
- if backup.status is Backup.BLOCKED:
- for dependency in backup.blocking:
- if dependency.status is Backup.READY:
- dependency.queue()
- if log:
- Backup._log()
- time.sleep(1)
- def __init__(self, config):
- self._config = config
- self._path = self._config['path']
- self._name = slugify(self._path, max_length=255)
- self._logpath = os.path.realpath(os.path.join(
- Backup.logdir, self.name, 'backup.log'))
- self._status = Backup.READY
- if self.blocking:
- self.setStatus(Backup.BLOCKED)
- Backup.instances[self._path] = self
- def __str__(self):
- return 'Backup {0} ({1}, {2} jobs)'.format(
- self.name, self.getStatus(), len([x for x in self.jobs
- if x.state not in (Job.FAILED, Job.SUCCESSFUL)]))
- @property
- def config(self):
- return self._config
- @property
- def name(self):
- return self._name
- @property
- def path(self):
- return self._path
- @property
- def logpath(self):
- return self._logpath
- @property
- def logfile(self):
- if not hasattr(self, '_logfile'):
- path = os.path.dirname(self.logpath)
- if not os.path.exists(path):
- os.makedirs(path, exist_ok=True)
- self._logfile = open(self.logpath, 'w+')
- return self._logfile
- def log(self, text):
- text = '[Backup {0}] {1}\n'.format(self.name, text)
- print(text, end='')
- self.logfile.write(text)
- self.logfile.flush()
- @property
- def status(self):
- if self.blocking:
- if [x for x in self.blocking if x.status is Backup.FAILED]:
- self.setStatus(Backup.FAILED)
- elif self._status is not Backup.BLOCKED:
- self.setStatus(Backup.BLOCKED)
- elif self._status is Backup.BLOCKED:
- if self not in Backup._queue:
- self.setStatus(Backup.READY)
- else:
- self.setStatus(Backup.QUEUED)
- for job in self.ready:
- job.queue()
- Job.initPool()
- elif self._status is Backup.QUEUED and self not in Backup._queue:
- self.setStatus(Backup.READY)
- if self._status in (Backup.RUNNING, Backup.QUEUED) and not self.pending:
- self.setStatus(Backup.FAILED if self.failed else Backup.SUCCESSFUL)
- if self._status in (Backup.FAILED, Backup.SUCCESSFUL) \
- and self in Backup._queue:
- Backup._queue.remove(self)
- return self._status
- @property
- def blocking(self):
- return [x for x in self.depends
- if x.status is not Backup.SUCCESSFUL]
- @property
- def depends(self):
- if not hasattr(self, '_depends'):
- self._depends = []
- for path in self.config["depends"]:
- if path not in Backup.instances:
- Backup(sources()[path])
- self._depends.append(Backup.instances[path])
- return self._depends
- @property
- def jobs(self):
- if not hasattr(self, '_jobs'):
- self._jobs = []
- if 'jobs' in self.config:
- for job in self.config['jobs']:
- self._jobs.append(Job(self, job))
- return self._jobs
- @property
- def pending(self):
- return [x for x in self.jobs if x.state not in (Job.FAILED, Job.SUCCESSFUL)]
- @property
- def ready(self):
- return [x for x in self.jobs if x.state is Job.READY]
- @property
- def failed(self):
- return [x for x in self.jobs if x.state is Job.FAILED]
- def setStatus(self, status):
- if self._status != status:
- self.log('{0} -> {1}'.format(
- self.getStatus(self._status), self.getStatus(status)))
- self._status = status
- if status in (Backup.SUCCESSFUL, Backup.FAILED):
- self.logfile.close()
- def getStatus(self, status=None):
- return {
- Backup.BLOCKED: 'blocked',
- Backup.READY: 'ready',
- Backup.QUEUED: 'queued',
- Backup.RUNNING: 'running',
- Backup.FAILED: 'failed',
- Backup.SUCCESSFUL: 'successful'
- }[self.status if status is None else status]
- def queue(self):
- if self in Backup._queue:
- raise StateException('Backup already queued')
- Backup._queue.append(self)
- self.setStatus(Backup.QUEUED)
- if self.status is not Backup.BLOCKED:
- for job in self.ready:
- job.queue()
- Job.initPool()
- def config():
- if hasattr(config, '_handle'):
- return config._handle
- with pushd(config._root):
- with open("backup.yml") as f:
- config._handle = yaml.load(f, Loader=yaml.SafeLoader)
- return config._handle
- def sources():
- if hasattr(sources, '_handle'):
- return sources._handle
- sources._handle = {}
- with pushd(config._root):
- for source in config()['sources']:
- source = os.path.realpath(source)
- for path in glob('{}/*.yml'.format(source)) + \
- glob('{}/*.yaml'.format(source)):
- path = os.path.realpath(path)
- with pushd(os.path.dirname(path)), open(path) as f:
- data = yaml.load(f, Loader=yaml.SafeLoader)
- if "active" in data and data["active"]:
- data['path'] = path
- if "depends" not in data:
- data["depends"] = []
- for i in range(0, len(data["depends"])):
- data["depends"][i] = os.path.realpath(
- '{}.yml'.format(data["depends"][i]))
- sources._handle[path] = data
- return sources._handle
- def main(args):
- try:
- config._root = args[0] if len(args) else '/etc/backup.d'
- if not os.path.exists(config._root):
- raise BackupException(
- 'Configuration files missing from {}'.format(config._root))
- if 'engine' in config():
- engine = config()["engine"]
- if engine not in ("rdiff-backup"):
- raise BackupException('Unknown backup engine: {}'.format(engine))
- Backup.engine = engine
- if 'logdir' in config():
- logdir = config()['logdir']
- os.makedirs(logdir, exist_ok=True)
- if not os.path.exists(logdir):
- raise BackupException(
- 'Unable to create logging directory: {}'.format(logdir))
- Backup.logdir = logdir
- if 'maxthreads' in config():
- Job.maxthreads = config()['maxthreads']
- if 'verbosity' in config():
- Backup.verbosity = config()['verbosity']
- Backup.logTransitions = Job.logTransitions = True
- Backup.load(sources().keys())
- Backup.start()
- Backup.wait()
- except BackupException as ex:
- print(ex)
- sys.exit(1)
- except Exception:
- from traceback import format_exc
- msg = "Error encountered:\n" + format_exc().strip()
- print(msg)
- sys.exit(1)
- if __name__ == '__main__':
- main(sys.argv[1:])
|