#!/usr/bin/env python3 import abc import argparse import dataclasses import os import subprocess from typing import Dict, List import yaml class Volume(abc.ABC): @dataclasses.dataclass class RepoConfig: aws_bucket_prefix: str aws_bucket_endpoint: str aws_keys: Dict[str, str] restic_cmd_base: List[str] restic_keep_daily: int restic_keep_monthly: int def __init__(self, name, config: 'Volume.RepoConfig'): self.__name = name self._bucket_name = config.aws_bucket_prefix if self.name != ".": self._bucket_name = f"{self._bucket_name}---{self.name.replace('/', '---')}" self._bucket_repo = f"s3:https://{config.aws_bucket_endpoint}/{self._bucket_name}" self._restic_cmd_base = config.restic_cmd_base + ["--repo", self._bucket_repo] self._restic_keep_daily = config.restic_keep_daily self._restic_keep_monthly = config.restic_keep_monthly self._environ = {**os.environ, **config.aws_keys} @property def name(self): return self.__name @abc.abstractproperty def _backup_path(self): raise NotImplementedError @abc.abstractproperty def _restore_path(self): raise NotImplementedError def backup(self): print(f"Backing up {self._bucket_name}", flush=True) try: # -------------------------------------------------------------------------------------- # Check if bucket exists. # -------------------------------------------------------------------------------------- subprocess.run(self._restic_cmd_base + ["snapshots"], env=self._environ, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) # -------------------------------------------------------------------------------------- # Perform the backup. # -------------------------------------------------------------------------------------- subprocess.run(self._restic_cmd_base + ["backup", "."], cwd=self._backup_path, env=self._environ, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) # -------------------------------------------------------------------------------------- # Forget and prune old snapshots. # -------------------------------------------------------------------------------------- subprocess.run( self._restic_cmd_base + ["forget", "--prune", "--group-by", "", "--keep-daily", str(self._restic_keep_daily), "--keep-monthly", str(self._restic_keep_monthly)], env=self._environ, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) # -------------------------------------------------------------------------------------- # Check for errors. # -------------------------------------------------------------------------------------- subprocess.run(self._restic_cmd_base + ["check"], env=self._environ, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as err: print(err.stdout.decode("ascii"), flush=True) raise def restore(self): print(f"Restoring {self._bucket_name}", flush=True) try: subprocess.run(self._restic_cmd_base + ["restore", "latest", "--target", "."], cwd=self._restore_path, env=self._environ, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as err: print(err.stdout.decode("ascii"), flush=True) raise class DirectoryVolume(Volume): def __init__(self, name, repo_config, directory): super().__init__(name, repo_config) self.__directory = directory @property def _backup_path(self): return self.__directory @property def _restore_path(self): return self.__directory class DatasetMount: def __init__(self, dataset, mountpoint): self.__dataset = dataset self.__mountpoint = mountpoint def __enter__(self): print(f"Mounting {self.__dataset} to {self.__mountpoint}", flush=True) try: # Bind mounting will preserve the device ID of the snapshot mount. This device ID is # different for every snapshot causing trouble for restic which keeps track of device # IDs. See https://github.com/restic/restic/issues/3041. Cloning does not have the same # issue as it is a fresh mount. subprocess.run( ["/usr/sbin/zfs", "clone", "-o", f"mountpoint={self.__mountpoint}", self.__dataset, "rpool/restic"], check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) except subprocess.CalledProcessError as err: print(err.stdout.decode("ascii"), flush=True) raise def __exit__(self, exc_type, exc_value, exc_traceback): print(f"Unmounting {self.__dataset} from {self.__mountpoint}", flush=True) try: subprocess.run(["/usr/sbin/zfs", "destroy", "rpool/restic"], check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as err: print(err.stdout.decode("ascii"), flush=True) raise class DatasetVolume(Volume): def __init__(self, name, repo_config, dataset, mountpoint): super().__init__(name, repo_config) self.__dataset = dataset self.__snapshot = self.__get_last_daily_snapshot() self.__mountpoint = mountpoint self.__backup_path = os.path.normpath( os.path.join("/", "mnt", os.path.relpath(self.__mountpoint, "/")) ) def __get_last_daily_snapshot(self): snapshots = subprocess.getoutput( f"zfs list -t snapshot -H -r {self.__dataset} -o name -s creation" ) daily_snapshots = filter(lambda s: s.endswith("_daily"), snapshots.split('\n')) last_daily_snapshot = list(daily_snapshots)[-1] assert '@' in last_daily_snapshot assert last_daily_snapshot.split('@')[0] == self.__dataset return last_daily_snapshot @property def _backup_path(self): return self.__backup_path @property def _restore_path(self): return self.__mountpoint def backup(self): with DatasetMount(self.__snapshot, self._backup_path): super().backup() class RepoManager(abc.ABC): def __init__(self, config): self._config = config with open(config["aws_bucket_keys_file"], encoding="utf-8") as keys_file: aws_keys = yaml.safe_load(keys_file) self._repo_config = Volume.RepoConfig( aws_bucket_prefix=self._config["aws_bucket_prefix"], aws_bucket_endpoint=self._config["aws_bucket_endpoint"], aws_keys=aws_keys, restic_cmd_base=[ "/usr/bin/restic", "--password-file", self._config["restic_password_file"], "--option", "s3.storage-class=ONEZONE_IA", ], restic_keep_daily=self._config["restic_keep_daily"], restic_keep_monthly=self._config["restic_keep_monthly"], ) self._volumes = self._get_volumes() @abc.abstractmethod def _get_volumes(self): raise NotImplementedError def backup(self): for volume in self._volumes: volume.backup() def restore(self): for volume in self._volumes: volume.restore() class DirectoryRepoManager(RepoManager): def __init__(self, config): super().__init__(config) def _get_volumes(self): return self.__get_volume_directories(self._config["directory"], self._config["exclude"]) def __get_volume_directories(self, root_directory, exclude_list): volumes = [] for entry in os.scandir(root_directory): name = os.path.relpath(entry.path, root_directory) if entry.is_dir() and (name not in exclude_list): volumes.append( DirectoryVolume(name, self._repo_config, os.path.abspath(entry.path)) ) return volumes class DatasetRepoManager(RepoManager): def __init__(self, config): super().__init__(config) def _get_volumes(self): return self.__get_volume_datasets(self._config["dataset"], self._config["exclude"]) def __get_volume_datasets(self, root_dataset, exclude_list): zfs_list = subprocess.getoutput(f"zfs list -H -r {root_dataset} -o name,mountpoint") zfs_list_lines = zfs_list.split('\n') zfs_list_lines_items = map(lambda l: l.split(), zfs_list_lines) volumes = [] for dataset, mountpoint in zfs_list_lines_items: name = os.path.relpath(dataset, root_dataset) if os.path.ismount(mountpoint) and (name not in exclude_list): volumes.append(DatasetVolume(name, self._repo_config, dataset, mountpoint)) return volumes def restore(self): raise NotImplementedError class BatchManager: def __init__(self, config_dir): self.__repos = BatchManager.__load_and_validate_config_dir(config_dir) @staticmethod def __load_and_validate_config_dir(config_dir): if not os.path.isdir(config_dir): raise ValueError(f"{config_dir} is not a directory") return [ BatchManager.__load_and_validate_config_file(os.path.join(config_dir, file)) for file in os.listdir(config_dir) ] @staticmethod def __load_and_validate_config_file(config_file_path): if not os.path.isfile(config_file_path): raise ValueError(f"{config_file_path} is not a file") with open(config_file_path, encoding="utf-8") as config_file: config = yaml.safe_load(config_file) if ("dataset" not in config) and ("directory" not in config): raise KeyError( f"either \"dataset\" or \"directory\" must be present in {config_file_path}") if ("dataset" in config) and ("directory" in config): raise KeyError( f"\"dataset\" and \"directory\" cannot be both present in {config_file_path}") for key in [ "exclude", "aws_bucket_keys_file", "aws_bucket_endpoint", "aws_bucket_prefix", "restic_password_file", "restic_keep_daily", "restic_keep_monthly", ]: if key not in config: raise KeyError(f"{key} must be present in {config_file_path}") for file in [config["restic_password_file"], config["aws_bucket_keys_file"]]: if not os.path.isfile(file): raise ValueError(f"{file} is not a file") if "dataset" in config: return DatasetRepoManager(config) return DirectoryRepoManager(config) def backup(self): for repo in self.__repos: repo.backup() def restore(self): for repo in self.__repos: repo.restore() if __name__ == "__main__": parser = argparse.ArgumentParser(description="Backup service data using restic") parser.add_argument("--config-dir", type=str, default="/etc/restic-batch.d", help="Path to directory with YAML config files") subparsers = parser.add_subparsers() backup_parser = subparsers.add_parser("backup") backup_parser.set_defaults(func=BatchManager.backup) restore_parser = subparsers.add_parser("restore") restore_parser.set_defaults(func=BatchManager.restore) args = parser.parse_args() args.func(BatchManager(args.config_dir))