mirror of
https://github.com/samsonjs/vdirsyncer.git
synced 2026-03-25 08:55:50 +00:00
154 lines
3.5 KiB
Python
154 lines
3.5 KiB
Python
from textwrap import dedent
|
|
|
|
|
|
def test_discover_command(tmpdir, runner):
|
|
runner.write_with_general(dedent('''
|
|
[storage foo]
|
|
type = filesystem
|
|
path = {0}/foo/
|
|
fileext = .txt
|
|
|
|
[storage bar]
|
|
type = filesystem
|
|
path = {0}/bar/
|
|
fileext = .txt
|
|
|
|
[pair foobar]
|
|
a = foo
|
|
b = bar
|
|
collections = ["from a"]
|
|
''').format(str(tmpdir)))
|
|
|
|
foo = tmpdir.mkdir('foo')
|
|
bar = tmpdir.mkdir('bar')
|
|
|
|
for x in 'abc':
|
|
foo.mkdir(x)
|
|
bar.mkdir(x)
|
|
bar.mkdir('d')
|
|
|
|
result = runner.invoke(['discover'])
|
|
assert not result.exception
|
|
|
|
foo.mkdir('d')
|
|
result = runner.invoke(['sync'])
|
|
assert not result.exception
|
|
lines = result.output.splitlines()
|
|
assert 'Syncing foobar/a' in lines
|
|
assert 'Syncing foobar/b' in lines
|
|
assert 'Syncing foobar/c' in lines
|
|
assert 'Syncing foobar/d' not in result.output
|
|
|
|
result = runner.invoke(['discover'])
|
|
assert not result.exception
|
|
|
|
result = runner.invoke(['sync'])
|
|
assert not result.exception
|
|
assert 'Syncing foobar/a' in lines
|
|
assert 'Syncing foobar/b' in lines
|
|
assert 'Syncing foobar/c' in lines
|
|
assert 'Syncing foobar/d' in result.output
|
|
|
|
# Check for redundant data that is already in the config. This avoids
|
|
# copying passwords from the config too.
|
|
assert 'fileext' not in tmpdir \
|
|
.join('status') \
|
|
.join('foobar.collections') \
|
|
.read()
|
|
|
|
|
|
def test_discover_on_unsupported_storage(tmpdir, runner):
|
|
runner.write_with_general(dedent('''
|
|
[storage foo]
|
|
type = http
|
|
url = https://example.com/foo.ics
|
|
|
|
[storage bar]
|
|
type = memory
|
|
fileext = .txt
|
|
|
|
[pair foobar]
|
|
a = foo
|
|
b = bar
|
|
collections = ["from a"]
|
|
''').format(str(tmpdir)))
|
|
|
|
result = runner.invoke(['discover'])
|
|
assert result.exception
|
|
assert 'doesn\'t support collection discovery' in result.output
|
|
|
|
|
|
def test_discover_different_collection_names(tmpdir, runner):
|
|
foo = tmpdir.mkdir('foo')
|
|
bar = tmpdir.mkdir('bar')
|
|
runner.write_with_general(dedent('''
|
|
[storage foo]
|
|
type = filesystem
|
|
fileext = .txt
|
|
path = {foo}
|
|
|
|
[storage bar]
|
|
type = filesystem
|
|
fileext = .txt
|
|
path = {bar}
|
|
|
|
[pair foobar]
|
|
a = foo
|
|
b = bar
|
|
collections = [
|
|
["coll1", "coll_a1", "coll_b1"],
|
|
"coll2"
|
|
]
|
|
''').format(foo=str(foo), bar=str(bar)))
|
|
|
|
result = runner.invoke(['discover'], input='y\n' * 6)
|
|
assert not result.exception
|
|
|
|
coll_a1 = foo.join('coll_a1')
|
|
coll_b1 = bar.join('coll_b1')
|
|
|
|
assert coll_a1.exists()
|
|
assert coll_b1.exists()
|
|
|
|
result = runner.invoke(['sync'])
|
|
assert not result.exception
|
|
|
|
foo_txt = coll_a1.join('foo.txt')
|
|
foo_txt.write('BEGIN:VCALENDAR\nUID:foo\nEND:VCALENDAR')
|
|
|
|
result = runner.invoke(['sync'])
|
|
assert not result.exception
|
|
|
|
assert foo_txt.exists()
|
|
assert coll_b1.join('foo.txt').exists()
|
|
|
|
|
|
def test_discover_direct_path(tmpdir, runner):
|
|
foo = tmpdir.join('foo')
|
|
bar = tmpdir.join('bar')
|
|
|
|
runner.write_with_general(dedent('''
|
|
[storage foo]
|
|
type = filesystem
|
|
fileext = .txt
|
|
path = {foo}
|
|
|
|
[storage bar]
|
|
type = filesystem
|
|
fileext = .txt
|
|
path = {bar}
|
|
|
|
[pair foobar]
|
|
a = foo
|
|
b = bar
|
|
collections = null
|
|
''').format(foo=str(foo), bar=str(bar)))
|
|
|
|
result = runner.invoke(['discover'])
|
|
assert not result.exception
|
|
|
|
result = runner.invoke(['sync'], input='y\n' * 2)
|
|
assert not result.exception
|
|
|
|
assert foo.exists()
|
|
assert bar.exists()
|