358 lines
12 KiB
Python

#!/usr/bin/env python3
import abc
import argparse
import dataclasses
import os
import subprocess
from typing import Dict, List
import yaml
class Volume(abc.ABC):
@dataclasses.dataclass
class RepoConfig:
aws_bucket_prefix: str
aws_bucket_endpoint: str
aws_keys: Dict[str, str]
restic_cmd_base: List[str]
restic_keep_daily: int
restic_keep_monthly: int
def __init__(self, name, config: 'Volume.RepoConfig'):
self.__name = name
self._bucket_name = config.aws_bucket_prefix
if self.name != ".":
self._bucket_name = f"{self._bucket_name}---{self.name.replace('/', '---')}"
self._bucket_repo = f"s3:https://{config.aws_bucket_endpoint}/{self._bucket_name}"
self._restic_cmd_base = config.restic_cmd_base + ["--repo", self._bucket_repo]
self._restic_keep_daily = config.restic_keep_daily
self._restic_keep_monthly = config.restic_keep_monthly
self._environ = {**os.environ, **config.aws_keys}
@property
def name(self):
return self.__name
@abc.abstractproperty
def _backup_path(self):
raise NotImplementedError
@abc.abstractproperty
def _restore_path(self):
raise NotImplementedError
def backup(self):
print(f"Backing up {self._bucket_name}", flush=True)
try:
# --------------------------------------------------------------------------------------
# Check if bucket exists. If not, create and initialise the bucket.
# --------------------------------------------------------------------------------------
try:
subprocess.run(self._restic_cmd_base + ["snapshots"],
env=self._environ, check=True,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as snapshots_err:
try:
ps = subprocess.run(self._restic_cmd_base + ["init"],
env=self._environ, check=True,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
print(ps.stdout.decode("ascii"), flush=True)
except subprocess.CalledProcessError:
print(snapshots_err.stdout.decode("ascii"), flush=True)
raise
# --------------------------------------------------------------------------------------
# Perform the backup.
# --------------------------------------------------------------------------------------
subprocess.run(self._restic_cmd_base + ["backup", "."], cwd=self._backup_path,
env=self._environ, check=True,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# --------------------------------------------------------------------------------------
# Forget and prune old snapshots.
# --------------------------------------------------------------------------------------
subprocess.run(
self._restic_cmd_base + ["forget", "--prune",
"--keep-daily", str(self._restic_keep_daily),
"--keep-monthly", str(self._restic_keep_monthly)],
env=self._environ, check=True,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT
)
# --------------------------------------------------------------------------------------
# Check for errors.
# --------------------------------------------------------------------------------------
subprocess.run(self._restic_cmd_base + ["check"],
env=self._environ, check=True,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as err:
print(err.stdout.decode("ascii"), flush=True)
raise
def restore(self):
print(f"Restoring {self._bucket_name}", flush=True)
try:
subprocess.run(self._restic_cmd_base + ["restore", "latest", "--target", "."],
cwd=self._restore_path,
env=self._environ, check=True,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as err:
print(err.stdout.decode("ascii"), flush=True)
raise
class DirectoryVolume(Volume):
def __init__(self, name, repo_config, directory):
super().__init__(name, repo_config)
self.__directory = directory
@property
def _backup_path(self):
return self.__directory
@property
def _restore_path(self):
return self.__directory
class DatasetMount:
def __init__(self, dataset, mountpoint):
self.__dataset = dataset
self.__mountpoint = mountpoint
def __enter__(self):
print(f"Mounting {self.__dataset} to {self.__mountpoint}", flush=True)
try:
# Bind mounting will preserve the device ID of the snapshot mount. This device ID is
# different for every snapshot causing trouble for restic which keeps track of device
# IDs. See https://github.com/restic/restic/issues/3041. Cloning does not have the same
# issue as it is a fresh mount.
subprocess.run(
["/usr/sbin/zfs",
"clone", "-o", f"mountpoint={self.__mountpoint}", self.__dataset, "rpool/restic"],
check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
)
except subprocess.CalledProcessError as err:
print(err.stdout.decode("ascii"), flush=True)
raise
def __exit__(self, exc_type, exc_value, exc_traceback):
print(f"Unmounting {self.__dataset} from {self.__mountpoint}", flush=True)
try:
subprocess.run(["/usr/sbin/zfs", "destroy", "rpool/restic"],
check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
except subprocess.CalledProcessError as err:
print(err.stdout.decode("ascii"), flush=True)
raise
class DatasetVolume(Volume):
def __init__(self, name, repo_config, dataset, mountpoint):
super().__init__(name, repo_config)
self.__dataset = dataset
self.__snapshot = self.__get_last_daily_snapshot()
self.__mountpoint = mountpoint
self.__backup_path = os.path.normpath(
os.path.join("/", "mnt", os.path.relpath(self.__mountpoint, "/"))
)
def __get_last_daily_snapshot(self):
snapshots = subprocess.getoutput(
f"zfs list -t snapshot -H -r {self.__dataset} -o name -s creation"
)
daily_snapshots = filter(lambda s: s.endswith("_daily"), snapshots.split('\n'))
last_daily_snapshot = list(daily_snapshots)[-1]
assert '@' in last_daily_snapshot
assert last_daily_snapshot.split('@')[0] == self.__dataset
return last_daily_snapshot
@property
def _backup_path(self):
return self.__backup_path
@property
def _restore_path(self):
return self.__mountpoint
def backup(self):
with DatasetMount(self.__snapshot, self._backup_path):
super().backup()
class RepoManager(abc.ABC):
def __init__(self, config):
self._config = config
with open(config["aws_bucket_keys_file"], encoding="utf-8") as keys_file:
aws_keys = yaml.safe_load(keys_file)
self._repo_config = Volume.RepoConfig(
aws_bucket_prefix=self._config["aws_bucket_prefix"],
aws_bucket_endpoint=self._config["aws_bucket_endpoint"],
aws_keys=aws_keys,
restic_cmd_base=[
"/usr/local/bin/restic",
"--password-file", self._config["restic_password_file"],
"--option", "s3.storage-class=ONEZONE_IA",
],
restic_keep_daily=self._config["restic_keep_daily"],
restic_keep_monthly=self._config["restic_keep_monthly"],
)
self._volumes = self._get_volumes()
@abc.abstractmethod
def _get_volumes(self):
raise NotImplementedError
def backup(self):
for volume in self._volumes:
volume.backup()
def restore(self):
for volume in self._volumes:
volume.restore()
class DirectoryRepoManager(RepoManager):
def __init__(self, config):
super().__init__(config)
def _get_volumes(self):
return self.__get_volume_directories(self._config["directory"])
def __get_volume_directories(self, root_directory):
return [
DirectoryVolume(
os.path.relpath(entry.path, root_directory),
self._repo_config,
os.path.abspath(entry.path),
)
for entry in os.scandir(root_directory) if entry.is_dir()
]
class DatasetRepoManager(RepoManager):
def __init__(self, config):
super().__init__(config)
def _get_volumes(self):
return self.__get_volume_datasets(self._config["dataset"])
def __get_volume_datasets(self, root_dataset):
zfs_list = subprocess.getoutput(
f"zfs list -H -r {root_dataset} "
"-o name,mountpoint,eu.wojciechkozlowski:offsite-snapshot"
)
zfs_list_lines = zfs_list.split('\n')
zfs_list_lines_items = map(lambda l: l.split(), zfs_list_lines)
return [
DatasetVolume(
os.path.relpath(dataset, root_dataset),
self._repo_config,
dataset,
mountpoint,
)
for dataset, mountpoint, offsite_snapshot in zfs_list_lines_items
if os.path.ismount(mountpoint) and (offsite_snapshot.lower() != "false")
]
def restore(self):
raise NotImplementedError
class BatchManager:
def __init__(self, config_dir):
self.__repos = BatchManager.__load_and_validate_config_dir(config_dir)
@staticmethod
def __load_and_validate_config_dir(config_dir):
if not os.path.isdir(config_dir):
raise ValueError(f"{config_dir} is not a directory")
return [
BatchManager.__load_and_validate_config_file(os.path.join(config_dir, file))
for file in os.listdir(config_dir)
]
@staticmethod
def __load_and_validate_config_file(config_file_path):
if not os.path.isfile(config_file_path):
raise ValueError(f"{config_file_path} is not a file")
with open(config_file_path, encoding="utf-8") as config_file:
config = yaml.safe_load(config_file)
if ("dataset" not in config) and ("directory" not in config):
raise KeyError(
f"either \"dataset\" or \"directory\" must be present in {config_file_path}")
if ("dataset" in config) and ("directory" in config):
raise KeyError(
f"\"dataset\" and \"directory\" cannot be both present in {config_file_path}")
for key in [
"aws_bucket_keys_file",
"aws_bucket_endpoint",
"aws_bucket_prefix",
"restic_password_file",
"restic_keep_daily",
"restic_keep_monthly",
]:
if key not in config:
raise KeyError(f"{key} must be present in {config_file_path}")
for file in [config["restic_password_file"], config["aws_bucket_keys_file"]]:
if not os.path.isfile(file):
raise ValueError(f"{file} is not a file")
if "dataset" in config:
return DatasetRepoManager(config)
return DirectoryRepoManager(config)
def backup(self):
for repo in self.__repos:
repo.backup()
def restore(self):
for repo in self.__repos:
repo.restore()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Backup service data using restic")
parser.add_argument("--config-dir", type=str, default="/etc/restic-batch.d",
help="Path to directory with YAML config files")
subparsers = parser.add_subparsers()
backup_parser = subparsers.add_parser("backup")
backup_parser.set_defaults(func=BatchManager.backup)
restore_parser = subparsers.add_parser("restore")
restore_parser.set_defaults(func=BatchManager.restore)
args = parser.parse_args()
args.func(BatchManager(args.config_dir))