backup.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. import contextlib
  2. import os
  3. import platform
  4. import subprocess
  5. import sys
  6. import time
  7. import yaml
  8. from glob import glob
  9. from slugify import slugify
  10. def term():
  11. """Get the Terminal reference to make output pretty
  12. Returns:
  13. (blessings.Terminal): Returns
  14. a `blessings <https://blessings.readthedocs.io/en/latest>`_ terminal
  15. instance. If running in windows and not cygwin it will return an
  16. `intercessions <https://pypi.org/project/intercessions>`_ terminal
  17. instance instead
  18. """
  19. if not hasattr(term, '_handle'):
  20. if sys.platform != "cygwin" and platform.system() == 'Windows':
  21. from intercessions import Terminal
  22. else:
  23. from blessings import Terminal
  24. term._handle = Terminal()
  25. return term._handle
  26. @contextlib.contextmanager
  27. def pushd(newDir):
  28. previousDir = os.getcwd()
  29. os.chdir(newDir)
  30. try:
  31. yield
  32. finally:
  33. os.chdir(previousDir)
  34. class StateException(Exception):
  35. pass
  36. class BackupException(Exception):
  37. pass
  38. class Job(object):
  39. pool = []
  40. maxthreads = 4
  41. READY = 0
  42. ACTIVE = 1
  43. RUNNING = 2
  44. FAILED = 3
  45. SUCCESSFUL = 4
  46. @staticmethod
  47. def initPool():
  48. if Job.pool:
  49. maxthreads = Job.maxthreads
  50. if len(Job.pool) < maxthreads:
  51. maxthreads = len(Job.pool)
  52. for i in range(0, maxthreads):
  53. Job.pool.pop(0).start()
  54. @staticmethod
  55. def finished():
  56. return not [x.state for x in Job.pool]
  57. def __init__(self, backup, config):
  58. self._config = config
  59. self._backup = backup
  60. self._state = Job.READY
  61. def __str__(self):
  62. return 'Backup {0} Job #{1} ({2})'.format(
  63. self._backup.name, self._backup.jobs.index(self), {
  64. Job.READY: 'ready',
  65. Job.ACTIVE: 'active',
  66. Job.RUNNING: 'running',
  67. Job.FAILED: 'failed',
  68. Job.SUCCESSFUL: 'successful',
  69. }[self.state])
  70. @property
  71. def config(self):
  72. return self._config
  73. @property
  74. def state(self):
  75. if self._state is Job.RUNNING:
  76. code = self._process.poll()
  77. if code is not None:
  78. self._state = Job.FAILED if code else Job.SUCCESSFUL
  79. Job.initPool()
  80. return self._state
  81. def queue(self):
  82. if self.state is not Job.READY:
  83. raise StateException('{} not in state to queue'.format(self))
  84. if self in Job.pool:
  85. raise StateException('{} already in queued pool'.format(self))
  86. self._state = Job.ACTIVE
  87. Job.pool.append(self)
  88. @property
  89. def args(self):
  90. if Backup.engine == "rdiff-backup":
  91. args = ['rdiff-backup']
  92. for item in self.config['filters']:
  93. if 'include' in item:
  94. args += ['--include', item['include']]
  95. elif 'exclude' in item:
  96. args += ['--exclude', item['exclude']]
  97. else:
  98. raise BackupException(
  99. '{0} has an invalid filter {1}'.format(self, item))
  100. return args + [self.fromPath, self.toPath]
  101. raise StateException('Invalid backup engine {}'.format(Backup.engine))
  102. @property
  103. def logfile(self):
  104. return self._backup.logfile
  105. @property
  106. def fromPath(self):
  107. if not hasattr(self, '_fromPath'):
  108. fromPath = self.config['from']
  109. if 'roots' in self._backup.config:
  110. roots = self._backup.config['roots']
  111. if 'from' in roots:
  112. fromPath = os.path.join(roots['from'], fromPath)
  113. self._fromPath = fromPath
  114. return self._fromPath
  115. @property
  116. def toPath(self):
  117. if not hasattr(self, '_toPath'):
  118. toPath = self.config['to']
  119. if 'roots' in self._backup.config:
  120. roots = self._backup.config['roots']
  121. if 'to' in roots:
  122. toPath = os.path.join(roots['to'], toPath)
  123. self._toPath = toPath
  124. return self._toPath
  125. @property
  126. def process(self):
  127. return self._process
  128. def start(self):
  129. if self.state is not Job.ACTIVE:
  130. raise StateException('Invalid state to start {}'.format(self))
  131. self._process = subprocess.Popen(
  132. self.args, stdout=self.logfile, stderr=self.logfile,
  133. stdin=subprocess.DEVNULL, universal_newlines=True)
  134. self._state = Job.RUNNING
  135. self._backup._state = Backup.RUNNING
  136. class Backup(object):
  137. instances = {}
  138. _queue = []
  139. engine = 'rdiff-backup'
  140. logdir = '/var/log/backup.d'
  141. BLOCKED = 0
  142. READY = 1
  143. QUEUED = 2
  144. RUNNING = 3
  145. FAILED = 4
  146. SUCCESSFUL = 5
  147. @staticmethod
  148. def _log():
  149. print('Backup status:')
  150. for backup in Backup.instances.values():
  151. print(' {}'.format(backup))
  152. for job in backup.pending:
  153. print(' {}'.format(job))
  154. @staticmethod
  155. def load(paths):
  156. for path in paths:
  157. Backup.get(path)
  158. @staticmethod
  159. def blocked():
  160. return [x for x in Backup.instances.values()
  161. if x.status is Backup.BLOCKED]
  162. @staticmethod
  163. def get(path):
  164. if path not in Backup.instances:
  165. Backup(sources()[path])
  166. return Backup.instances[path]
  167. @staticmethod
  168. def start():
  169. for backup in Backup.instances.values():
  170. backup.queue()
  171. Job.initPool()
  172. @staticmethod
  173. def wait(log=False):
  174. while Backup._queue:
  175. for backup in Backup._queue:
  176. if backup.status is Backup.BLOCKED:
  177. for dependency in backup.blocking:
  178. if dependency.status is Backup.READY:
  179. dependency.queue()
  180. if log:
  181. Backup._log()
  182. time.sleep(1)
  183. def __init__(self, config):
  184. self._config = config
  185. self._path = self._config['path']
  186. self._name = os.path.basename(self._path)
  187. self._logpath = os.path.realpath(os.path.join(
  188. Backup.logdir, '{}.log'.format(slugify(self._path))))
  189. self._status = Backup.BLOCKED if self.blocking else Backup.READY
  190. Backup.instances[self._path] = self
  191. def __str__(self):
  192. return 'Backup {0} ({1}, {2} jobs)'.format(
  193. self.name, {
  194. Backup.BLOCKED: 'blocked',
  195. Backup.READY: 'ready',
  196. Backup.QUEUED: 'queued',
  197. Backup.RUNNING: 'running',
  198. Backup.FAILED: 'failed',
  199. Backup.SUCCESSFUL: 'successful'
  200. }[self.status], len([x for x in self.jobs
  201. if x.state not in (Job.FAILED, Job.SUCCESSFUL)]))
  202. @property
  203. def config(self):
  204. return self._config
  205. @property
  206. def name(self):
  207. return self._name
  208. @property
  209. def path(self):
  210. return self._path
  211. @property
  212. def logpath(self):
  213. return self._logpath
  214. @property
  215. def logfile(self):
  216. if not hasattr(self, '_logfile'):
  217. self._logfile = open(self.logpath, 'w')
  218. return self._logfile
  219. def log(self, text):
  220. self._logfile.write(text)
  221. @property
  222. def status(self):
  223. if self.blocking:
  224. if [x for x in self.blocking if x.status is Backup.FAILED]:
  225. self._status = Backup.FAILED
  226. elif self._status is not Backup.BLOCKED:
  227. self._status = Backup.BLOCKED
  228. elif self._status is Backup.BLOCKED:
  229. if self not in Backup._queue:
  230. self._status = Backup.READY
  231. else:
  232. self._status = Backup.QUEUED
  233. for job in self.ready:
  234. job.queue()
  235. elif self._status is Backup.QUEUED and self not in Backup._queue:
  236. self._status = Backup.READY
  237. if self._status in (Backup.RUNNING, Backup.QUEUED) and not self.pending:
  238. self._status = Backup.FAILED if self.failed else Backup.SUCCESSFUL
  239. if self._status in (Backup.FAILED, Backup.SUCCESSFUL) \
  240. and self in Backup._queue:
  241. Backup._queue.remove(self)
  242. return self._status
  243. @property
  244. def blocking(self):
  245. return [x for x in self.depends
  246. if x.status is not Backup.SUCCESSFUL]
  247. @property
  248. def depends(self):
  249. if not hasattr(self, '_depends'):
  250. self._depends = []
  251. for path in self.config["depends"]:
  252. if path not in Backup.instances:
  253. Backup(sources()[path])
  254. self._depends.append(Backup.instances[path])
  255. return self._depends
  256. @property
  257. def jobs(self):
  258. if not hasattr(self, '_jobs'):
  259. self._jobs = []
  260. for job in self.config['jobs']:
  261. self._jobs.append(Job(self, job))
  262. return self._jobs
  263. @property
  264. def pending(self):
  265. return [x for x in self.jobs if x.state not in (Job.FAILED, Job.SUCCESSFUL)]
  266. @property
  267. def ready(self):
  268. return [x for x in self.jobs if x.state is Job.READY]
  269. @property
  270. def failed(self):
  271. return [x for x in self.jobs if x.state is Job.FAILED]
  272. def setStatus(self, state):
  273. if state in (None, False, True):
  274. self._status = state
  275. def queue(self):
  276. if self in Backup._queue:
  277. raise StateException('Backup already queued')
  278. Backup._queue.append(self)
  279. self._status = Backup.QUEUED
  280. if self.status is not Backup.BLOCKED:
  281. for job in self.ready:
  282. job.queue()
  283. def config():
  284. if hasattr(config, '_handle'):
  285. return config._handle
  286. with pushd('etc/backup.d'):
  287. with open("backup.yml") as f:
  288. config._handle = yaml.load(f, Loader=yaml.SafeLoader)
  289. return config._handle
  290. def sources():
  291. if hasattr(sources, '_handle'):
  292. return sources._handle
  293. sources._handle = {}
  294. with pushd('etc/backup.d'):
  295. for source in config()['sources']:
  296. source = os.path.realpath(source)
  297. for path in glob('{}/*.yml'.format(source)):
  298. path = os.path.realpath(path)
  299. with pushd(os.path.dirname(path)), open(path) as f:
  300. data = yaml.load(f, Loader=yaml.SafeLoader)
  301. if "active" in data and data["active"]:
  302. data['path'] = path
  303. if "depends" not in data:
  304. data["depends"] = []
  305. for i in range(0, len(data["depends"])):
  306. data["depends"][i] = os.path.realpath(
  307. '{}.yml'.format(data["depends"][i]))
  308. sources._handle[path] = data
  309. return sources._handle
  310. def main(args):
  311. if 'engine' in config():
  312. engine = config()["engine"]
  313. if engine not in ("rdiff-backup"):
  314. raise BackupException('Unknown backup engine: {}'.format(engine))
  315. Backup.engine = engine
  316. if 'logdir' in config():
  317. logdir = config()['logdir']
  318. os.makedirs(logdir, exist_ok=True)
  319. if not os.path.exists(logdir):
  320. raise BackupException(
  321. 'Unable to create logging directory: {}'.format(logdir))
  322. Backup.logdir = logdir
  323. if 'maxthreads' in config():
  324. Job.maxthreads = config()['maxthreads']
  325. Backup.load(sources().keys())
  326. Backup._log()
  327. Backup.start()
  328. Backup._log()
  329. Backup.wait(True)
  330. if __name__ == '__main__':
  331. try:
  332. main(sys.argv[1:])
  333. except BackupException as ex:
  334. print(ex)
  335. sys.exit(1)
  336. except Exception:
  337. from traceback import format_exc
  338. msg = "Error encountered:\n" + format_exc().strip()
  339. print(msg)
  340. sys.exit(1)