mirror of
https://github.com/samsonjs/vdirsyncer.git
synced 2026-03-25 08:55:50 +00:00
548 lines
14 KiB
Python
548 lines
14 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import io
|
|
from textwrap import dedent
|
|
|
|
from click.testing import CliRunner
|
|
|
|
import pytest
|
|
|
|
import vdirsyncer.cli as cli
|
|
|
|
|
|
class _CustomRunner(object):
|
|
def __init__(self, tmpdir):
|
|
self.tmpdir = tmpdir
|
|
self.cfg = tmpdir.join('config')
|
|
self.runner = CliRunner()
|
|
|
|
def invoke(self, args, env=None, **kwargs):
|
|
env = env or {}
|
|
env.setdefault('VDIRSYNCER_CONFIG', str(self.cfg))
|
|
return self.runner.invoke(cli.app, args, env=env, **kwargs)
|
|
|
|
def write_with_general(self, data):
|
|
self.cfg.write(dedent('''
|
|
[general]
|
|
status_path = {}/status/
|
|
''').format(str(self.tmpdir)))
|
|
self.cfg.write(data, mode='a')
|
|
|
|
|
|
@pytest.fixture
|
|
def runner(tmpdir, monkeypatch):
|
|
return _CustomRunner(tmpdir)
|
|
|
|
|
|
def test_read_config(monkeypatch):
|
|
f = io.StringIO(dedent(u'''
|
|
[general]
|
|
status_path = /tmp/status/
|
|
|
|
[pair bob]
|
|
a = bob_a
|
|
b = bob_b
|
|
foo = bar
|
|
bam = true
|
|
|
|
[storage bob_a]
|
|
type = filesystem
|
|
path = /tmp/contacts/
|
|
fileext = .vcf
|
|
yesno = false
|
|
number = 42
|
|
|
|
[storage bob_b]
|
|
type = carddav
|
|
|
|
[bogus]
|
|
lol = true
|
|
'''))
|
|
|
|
errors = []
|
|
monkeypatch.setattr('vdirsyncer.cli.cli_logger.error', errors.append)
|
|
general, pairs, storages = cli.utils.read_config(f)
|
|
assert general == {'status_path': '/tmp/status/'}
|
|
assert pairs == {'bob': ('bob_a', 'bob_b', {'bam': True, 'foo': 'bar'})}
|
|
assert storages == {
|
|
'bob_a': {'type': 'filesystem', 'path': '/tmp/contacts/', 'fileext':
|
|
'.vcf', 'yesno': False, 'number': 42,
|
|
'instance_name': 'bob_a'},
|
|
'bob_b': {'type': 'carddav', 'instance_name': 'bob_b'}
|
|
}
|
|
|
|
assert len(errors) == 1
|
|
assert errors[0].startswith('Unknown section')
|
|
assert 'bogus' in errors[0]
|
|
|
|
|
|
def test_storage_instance_from_config(monkeypatch):
|
|
def lol(**kw):
|
|
assert kw == {'foo': 'bar', 'baz': 1}
|
|
return 'OK'
|
|
|
|
import vdirsyncer.storage
|
|
monkeypatch.setitem(vdirsyncer.storage.storage_names, 'lol', lol)
|
|
config = {'type': 'lol', 'foo': 'bar', 'baz': 1}
|
|
assert cli.utils.storage_instance_from_config(config) == 'OK'
|
|
|
|
|
|
def test_parse_pairs_args():
|
|
pairs = {
|
|
'foo': ('bar', 'baz', {'conflict_resolution': 'a wins'},
|
|
{'storage_option': True}),
|
|
'one': ('two', 'three', {'collections': 'a,b,c'}, {}),
|
|
'eins': ('zwei', 'drei', {'ha': True}, {})
|
|
}
|
|
assert sorted(
|
|
cli.parse_pairs_args(['foo/foocoll', 'one', 'eins'], pairs)
|
|
) == [
|
|
('eins', set()),
|
|
('foo', {'foocoll'}),
|
|
('one', set()),
|
|
]
|
|
|
|
|
|
def test_simple_run(tmpdir, runner):
|
|
runner.write_with_general(dedent('''
|
|
[pair my_pair]
|
|
a = my_a
|
|
b = my_b
|
|
|
|
[storage my_a]
|
|
type = filesystem
|
|
path = {0}/path_a/
|
|
fileext = .txt
|
|
|
|
[storage my_b]
|
|
type = filesystem
|
|
path = {0}/path_b/
|
|
fileext = .txt
|
|
''').format(str(tmpdir)))
|
|
|
|
tmpdir.mkdir('path_a')
|
|
tmpdir.mkdir('path_b')
|
|
|
|
result = runner.invoke(['sync'])
|
|
assert not result.exception
|
|
|
|
tmpdir.join('path_a/haha.txt').write('UID:haha')
|
|
result = runner.invoke(['sync'])
|
|
assert 'Copying (uploading) item haha to my_b' in result.output
|
|
assert tmpdir.join('path_b/haha.txt').read() == 'UID:haha'
|
|
|
|
|
|
def test_empty_storage(tmpdir, runner):
|
|
runner.write_with_general(dedent('''
|
|
[pair my_pair]
|
|
a = my_a
|
|
b = my_b
|
|
|
|
[storage my_a]
|
|
type = filesystem
|
|
path = {0}/path_a/
|
|
fileext = .txt
|
|
|
|
[storage my_b]
|
|
type = filesystem
|
|
path = {0}/path_b/
|
|
fileext = .txt
|
|
''').format(str(tmpdir)))
|
|
|
|
tmpdir.mkdir('path_a')
|
|
tmpdir.mkdir('path_b')
|
|
|
|
result = runner.invoke(['sync'])
|
|
assert not result.exception
|
|
|
|
tmpdir.join('path_a/haha.txt').write('UID:haha')
|
|
result = runner.invoke(['sync'])
|
|
tmpdir.join('path_b/haha.txt').remove()
|
|
result = runner.invoke(['sync'])
|
|
lines = result.output.splitlines()
|
|
assert len(lines) == 2
|
|
assert lines[0] == 'Syncing my_pair'
|
|
assert lines[1].startswith('error: my_pair: '
|
|
'Storage "my_b" was completely emptied.')
|
|
assert result.exception
|
|
|
|
|
|
def test_missing_general_section(tmpdir, runner):
|
|
runner.cfg.write(dedent('''
|
|
[pair my_pair]
|
|
a = my_a
|
|
b = my_b
|
|
|
|
[storage my_a]
|
|
type = filesystem
|
|
path = {0}/path_a/
|
|
fileext = .txt
|
|
|
|
[storage my_b]
|
|
type = filesystem
|
|
path = {0}/path_b/
|
|
fileext = .txt
|
|
''').format(str(tmpdir)))
|
|
|
|
result = runner.invoke(['sync'])
|
|
assert result.exception
|
|
assert result.output.startswith('critical:')
|
|
assert 'invalid general section' in result.output.lower()
|
|
|
|
|
|
def test_wrong_general_section(tmpdir, runner):
|
|
runner.cfg.write(dedent('''
|
|
[general]
|
|
wrong = true
|
|
'''))
|
|
result = runner.invoke(['sync'])
|
|
|
|
assert result.exception
|
|
lines = result.output.splitlines()
|
|
assert lines[:-2] == [
|
|
'critical: general section doesn\'t take the parameters: wrong',
|
|
'critical: general section is missing the parameters: status_path'
|
|
]
|
|
assert 'Invalid general section.' in lines[-2]
|
|
|
|
|
|
def test_verbosity(tmpdir):
|
|
runner = CliRunner()
|
|
config_file = tmpdir.join('config')
|
|
config_file.write('')
|
|
|
|
result = runner.invoke(
|
|
cli.app, ['--verbosity=HAHA', 'sync'],
|
|
env={'VDIRSYNCER_CONFIG': str(config_file)}
|
|
)
|
|
assert result.exception
|
|
assert 'invalid verbosity value' in result.output.lower()
|
|
|
|
|
|
def test_invalid_storage_name():
|
|
f = io.StringIO(dedent(u'''
|
|
[general]
|
|
status_path = /tmp/status/
|
|
|
|
[storage foo.bar]
|
|
'''))
|
|
|
|
with pytest.raises(cli.CliError) as excinfo:
|
|
cli.utils.read_config(f)
|
|
|
|
assert 'invalid characters' in str(excinfo.value).lower()
|
|
|
|
|
|
def test_deprecated_item_status(tmpdir):
|
|
f = tmpdir.join('mypair.items')
|
|
f.write(dedent('''
|
|
["ident", ["href_a", "etag_a", "href_b", "etag_b"]]
|
|
["ident_two", ["href_a", "etag_a", "href_b", "etag_b"]]
|
|
''').strip())
|
|
|
|
data = {
|
|
'ident': ['href_a', 'etag_a', 'href_b', 'etag_b'],
|
|
'ident_two': ['href_a', 'etag_a', 'href_b', 'etag_b']
|
|
}
|
|
|
|
assert cli.utils.load_status(
|
|
str(tmpdir), 'mypair', data_type='items') == data
|
|
|
|
cli.utils.save_status(
|
|
str(tmpdir), 'mypair', data_type='items', data=data)
|
|
assert cli.utils.load_status(
|
|
str(tmpdir), 'mypair', data_type='items') == data
|
|
|
|
|
|
def test_collections_cache_invalidation(tmpdir, runner):
|
|
runner.write_with_general(dedent('''
|
|
[storage foo]
|
|
type = filesystem
|
|
path = {0}/foo/
|
|
fileext = .txt
|
|
|
|
[storage bar]
|
|
type = filesystem
|
|
path = {0}/bar/
|
|
fileext = .txt
|
|
|
|
[pair foobar]
|
|
a = foo
|
|
b = bar
|
|
collections = ["a", "b", "c"]
|
|
''').format(str(tmpdir)))
|
|
|
|
foo = tmpdir.mkdir('foo')
|
|
bar = tmpdir.mkdir('bar')
|
|
for x in 'abc':
|
|
foo.mkdir(x)
|
|
bar.mkdir(x)
|
|
foo.join('a/itemone.txt').write('UID:itemone')
|
|
|
|
result = runner.invoke(['sync'])
|
|
assert not result.exception
|
|
|
|
rv = bar.join('a').listdir()
|
|
assert len(rv) == 1
|
|
assert rv[0].basename == 'itemone.txt'
|
|
|
|
runner.write_with_general(dedent('''
|
|
[storage foo]
|
|
type = filesystem
|
|
path = {0}/foo/
|
|
fileext = .txt
|
|
|
|
[storage bar]
|
|
type = filesystem
|
|
path = {0}/bar2/
|
|
fileext = .txt
|
|
|
|
[pair foobar]
|
|
a = foo
|
|
b = bar
|
|
collections = ["a", "b", "c"]
|
|
''').format(str(tmpdir)))
|
|
|
|
tmpdir.join('status').remove()
|
|
bar2 = tmpdir.mkdir('bar2')
|
|
for x in 'abc':
|
|
bar2.mkdir(x)
|
|
result = runner.invoke(['sync'])
|
|
assert not result.exception
|
|
|
|
rv = bar.join('a').listdir()
|
|
rv2 = bar2.join('a').listdir()
|
|
assert len(rv) == len(rv2) == 1
|
|
assert rv[0].basename == rv2[0].basename == 'itemone.txt'
|
|
|
|
|
|
def test_invalid_pairs_as_cli_arg(tmpdir, runner):
|
|
runner.write_with_general(dedent('''
|
|
[storage foo]
|
|
type = filesystem
|
|
path = {0}/foo/
|
|
fileext = .txt
|
|
|
|
[storage bar]
|
|
type = filesystem
|
|
path = {0}/bar/
|
|
fileext = .txt
|
|
|
|
[pair foobar]
|
|
a = foo
|
|
b = bar
|
|
collections = ["a", "b", "c"]
|
|
''').format(str(tmpdir)))
|
|
|
|
for base in ('foo', 'bar'):
|
|
base = tmpdir.mkdir(base)
|
|
for c in 'abc':
|
|
base.mkdir(c)
|
|
|
|
result = runner.invoke(['sync', 'foobar/d'])
|
|
assert result.exception
|
|
assert 'pair foobar: collection d not found' in result.output.lower()
|
|
|
|
|
|
def test_discover_command(tmpdir, runner):
|
|
runner.write_with_general(dedent('''
|
|
[storage foo]
|
|
type = filesystem
|
|
path = {0}/foo/
|
|
fileext = .txt
|
|
|
|
[storage bar]
|
|
type = filesystem
|
|
path = {0}/bar/
|
|
fileext = .txt
|
|
|
|
[pair foobar]
|
|
a = foo
|
|
b = bar
|
|
collections = ["from a"]
|
|
''').format(str(tmpdir)))
|
|
|
|
foo = tmpdir.mkdir('foo')
|
|
bar = tmpdir.mkdir('bar')
|
|
|
|
for x in 'abc':
|
|
foo.mkdir(x)
|
|
bar.mkdir(x)
|
|
bar.mkdir('d')
|
|
|
|
result = runner.invoke(['sync'])
|
|
assert not result.exception
|
|
lines = result.output.splitlines()
|
|
assert lines[0].startswith('Discovering')
|
|
assert 'Syncing foobar/a' in lines
|
|
assert 'Syncing foobar/b' in lines
|
|
assert 'Syncing foobar/c' in lines
|
|
assert 'Syncing foobar/d' not in lines
|
|
|
|
foo.mkdir('d')
|
|
result = runner.invoke(['sync'])
|
|
assert not result.exception
|
|
assert 'Syncing foobar/a' in lines
|
|
assert 'Syncing foobar/b' in lines
|
|
assert 'Syncing foobar/c' in lines
|
|
assert 'Syncing foobar/d' not in result.output
|
|
|
|
result = runner.invoke(['discover'])
|
|
assert not result.exception
|
|
|
|
result = runner.invoke(['sync'])
|
|
assert not result.exception
|
|
assert 'Syncing foobar/a' in lines
|
|
assert 'Syncing foobar/b' in lines
|
|
assert 'Syncing foobar/c' in lines
|
|
assert 'Syncing foobar/d' in result.output
|
|
|
|
|
|
def test_multiple_pairs(tmpdir, runner):
|
|
def get_cfg():
|
|
for name_a, name_b in ('foo', 'bar'), ('bam', 'baz'):
|
|
yield dedent('''
|
|
[pair {a}{b}]
|
|
a = {a}
|
|
b = {b}
|
|
''').format(a=name_a, b=name_b)
|
|
|
|
for name in name_a, name_b:
|
|
yield dedent('''
|
|
[storage {name}]
|
|
type = filesystem
|
|
path = {base}/{name}/
|
|
fileext = .txt
|
|
''').format(name=name, base=str(tmpdir))
|
|
|
|
runner.write_with_general(''.join(get_cfg()))
|
|
|
|
result = runner.invoke(['sync'])
|
|
assert set(result.output.splitlines()) > set([
|
|
'Discovering collections for pair bambaz',
|
|
'Discovering collections for pair foobar',
|
|
'Syncing bambaz',
|
|
'Syncing foobar',
|
|
])
|
|
|
|
|
|
def test_invalid_collections_arg(tmpdir, runner):
|
|
runner.write_with_general(dedent('''
|
|
[pair foobar]
|
|
a = foo
|
|
b = bar
|
|
collections = [null]
|
|
|
|
[storage foo]
|
|
type = filesystem
|
|
path = {base}/foo/
|
|
fileext = .txt
|
|
|
|
[storage bar]
|
|
type = filesystem
|
|
path = {base}/bar/
|
|
fileext = .txt
|
|
'''.format(base=str(tmpdir))))
|
|
|
|
result = runner.invoke(['sync'])
|
|
assert result.exception
|
|
assert result.output.strip().endswith(
|
|
'Section `pair foobar`: `collections` parameter must be a list of '
|
|
'collection names (strings!) or `null`.'
|
|
)
|
|
|
|
|
|
def test_create_collections(tmpdir, runner):
|
|
runner.write_with_general(dedent('''
|
|
[pair foobar]
|
|
a = foo
|
|
b = bar
|
|
collections = ["a", "b", "c"]
|
|
|
|
[storage foo]
|
|
type = filesystem
|
|
path = {base}/foo/
|
|
fileext = .txt
|
|
|
|
[storage bar]
|
|
type = filesystem
|
|
path = {base}/bar/
|
|
fileext = .txt
|
|
'''.format(base=str(tmpdir))))
|
|
|
|
result = runner.invoke(['sync'])
|
|
assert result.exception
|
|
entries = set(x.basename for x in tmpdir.listdir())
|
|
assert 'foo' not in entries and 'bar' not in entries
|
|
|
|
result = runner.invoke(['sync'], input='y\n' * 6)
|
|
assert not result.exception
|
|
assert \
|
|
set(x.basename for x in tmpdir.join('foo').listdir()) == \
|
|
set(x.basename for x in tmpdir.join('bar').listdir()) == \
|
|
set('abc')
|
|
|
|
|
|
def test_ident_conflict(tmpdir, runner):
|
|
runner.write_with_general(dedent('''
|
|
[pair foobar]
|
|
a = foo
|
|
b = bar
|
|
|
|
[storage foo]
|
|
type = filesystem
|
|
path = {base}/foo/
|
|
fileext = .txt
|
|
|
|
[storage bar]
|
|
type = filesystem
|
|
path = {base}/bar/
|
|
fileext = .txt
|
|
'''.format(base=str(tmpdir))))
|
|
|
|
foo = tmpdir.mkdir('foo')
|
|
bar = tmpdir.mkdir('bar')
|
|
|
|
foo.join('one.txt').write('UID:1')
|
|
foo.join('two.txt').write('UID:1')
|
|
foo.join('three.txt').write('UID:1')
|
|
|
|
result = runner.invoke(['sync'])
|
|
assert result.exception
|
|
assert ('error: foobar: Storage "foo" contains multiple items with the '
|
|
'same UID or even content') in result.output
|
|
assert sorted([
|
|
'one.txt' in result.output,
|
|
'two.txt' in result.output,
|
|
'three.txt' in result.output,
|
|
]) == [False, True, True]
|
|
|
|
|
|
def test_parse_config_value(capsys):
|
|
invalid = object()
|
|
|
|
def x(s):
|
|
try:
|
|
rv = cli.utils.parse_config_value(s)
|
|
except ValueError:
|
|
return invalid
|
|
else:
|
|
warnings = capsys.readouterr()[1]
|
|
return rv, len(warnings.splitlines())
|
|
|
|
assert x('123 # comment!') is invalid
|
|
|
|
assert x('True') == ('True', 1)
|
|
assert x('False') == ('False', 1)
|
|
assert x('Yes') == ('Yes', 1)
|
|
assert x('None') == ('None', 1)
|
|
assert x('"True"') == ('True', 0)
|
|
assert x('"False"') == ('False', 0)
|
|
|
|
assert x('"123 # comment!"') == ('123 # comment!', 0)
|
|
assert x('true') == (True, 0)
|
|
assert x('false') == (False, 0)
|
|
assert x('null') == (None, 0)
|
|
assert x('3.14') == (3.14, 0)
|
|
assert x('') == ('', 0)
|
|
assert x('""') == ('', 0)
|