Browse Source

Rework to be class based

Nathaniel van Diepen 4 years ago
parent
commit
1db4c3528b
3 changed files with 291 additions and 120 deletions
  1. 1 0
      .gitignore
  2. 288 119
      backup.py
  3. 2 1
      etc/backup.d/backup.yml

+ 1 - 0
.gitignore

@@ -58,3 +58,4 @@ docs/_build/
 # PyBuilder
 target/
 
+var/

+ 288 - 119
backup.py

@@ -7,6 +7,7 @@ import time
 import yaml
 
 from glob import glob
+from slugify import slugify
 
 
 def term():
@@ -42,7 +43,7 @@ def pushd(newDir):
         os.chdir(previousDir)
 
 
-class DependencyException(Exception):
+class StateException(Exception):
     pass
 
 
@@ -50,70 +51,277 @@ class BackupException(Exception):
     pass
 
 
-def deptree(sources):
-    deps = []
-    deferred = []
-    for name in sources.keys():
-        source = sources[name]
-        if "depends" in source and not source["depends"]:
-            deferred.append(name)
-
-        else:
-            deps.append(name)
-
-    while deferred:
-        name = deferred.pop()
-        depends = sources[name]["depends"]
-        # todo - detect dependency loop
-        if name in depends:
-            raise DependencyException('Source {} depends upon itself'.format(name))
-
-        elif set(depends).issubset(set(deps)):
-            deps.append(name)
-
-        elif not set(depends).issubset(set(deps)):
-            missing = ', '.join(set(depends).difference(set(deps)))
-            raise DependencyException(
-                    'Source {0} has missing dependencies: {1}'.format(name, missing))
-
-        else:
-            deferred.append(name)
-
-    return deps
-
-
-def status(name, value=None):
-    if not hasattr(status, "_handle"):
-        status._handle = {}
+class Job(object):
+    pool = []
+    maxthreads = 4
+    READY = 0
+    ACTIVE = 1
+    RUNNING = 2
+    FAILED = 3
+    SUCCESSFUL = 4
+
+    @staticmethod
+    def initPool():
+        if Job.pool:
+            maxthreads = Job.maxthreads
+            if len(Job.pool) < maxthreads:
+                maxthreads = len(Job.pool)
+
+            for i in range(0, maxthreads):
+                Job.pool.pop(0).start()
+
+    @staticmethod
+    def finished():
+        return not [x.state for x in Job.pool]
+
+    def __init__(self, backup, config):
+        self._config = config
+        self._backup = backup
+        self._state = Job.READY
+
+    def __str__(self):
+        return 'Backup {0} Job #{1} ({2})'.format(
+                self._backup.name, self._backup.jobs.index(self), {
+                    Job.READY: 'ready',
+                    Job.ACTIVE: 'active',
+                    Job.RUNNING: 'running',
+                    Job.FAILED: 'failed',
+                    Job.SUCCESSFUL: 'successful',
+                }[self.state])
+
+    @property
+    def config(self):
+        return self._config
+
+    @property
+    def state(self):
+        if self._state is Job.RUNNING:
+            code = self._process.poll()
+            if code is not None:
+                self._state = Job.FAILED if code else Job.SUCCESSFUL
+                Job.initPool()
+
+        return self._state
+
+    def queue(self):
+        if self.state is not Job.READY:
+            raise StateException('{} not in state to queue'.format(self))
+
+        if self in Job.pool:
+            raise StateException('{} already in queued pool'.format(self))
+
+        self._state = Job.ACTIVE
+        Job.pool.append(self)
+
+    @property
+    def args(self):
+        return Backup.args + []
+
+    @property
+    def logfile(self):
+        return self._backup.logfile
+
+    @property
+    def process(self):
+        return self._process
+
+    def start(self):
+        if self.state is not Job.ACTIVE:
+            raise StateException('Invalid state to start {}'.format(self))
+
+        self._process = subprocess.Popen(
+                self.args, stdout=self.logfile, stderr=self.logfile,
+                stdin=subprocess.DEVNULL, universal_newlines=True)
+        self._state = Job.RUNNING
+        self._backup._state = Backup.RUNNING
+
+
+class Backup(object):
+    instances = {}
+    _queue = []
+    args = ['rdiff-backup']
+    logdir = '/var/log/backup.d'
+    BLOCKED = 0
+    READY = 1
+    QUEUED = 2
+    RUNNING = 3
+    FAILED = 4
+    SUCCESSFUL = 5
+
+    @staticmethod
+    def _log():
+        print('Backup status:')
+        for backup in Backup.instances.values():
+            print(' {}'.format(backup))
+            for job in backup.pending:
+                print('  {}'.format(job))
+
+    @staticmethod
+    def load(paths):
+        for path in paths:
+            Backup.get(path)
+
+    @staticmethod
+    def blocked():
+        return [x for x in Backup.instances.values()
+                if x.status is Backup.BLOCKED]
+
+    @staticmethod
+    def get(path):
+        if path not in Backup.instances:
+            Backup(sources()[path])
+
+        return Backup.instances[path]
+
+    @staticmethod
+    def start():
+        for backup in Backup.instances.values():
+            backup.queue()
+
+        Job.initPool()
+
+    @staticmethod
+    def wait(log=False):
+        while Backup._queue:
+            for backup in Backup._queue:
+                if backup.status is Backup.BLOCKED:
+                    for dependency in backup.blocking:
+                        if dependency.status is Backup.READY:
+                            dependency.queue()
+
+            if log:
+                Backup._log()
 
