Browse Source

Get concurrent process tasks working

Nathaniel van Diepen 4 years ago
parent
commit
8538254dff
2 changed files with 125 additions and 28 deletions
  1. 124 28
      backup.py
  2. 1 0
      etc/backup.d/backup.yml

+ 124 - 28
backup.py

@@ -1,11 +1,36 @@
 import contextlib
 import os
-import yaml
+import platform
+import subprocess
 import sys
+import time
+import yaml
 
 from glob import glob
 
 
+def term():
+    """Get the Terminal reference to make output pretty
+
+    Returns:
+        (blessings.Terminal): Returns
+        a `blessings <https://blessings.readthedocs.io/en/latest>`_ terminal
+        instance. If running in windows and not cygwin it will return an
+        `intercessions <https://pypi.org/project/intercessions>`_ terminal
+        instance instead
+    """
+    if not hasattr(term, '_handle'):
+        if sys.platform != "cygwin" and platform.system() == 'Windows':
+            from intercessions import Terminal
+
+        else:
+            from blessings import Terminal
+
+        term._handle = Terminal()
+
+    return term._handle
+
+
 @contextlib.contextmanager
 def pushd(newDir):
     previousDir = os.getcwd()
@@ -25,10 +50,8 @@ class BackupException(Exception):
     pass
 
 
-def deptree(sources, deps=None):
-    if deps is None:
-        deps = []
-
+def deptree(sources):
+    deps = []
     deferred = []
     for name in sources.keys():
         source = sources[name]
@@ -69,32 +92,55 @@ def status(name, value=None):
     status._handle[name] = value
 
 
-def backup(name):
-    source = main.sources[name]
+def backup(source):
     failed = [x for x in source["depends"] if not status(x)]
+    path = source['path']
     if failed:
-        raise BackupException(
-                "Unable to backup {0} due to ncomplete backups: {1}".format(
-                    name, failed))
-
-    # TODO - handle explicit failures with false
-    # status(name, False)
-    status(name, True)
-
+        process = BackupException(
+                "Unable to backup {0} due to incomplete backups: {1}".format(
+                    path, ', '.join(failed)))
+        active = False
+
+    else:
+        args = []
+        process = subprocess.Popen(
+                ['rdiff-backup'] + args, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+                stdin=subprocess.DEVNULL, universal_newlines=True)
+        active = True
+
+    return {
+        'active': active,
+        'name': os.path.basename(path),
+        'path': path,
+        'process': process
+    }
+
+
+def config():
+    if hasattr(config, '_handle'):
+        return config._handle
 
-def main(args):
     with pushd('etc/backup.d'):
         with open("backup.yml") as f:
-            main.config = yaml.load(f)
+            config._handle = yaml.load(f)
+
+    return config._handle
+
 
-        sources = {}
-        for source in main.config['sources']:
+def sources():
+    if hasattr(sources, '_handle'):
+        return sources._handle
+
+    sources._handle = {}
+    with pushd('etc/backup.d'):
+        for source in config()['sources']:
             source = os.path.realpath(source)
             for path in glob('{}/*.yml'.format(source)):
                 path = os.path.realpath(path)
                 with pushd(os.path.dirname(path)), open(path) as f:
                     data = yaml.load(f)
                     if "active" in data and data["active"]:
+                        data['path'] = path
                         if "depends" not in data:
                             data["depends"] = []
 
@@ -102,19 +148,69 @@ def main(args):
                             data["depends"][i] = os.path.realpath(
                                     '{}.yml'.format(data["depends"][i]))
 
-                        sources[path] = data
+                        sources._handle[path] = data
+
+    return sources._handle
+
 
-    main.sources = sources
-    main.deptree = deptree(sources)
+def logStatus(tasks):
     errors = []
-    for name in main.deptree:
-        try:
-            backup(name)
+    t = term()
+    for task in list(tasks):
+        print(task['name'], end=': ')
+        code = 1 if isinstance(
+            task['process'], BackupException) else task['process'].poll()
+
+        if code is None:
+            print(t.blue('...'))
+            continue
+
+        if code:
+            print(t.red('fail'))
+
+        else:
+            print(t.green('ok'))
+
+        if task['active']:
+            if code:
+                errors.append(task['name'])
+
+            task['active'] = False
+            status(task['path'], not code)
+
+    return errors
 
-        except BackupException as ex:
-            print(ex)
-            errors.append(ex)
 
+def main(args):
+    engine = config()["engine"]
+    if engine not in ("rdiff-backup"):
+        raise BackupException('Unknown backup engine: {}'.format(engine))
+
+    tasks = []
+    errors = []
+    maxconcurrent = config()["maxconcurrent"]
+    tree = deptree(sources())
+    t = term()
+    with t.fullscreen(), t.hidden_cursor():
+        while len(tree):
+            if len([x for x in tasks if x['active']]) < maxconcurrent:
+                path = tree.pop(0)
+                source = sources()[path]
+                if not [x for x in source["depends"] if status(x) is None]:
+                    tasks.append(backup(source))
+
+                else:
+                    tree.append(path)
+
+            print(t.clear + t.move(0, 0), end='')
+            errors += logStatus(tasks)
+
+        while len([x for x in tasks if x['active']]):
+            print(t.clear + t.move(0, 0), end='')
+            errors += logStatus(tasks)
+            time.sleep(1)
+
+    errors += logStatus(tasks)
     if errors:
         raise BackupException("At least one backup failed")
 

+ 1 - 0
etc/backup.d/backup.yml

@@ -1,4 +1,5 @@
 ---
 engine: rdiff-backup
+maxconcurrent: 4
 sources:
  - sources.d