backup.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. import contextlib
  2. import os
  3. import platform
  4. import subprocess
  5. import sys
  6. import time
  7. import yaml
  8. from glob import glob
  9. from slugify import slugify
  10. def term():
  11. """Get the Terminal reference to make output pretty
  12. Returns:
  13. (blessings.Terminal): Returns
  14. a `blessings <https://blessings.readthedocs.io/en/latest>`_ terminal
  15. instance. If running in windows and not cygwin it will return an
  16. `intercessions <https://pypi.org/project/intercessions>`_ terminal
  17. instance instead
  18. """
  19. if not hasattr(term, '_handle'):
  20. if sys.platform != "cygwin" and platform.system() == 'Windows':
  21. from intercessions import Terminal
  22. else:
  23. from blessings import Terminal
  24. term._handle = Terminal()
  25. return term._handle
  26. @contextlib.contextmanager
  27. def pushd(newDir):
  28. previousDir = os.getcwd()
  29. os.chdir(newDir)
  30. try:
  31. yield
  32. finally:
  33. os.chdir(previousDir)
  34. class StateException(Exception):
  35. pass
  36. class BackupException(Exception):
  37. pass
  38. class Job(object):
  39. pool = []
  40. maxthreads = 4
  41. verbosity = 4
  42. logTransitions = False
  43. READY = 0
  44. QUEUED = 1
  45. RUNNING = 2
  46. FAILED = 3
  47. SUCCESSFUL = 4
  48. @staticmethod
  49. def initPool():
  50. if Job.pool:
  51. maxthreads = Job.maxthreads
  52. if len(Job.pool) < maxthreads:
  53. maxthreads = len(Job.pool)
  54. for i in range(0, maxthreads):
  55. Job.pool.pop(0).start()
  56. @staticmethod
  57. def finished():
  58. return not [x.state for x in Job.pool]
  59. def __init__(self, backup, config):
  60. self._config = config
  61. self._backup = backup
  62. self._state = Job.READY
  63. def __str__(self):
  64. return 'Backup {0} Job #{1} ({2})'.format(
  65. self._backup.name, self.index, self.getState())
  66. @property
  67. def config(self):
  68. return self._config
  69. @property
  70. def state(self):
  71. if self._state is Job.READY and self in Job.pool:
  72. self.setState(Job.QUEUED)
  73. elif self._state is Job.RUNNING:
  74. code = self._process.poll()
  75. if code is not None:
  76. self.setState(Job.FAILED if code else Job.SUCCESSFUL)
  77. Job.initPool()
  78. return self._state
  79. def queue(self):
  80. if self.state is not Job.READY:
  81. raise StateException('{} not in state to queue'.format(self))
  82. if self in Job.pool:
  83. raise StateException('{} already in queued pool'.format(self))
  84. self.setState(Job.QUEUED)
  85. Job.pool.append(self)
  86. @property
  87. def args(self):
  88. if Backup.engine == "rdiff-backup":
  89. args = ['rdiff-backup', '-v{}'.format(Backup.verbosity)]
  90. if 'filters' in self.config:
  91. for item in self.config['filters']:
  92. if 'include' in item:
  93. args += ['--include', item['include']]
  94. elif 'exclude' in item:
  95. args += ['--exclude', item['exclude']]
  96. else:
  97. raise BackupException(
  98. '{0} has an invalid filter {1}'.format(self, item))
  99. return args + [self.fromPath, self.toPath]
  100. raise StateException('Invalid backup engine {}'.format(Backup.engine))
  101. @property
  102. def logfile(self):
  103. return self._backup.logfile
  104. @property
  105. def fromPath(self):
  106. if not hasattr(self, '_fromPath'):
  107. fromPath = self.config['from']
  108. if 'roots' in self._backup.config:
  109. roots = self._backup.config['roots']
  110. if 'from' in roots:
  111. fromPath = os.path.join(roots['from'], fromPath)
  112. self._fromPath = fromPath
  113. return self._fromPath
  114. @property
  115. def toPath(self):
  116. if not hasattr(self, '_toPath'):
  117. toPath = self.config['to']
  118. if 'roots' in self._backup.config:
  119. roots = self._backup.config['roots']
  120. if 'to' in roots:
  121. toPath = os.path.join(roots['to'], toPath)
  122. self._toPath = toPath
  123. return self._toPath
  124. @property
  125. def process(self):
  126. return self._process
  127. @property
  128. def index(self):
  129. if not hasattr(self, '_index'):
  130. self._index = self._backup.jobs.index(self)
  131. return self._index
  132. def log(self, text):
  133. text = '[Backup {0} Job #{1}] {2}\n'.format(
  134. self._backup.name, self.index, text)
  135. print(text, end='')
  136. self.logfile.write(text)
  137. self.logfile.flush()
  138. def start(self):
  139. if self.state is not Job.QUEUED:
  140. raise StateException('Invalid state to start {}'.format(self))
  141. self._backup.setStatus(Backup.RUNNING)
  142. self.setState(Job.RUNNING)
  143. self._process = subprocess.Popen(
  144. self.args, stdout=self.logfile, stderr=subprocess.STDOUT,
  145. stdin=subprocess.DEVNULL, universal_newlines=True, bufsize=1)
  146. def setState(self, state):
  147. self._state = state
  148. self.log('{0} -> {1}'.format(
  149. self.getState(self._state), self.getState(state)))
  150. def getState(self, state=None):
  151. return {
  152. Job.READY: 'ready',
  153. Job.QUEUED: 'queued',
  154. Job.RUNNING: 'running',
  155. Job.FAILED: 'failed',
  156. Job.SUCCESSFUL: 'successful',
  157. }[self.state if state is None else state]
  158. class Backup(object):
  159. instances = {}
  160. _queue = []
  161. logTransitions = False
  162. engine = 'rdiff-backup'
  163. logdir = '/var/log/backup.d'
  164. BLOCKED = 0
  165. READY = 1
  166. QUEUED = 2
  167. RUNNING = 3
  168. FAILED = 4
  169. SUCCESSFUL = 5
  170. @staticmethod
  171. def _log():
  172. print('Backup status:')
  173. for backup in Backup.instances.values():
  174. print(' {}'.format(backup))
  175. for job in backup.pending:
  176. print(' {}'.format(job))
  177. @staticmethod
  178. def load(paths):
  179. for path in paths:
  180. Backup.get(path)
  181. @staticmethod
  182. def blocked():
  183. return [x for x in Backup.instances.values()
  184. if x.status is Backup.BLOCKED]
  185. @staticmethod
  186. def get(path):
  187. if path not in Backup.instances:
  188. Backup(sources()[path])
  189. return Backup.instances[path]
  190. @staticmethod
  191. def start():
  192. for backup in Backup.instances.values():
  193. backup.queue()
  194. Job.initPool()
  195. @staticmethod
  196. def wait(log=False):
  197. while Backup._queue:
  198. for backup in Backup._queue:
  199. if backup.status is Backup.BLOCKED:
  200. for dependency in backup.blocking:
  201. if dependency.status is Backup.READY:
  202. dependency.queue()
  203. if log:
  204. Backup._log()
  205. time.sleep(1)
  206. def __init__(self, config):
  207. self._config = config
  208. self._path = self._config['path']
  209. self._name = os.path.basename(self._path)
  210. self._logpath = os.path.realpath(os.path.join(
  211. Backup.logdir, '{}.log'.format(slugify(self._path))))
  212. self._status = Backup.READY
  213. if self.blocking:
  214. self.setStatus(Backup.BLOCKED)
  215. Backup.instances[self._path] = self
  216. def __str__(self):
  217. return 'Backup {0} ({1}, {2} jobs)'.format(
  218. self.name, self.getStatus(), len([x for x in self.jobs
  219. if x.state not in (Job.FAILED, Job.SUCCESSFUL)]))
  220. @property
  221. def config(self):
  222. return self._config
  223. @property
  224. def name(self):
  225. return self._name
  226. @property
  227. def path(self):
  228. return self._path
  229. @property
  230. def logpath(self):
  231. return self._logpath
  232. @property
  233. def logfile(self):
  234. if not hasattr(self, '_logfile'):
  235. self._logfile = open(self.logpath, 'w+')
  236. return self._logfile
  237. def log(self, text):
  238. text = '[Backup {0}] {1}\n'.format(self.name, text)
  239. print(text, end='')
  240. self.logfile.write(text)
  241. self.logfile.flush()
  242. @property
  243. def status(self):
  244. if self.blocking:
  245. if [x for x in self.blocking if x.status is Backup.FAILED]:
  246. self.setStatus(Backup.FAILED)
  247. elif self._status is not Backup.BLOCKED:
  248. self.setStatus(Backup.BLOCKED)
  249. elif self._status is Backup.BLOCKED:
  250. if self not in Backup._queue:
  251. self.setStatus(Backup.READY)
  252. else:
  253. self.setStatus(Backup.QUEUED)
  254. for job in self.ready:
  255. job.queue()
  256. elif self._status is Backup.QUEUED and self not in Backup._queue:
  257. self.setStatus(Backup.READY)
  258. if self._status in (Backup.RUNNING, Backup.QUEUED) and not self.pending:
  259. self.setStatus(Backup.FAILED if self.failed else Backup.SUCCESSFUL)
  260. if self._status in (Backup.FAILED, Backup.SUCCESSFUL) \
  261. and self in Backup._queue:
  262. Backup._queue.remove(self)
  263. return self._status
  264. @property
  265. def blocking(self):
  266. return [x for x in self.depends
  267. if x.status is not Backup.SUCCESSFUL]
  268. @property
  269. def depends(self):
  270. if not hasattr(self, '_depends'):
  271. self._depends = []
  272. for path in self.config["depends"]:
  273. if path not in Backup.instances:
  274. Backup(sources()[path])
  275. self._depends.append(Backup.instances[path])
  276. return self._depends
  277. @property
  278. def jobs(self):
  279. if not hasattr(self, '_jobs'):
  280. self._jobs = []
  281. for job in self.config['jobs']:
  282. self._jobs.append(Job(self, job))
  283. return self._jobs
  284. @property
  285. def pending(self):
  286. return [x for x in self.jobs if x.state not in (Job.FAILED, Job.SUCCESSFUL)]
  287. @property
  288. def ready(self):
  289. return [x for x in self.jobs if x.state is Job.READY]
  290. @property
  291. def failed(self):
  292. return [x for x in self.jobs if x.state is Job.FAILED]
  293. def setStatus(self, status):
  294. self.log('{0} -> {1}'.format(
  295. self.getStatus(self._status), self.getStatus(status)))
  296. self._status = status
  297. def getStatus(self, status=None):
  298. return {
  299. Backup.BLOCKED: 'blocked',
  300. Backup.READY: 'ready',
  301. Backup.QUEUED: 'queued',
  302. Backup.RUNNING: 'running',
  303. Backup.FAILED: 'failed',
  304. Backup.SUCCESSFUL: 'successful'
  305. }[self.status if status is None else status]
  306. def queue(self):
  307. if self in Backup._queue:
  308. raise StateException('Backup already queued')
  309. Backup._queue.append(self)
  310. self.setStatus(Backup.QUEUED)
  311. if self.status is not Backup.BLOCKED:
  312. for job in self.ready:
  313. job.queue()
  314. def config():
  315. if hasattr(config, '_handle'):
  316. return config._handle
  317. with pushd('etc/backup.d'):
  318. with open("backup.yml") as f:
  319. config._handle = yaml.load(f, Loader=yaml.SafeLoader)
  320. return config._handle
  321. def sources():
  322. if hasattr(sources, '_handle'):
  323. return sources._handle
  324. sources._handle = {}
  325. with pushd('etc/backup.d'):
  326. for source in config()['sources']:
  327. source = os.path.realpath(source)
  328. for path in glob('{}/*.yml'.format(source)) + \
  329. glob('{}/*.yaml'.format(source)):
  330. path = os.path.realpath(path)
  331. with pushd(os.path.dirname(path)), open(path) as f:
  332. data = yaml.load(f, Loader=yaml.SafeLoader)
  333. if "active" in data and data["active"]:
  334. data['path'] = path
  335. if "depends" not in data:
  336. data["depends"] = []
  337. for i in range(0, len(data["depends"])):
  338. data["depends"][i] = os.path.realpath(
  339. '{}.yml'.format(data["depends"][i]))
  340. sources._handle[path] = data
  341. return sources._handle
  342. def main(args):
  343. if 'engine' in config():
  344. engine = config()["engine"]
  345. if engine not in ("rdiff-backup"):
  346. raise BackupException('Unknown backup engine: {}'.format(engine))
  347. Backup.engine = engine
  348. if 'logdir' in config():
  349. logdir = config()['logdir']
  350. os.makedirs(logdir, exist_ok=True)
  351. if not os.path.exists(logdir):
  352. raise BackupException(
  353. 'Unable to create logging directory: {}'.format(logdir))
  354. Backup.logdir = logdir
  355. if 'maxthreads' in config():
  356. Job.maxthreads = config()['maxthreads']
  357. if 'verbosity' in config():
  358. Backup.verbosity = config()['verbosity']
  359. Backup.logTransitions = Job.logTransitions = True
  360. Backup.load(sources().keys())
  361. Backup.start()
  362. Backup.wait()
  363. if __name__ == '__main__':
  364. try:
  365. main(sys.argv[1:])
  366. except BackupException as ex:
  367. print(ex)
  368. sys.exit(1)
  369. except Exception:
  370. from traceback import format_exc
  371. msg = "Error encountered:\n" + format_exc().strip()
  372. print(msg)
  373. sys.exit(1)