-    if value is None:
-        return status._handle[name] if name in status._handle else None
+            time.sleep(1)
 
-    status._handle[name] = value
+    def __init__(self, config):
+        self._config = config
+        self._path = self._config['path']
+        self._name = os.path.basename(self._path)
+        self._logpath = os.path.realpath(os.path.join(
+            Backup.logdir, '{}.log'.format(slugify(self._path))))
+        self._status = Backup.BLOCKED if self.blocking else Backup.READY
+        Backup.instances[self._path] = self
+
+    def __str__(self):
+        return 'Backup {0} ({1}, {2} jobs)'.format(
+                self.name, {
+                    Backup.BLOCKED: 'blocked',
+                    Backup.READY: 'ready',
+                    Backup.QUEUED: 'queued',
+                    Backup.RUNNING: 'running',
+                    Backup.FAILED: 'failed',
+                    Backup.SUCCESSFUL: 'successful'
+                }[self.status], len([x for x in self.jobs
+                    if x.state not in (Job.FAILED, Job.SUCCESSFUL)]))
+
+    @property
+    def config(self):
+        return self._config
+
+    @property
+    def name(self):
+        return self._name
+
+    @property
+    def path(self):
+        return self._path
+
+    @property
+    def logpath(self):
+        return self._logpath
+
+    @property
+    def logfile(self):
+        if not hasattr(self, '_logfile'):
+            self._logfile = open(self.logpath, 'w')
+
+        return self._logfile
+
+    def log(self, text):
+        self._logfile.write(text)
+
+    @property
+    def status(self):
+        if self.blocking:
+            if [x for x in self.blocking if x.status is Backup.FAILED]:
+                self._status = Backup.FAILED
+
+            elif self._status is not Backup.BLOCKED:
+                self._status = Backup.BLOCKED
+
+        elif self._status is Backup.BLOCKED:
+            if self not in Backup._queue:
+                self._status = Backup.READY
+
+            else:
+                self._status = Backup.QUEUED
+                for job in self.ready:
+                    job.queue()
+
+        elif self._status is Backup.QUEUED and self not in Backup._queue:
+            self._status = Backup.READY
+
+        if self._status in (Backup.RUNNING, Backup.QUEUED) and not self.pending:
+            self._status = Backup.FAILED if self.failed else Backup.SUCCESSFUL
+
+        if self._status in (Backup.FAILED, Backup.SUCCESSFUL) \
+                and self in Backup._queue:
+            Backup._queue.remove(self)
+
+        return self._status
+
+    @property
+    def blocking(self):
+        return [x for x in self.depends
+                if x.status is not Backup.SUCCESSFUL]
+
+    @property
+    def depends(self):
+        if not hasattr(self, '_depends'):
+            self._depends = []
+            for path in self.config["depends"]:
+                if path not in Backup.instances:
+                    Backup(sources()[path])
+
+                self._depends.append(Backup.instances[path])
+
+        return self._depends
+
+    @property
+    def jobs(self):
+        if not hasattr(self, '_jobs'):
+            self._jobs = []
+            for job in self.config['jobs']:
+                self._jobs.append(Job(self, job))
+
+        return self._jobs
+
+    @property
+    def pending(self):
+        return [x for x in self.jobs if x.state not in (Job.FAILED, Job.SUCCESSFUL)]
+
+    @property
+    def ready(self):
+        return [x for x in self.jobs if x.state is Job.READY]
+
+    @property
+    def failed(self):
+        return [x for x in self.jobs if x.state is Job.FAILED]
 
