__init__.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622
  1. import contextlib
  2. import os
  3. import platform
  4. import psutil
  5. import shlex
  6. import subprocess
  7. import sys
  8. import time
  9. import yaml
  10. from datetime import datetime
  11. from glob import glob
  12. from slugify import slugify
  13. def term():
  14. """Get the Terminal reference to make output pretty
  15. Returns:
  16. (blessings.Terminal): Returns
  17. a `blessings <https://blessings.readthedocs.io/en/latest>`_ terminal
  18. instance. If running in windows and not cygwin it will return an
  19. `intercessions <https://pypi.org/project/intercessions>`_ terminal
  20. instance instead
  21. """
  22. if not hasattr(term, '_handle'):
  23. if sys.platform != "cygwin" and platform.system() == 'Windows':
  24. from intercessions import Terminal
  25. else:
  26. from blessings import Terminal
  27. term._handle = Terminal()
  28. return term._handle
  29. @contextlib.contextmanager
  30. def pushd(newDir):
  31. previousDir = os.getcwd()
  32. os.chdir(newDir)
  33. try:
  34. yield
  35. finally:
  36. os.chdir(previousDir)
  37. class StateException(Exception):
  38. pass
  39. class BackupException(Exception):
  40. pass
  41. class Job(object):
  42. pool = []
  43. maxthreads = 4
  44. verbosity = 3
  45. logTransitions = False
  46. READY = 0
  47. QUEUED = 1
  48. STARTING = 2
  49. RUNNING = 3
  50. ENDING = 4
  51. FAILED = 5
  52. SUCCESSFUL = 6
  53. @staticmethod
  54. def initPool():
  55. if Job.pool:
  56. maxthreads = Job.maxthreads
  57. running = len(Job.running())
  58. if maxthreads > running:
  59. queued = Job.queued()
  60. if len(queued) < maxthreads - running:
  61. maxthreads = len(queued)
  62. if maxthreads:
  63. for i in range(0, maxthreads):
  64. if len(queued) > i:
  65. queued[i].start()
  66. @staticmethod
  67. def finished():
  68. return [x for x in Job.pool if x.state in (Job.FAILED, Job.SUCCESSFUL)]
  69. @staticmethod
  70. def pending():
  71. return [x for x in Job.pool
  72. if x.state not in (Job.FAILED, Job.SUCCESSFUL)]
  73. @staticmethod
  74. def running():
  75. return [x for x in Job.pool
  76. if x.state in (Job.STARTING, Job.RUNNING, Job.ENDING)]
  77. @staticmethod
  78. def queued():
  79. return [x for x in Job.pending() if x.state is Job.QUEUED]
  80. def __init__(self, backup, config):
  81. self._config = config
  82. self._backup = backup
  83. self._state = Job.READY
  84. def __str__(self):
  85. return 'Backup {0} Job #{1} ({2})'.format(
  86. self._backup.name, self.index, self.getState())
  87. @property
  88. def config(self):
  89. return self._config
  90. @property
  91. def state(self):
  92. if self._state is Job.READY and self in Job.pool:
  93. self.setState(Job.QUEUED)
  94. elif self._state in (Job.STARTING, Job.RUNNING, Job.ENDING):
  95. code = self._process.poll() or self._process.returncode
  96. if code is None and not psutil.pid_exists(self._process.pid):
  97. code = -1
  98. if code is not None:
  99. if code:
  100. self.setState(Job.FAILED)
  101. Job.initPool()
  102. elif self._state is Job.ENDING:
  103. self.setState(Job.SUCCESSFUL)
  104. Job.initPool()
  105. else:
  106. self.start()
  107. return self._state
  108. def queue(self):
  109. if self.state is not Job.READY:
  110. raise StateException('{} not in state to queue'.format(self))
  111. if self in Job.pool:
  112. raise StateException('{} already in queued pool'.format(self))
  113. self.setState(Job.QUEUED)
  114. Job.pool.append(self)
  115. @property
  116. def args(self):
  117. if self._state is Job.STARTING:
  118. return shlex.split(self.pre)
  119. elif self._state is Job.RUNNING:
  120. if Backup.engine == "rdiff-backup":
  121. args = ['rdiff-backup', '-v{}'.format(Backup.verbosity)]
  122. if 'filters' in self.config:
  123. for item in self.config['filters']:
  124. if 'include' in item:
  125. args += ['--include', item['include']]
  126. elif 'exclude' in item:
  127. args += ['--exclude', item['exclude']]
  128. else:
  129. raise BackupException(
  130. '{0} has an invalid filter {1}'.format(self, item))
  131. return args + [self.fromPath, self.toPath]
  132. else:
  133. raise StateException(
  134. 'Invalid backup engine {}'.format(Backup.engine))
  135. elif self._state is Job.ENDING:
  136. return shlex.split(self.post)
  137. else:
  138. raise StateException('Invalid state {}'.format(self.getState()))
  139. @property
  140. def logfile(self):
  141. if not hasattr(self, '_logfile'):
  142. path = os.path.dirname(self.logpath)
  143. if not os.path.exists(path):
  144. os.makedirs(path, exist_ok=True)
  145. self._logfile = open(self.logpath, 'w' if Backup.truncateLogs else 'a')
  146. return self._logfile
  147. @property
  148. def logpath(self):
  149. if not hasattr(self, '_logpath'):
  150. self._logpath = os.path.join(os.path.dirname(
  151. self._backup.logpath), 'job{}.log'.format(self.index))
  152. return self._logpath
  153. @property
  154. def fromPath(self):
  155. if not hasattr(self, '_fromPath'):
  156. fromPath = self.config['from']
  157. if 'roots' in self._backup.config:
  158. roots = self._backup.config['roots']
  159. if 'from' in roots:
  160. if '::' in roots['from']:
  161. if roots['from'].endswith('::'):
  162. fromPath = roots['from'] + fromPath
  163. else:
  164. fromPath = roots['from'] + os.sep + fromPath
  165. else:
  166. fromPath = os.path.join(roots['from'], fromPath)
  167. self._fromPath = fromPath
  168. return self._fromPath
  169. @property
  170. def toPath(self):
  171. if not hasattr(self, '_toPath'):
  172. toPath = self.config['to']
  173. if 'roots' in self._backup.config:
  174. roots = self._backup.config['roots']
  175. if 'to' in roots:
  176. if '::' in roots['to']:
  177. if roots['to'].endswith('::'):
  178. toPath = roots['to'] + toPath
  179. else:
  180. toPath = roots['to'] + os.sep + toPath
  181. else:
  182. toPath = os.path.join(roots['to'], toPath)
  183. self._toPath = toPath
  184. return self._toPath
  185. @property
  186. def process(self):
  187. return self._process
  188. @property
  189. def index(self):
  190. if not hasattr(self, '_index'):
  191. self._index = self._backup.jobs.index(self)
  192. return self._index
  193. @property
  194. def pre(self):
  195. if not hasattr(self, '_pre'):
  196. self._pre = self.config['pre'] if 'pre' in self.config else None
  197. return self._pre
  198. @property
  199. def post(self):
  200. if not hasattr(self, '_post'):
  201. self._post = self.config['post'] if 'post' in self.config else None
  202. return self._post
  203. def log(self, text):
  204. text = '[{0}] (Backup {1} Job #{2}) {3}\n'.format(
  205. datetime.now().isoformat(), self._backup.name, self.index, text)
  206. print(text, end='')
  207. self._backup.logfile.write(text)
  208. self._backup.logfile.flush()
  209. def start(self):
  210. if self._state is self.QUEUED:
  211. self._backup.setStatus(Backup.RUNNING)
  212. self.setState(Job.STARTING)
  213. if self.pre is None:
  214. self.setState(Job.RUNNING)
  215. elif self._state is self.STARTING:
  216. self.setState(Job.RUNNING)
  217. elif self._state is self.RUNNING:
  218. self.setState(Job.ENDING)
  219. if self.post is None:
  220. self.setState(Job.SUCCESSFUL)
  221. Job.initPool()
  222. return
  223. else:
  224. raise StateException('Invalid state to start {}'.format(self))
  225. args = self.args
  226. self.logfile.write(
  227. "[{0}] {1} &\n".format(
  228. datetime.now().isoformat(),
  229. ' '.join([shlex.quote(x) for x in args])))
  230. self.logfile.flush()
  231. self._process = subprocess.Popen(
  232. args, stdout=self.logfile, stderr=subprocess.STDOUT,
  233. stdin=subprocess.DEVNULL, universal_newlines=True, bufsize=1)
  234. def setState(self, state):
  235. if self._state != state:
  236. self.log('{0} -> {1}'.format(
  237. self.getState(self._state), self.getState(state)))
  238. self._state = state
  239. if state in (Job.SUCCESSFUL, Job.FAILED):
  240. self.logfile.close()
  241. def getState(self, state=None):
  242. return {
  243. Job.READY: 'ready',
  244. Job.QUEUED: 'queued',
  245. Job.STARTING: 'starting',
  246. Job.RUNNING: 'running',
  247. Job.ENDING: 'ending',
  248. Job.FAILED: 'failed',
  249. Job.SUCCESSFUL: 'successful',
  250. }[self.state if state is None else state]
  251. class Backup(object):
  252. instances = {}
  253. _queue = []
  254. logTransitions = False
  255. truncateLogs = True
  256. engine = 'rdiff-backup'
  257. logdir = '/var/log/backup.d'
  258. BLOCKED = 0
  259. READY = 1
  260. QUEUED = 2
  261. RUNNING = 3
  262. FAILED = 4
  263. SUCCESSFUL = 5
  264. @staticmethod
  265. def _log():
  266. print('Backup status:')
  267. for backup in Backup.instances.values():
  268. print(' {}'.format(backup))
  269. for job in backup.pending:
  270. print(' {}'.format(job))
  271. @staticmethod
  272. def load(paths):
  273. for path in paths:
  274. Backup.get(path)
  275. @staticmethod
  276. def blocked():
  277. return [x for x in Backup.instances.values()
  278. if x.status is Backup.BLOCKED]
  279. @staticmethod
  280. def get(path):
  281. if path not in Backup.instances:
  282. Backup(sources()[path])
  283. return Backup.instances[path]
  284. @staticmethod
  285. def start():
  286. for backup in Backup.instances.values():
  287. backup.queue()
  288. Job.initPool()
  289. @staticmethod
  290. def wait(log=False):
  291. while Backup._queue:
  292. for backup in Backup._queue:
  293. if backup.status is Backup.BLOCKED:
  294. for dependency in backup.blocking:
  295. if dependency.status is Backup.READY:
  296. dependency.queue()
  297. if log:
  298. Backup._log()
  299. time.sleep(1)
  300. def __init__(self, config):
  301. self._config = config
  302. self._path = self._config['path']
  303. self._name = slugify(self._path, max_length=255)
  304. self._logpath = os.path.realpath(os.path.join(
  305. Backup.logdir, self.name, 'backup.log'))
  306. self._status = Backup.READY
  307. if self.blocking:
  308. self.setStatus(Backup.BLOCKED)
  309. Backup.instances[self._path] = self
  310. def __str__(self):
  311. return 'Backup {0} ({1}, {2} jobs)'.format(
  312. self.name, self.getStatus(), len([x for x in self.jobs
  313. if x.state not in (Job.FAILED, Job.SUCCESSFUL)]))
  314. @property
  315. def config(self):
  316. return self._config
  317. @property
  318. def name(self):
  319. return self._name
  320. @property
  321. def path(self):
  322. return self._path
  323. @property
  324. def logpath(self):
  325. return self._logpath
  326. @property
  327. def logfile(self):
  328. if not hasattr(self, '_logfile'):
  329. path = os.path.dirname(self.logpath)
  330. if not os.path.exists(path):
  331. os.makedirs(path, exist_ok=True)
  332. self._logfile = open(self.logpath, 'w' if Backup.truncateLogs else 'a')
  333. return self._logfile
  334. def log(self, text):
  335. text = '[{0}] (Backup {1}) {2}\n'.format(
  336. datetime.now().isoformat(), self.name, text)
  337. print(text, end='')
  338. self.logfile.write(text)
  339. self.logfile.flush()
  340. @property
  341. def status(self):
  342. if self.blocking:
  343. if [x for x in self.blocking if x.status is Backup.FAILED]:
  344. self.setStatus(Backup.FAILED)
  345. elif self._status is not Backup.BLOCKED:
  346. self.setStatus(Backup.BLOCKED)
  347. elif self._status is Backup.BLOCKED:
  348. if self not in Backup._queue:
  349. self.setStatus(Backup.READY)
  350. else:
  351. self.setStatus(Backup.QUEUED)
  352. for job in self.ready:
  353. job.queue()
  354. Job.initPool()
  355. elif self._status is Backup.QUEUED and self not in Backup._queue:
  356. self.setStatus(Backup.READY)
  357. if self._status in (Backup.RUNNING, Backup.QUEUED) and not self.pending:
  358. self.setStatus(Backup.FAILED if self.failed else Backup.SUCCESSFUL)
  359. if self._status in (Backup.FAILED, Backup.SUCCESSFUL) \
  360. and self in Backup._queue:
  361. Backup._queue.remove(self)
  362. return self._status
  363. @property
  364. def blocking(self):
  365. return [x for x in self.depends
  366. if x.status is not Backup.SUCCESSFUL]
  367. @property
  368. def depends(self):
  369. if not hasattr(self, '_depends'):
  370. self._depends = []
  371. for path in self.config["depends"]:
  372. if path not in Backup.instances:
  373. Backup(sources()[path])
  374. self._depends.append(Backup.instances[path])
  375. return self._depends
  376. @property
  377. def jobs(self):
  378. if not hasattr(self, '_jobs'):
  379. self._jobs = []
  380. if 'jobs' in self.config:
  381. for job in self.config['jobs']:
  382. self._jobs.append(Job(self, job))
  383. return self._jobs
  384. @property
  385. def pending(self):
  386. return [x for x in self.jobs if x.state not in (Job.FAILED, Job.SUCCESSFUL)]
  387. @property
  388. def ready(self):
  389. return [x for x in self.jobs if x.state is Job.READY]
  390. @property
  391. def failed(self):
  392. return [x for x in self.jobs if x.state is Job.FAILED]
  393. def setStatus(self, status):
  394. if self._status != status:
  395. self.log('{0} -> {1}'.format(
  396. self.getStatus(self._status), self.getStatus(status)))
  397. self._status = status
  398. if status in (Backup.SUCCESSFUL, Backup.FAILED):
  399. self.logfile.close()
  400. def getStatus(self, status=None):
  401. return {
  402. Backup.BLOCKED: 'blocked',
  403. Backup.READY: 'ready',
  404. Backup.QUEUED: 'queued',
  405. Backup.RUNNING: 'running',
  406. Backup.FAILED: 'failed',
  407. Backup.SUCCESSFUL: 'successful'
  408. }[self.status if status is None else status]
  409. def queue(self):
  410. if self in Backup._queue:
  411. raise StateException('Backup already queued')
  412. Backup._queue.append(self)
  413. self.setStatus(Backup.QUEUED)
  414. if self.status is not Backup.BLOCKED:
  415. for job in self.ready:
  416. job.queue()
  417. Job.initPool()
  418. def config():
  419. if hasattr(config, '_handle'):
  420. return config._handle
  421. with pushd(config._root):
  422. with open("backup.yml") as f:
  423. config._handle = yaml.load(f, Loader=yaml.SafeLoader)
  424. return config._handle
  425. def sources():
  426. if hasattr(sources, '_handle'):
  427. return sources._handle
  428. sources._handle = {}
  429. with pushd(config._root):
  430. for source in config()['sources']:
  431. source = os.path.realpath(source)
  432. for path in glob('{}/*.yml'.format(source)) + \
  433. glob('{}/*.yaml'.format(source)):
  434. path = os.path.realpath(path)
  435. with pushd(os.path.dirname(path)), open(path) as f:
  436. data = yaml.load(f, Loader=yaml.SafeLoader)
  437. if "active" in data and data["active"]:
  438. data['path'] = path
  439. if "depends" not in data:
  440. data["depends"] = []
  441. for i in range(0, len(data["depends"])):
  442. data["depends"][i] = os.path.realpath(
  443. '{}.yml'.format(data["depends"][i]))
  444. sources._handle[path] = data
  445. return sources._handle
  446. def main(args):
  447. try:
  448. config._root = args[0] if len(args) else '/etc/backup.d'
  449. if not os.path.exists(config._root):
  450. raise BackupException(
  451. 'Configuration files missing from {}'.format(config._root))
  452. if 'engine' in config():
  453. engine = config()["engine"]
  454. if engine not in ("rdiff-backup"):
  455. raise BackupException('Unknown backup engine: {}'.format(engine))
  456. Backup.engine = engine
  457. if 'logdir' in config():
  458. logdir = config()['logdir']
  459. os.makedirs(logdir, exist_ok=True)
  460. if not os.path.exists(logdir):
  461. raise BackupException(
  462. 'Unable to create logging directory: {}'.format(logdir))
  463. Backup.logdir = logdir
  464. if 'maxthreads' in config():
  465. Job.maxthreads = config()['maxthreads']
  466. if 'verbosity' in config():
  467. Backup.verbosity = config()['verbosity']
  468. Backup.logTransitions = Job.logTransitions = True
  469. Backup.truncateLogs = "truncateLogs" in config() and config["truncateLogs"]
  470. Backup.load(sources().keys())
  471. Backup.start()
  472. Backup.wait()
  473. except BackupException as ex:
  474. print(ex)
  475. sys.exit(1)
  476. except Exception:
  477. from traceback import format_exc
  478. msg = "Error encountered:\n" + format_exc().strip()
  479. print(msg)
  480. sys.exit(1)
  481. if __name__ == '__main__':
  482. main(sys.argv[1:])