__init__.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614
  1. import contextlib
  2. import os
  3. import platform
  4. import psutil
  5. import shlex
  6. import subprocess
  7. import sys
  8. import time
  9. import yaml
  10. from glob import glob
  11. from slugify import slugify
  12. def term():
  13. """Get the Terminal reference to make output pretty
  14. Returns:
  15. (blessings.Terminal): Returns
  16. a `blessings <https://blessings.readthedocs.io/en/latest>`_ terminal
  17. instance. If running in windows and not cygwin it will return an
  18. `intercessions <https://pypi.org/project/intercessions>`_ terminal
  19. instance instead
  20. """
  21. if not hasattr(term, '_handle'):
  22. if sys.platform != "cygwin" and platform.system() == 'Windows':
  23. from intercessions import Terminal
  24. else:
  25. from blessings import Terminal
  26. term._handle = Terminal()
  27. return term._handle
  28. @contextlib.contextmanager
  29. def pushd(newDir):
  30. previousDir = os.getcwd()
  31. os.chdir(newDir)
  32. try:
  33. yield
  34. finally:
  35. os.chdir(previousDir)
  36. class StateException(Exception):
  37. pass
  38. class BackupException(Exception):
  39. pass
  40. class Job(object):
  41. pool = []
  42. maxthreads = 4
  43. verbosity = 3
  44. logTransitions = False
  45. READY = 0
  46. QUEUED = 1
  47. STARTING = 2
  48. RUNNING = 3
  49. ENDING = 4
  50. FAILED = 5
  51. SUCCESSFUL = 6
  52. @staticmethod
  53. def initPool():
  54. if Job.pool:
  55. maxthreads = Job.maxthreads
  56. running = len(Job.running())
  57. if maxthreads > running:
  58. queued = Job.queued()
  59. if len(queued) < maxthreads - running:
  60. maxthreads = len(queued)
  61. if maxthreads:
  62. for i in range(0, maxthreads):
  63. if len(queued) > i:
  64. queued[i].start()
  65. @staticmethod
  66. def finished():
  67. return [x for x in Job.pool if x.state in (Job.FAILED, Job.SUCCESSFUL)]
  68. @staticmethod
  69. def pending():
  70. return [x for x in Job.pool
  71. if x.state not in (Job.FAILED, Job.SUCCESSFUL)]
  72. @staticmethod
  73. def running():
  74. return [x for x in Job.pool
  75. if x.state in (Job.STARTING, Job.RUNNING, Job.ENDING)]
  76. @staticmethod
  77. def queued():
  78. return [x for x in Job.pending() if x.state is Job.QUEUED]
  79. def __init__(self, backup, config):
  80. self._config = config
  81. self._backup = backup
  82. self._state = Job.READY
  83. def __str__(self):
  84. return 'Backup {0} Job #{1} ({2})'.format(
  85. self._backup.name, self.index, self.getState())
  86. @property
  87. def config(self):
  88. return self._config
  89. @property
  90. def state(self):
  91. if self._state is Job.READY and self in Job.pool:
  92. self.setState(Job.QUEUED)
  93. elif self._state in (Job.STARTING, Job.RUNNING, Job.ENDING):
  94. code = self._process.poll() or self._process.returncode
  95. if code is None and not psutil.pid_exists(self._process.pid):
  96. code = -1
  97. if code is not None:
  98. if code:
  99. self.setState(Job.FAILED)
  100. Job.initPool()
  101. elif self._state is Job.ENDING:
  102. self.setState(Job.SUCCESSFUL)
  103. Job.initPool()
  104. else:
  105. self.start()
  106. return self._state
  107. def queue(self):
  108. if self.state is not Job.READY:
  109. raise StateException('{} not in state to queue'.format(self))
  110. if self in Job.pool:
  111. raise StateException('{} already in queued pool'.format(self))
  112. self.setState(Job.QUEUED)
  113. Job.pool.append(self)
  114. @property
  115. def args(self):
  116. if self._state is Job.STARTING:
  117. return shlex.split(self.pre)
  118. elif self._state is Job.RUNNING:
  119. if Backup.engine == "rdiff-backup":
  120. args = ['rdiff-backup', '-v{}'.format(Backup.verbosity)]
  121. if 'filters' in self.config:
  122. for item in self.config['filters']:
  123. if 'include' in item:
  124. args += ['--include', item['include']]
  125. elif 'exclude' in item:
  126. args += ['--exclude', item['exclude']]
  127. else:
  128. raise BackupException(
  129. '{0} has an invalid filter {1}'.format(self, item))
  130. return args + [self.fromPath, self.toPath]
  131. else:
  132. raise StateException(
  133. 'Invalid backup engine {}'.format(Backup.engine))
  134. elif self._state is Job.ENDING:
  135. return shlex.split(self.post)
  136. else:
  137. raise StateException('Invalid state {}'.format(self.getState()))
  138. @property
  139. def logfile(self):
  140. if not hasattr(self, '_logfile'):
  141. path = os.path.dirname(self.logpath)
  142. if not os.path.exists(path):
  143. os.makedirs(path, exist_ok=True)
  144. self._logfile = open(self.logpath, 'w')
  145. return self._logfile
  146. @property
  147. def logpath(self):
  148. if not hasattr(self, '_logpath'):
  149. self._logpath = os.path.join(os.path.dirname(
  150. self._backup.logpath), 'job{}.log'.format(self.index))
  151. return self._logpath
  152. @property
  153. def fromPath(self):
  154. if not hasattr(self, '_fromPath'):
  155. fromPath = self.config['from']
  156. if 'roots' in self._backup.config:
  157. roots = self._backup.config['roots']
  158. if 'from' in roots:
  159. if '::' in roots['from']:
  160. if roots['from'].endswith('::'):
  161. fromPath = roots['from'] + fromPath
  162. else:
  163. fromPath = roots['from'] + os.sep + fromPath
  164. else:
  165. fromPath = os.path.join(roots['from'], fromPath)
  166. self._fromPath = fromPath
  167. return self._fromPath
  168. @property
  169. def toPath(self):
  170. if not hasattr(self, '_toPath'):
  171. toPath = self.config['to']
  172. if 'roots' in self._backup.config:
  173. roots = self._backup.config['roots']
  174. if 'to' in roots:
  175. if '::' in roots['to']:
  176. if roots['to'].endswith('::'):
  177. toPath = roots['to'] + toPath
  178. else:
  179. toPath = roots['to'] + os.sep + toPath
  180. else:
  181. toPath = os.path.join(roots['to'], toPath)
  182. self._toPath = toPath
  183. return self._toPath
  184. @property
  185. def process(self):
  186. return self._process
  187. @property
  188. def index(self):
  189. if not hasattr(self, '_index'):
  190. self._index = self._backup.jobs.index(self)
  191. return self._index
  192. @property
  193. def pre(self):
  194. if not hasattr(self, '_pre'):
  195. self._pre = self.config['pre'] if 'pre' in self.config else None
  196. return self._pre
  197. @property
  198. def post(self):
  199. if not hasattr(self, '_post'):
  200. self._post = self.config['post'] if 'post' in self.config else None
  201. return self._post
  202. def log(self, text):
  203. text = '[Backup {0} Job #{1}] {2}\n'.format(
  204. self._backup.name, self.index, text)
  205. print(text, end='')
  206. self._backup.logfile.write(text)
  207. self._backup.logfile.flush()
  208. def start(self):
  209. if self._state is self.QUEUED:
  210. self._backup.setStatus(Backup.RUNNING)
  211. self.setState(Job.STARTING)
  212. if self.pre is None:
  213. self.setState(Job.RUNNING)
  214. elif self._state is self.STARTING:
  215. self.setState(Job.RUNNING)
  216. elif self._state is self.RUNNING:
  217. self.setState(Job.ENDING)
  218. if self.post is None:
  219. self.setState(Job.SUCCESSFUL)
  220. return
  221. else:
  222. raise StateException('Invalid state to start {}'.format(self))
  223. args = self.args
  224. self.logfile.write(' '.join([shlex.quote(x) for x in args]) + '\n')
  225. self.logfile.flush()
  226. self._process = subprocess.Popen(
  227. args, stdout=self.logfile, stderr=subprocess.STDOUT,
  228. stdin=subprocess.DEVNULL, universal_newlines=True, bufsize=1)
  229. def setState(self, state):
  230. if self._state != state:
  231. self.log('{0} -> {1}'.format(
  232. self.getState(self._state), self.getState(state)))
  233. self._state = state
  234. if state in (Job.SUCCESSFUL, Job.FAILED):
  235. self.logfile.close()
  236. def getState(self, state=None):
  237. return {
  238. Job.READY: 'ready',
  239. Job.QUEUED: 'queued',
  240. Job.STARTING: 'starting',
  241. Job.RUNNING: 'running',
  242. Job.ENDING: 'ending',
  243. Job.FAILED: 'failed',
  244. Job.SUCCESSFUL: 'successful',
  245. }[self.state if state is None else state]
  246. class Backup(object):
  247. instances = {}
  248. _queue = []
  249. logTransitions = False
  250. engine = 'rdiff-backup'
  251. logdir = '/var/log/backup.d'
  252. BLOCKED = 0
  253. READY = 1
  254. QUEUED = 2
  255. RUNNING = 3
  256. FAILED = 4
  257. SUCCESSFUL = 5
  258. @staticmethod
  259. def _log():
  260. print('Backup status:')
  261. for backup in Backup.instances.values():
  262. print(' {}'.format(backup))
  263. for job in backup.pending:
  264. print(' {}'.format(job))
  265. @staticmethod
  266. def load(paths):
  267. for path in paths:
  268. Backup.get(path)
  269. @staticmethod
  270. def blocked():
  271. return [x for x in Backup.instances.values()
  272. if x.status is Backup.BLOCKED]
  273. @staticmethod
  274. def get(path):
  275. if path not in Backup.instances:
  276. Backup(sources()[path])
  277. return Backup.instances[path]
  278. @staticmethod
  279. def start():
  280. for backup in Backup.instances.values():
  281. backup.queue()
  282. Job.initPool()
  283. @staticmethod
  284. def wait(log=False):
  285. while Backup._queue:
  286. for backup in Backup._queue:
  287. if backup.status is Backup.BLOCKED:
  288. for dependency in backup.blocking:
  289. if dependency.status is Backup.READY:
  290. dependency.queue()
  291. if log:
  292. Backup._log()
  293. time.sleep(1)
  294. def __init__(self, config):
  295. self._config = config
  296. self._path = self._config['path']
  297. self._name = slugify(self._path, max_length=255)
  298. self._logpath = os.path.realpath(os.path.join(
  299. Backup.logdir, self.name, 'backup.log'))
  300. self._status = Backup.READY
  301. if self.blocking:
  302. self.setStatus(Backup.BLOCKED)
  303. Backup.instances[self._path] = self
  304. def __str__(self):
  305. return 'Backup {0} ({1}, {2} jobs)'.format(
  306. self.name, self.getStatus(), len([x for x in self.jobs
  307. if x.state not in (Job.FAILED, Job.SUCCESSFUL)]))
  308. @property
  309. def config(self):
  310. return self._config
  311. @property
  312. def name(self):
  313. return self._name
  314. @property
  315. def path(self):
  316. return self._path
  317. @property
  318. def logpath(self):
  319. return self._logpath
  320. @property
  321. def logfile(self):
  322. if not hasattr(self, '_logfile'):
  323. path = os.path.dirname(self.logpath)
  324. if not os.path.exists(path):
  325. os.makedirs(path, exist_ok=True)
  326. self._logfile = open(self.logpath, 'w+')
  327. return self._logfile
  328. def log(self, text):
  329. text = '[Backup {0}] {1}\n'.format(self.name, text)
  330. print(text, end='')
  331. self.logfile.write(text)
  332. self.logfile.flush()
  333. @property
  334. def status(self):
  335. if self.blocking:
  336. if [x for x in self.blocking if x.status is Backup.FAILED]:
  337. self.setStatus(Backup.FAILED)
  338. elif self._status is not Backup.BLOCKED:
  339. self.setStatus(Backup.BLOCKED)
  340. elif self._status is Backup.BLOCKED:
  341. if self not in Backup._queue:
  342. self.setStatus(Backup.READY)
  343. else:
  344. self.setStatus(Backup.QUEUED)
  345. for job in self.ready:
  346. job.queue()
  347. Job.initPool()
  348. elif self._status is Backup.QUEUED and self not in Backup._queue:
  349. self.setStatus(Backup.READY)
  350. if self._status in (Backup.RUNNING, Backup.QUEUED) and not self.pending:
  351. self.setStatus(Backup.FAILED if self.failed else Backup.SUCCESSFUL)
  352. if self._status in (Backup.FAILED, Backup.SUCCESSFUL) \
  353. and self in Backup._queue:
  354. Backup._queue.remove(self)
  355. return self._status
  356. @property
  357. def blocking(self):
  358. return [x for x in self.depends
  359. if x.status is not Backup.SUCCESSFUL]
  360. @property
  361. def depends(self):
  362. if not hasattr(self, '_depends'):
  363. self._depends = []
  364. for path in self.config["depends"]:
  365. if path not in Backup.instances:
  366. Backup(sources()[path])
  367. self._depends.append(Backup.instances[path])
  368. return self._depends
  369. @property
  370. def jobs(self):
  371. if not hasattr(self, '_jobs'):
  372. self._jobs = []
  373. if 'jobs' in self.config:
  374. for job in self.config['jobs']:
  375. self._jobs.append(Job(self, job))
  376. return self._jobs
  377. @property
  378. def pending(self):
  379. return [x for x in self.jobs if x.state not in (Job.FAILED, Job.SUCCESSFUL)]
  380. @property
  381. def ready(self):
  382. return [x for x in self.jobs if x.state is Job.READY]
  383. @property
  384. def failed(self):
  385. return [x for x in self.jobs if x.state is Job.FAILED]
  386. def setStatus(self, status):
  387. if self._status != status:
  388. self.log('{0} -> {1}'.format(
  389. self.getStatus(self._status), self.getStatus(status)))
  390. self._status = status
  391. if status in (Backup.SUCCESSFUL, Backup.FAILED):
  392. self.logfile.close()
  393. def getStatus(self, status=None):
  394. return {
  395. Backup.BLOCKED: 'blocked',
  396. Backup.READY: 'ready',
  397. Backup.QUEUED: 'queued',
  398. Backup.RUNNING: 'running',
  399. Backup.FAILED: 'failed',
  400. Backup.SUCCESSFUL: 'successful'
  401. }[self.status if status is None else status]
  402. def queue(self):
  403. if self in Backup._queue:
  404. raise StateException('Backup already queued')
  405. Backup._queue.append(self)
  406. self.setStatus(Backup.QUEUED)
  407. if self.status is not Backup.BLOCKED:
  408. for job in self.ready:
  409. job.queue()
  410. Job.initPool()
  411. def config():
  412. if hasattr(config, '_handle'):
  413. return config._handle
  414. with pushd(config._root):
  415. with open("backup.yml") as f:
  416. config._handle = yaml.load(f, Loader=yaml.SafeLoader)
  417. return config._handle
  418. def sources():
  419. if hasattr(sources, '_handle'):
  420. return sources._handle
  421. sources._handle = {}
  422. with pushd(config._root):
  423. for source in config()['sources']:
  424. source = os.path.realpath(source)
  425. for path in glob('{}/*.yml'.format(source)) + \
  426. glob('{}/*.yaml'.format(source)):
  427. path = os.path.realpath(path)
  428. with pushd(os.path.dirname(path)), open(path) as f:
  429. data = yaml.load(f, Loader=yaml.SafeLoader)
  430. if "active" in data and data["active"]:
  431. data['path'] = path
  432. if "depends" not in data:
  433. data["depends"] = []
  434. for i in range(0, len(data["depends"])):
  435. data["depends"][i] = os.path.realpath(
  436. '{}.yml'.format(data["depends"][i]))
  437. sources._handle[path] = data
  438. return sources._handle
  439. def main(args):
  440. try:
  441. config._root = args[0] if len(args) else '/etc/backup.d'
  442. if not os.path.exists(config._root):
  443. raise BackupException(
  444. 'Configuration files missing from {}'.format(config._root))
  445. if 'engine' in config():
  446. engine = config()["engine"]
  447. if engine not in ("rdiff-backup"):
  448. raise BackupException('Unknown backup engine: {}'.format(engine))
  449. Backup.engine = engine
  450. if 'logdir' in config():
  451. logdir = config()['logdir']
  452. os.makedirs(logdir, exist_ok=True)
  453. if not os.path.exists(logdir):
  454. raise BackupException(
  455. 'Unable to create logging directory: {}'.format(logdir))
  456. Backup.logdir = logdir
  457. if 'maxthreads' in config():
  458. Job.maxthreads = config()['maxthreads']
  459. if 'verbosity' in config():
  460. Backup.verbosity = config()['verbosity']
  461. Backup.logTransitions = Job.logTransitions = True
  462. Backup.load(sources().keys())
  463. Backup.start()
  464. Backup.wait()
  465. except BackupException as ex:
  466. print(ex)
  467. sys.exit(1)
  468. except Exception:
  469. from traceback import format_exc
  470. msg = "Error encountered:\n" + format_exc().strip()
  471. print(msg)
  472. sys.exit(1)
  473. if __name__ == '__main__':
  474. main(sys.argv[1:])