backup.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531
  1. import contextlib
  2. import os
  3. import platform
  4. import shlex
  5. import subprocess
  6. import sys
  7. import time
  8. import yaml
  9. from glob import glob
  10. from slugify import slugify
  11. def term():
  12. """Get the Terminal reference to make output pretty
  13. Returns:
  14. (blessings.Terminal): Returns
  15. a `blessings <https://blessings.readthedocs.io/en/latest>`_ terminal
  16. instance. If running in windows and not cygwin it will return an
  17. `intercessions <https://pypi.org/project/intercessions>`_ terminal
  18. instance instead
  19. """
  20. if not hasattr(term, '_handle'):
  21. if sys.platform != "cygwin" and platform.system() == 'Windows':
  22. from intercessions import Terminal
  23. else:
  24. from blessings import Terminal
  25. term._handle = Terminal()
  26. return term._handle
  27. @contextlib.contextmanager
  28. def pushd(newDir):
  29. previousDir = os.getcwd()
  30. os.chdir(newDir)
  31. try:
  32. yield
  33. finally:
  34. os.chdir(previousDir)
  35. class StateException(Exception):
  36. pass
  37. class BackupException(Exception):
  38. pass
  39. class Job(object):
  40. pool = []
  41. maxthreads = 4
  42. verbosity = 3
  43. logTransitions = False
  44. READY = 0
  45. QUEUED = 1
  46. RUNNING = 2
  47. FAILED = 3
  48. SUCCESSFUL = 4
  49. @staticmethod
  50. def initPool():
  51. if Job.pool:
  52. maxthreads = Job.maxthreads
  53. if len(Job.pool) < maxthreads:
  54. maxthreads = len(Job.pool)
  55. for i in range(0, maxthreads):
  56. Job.pool.pop(0).start()
  57. @staticmethod
  58. def finished():
  59. return not [x.state for x in Job.pool]
  60. def __init__(self, backup, config):
  61. self._config = config
  62. self._backup = backup
  63. self._state = Job.READY
  64. def __str__(self):
  65. return 'Backup {0} Job #{1} ({2})'.format(
  66. self._backup.name, self.index, self.getState())
  67. @property
  68. def config(self):
  69. return self._config
  70. @property
  71. def state(self):
  72. if self._state is Job.READY and self in Job.pool:
  73. self.setState(Job.QUEUED)
  74. elif self._state is Job.RUNNING:
  75. code = self._process.poll()
  76. if code is not None:
  77. self.setState(Job.FAILED if code else Job.SUCCESSFUL)
  78. Job.initPool()
  79. return self._state
  80. def queue(self):
  81. if self.state is not Job.READY:
  82. raise StateException('{} not in state to queue'.format(self))
  83. if self in Job.pool:
  84. raise StateException('{} already in queued pool'.format(self))
  85. self.setState(Job.QUEUED)
  86. Job.pool.append(self)
  87. @property
  88. def args(self):
  89. if not hasattr(self, '_args'):
  90. if Backup.engine == "rdiff-backup":
  91. args = ['rdiff-backup', '-v{}'.format(Backup.verbosity)]
  92. if 'filters' in self.config:
  93. for item in self.config['filters']:
  94. if 'include' in item:
  95. args += ['--include', item['include']]
  96. elif 'exclude' in item:
  97. args += ['--exclude', item['exclude']]
  98. else:
  99. raise BackupException(
  100. '{0} has an invalid filter {1}'.format(self, item))
  101. self._args = args + [self.fromPath, self.toPath]
  102. else:
  103. raise StateException(
  104. 'Invalid backup engine {}'.format(Backup.engine))
  105. return self._args
  106. @property
  107. def logfile(self):
  108. if not hasattr(self, '_logfile'):
  109. path = os.path.dirname(self.logpath)
  110. if not os.path.exists(path):
  111. os.makedirs(path, exist_ok=True)
  112. self._logfile = open(self.logpath, 'w')
  113. return self._logfile
  114. @property
  115. def logpath(self):
  116. if not hasattr(self, '_logpath'):
  117. self._logpath = os.path.join(os.path.dirname(
  118. self._backup.logpath), 'job{}.log'.format(self.index))
  119. return self._logpath
  120. @property
  121. def fromPath(self):
  122. if not hasattr(self, '_fromPath'):
  123. fromPath = self.config['from']
  124. if 'roots' in self._backup.config:
  125. roots = self._backup.config['roots']
  126. if 'from' in roots:
  127. if '::' in roots['from']:
  128. if roots['from'].endswith('::'):
  129. fromPath = roots['from'] + fromPath
  130. else:
  131. fromPath = roots['from'] + os.sep + fromPath
  132. else:
  133. fromPath = os.path.join(roots['from'], fromPath)
  134. self._fromPath = fromPath
  135. return self._fromPath
  136. @property
  137. def toPath(self):
  138. if not hasattr(self, '_toPath'):
  139. toPath = self.config['to']
  140. if 'roots' in self._backup.config:
  141. roots = self._backup.config['roots']
  142. if 'to' in roots:
  143. if '::' in roots['to']:
  144. if roots['to'].endswith('::'):
  145. toPath = roots['to'] + toPath
  146. else:
  147. toPath = roots['to'] + os.sep + toPath
  148. else:
  149. toPath = os.path.join(roots['to'], toPath)
  150. self._toPath = toPath
  151. return self._toPath
  152. @property
  153. def process(self):
  154. return self._process
  155. @property
  156. def index(self):
  157. if not hasattr(self, '_index'):
  158. self._index = self._backup.jobs.index(self)
  159. return self._index
  160. def log(self, text):
  161. text = '[Backup {0} Job #{1}] {2}\n'.format(
  162. self._backup.name, self.index, text)
  163. print(text, end='')
  164. self._backup.logfile.write(text)
  165. self._backup.logfile.flush()
  166. def start(self):
  167. if self.state is not Job.QUEUED:
  168. raise StateException('Invalid state to start {}'.format(self))
  169. self._backup.setStatus(Backup.RUNNING)
  170. self.setState(Job.RUNNING)
  171. self.logfile.write(' '.join([shlex.quote(x) for x in self.args]) + '\n')
  172. self.logfile.flush()
  173. self._process = subprocess.Popen(
  174. self.args, stdout=self.logfile, stderr=subprocess.STDOUT,
  175. stdin=subprocess.DEVNULL, universal_newlines=True, bufsize=1)
  176. def setState(self, state):
  177. if self._state != state:
  178. self.log('{0} -> {1}'.format(
  179. self.getState(self._state), self.getState(state)))
  180. self._state = state
  181. def getState(self, state=None):
  182. return {
  183. Job.READY: 'ready',
  184. Job.QUEUED: 'queued',
  185. Job.RUNNING: 'running',
  186. Job.FAILED: 'failed',
  187. Job.SUCCESSFUL: 'successful',
  188. }[self.state if state is None else state]
  189. class Backup(object):
  190. instances = {}
  191. _queue = []
  192. logTransitions = False
  193. engine = 'rdiff-backup'
  194. logdir = '/var/log/backup.d'
  195. BLOCKED = 0
  196. READY = 1
  197. QUEUED = 2
  198. RUNNING = 3
  199. FAILED = 4
  200. SUCCESSFUL = 5
  201. @staticmethod
  202. def _log():
  203. print('Backup status:')
  204. for backup in Backup.instances.values():
  205. print(' {}'.format(backup))
  206. for job in backup.pending:
  207. print(' {}'.format(job))
  208. @staticmethod
  209. def load(paths):
  210. for path in paths:
  211. Backup.get(path)
  212. @staticmethod
  213. def blocked():
  214. return [x for x in Backup.instances.values()
  215. if x.status is Backup.BLOCKED]
  216. @staticmethod
  217. def get(path):
  218. if path not in Backup.instances:
  219. Backup(sources()[path])
  220. return Backup.instances[path]
  221. @staticmethod
  222. def start():
  223. for backup in Backup.instances.values():
  224. backup.queue()
  225. Job.initPool()
  226. @staticmethod
  227. def wait(log=False):
  228. while Backup._queue:
  229. for backup in Backup._queue:
  230. if backup.status is Backup.BLOCKED:
  231. for dependency in backup.blocking:
  232. if dependency.status is Backup.READY:
  233. dependency.queue()
  234. if log:
  235. Backup._log()
  236. time.sleep(1)
  237. def __init__(self, config):
  238. self._config = config
  239. self._path = self._config['path']
  240. self._name = os.path.basename(self._path)
  241. self._logpath = os.path.realpath(os.path.join(
  242. Backup.logdir, slugify(self._path), 'backup.log'))
  243. self._status = Backup.READY
  244. if self.blocking:
  245. self.setStatus(Backup.BLOCKED)
  246. Backup.instances[self._path] = self
  247. def __str__(self):
  248. return 'Backup {0} ({1}, {2} jobs)'.format(
  249. self.name, self.getStatus(), len([x for x in self.jobs
  250. if x.state not in (Job.FAILED, Job.SUCCESSFUL)]))
  251. @property
  252. def config(self):
  253. return self._config
  254. @property
  255. def name(self):
  256. return self._name
  257. @property
  258. def path(self):
  259. return self._path
  260. @property
  261. def logpath(self):
  262. return self._logpath
  263. @property
  264. def logfile(self):
  265. if not hasattr(self, '_logfile'):
  266. path = os.path.dirname(self.logpath)
  267. if not os.path.exists(path):
  268. os.makedirs(path, exist_ok=True)
  269. self._logfile = open(self.logpath, 'w+')
  270. return self._logfile
  271. def log(self, text):
  272. text = '[Backup {0}] {1}\n'.format(self.name, text)
  273. print(text, end='')
  274. self.logfile.write(text)
  275. self.logfile.flush()
  276. @property
  277. def status(self):
  278. if self.blocking:
  279. if [x for x in self.blocking if x.status is Backup.FAILED]:
  280. self.setStatus(Backup.FAILED)
  281. elif self._status is not Backup.BLOCKED:
  282. self.setStatus(Backup.BLOCKED)
  283. elif self._status is Backup.BLOCKED:
  284. if self not in Backup._queue:
  285. self.setStatus(Backup.READY)
  286. else:
  287. self.setStatus(Backup.QUEUED)
  288. for job in self.ready:
  289. job.queue()
  290. elif self._status is Backup.QUEUED and self not in Backup._queue:
  291. self.setStatus(Backup.READY)
  292. if self._status in (Backup.RUNNING, Backup.QUEUED) and not self.pending:
  293. self.setStatus(Backup.FAILED if self.failed else Backup.SUCCESSFUL)
  294. if self._status in (Backup.FAILED, Backup.SUCCESSFUL) \
  295. and self in Backup._queue:
  296. Backup._queue.remove(self)
  297. return self._status
  298. @property
  299. def blocking(self):
  300. return [x for x in self.depends
  301. if x.status is not Backup.SUCCESSFUL]
  302. @property
  303. def depends(self):
  304. if not hasattr(self, '_depends'):
  305. self._depends = []
  306. for path in self.config["depends"]:
  307. if path not in Backup.instances:
  308. Backup(sources()[path])
  309. self._depends.append(Backup.instances[path])
  310. return self._depends
  311. @property
  312. def jobs(self):
  313. if not hasattr(self, '_jobs'):
  314. self._jobs = []
  315. for job in self.config['jobs']:
  316. self._jobs.append(Job(self, job))
  317. return self._jobs
  318. @property
  319. def pending(self):
  320. return [x for x in self.jobs if x.state not in (Job.FAILED, Job.SUCCESSFUL)]
  321. @property
  322. def ready(self):
  323. return [x for x in self.jobs if x.state is Job.READY]
  324. @property
  325. def failed(self):
  326. return [x for x in self.jobs if x.state is Job.FAILED]
  327. def setStatus(self, status):
  328. if self._status != status:
  329. self.log('{0} -> {1}'.format(
  330. self.getStatus(self._status), self.getStatus(status)))
  331. self._status = status
  332. def getStatus(self, status=None):
  333. return {
  334. Backup.BLOCKED: 'blocked',
  335. Backup.READY: 'ready',
  336. Backup.QUEUED: 'queued',
  337. Backup.RUNNING: 'running',
  338. Backup.FAILED: 'failed',
  339. Backup.SUCCESSFUL: 'successful'
  340. }[self.status if status is None else status]
  341. def queue(self):
  342. if self in Backup._queue:
  343. raise StateException('Backup already queued')
  344. Backup._queue.append(self)
  345. self.setStatus(Backup.QUEUED)
  346. if self.status is not Backup.BLOCKED:
  347. for job in self.ready:
  348. job.queue()
  349. def config():
  350. if hasattr(config, '_handle'):
  351. return config._handle
  352. with pushd('etc/backup.d'):
  353. with open("backup.yml") as f:
  354. config._handle = yaml.load(f, Loader=yaml.SafeLoader)
  355. return config._handle
  356. def sources():
  357. if hasattr(sources, '_handle'):
  358. return sources._handle
  359. sources._handle = {}
  360. with pushd('etc/backup.d'):
  361. for source in config()['sources']:
  362. source = os.path.realpath(source)
  363. for path in glob('{}/*.yml'.format(source)) + \
  364. glob('{}/*.yaml'.format(source)):
  365. path = os.path.realpath(path)
  366. with pushd(os.path.dirname(path)), open(path) as f:
  367. data = yaml.load(f, Loader=yaml.SafeLoader)
  368. if "active" in data and data["active"]:
  369. data['path'] = path
  370. if "depends" not in data:
  371. data["depends"] = []
  372. for i in range(0, len(data["depends"])):
  373. data["depends"][i] = os.path.realpath(
  374. '{}.yml'.format(data["depends"][i]))
  375. sources._handle[path] = data
  376. return sources._handle
  377. def main(args):
  378. if 'engine' in config():
  379. engine = config()["engine"]
  380. if engine not in ("rdiff-backup"):
  381. raise BackupException('Unknown backup engine: {}'.format(engine))
  382. Backup.engine = engine
  383. if 'logdir' in config():
  384. logdir = config()['logdir']
  385. os.makedirs(logdir, exist_ok=True)
  386. if not os.path.exists(logdir):
  387. raise BackupException(
  388. 'Unable to create logging directory: {}'.format(logdir))
  389. Backup.logdir = logdir
  390. if 'maxthreads' in config():
  391. Job.maxthreads = config()['maxthreads']
  392. if 'verbosity' in config():
  393. Backup.verbosity = config()['verbosity']
  394. Backup.logTransitions = Job.logTransitions = True
  395. Backup.load(sources().keys())
  396. Backup.start()
  397. Backup.wait()
  398. if __name__ == '__main__':
  399. try:
  400. main(sys.argv[1:])
  401. except BackupException as ex:
  402. print(ex)
  403. sys.exit(1)
  404. except Exception:
  405. from traceback import format_exc
  406. msg = "Error encountered:\n" + format_exc().strip()
  407. print(msg)
  408. sys.exit(1)