vdirsyncer/vdirsyncer/cli/config.py
Hugo Osvaldo Barrera aab70e9fb0 Use cached_property from the stdlib
Our local implementation preceded the one in the stdlib, but we no
longer support versions of Python which do not ship it.
2025-09-20 12:45:56 +02:00

365 lines
12 KiB
Python

from __future__ import annotations
import json
import os
import string
from configparser import RawConfigParser
from functools import cached_property
from itertools import chain
from typing import IO
from typing import Any
from typing import Generator
from vdirsyncer import PROJECT_HOME
from vdirsyncer import exceptions
from vdirsyncer.utils import expand_path
from vdirsyncer.vobject import Item
from .fetchparams import expand_fetch_params
from .utils import storage_class_from_config
GENERAL_ALL = frozenset(["status_path"])
GENERAL_REQUIRED = frozenset(["status_path"])
SECTION_NAME_CHARS = frozenset(chain(string.ascii_letters, string.digits, "_"))
def validate_section_name(name, section_type):
invalid = set(name) - SECTION_NAME_CHARS
if invalid:
chars_display = "".join(sorted(SECTION_NAME_CHARS))
raise exceptions.UserError(
f'The {section_type}-section "{name}" contains invalid characters. Only '
"the following characters are allowed for storage and "
f"pair names:\n{chars_display}"
)
def _validate_general_section(general_config: dict[str, str]):
invalid = set(general_config) - GENERAL_ALL
missing = GENERAL_REQUIRED - set(general_config)
problems: list[str] = []
if invalid:
problems.append(
"general section doesn't take the parameters: {}".format(", ".join(invalid))
)
if missing:
problems.append(
"general section is missing the parameters: {}".format(", ".join(missing))
)
if problems:
raise exceptions.UserError(
"Invalid general section. Copy the example "
f"config from the repository and edit it: {PROJECT_HOME}",
problems=problems,
)
def _validate_collections_param(collections):
if collections is None:
return
if not isinstance(collections, list):
raise ValueError("`collections` parameter must be a list or `null`.")
collection_names = set()
for i, collection in enumerate(collections):
try:
if isinstance(collection, (str, bytes)):
collection_name = collection
elif isinstance(collection, list):
e = ValueError(
"Expected list of format "
'["config_name", "storage_a_name", "storage_b_name"]'
)
if len(collection) != 3:
raise e
if not isinstance(collection[0], (str, bytes)):
raise e
for x in collection[1:]:
if x is not None and not isinstance(x, (str, bytes)):
raise e
collection_name = collection[0]
else:
raise ValueError("Expected string or list of three strings.")
if collection_name in collection_names:
raise ValueError("Duplicate value.")
collection_names.add(collection_name)
except ValueError as e:
raise ValueError(f"`collections` parameter, position {i}: {e!s}")
class _ConfigReader:
def __init__(self, f: IO[Any]):
self._file: IO[Any] = f
self._parser = c = RawConfigParser()
c.read_file(f)
self._seen_names: set = set()
self._general: dict[str, str] = {}
self._pairs: dict[str, dict[str, str]] = {}
self._storages: dict[str, dict[str, str]] = {}
def _parse_section(
self, section_type: str, name: str, options: dict[str, Any]
) -> None:
validate_section_name(name, section_type)
if name in self._seen_names:
raise ValueError(f'Name "{name}" already used.')
self._seen_names.add(name)
if section_type == "general":
if self._general:
raise ValueError("More than one general section.")
self._general = options
elif section_type == "storage":
self._storages[name] = options
elif section_type == "pair":
self._pairs[name] = options
else:
raise ValueError("Unknown section type.")
def parse(
self,
) -> tuple[dict[str, str], dict[str, dict[str, str]], dict[str, dict[str, str]]]:
for section in self._parser.sections():
if " " in section:
section_type, name = section.split(" ", 1)
else:
section_type = name = section
try:
self._parse_section(
section_type,
name,
dict(_parse_options(self._parser.items(section), section=section)),
)
except ValueError as e:
raise exceptions.UserError(f'Section "{section}": {e!s}')
_validate_general_section(self._general)
if getattr(self._file, "name", None):
self._general["status_path"] = os.path.join(
os.path.dirname(self._file.name),
expand_path(self._general["status_path"]),
)
return self._general, self._pairs, self._storages
def _parse_options(
items: list[tuple[str, str]], section: str | None = None
) -> Generator[tuple[str, dict[str, str]], None, None]:
for key, value in items:
try:
yield key, json.loads(value)
except ValueError as e:
raise ValueError(f'Section "{section}", option "{key}": {e}')
class Config:
def __init__(
self,
general: dict[str, str],
pairs: dict[str, dict[str, str]],
storages: dict[str, dict[str, str]],
) -> None:
self.general = general
self.storages = storages
for name, options in storages.items():
options["instance_name"] = name
self.pairs: dict[str, PairConfig] = {}
for name, options in pairs.items():
try:
self.pairs[name] = PairConfig(self, name, options)
except ValueError as e:
raise exceptions.UserError(f"Pair {name}: {e}")
@classmethod
def from_fileobject(cls, f: IO[Any]):
reader = _ConfigReader(f)
return cls(*reader.parse())
@classmethod
def from_filename_or_environment(cls, fname: str | None = None):
if fname is None:
fname = os.environ.get("VDIRSYNCER_CONFIG", None)
if fname is None:
fname = expand_path("~/.vdirsyncer/config")
if not os.path.exists(fname):
xdg_config_dir = os.environ.get(
"XDG_CONFIG_HOME", expand_path("~/.config/")
)
fname = os.path.join(xdg_config_dir, "vdirsyncer/config")
try:
with open(fname) as f:
return cls.from_fileobject(f)
except Exception as e:
raise exceptions.UserError(f"Error during reading config {fname}: {e}")
def get_storage_args(self, storage_name: str):
try:
args = self.storages[storage_name]
except KeyError:
raise exceptions.UserError(
f"Storage {storage_name!r} not found. "
f"These are the configured storages: {list(self.storages)}"
)
else:
return expand_fetch_params(args)
def get_pair(self, pair_name: str) -> PairConfig:
try:
return self.pairs[pair_name]
except KeyError as e:
raise exceptions.PairNotFound(e, pair_name=pair_name)
class PairConfig:
def __init__(self, full_config: Config, name: str, options: dict[str, str]):
self._config: Config = full_config
self.name: str = name
self.name_a: str = options.pop("a")
self.name_b: str = options.pop("b")
self._partial_sync: str | None = options.pop("partial_sync", None)
self.metadata: str | tuple[()] = options.pop("metadata", ())
self.conflict_resolution = self._process_conflict_resolution_param(
options.pop("conflict_resolution", None)
)
try:
self.collections = options.pop("collections")
except KeyError:
raise ValueError(
"collections parameter missing.\n\n"
"As of 0.9.0 this parameter has no default anymore. "
"Set `collections = null` explicitly in your pair config."
)
else:
_validate_collections_param(self.collections)
if options:
raise ValueError("Unknown options: {}".format(", ".join(options)))
def _process_conflict_resolution_param(
self, conflict_resolution: str | list[str] | None
):
if conflict_resolution in (None, "a wins", "b wins"):
return conflict_resolution
if (
isinstance(conflict_resolution, list)
and len(conflict_resolution) > 1
and conflict_resolution[0] == "command"
):
def resolve(a, b):
a_name = self.config_a["instance_name"]
b_name = self.config_b["instance_name"]
command = conflict_resolution[1:]
return _resolve_conflict_via_command(a, b, command, a_name, b_name)
return resolve
raise ValueError("Invalid value for `conflict_resolution`.")
# The following parameters are lazily evaluated because evaluating
# self.config_a would expand all `x.fetch` parameters. This is costly and
# unnecessary if the pair is not actually synced.
@cached_property
def config_a(self):
return self._config.get_storage_args(self.name_a)
@cached_property
def config_b(self):
return self._config.get_storage_args(self.name_b)
@cached_property
def partial_sync(self):
partial_sync = self._partial_sync
# We need to use UserError here because ValueError is not
# caught at the time this is expanded.
if partial_sync is not None:
cls_a, _ = storage_class_from_config(self.config_a)
cls_b, _ = storage_class_from_config(self.config_b)
if (
not cls_a.read_only
and not self.config_a.get("read_only", False)
and not cls_b.read_only
and not self.config_b.get("read_only", False)
):
raise exceptions.UserError(
"`partial_sync` is only effective if one storage is "
"read-only. Use `read_only = true` in exactly one storage "
"section."
)
if partial_sync is None:
partial_sync = "revert"
if partial_sync not in ("ignore", "revert", "error"):
raise exceptions.UserError("Invalid value for `partial_sync`.")
return partial_sync
class CollectionConfig:
def __init__(self, pair, name: str, config_a, config_b):
self.pair = pair
self._config = pair._config
self.name: str = name
self.config_a = config_a
self.config_b = config_b
#: Public API. Khal's config wizard depends on this function.
load_config = Config.from_filename_or_environment
def _resolve_conflict_via_command(
a, b, command, a_name, b_name, _check_call=None
) -> Item:
import shutil
import tempfile
if _check_call is None:
from subprocess import check_call as _check_call
from vdirsyncer.vobject import Item
dir = tempfile.mkdtemp(prefix="vdirsyncer-conflict.")
try:
a_tmp = os.path.join(dir, a_name)
b_tmp = os.path.join(dir, b_name)
with open(a_tmp, "w") as f:
f.write(a.raw)
with open(b_tmp, "w") as f:
f.write(b.raw)
command[0] = expand_path(command[0])
_check_call([*command, a_tmp, b_tmp])
with open(a_tmp) as f:
new_a = f.read()
with open(b_tmp) as f:
new_b = f.read()
if new_a != new_b:
raise exceptions.UserError("The two files are not completely equal.")
return Item(new_a)
finally:
shutil.rmtree(dir)