1
0

__init__.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568
  1. import contextlib
  2. import os
  3. import platform
  4. import psutil
  5. import shlex
  6. import subprocess
  7. import sys
  8. import time
  9. import yaml
  10. from glob import glob
  11. from slugify import slugify
  12. def term():
  13. """Get the Terminal reference to make output pretty
  14. Returns:
  15. (blessings.Terminal): Returns
  16. a `blessings <https://blessings.readthedocs.io/en/latest>`_ terminal
  17. instance. If running in windows and not cygwin it will return an
  18. `intercessions <https://pypi.org/project/intercessions>`_ terminal
  19. instance instead
  20. """
  21. if not hasattr(term, '_handle'):
  22. if sys.platform != "cygwin" and platform.system() == 'Windows':
  23. from intercessions import Terminal
  24. else:
  25. from blessings import Terminal
  26. term._handle = Terminal()
  27. return term._handle
  28. @contextlib.contextmanager
  29. def pushd(newDir):
  30. previousDir = os.getcwd()
  31. os.chdir(newDir)
  32. try:
  33. yield
  34. finally:
  35. os.chdir(previousDir)
  36. class StateException(Exception):
  37. pass
  38. class BackupException(Exception):
  39. pass
  40. class Job(object):
  41. pool = []
  42. maxthreads = 4
  43. verbosity = 3
  44. logTransitions = False
  45. READY = 0
  46. QUEUED = 1
  47. RUNNING = 2
  48. FAILED = 3
  49. SUCCESSFUL = 4
  50. @staticmethod
  51. def initPool():
  52. if Job.pool:
  53. maxthreads = Job.maxthreads
  54. running = len(Job.running())
  55. if maxthreads > running:
  56. queued = Job.queued()
  57. if len(queued) < maxthreads - running:
  58. maxthreads = len(queued)
  59. if maxthreads:
  60. for i in range(0, maxthreads):
  61. if len(queued) > i:
  62. queued[i].start()
  63. @staticmethod
  64. def finished():
  65. return [x for x in Job.pool if x.state in (Job.FAILED, Job.SUCCESSFUL)]
  66. @staticmethod
  67. def pending():
  68. return [x for x in Job.pool
  69. if x.state not in (Job.FAILED, Job.SUCCESSFUL)]
  70. @staticmethod
  71. def running():
  72. return [x for x in Job.pool
  73. if x.state is Job.RUNNING]
  74. @staticmethod
  75. def queued():
  76. return [x for x in Job.pending() if x.state is Job.QUEUED]
  77. def __init__(self, backup, config):
  78. self._config = config
  79. self._backup = backup
  80. self._state = Job.READY
  81. def __str__(self):
  82. return 'Backup {0} Job #{1} ({2})'.format(
  83. self._backup.name, self.index, self.getState())
  84. @property
  85. def config(self):
  86. return self._config
  87. @property
  88. def state(self):
  89. if self._state is Job.READY and self in Job.pool:
  90. self.setState(Job.QUEUED)
  91. elif self._state is Job.RUNNING:
  92. code = self._process.poll() or self._process.returncode
  93. if code is None and not psutil.pid_exists(self._process.pid):
  94. code = -1
  95. if code is not None:
  96. self.setState(Job.FAILED if code else Job.SUCCESSFUL)
  97. Job.initPool()
  98. return self._state
  99. def queue(self):
  100. if self.state is not Job.READY:
  101. raise StateException('{} not in state to queue'.format(self))
  102. if self in Job.pool:
  103. raise StateException('{} already in queued pool'.format(self))
  104. self.setState(Job.QUEUED)
  105. Job.pool.append(self)
  106. @property
  107. def args(self):
  108. if not hasattr(self, '_args'):
  109. if Backup.engine == "rdiff-backup":
  110. args = ['rdiff-backup', '-v{}'.format(Backup.verbosity)]
  111. if 'filters' in self.config:
  112. for item in self.config['filters']:
  113. if 'include' in item:
  114. args += ['--include', item['include']]
  115. elif 'exclude' in item:
  116. args += ['--exclude', item['exclude']]
  117. else:
  118. raise BackupException(
  119. '{0} has an invalid filter {1}'.format(self, item))
  120. self._args = args + [self.fromPath, self.toPath]
  121. else:
  122. raise StateException(
  123. 'Invalid backup engine {}'.format(Backup.engine))
  124. return self._args
  125. @property
  126. def logfile(self):
  127. if not hasattr(self, '_logfile'):
  128. path = os.path.dirname(self.logpath)
  129. if not os.path.exists(path):
  130. os.makedirs(path, exist_ok=True)
  131. self._logfile = open(self.logpath, 'w')
  132. return self._logfile
  133. @property
  134. def logpath(self):
  135. if not hasattr(self, '_logpath'):
  136. self._logpath = os.path.join(os.path.dirname(
  137. self._backup.logpath), 'job{}.log'.format(self.index))
  138. return self._logpath
  139. @property
  140. def fromPath(self):
  141. if not hasattr(self, '_fromPath'):
  142. fromPath = self.config['from']
  143. if 'roots' in self._backup.config:
  144. roots = self._backup.config['roots']
  145. if 'from' in roots:
  146. if '::' in roots['from']:
  147. if roots['from'].endswith('::'):
  148. fromPath = roots['from'] + fromPath
  149. else:
  150. fromPath = roots['from'] + os.sep + fromPath
  151. else:
  152. fromPath = os.path.join(roots['from'], fromPath)
  153. self._fromPath = fromPath
  154. return self._fromPath
  155. @property
  156. def toPath(self):
  157. if not hasattr(self, '_toPath'):
  158. toPath = self.config['to']
  159. if 'roots' in self._backup.config:
  160. roots = self._backup.config['roots']
  161. if 'to' in roots:
  162. if '::' in roots['to']:
  163. if roots['to'].endswith('::'):
  164. toPath = roots['to'] + toPath
  165. else:
  166. toPath = roots['to'] + os.sep + toPath
  167. else:
  168. toPath = os.path.join(roots['to'], toPath)
  169. self._toPath = toPath
  170. return self._toPath
  171. @property
  172. def process(self):
  173. return self._process
  174. @property
  175. def index(self):
  176. if not hasattr(self, '_index'):
  177. self._index = self._backup.jobs.index(self)
  178. return self._index
  179. def log(self, text):
  180. text = '[Backup {0} Job #{1}] {2}\n'.format(
  181. self._backup.name, self.index, text)
  182. print(text, end='')
  183. self._backup.logfile.write(text)
  184. self._backup.logfile.flush()
  185. def start(self):
  186. if self.state is not Job.QUEUED:
  187. raise StateException('Invalid state to start {}'.format(self))
  188. self._backup.setStatus(Backup.RUNNING)
  189. self.setState(Job.RUNNING)
  190. self.logfile.write(' '.join([shlex.quote(x) for x in self.args]) + '\n')
  191. self.logfile.flush()
  192. self._process = subprocess.Popen(
  193. self.args, stdout=self.logfile, stderr=subprocess.STDOUT,
  194. stdin=subprocess.DEVNULL, universal_newlines=True, bufsize=1)
  195. def setState(self, state):
  196. if self._state != state:
  197. self.log('{0} -> {1}'.format(
  198. self.getState(self._state), self.getState(state)))
  199. self._state = state
  200. if state in (Job.SUCCESSFUL, Job.FAILED):
  201. self.logfile.close()
  202. def getState(self, state=None):
  203. return {
  204. Job.READY: 'ready',
  205. Job.QUEUED: 'queued',
  206. Job.RUNNING: 'running',
  207. Job.FAILED: 'failed',
  208. Job.SUCCESSFUL: 'successful',
  209. }[self.state if state is None else state]
  210. class Backup(object):
  211. instances = {}
  212. _queue = []
  213. logTransitions = False
  214. engine = 'rdiff-backup'
  215. logdir = '/var/log/backup.d'
  216. BLOCKED = 0
  217. READY = 1
  218. QUEUED = 2
  219. RUNNING = 3
  220. FAILED = 4
  221. SUCCESSFUL = 5
  222. @staticmethod
  223. def _log():
  224. print('Backup status:')
  225. for backup in Backup.instances.values():
  226. print(' {}'.format(backup))
  227. for job in backup.pending:
  228. print(' {}'.format(job))
  229. @staticmethod
  230. def load(paths):
  231. for path in paths:
  232. Backup.get(path)
  233. @staticmethod
  234. def blocked():
  235. return [x for x in Backup.instances.values()
  236. if x.status is Backup.BLOCKED]
  237. @staticmethod
  238. def get(path):
  239. if path not in Backup.instances:
  240. Backup(sources()[path])
  241. return Backup.instances[path]
  242. @staticmethod
  243. def start():
  244. for backup in Backup.instances.values():
  245. backup.queue()
  246. Job.initPool()
  247. @staticmethod
  248. def wait(log=False):
  249. while Backup._queue:
  250. for backup in Backup._queue:
  251. if backup.status is Backup.BLOCKED:
  252. for dependency in backup.blocking:
  253. if dependency.status is Backup.READY:
  254. dependency.queue()
  255. if log:
  256. Backup._log()
  257. time.sleep(1)
  258. def __init__(self, config):
  259. self._config = config
  260. self._path = self._config['path']
  261. self._name = slugify(self._path, max_length=255)
  262. self._logpath = os.path.realpath(os.path.join(
  263. Backup.logdir, self.name, 'backup.log'))
  264. self._status = Backup.READY
  265. if self.blocking:
  266. self.setStatus(Backup.BLOCKED)
  267. Backup.instances[self._path] = self
  268. def __str__(self):
  269. return 'Backup {0} ({1}, {2} jobs)'.format(
  270. self.name, self.getStatus(), len([x for x in self.jobs
  271. if x.state not in (Job.FAILED, Job.SUCCESSFUL)]))
  272. @property
  273. def config(self):
  274. return self._config
  275. @property
  276. def name(self):
  277. return self._name
  278. @property
  279. def path(self):
  280. return self._path
  281. @property
  282. def logpath(self):
  283. return self._logpath
  284. @property
  285. def logfile(self):
  286. if not hasattr(self, '_logfile'):
  287. path = os.path.dirname(self.logpath)
  288. if not os.path.exists(path):
  289. os.makedirs(path, exist_ok=True)
  290. self._logfile = open(self.logpath, 'w+')
  291. return self._logfile
  292. def log(self, text):
  293. text = '[Backup {0}] {1}\n'.format(self.name, text)
  294. print(text, end='')
  295. self.logfile.write(text)
  296. self.logfile.flush()
  297. @property
  298. def status(self):
  299. if self.blocking:
  300. if [x for x in self.blocking if x.status is Backup.FAILED]:
  301. self.setStatus(Backup.FAILED)
  302. elif self._status is not Backup.BLOCKED:
  303. self.setStatus(Backup.BLOCKED)
  304. elif self._status is Backup.BLOCKED:
  305. if self not in Backup._queue:
  306. self.setStatus(Backup.READY)
  307. else:
  308. self.setStatus(Backup.QUEUED)
  309. for job in self.ready:
  310. job.queue()
  311. Job.initPool()
  312. elif self._status is Backup.QUEUED and self not in Backup._queue:
  313. self.setStatus(Backup.READY)
  314. if self._status in (Backup.RUNNING, Backup.QUEUED) and not self.pending:
  315. self.setStatus(Backup.FAILED if self.failed else Backup.SUCCESSFUL)
  316. if self._status in (Backup.FAILED, Backup.SUCCESSFUL) \
  317. and self in Backup._queue:
  318. Backup._queue.remove(self)
  319. return self._status
  320. @property
  321. def blocking(self):
  322. return [x for x in self.depends
  323. if x.status is not Backup.SUCCESSFUL]
  324. @property
  325. def depends(self):
  326. if not hasattr(self, '_depends'):
  327. self._depends = []
  328. for path in self.config["depends"]:
  329. if path not in Backup.instances:
  330. Backup(sources()[path])
  331. self._depends.append(Backup.instances[path])
  332. return self._depends
  333. @property
  334. def jobs(self):
  335. if not hasattr(self, '_jobs'):
  336. self._jobs = []
  337. if 'jobs' in self.config:
  338. for job in self.config['jobs']:
  339. self._jobs.append(Job(self, job))
  340. return self._jobs
  341. @property
  342. def pending(self):
  343. return [x for x in self.jobs if x.state not in (Job.FAILED, Job.SUCCESSFUL)]
  344. @property
  345. def ready(self):
  346. return [x for x in self.jobs if x.state is Job.READY]
  347. @property
  348. def failed(self):
  349. return [x for x in self.jobs if x.state is Job.FAILED]
  350. def setStatus(self, status):
  351. if self._status != status:
  352. self.log('{0} -> {1}'.format(
  353. self.getStatus(self._status), self.getStatus(status)))
  354. self._status = status
  355. if status in (Backup.SUCCESSFUL, Backup.FAILED):
  356. self.logfile.close()
  357. def getStatus(self, status=None):
  358. return {
  359. Backup.BLOCKED: 'blocked',
  360. Backup.READY: 'ready',
  361. Backup.QUEUED: 'queued',
  362. Backup.RUNNING: 'running',
  363. Backup.FAILED: 'failed',
  364. Backup.SUCCESSFUL: 'successful'
  365. }[self.status if status is None else status]
  366. def queue(self):
  367. if self in Backup._queue:
  368. raise StateException('Backup already queued')
  369. Backup._queue.append(self)
  370. self.setStatus(Backup.QUEUED)
  371. if self.status is not Backup.BLOCKED:
  372. for job in self.ready:
  373. job.queue()
  374. Job.initPool()
  375. def config():
  376. if hasattr(config, '_handle'):
  377. return config._handle
  378. with pushd(config._root):
  379. with open("backup.yml") as f:
  380. config._handle = yaml.load(f, Loader=yaml.SafeLoader)
  381. return config._handle
  382. def sources():
  383. if hasattr(sources, '_handle'):
  384. return sources._handle
  385. sources._handle = {}
  386. with pushd(config._root):
  387. for source in config()['sources']:
  388. source = os.path.realpath(source)
  389. for path in glob('{}/*.yml'.format(source)) + \
  390. glob('{}/*.yaml'.format(source)):
  391. path = os.path.realpath(path)
  392. with pushd(os.path.dirname(path)), open(path) as f:
  393. data = yaml.load(f, Loader=yaml.SafeLoader)
  394. if "active" in data and data["active"]:
  395. data['path'] = path
  396. if "depends" not in data:
  397. data["depends"] = []
  398. for i in range(0, len(data["depends"])):
  399. data["depends"][i] = os.path.realpath(
  400. '{}.yml'.format(data["depends"][i]))
  401. sources._handle[path] = data
  402. return sources._handle
  403. def main(args):
  404. config._root = args[0] if len(args) else '/etc/backup.d'
  405. if not os.path.exists(config._root):
  406. raise BackupException(
  407. 'Configuration files missing from {}'.format(config._root))
  408. if 'engine' in config():
  409. engine = config()["engine"]
  410. if engine not in ("rdiff-backup"):
  411. raise BackupException('Unknown backup engine: {}'.format(engine))
  412. Backup.engine = engine
  413. if 'logdir' in config():
  414. logdir = config()['logdir']
  415. os.makedirs(logdir, exist_ok=True)
  416. if not os.path.exists(logdir):
  417. raise BackupException(
  418. 'Unable to create logging directory: {}'.format(logdir))
  419. Backup.logdir = logdir
  420. if 'maxthreads' in config():
  421. Job.maxthreads = config()['maxthreads']
  422. if 'verbosity' in config():
  423. Backup.verbosity = config()['verbosity']
  424. Backup.logTransitions = Job.logTransitions = True
  425. Backup.load(sources().keys())
  426. Backup.start()
  427. Backup.wait()
  428. if __name__ == '__main__':
  429. try:
  430. main(sys.argv[1:])
  431. except BackupException as ex:
  432. print(ex)
  433. sys.exit(1)
  434. except Exception:
  435. from traceback import format_exc
  436. msg = "Error encountered:\n" + format_exc().strip()
  437. print(msg)
  438. sys.exit(1)