mirror of
https://github.com/samsonjs/vdirsyncer.git
synced 2026-04-13 12:05:54 +00:00
parent
18d8bb9fc2
commit
b20fc996a2
3 changed files with 133 additions and 80 deletions
|
|
@ -3,9 +3,10 @@ from textwrap import dedent
|
|||
|
||||
import pytest
|
||||
|
||||
import vdirsyncer.cli.config # noqa
|
||||
import vdirsyncer.cli.utils # noqa
|
||||
from vdirsyncer import cli, exceptions
|
||||
from vdirsyncer.cli.config import Config
|
||||
|
||||
import vdirsyncer.cli.utils # noqa
|
||||
|
||||
|
||||
invalid = object()
|
||||
|
|
@ -17,7 +18,7 @@ def read_config(tmpdir, monkeypatch):
|
|||
errors = []
|
||||
monkeypatch.setattr('vdirsyncer.cli.cli_logger.error', errors.append)
|
||||
f = io.StringIO(dedent(cfg.format(base=str(tmpdir))))
|
||||
rv = vdirsyncer.cli.config.read_config(f)
|
||||
rv = Config.from_fileobject(f)
|
||||
monkeypatch.undo()
|
||||
return errors, rv
|
||||
return inner
|
||||
|
|
@ -37,7 +38,7 @@ def parse_config_value(capsys):
|
|||
|
||||
|
||||
def test_read_config(read_config):
|
||||
errors, (general, pairs, storages) = read_config(u'''
|
||||
errors, c = read_config(u'''
|
||||
[general]
|
||||
status_path = /tmp/status/
|
||||
|
||||
|
|
@ -57,25 +58,20 @@ def test_read_config(read_config):
|
|||
|
||||
[storage bob_b]
|
||||
type = carddav
|
||||
|
||||
[bogus]
|
||||
lol = true
|
||||
''')
|
||||
|
||||
assert general == {'status_path': '/tmp/status/'}
|
||||
assert pairs == {'bob': ('bob_a', 'bob_b',
|
||||
{'collections': None, 'bam': True, 'foo': 'bar'})}
|
||||
assert storages == {
|
||||
assert c.general == {'status_path': '/tmp/status/'}
|
||||
assert c.pairs == {
|
||||
'bob': ('bob_a', 'bob_b',
|
||||
{'collections': None, 'bam': True, 'foo': 'bar'})
|
||||
}
|
||||
assert c.storages == {
|
||||
'bob_a': {'type': 'filesystem', 'path': '/tmp/contacts/', 'fileext':
|
||||
'.vcf', 'yesno': False, 'number': 42,
|
||||
'instance_name': 'bob_a'},
|
||||
'bob_b': {'type': 'carddav', 'instance_name': 'bob_b'}
|
||||
}
|
||||
|
||||
assert len(errors) == 1
|
||||
assert errors[0].startswith('Unknown section')
|
||||
assert 'bogus' in errors[0]
|
||||
|
||||
|
||||
def test_missing_collections_param(read_config):
|
||||
with pytest.raises(exceptions.UserError) as excinfo:
|
||||
|
|
@ -97,6 +93,19 @@ def test_missing_collections_param(read_config):
|
|||
assert 'collections parameter missing' in str(excinfo.value)
|
||||
|
||||
|
||||
def test_invalid_section_type(read_config):
|
||||
with pytest.raises(exceptions.UserError) as excinfo:
|
||||
read_config(u'''
|
||||
[general]
|
||||
status_path = /tmp/status/
|
||||
|
||||
[bogus]
|
||||
''')
|
||||
|
||||
assert 'Unknown section' in str(excinfo.value)
|
||||
assert 'bogus' in str(excinfo.value)
|
||||
|
||||
|
||||
def test_storage_instance_from_config(monkeypatch):
|
||||
def lol(**kw):
|
||||
assert kw == {'foo': 'bar', 'baz': 1}
|
||||
|
|
@ -181,6 +190,31 @@ def test_invalid_collections_arg(read_config):
|
|||
assert 'Expected string' in str(excinfo.value)
|
||||
|
||||
|
||||
def test_duplicate_sections(read_config):
|
||||
with pytest.raises(exceptions.UserError) as excinfo:
|
||||
read_config(u'''
|
||||
[general]
|
||||
status_path = /tmp/status/
|
||||
|
||||
[pair foobar]
|
||||
a = foobar
|
||||
b = bar
|
||||
collections = null
|
||||
|
||||
[storage foobar]
|
||||
type = filesystem
|
||||
path = /tmp/foo/
|
||||
fileext = .txt
|
||||
|
||||
[storage bar]
|
||||
type = filesystem
|
||||
path = /tmp/bar/
|
||||
fileext = .txt
|
||||
''')
|
||||
|
||||
assert 'Name "foobar" already used' in str(excinfo.value)
|
||||
|
||||
|
||||
def test_parse_config_value(parse_config_value):
|
||||
x = parse_config_value
|
||||
|
||||
|
|
|
|||
|
|
@ -48,10 +48,10 @@ def app(ctx, config):
|
|||
'''
|
||||
Synchronize calendars and contacts
|
||||
'''
|
||||
from .config import load_config
|
||||
|
||||
if not ctx.config:
|
||||
ctx.config = load_config(config)
|
||||
from .config import Config
|
||||
ctx.config = Config.from_filename_or_environment(config)
|
||||
|
||||
main = app
|
||||
|
||||
|
|
|
|||
|
|
@ -96,81 +96,75 @@ def _validate_pair_section(pair_config):
|
|||
.format(i=i, e=str(e)))
|
||||
|
||||
|
||||
def load_config(fname=None):
|
||||
if fname is None:
|
||||
fname = os.environ.get('VDIRSYNCER_CONFIG', None)
|
||||
if fname is None:
|
||||
fname = expand_path('~/.vdirsyncer/config')
|
||||
if not os.path.exists(fname):
|
||||
xdg_config_dir = os.environ.get('XDG_CONFIG_HOME',
|
||||
expand_path('~/.config/'))
|
||||
fname = os.path.join(xdg_config_dir, 'vdirsyncer/config')
|
||||
class ConfigReader:
|
||||
def __init__(self, f):
|
||||
self._file = f
|
||||
self._parser = c = RawConfigParser()
|
||||
c.readfp(f)
|
||||
self._seen_names = set()
|
||||
|
||||
try:
|
||||
with open(fname) as f:
|
||||
general, pairs, storages = read_config(f)
|
||||
except Exception as e:
|
||||
raise exceptions.UserError(
|
||||
'Error during reading config {}: {}'
|
||||
.format(fname, e)
|
||||
)
|
||||
self._general = {}
|
||||
self._pairs = {}
|
||||
self._storages = {}
|
||||
|
||||
return Config(general, pairs, storages)
|
||||
self._handlers = {
|
||||
'general': self._handle_general,
|
||||
'pair': self._handle_pair,
|
||||
'storage': self._handle_storage
|
||||
}
|
||||
|
||||
def _get_options(self, s):
|
||||
return dict(parse_options(self._parser.items(s), section=s))
|
||||
|
||||
def read_config(f):
|
||||
c = RawConfigParser()
|
||||
c.readfp(f)
|
||||
def _handle_storage(self, storage_name, options):
|
||||
options['instance_name'] = storage_name
|
||||
self._storages[storage_name] = options
|
||||
|
||||
def get_options(s):
|
||||
return dict(parse_options(c.items(s), section=s))
|
||||
|
||||
general = {}
|
||||
pairs = {}
|
||||
storages = {}
|
||||
|
||||
def handle_storage(storage_name, options):
|
||||
storages.setdefault(storage_name, {}).update(options)
|
||||
storages[storage_name]['instance_name'] = storage_name
|
||||
|
||||
def handle_pair(pair_name, options):
|
||||
def _handle_pair(self, pair_name, options):
|
||||
_validate_pair_section(options)
|
||||
a, b = options.pop('a'), options.pop('b')
|
||||
pairs[pair_name] = a, b, options
|
||||
self._pairs[pair_name] = a, b, options
|
||||
|
||||
def handle_general(_, options):
|
||||
if general:
|
||||
raise exceptions.UserError(
|
||||
'More than one general section in config file.')
|
||||
general.update(options)
|
||||
def _handle_general(self, _, options):
|
||||
if self._general:
|
||||
raise ValueError('More than one general section.')
|
||||
self._general = options
|
||||
|
||||
def bad_section(name, options):
|
||||
cli_logger.error('Unknown section: {}'.format(name))
|
||||
|
||||
handlers = {'storage': handle_storage, 'pair': handle_pair, 'general':
|
||||
handle_general}
|
||||
|
||||
for section in c.sections():
|
||||
if ' ' in section:
|
||||
section_type, name = section.split(' ', 1)
|
||||
else:
|
||||
section_type = name = section
|
||||
def _parse_section(self, section_type, name, options):
|
||||
validate_section_name(name, section_type)
|
||||
if name in self._seen_names:
|
||||
raise ValueError('Name "{}" already used.'.format(name))
|
||||
self._seen_names.add(name)
|
||||
|
||||
try:
|
||||
validate_section_name(name, section_type)
|
||||
f = handlers.get(section_type, bad_section)
|
||||
f(name, get_options(section))
|
||||
except ValueError as e:
|
||||
raise exceptions.UserError(
|
||||
'Section `{}`: {}'.format(section, str(e)))
|
||||
f = self._handlers[section_type]
|
||||
except KeyError:
|
||||
raise ValueError('Unknown section type.')
|
||||
|
||||
_validate_general_section(general)
|
||||
if getattr(f, 'name', None):
|
||||
general['status_path'] = os.path.join(
|
||||
os.path.dirname(f.name),
|
||||
expand_path(general['status_path'])
|
||||
)
|
||||
return general, pairs, storages
|
||||
f(name, options)
|
||||
|
||||
def parse(self):
|
||||
for section in self._parser.sections():
|
||||
if ' ' in section:
|
||||
section_type, name = section.split(' ', 1)
|
||||
else:
|
||||
section_type = name = section
|
||||
|
||||
try:
|
||||
self._parse_section(section_type, name,
|
||||
self._get_options(section))
|
||||
except ValueError as e:
|
||||
raise exceptions.UserError(
|
||||
'Section "{}": {}'.format(section, str(e)))
|
||||
|
||||
_validate_general_section(self._general)
|
||||
if getattr(self._file, 'name', None):
|
||||
self._general['status_path'] = os.path.join(
|
||||
os.path.dirname(self._file.name),
|
||||
expand_path(self._general['status_path'])
|
||||
)
|
||||
|
||||
return self._general, self._pairs, self._storages
|
||||
|
||||
|
||||
def parse_config_value(value):
|
||||
|
|
@ -221,6 +215,31 @@ class Config(object):
|
|||
self.pairs = pairs
|
||||
self.storages = storages
|
||||
|
||||
@classmethod
|
||||
def from_fileobject(cls, f):
|
||||
reader = ConfigReader(f)
|
||||
return cls(*reader.parse())
|
||||
|
||||
@classmethod
|
||||
def from_filename_or_environment(cls, fname=None):
|
||||
if fname is None:
|
||||
fname = os.environ.get('VDIRSYNCER_CONFIG', None)
|
||||
if fname is None:
|
||||
fname = expand_path('~/.vdirsyncer/config')
|
||||
if not os.path.exists(fname):
|
||||
xdg_config_dir = os.environ.get('XDG_CONFIG_HOME',
|
||||
expand_path('~/.config/'))
|
||||
fname = os.path.join(xdg_config_dir, 'vdirsyncer/config')
|
||||
|
||||
try:
|
||||
with open(fname) as f:
|
||||
return cls.from_fileobject(f)
|
||||
except Exception as e:
|
||||
raise exceptions.UserError(
|
||||
'Error during reading config {}: {}'
|
||||
.format(fname, e)
|
||||
)
|
||||
|
||||
def get_storage_args(self, storage_name, pair_name=None):
|
||||
try:
|
||||
args = self.storages[storage_name]
|
||||
|
|
|
|||
Loading…
Reference in a new issue