+    def setStatus(self, state):
+        if state in (None, False, True):
+            self._status = state
 
-def backup(source):
-    failed = [x for x in source["depends"] if not status(x)]
-    path = source['path']
-    if failed:
-        process = BackupException(
-                "Unable to backup {0} due to incomplete backups: {1}".format(
-                    path, ', '.join(failed)))
-        active = False
+    def queue(self):
+        if self in Backup._queue:
+            raise StateException('Backup already queued')
 
-    else:
-        args = []
-        process = subprocess.Popen(
-                ['rdiff-backup'] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
-                stdin=subprocess.DEVNULL, universal_newlines=True)
-        active = True
-
-    return {
-        'active': active,
-        'name': os.path.basename(path),
-        'path': path,
-        'process': process
-    }
+        Backup._queue.append(self)
+        self._status = Backup.QUEUED
+        if self.status is not Backup.BLOCKED:
+            for job in self.ready:
+                job.queue()
 
 
 def config():
@@ -153,76 +361,37 @@ def sources():
     return sources._handle
 
 
-def logStatus(tasks):
-    errors = []
-    t = term()
-    for task in list(tasks):
-        print(task['name'], end=': ')
-        code = 1 if isinstance(
-            task['process'], BackupException) else task['process'].poll()
-
-        if code is None:
-            print(t.blue('...'))
-            continue
-
-        if code:
-            print(t.red('fail'))
-
-        else:
-            print(t.green('ok'))
-
-        if task['active']:
-            if code:
-                errors.append(task['name'])
+def main(args):
+    if 'engine' in config():
+        engine = config()["engine"]
+        if engine not in ("rdiff-backup"):
+            raise BackupException('Unknown backup engine: {}'.format(engine))
 
-            task['active'] = False
-            status(task['path'], not code)
+        Backup.args = [engine]
 
-    return errors
+    if 'logdir' in config():
+        logdir = config()['logdir']
+        os.makedirs(logdir, exist_ok=True)
+        if not os.path.exists(logdir):
+            raise BackupException(
+                'Unable to create logging directory: {}'.format(logdir))
 
+        Backup.logdir = logdir
 
-def main(args):
-    engine = config()["engine"]
-    if engine not in ("rdiff-backup"):
-        raise BackupException('Unknown backup engine: {}'.format(engine))
-
-    tasks = []
-    errors = []
-    maxconcurrent = config()["maxconcurrent"]
-    tree = deptree(sources())
-    t = term()
-    with t.fullscreen(), t.hidden_cursor():
-        while len(tree):
-            if len([x for x in tasks if x['active']]) < maxconcurrent:
-                path = tree.pop(0)
-                source = sources()[path]
-                if not [x for x in source["depends"] if status(x) is None]:
-                    tasks.append(backup(source))
-
-                else:
-                    tree.append(path)
-
-            print(t.clear + t.move(0, 0), end='')
-            errors += logStatus(tasks)
-
-        while len([x for x in tasks if x['active']]):
-            print(t.clear + t.move(0, 0), end='')
-            errors += logStatus(tasks)
-            time.sleep(1)
+    if 'maxthreads' in config():
+        Job.maxthreads = config()['maxthreads']
 
-    errors += logStatus(tasks)
-    if errors:
-        raise BackupException("At least one backup failed")
+    Backup.load(sources().keys())
+    Backup._log()
+    Backup.start()
+    Backup._log()
+    Backup.wait(True)
 
 
 if __name__ == '__main__':
     try:
         main(sys.argv[1:])
 
-    except DependencyException as ex:
-        print(ex)
-        sys.exit(1)
-
     except BackupException as ex:
         print(ex)
         sys.exit(1)

+ 2 - 1
etc/backup.d/backup.yml

@@ -1,5 +1,6 @@
 ---
 engine: rdiff-backup
-maxconcurrent: 4
+maxthreads: 4
+logdir: var/log/backup.d
 sources:
  - sources.d