vdirsyncer/tests/cli/test_main.py
Markus Unterwaditzer 1f8593ebbf Fix style
2015-04-11 16:42:03 +02:00

323 lines
7.9 KiB
Python

# -*- coding: utf-8 -*-
from textwrap import dedent
from click.testing import CliRunner
import pytest
import vdirsyncer.cli as cli
def test_simple_run(tmpdir, runner):
runner.write_with_general(dedent('''
[pair my_pair]
a = my_a
b = my_b
[storage my_a]
type = filesystem
path = {0}/path_a/
fileext = .txt
[storage my_b]
type = filesystem
path = {0}/path_b/
fileext = .txt
''').format(str(tmpdir)))
tmpdir.mkdir('path_a')
tmpdir.mkdir('path_b')
result = runner.invoke(['sync'])
assert not result.exception
tmpdir.join('path_a/haha.txt').write('UID:haha')
result = runner.invoke(['sync'])
assert 'Copying (uploading) item haha to my_b' in result.output
assert tmpdir.join('path_b/haha.txt').read() == 'UID:haha'
def test_empty_storage(tmpdir, runner):
runner.write_with_general(dedent('''
[pair my_pair]
a = my_a
b = my_b
[storage my_a]
type = filesystem
path = {0}/path_a/
fileext = .txt
[storage my_b]
type = filesystem
path = {0}/path_b/
fileext = .txt
''').format(str(tmpdir)))
tmpdir.mkdir('path_a')
tmpdir.mkdir('path_b')
result = runner.invoke(['sync'])
assert not result.exception
tmpdir.join('path_a/haha.txt').write('UID:haha')
result = runner.invoke(['sync'])
tmpdir.join('path_b/haha.txt').remove()
result = runner.invoke(['sync'])
lines = result.output.splitlines()
assert len(lines) == 2
assert lines[0] == 'Syncing my_pair'
assert lines[1].startswith('error: my_pair: '
'Storage "my_b" was completely emptied.')
assert result.exception
def test_verbosity(tmpdir):
runner = CliRunner()
config_file = tmpdir.join('config')
config_file.write('')
result = runner.invoke(
cli.app, ['--verbosity=HAHA', 'sync'],
env={'VDIRSYNCER_CONFIG': str(config_file)}
)
assert result.exception
assert 'invalid verbosity value' in result.output.lower()
def test_deprecated_item_status(tmpdir):
f = tmpdir.join('mypair.items')
f.write(dedent('''
["ident", ["href_a", "etag_a", "href_b", "etag_b"]]
["ident_two", ["href_a", "etag_a", "href_b", "etag_b"]]
''').strip())
data = {
'ident': ['href_a', 'etag_a', 'href_b', 'etag_b'],
'ident_two': ['href_a', 'etag_a', 'href_b', 'etag_b']
}
assert cli.utils.load_status(
str(tmpdir), 'mypair', data_type='items') == data
cli.utils.save_status(
str(tmpdir), 'mypair', data_type='items', data=data)
assert cli.utils.load_status(
str(tmpdir), 'mypair', data_type='items') == data
def test_collections_cache_invalidation(tmpdir, runner):
runner.write_with_general(dedent('''
[storage foo]
type = filesystem
path = {0}/foo/
fileext = .txt
[storage bar]
type = filesystem
path = {0}/bar/
fileext = .txt
[pair foobar]
a = foo
b = bar
collections = ["a", "b", "c"]
''').format(str(tmpdir)))
foo = tmpdir.mkdir('foo')
bar = tmpdir.mkdir('bar')
for x in 'abc':
foo.mkdir(x)
bar.mkdir(x)
foo.join('a/itemone.txt').write('UID:itemone')
result = runner.invoke(['sync'])
assert not result.exception
assert 'detected change in config file' not in result.output.lower()
rv = bar.join('a').listdir()
assert len(rv) == 1
assert rv[0].basename == 'itemone.txt'
runner.write_with_general(dedent('''
[storage foo]
type = filesystem
path = {0}/foo/
fileext = .txt
[storage bar]
type = filesystem
path = {0}/bar2/
fileext = .txt
[pair foobar]
a = foo
b = bar
collections = ["a", "b", "c"]
''').format(str(tmpdir)))
for entry in tmpdir.join('status').listdir():
if not str(entry).endswith('.collections'):
entry.remove()
bar2 = tmpdir.mkdir('bar2')
for x in 'abc':
bar2.mkdir(x)
result = runner.invoke(['sync'])
assert 'detected change in config file' in result.output.lower()
assert not result.exception
rv = bar.join('a').listdir()
rv2 = bar2.join('a').listdir()
assert len(rv) == len(rv2) == 1
assert rv[0].basename == rv2[0].basename == 'itemone.txt'
def test_invalid_pairs_as_cli_arg(tmpdir, runner):
runner.write_with_general(dedent('''
[storage foo]
type = filesystem
path = {0}/foo/
fileext = .txt
[storage bar]
type = filesystem
path = {0}/bar/
fileext = .txt
[pair foobar]
a = foo
b = bar
collections = ["a", "b", "c"]
''').format(str(tmpdir)))
for base in ('foo', 'bar'):
base = tmpdir.mkdir(base)
for c in 'abc':
base.mkdir(c)
result = runner.invoke(['sync', 'foobar/d'])
assert result.exception
assert 'pair foobar: collection d not found' in result.output.lower()
def test_multiple_pairs(tmpdir, runner):
def get_cfg():
for name_a, name_b in ('foo', 'bar'), ('bam', 'baz'):
yield dedent('''
[pair {a}{b}]
a = {a}
b = {b}
''').format(a=name_a, b=name_b)
for name in name_a, name_b:
yield dedent('''
[storage {name}]
type = filesystem
path = {base}/{name}/
fileext = .txt
''').format(name=name, base=str(tmpdir))
runner.write_with_general(''.join(get_cfg()))
result = runner.invoke(['sync'])
assert set(result.output.splitlines()) > set([
'Discovering collections for pair bambaz',
'Discovering collections for pair foobar',
'Syncing bambaz',
'Syncing foobar',
])
def test_create_collections(tmpdir, runner):
runner.write_with_general(dedent('''
[pair foobar]
a = foo
b = bar
collections = ["a", "b", "c"]
[storage foo]
type = filesystem
path = {base}/foo/
fileext = .txt
[storage bar]
type = filesystem
path = {base}/bar/
fileext = .txt
'''.format(base=str(tmpdir))))
result = runner.invoke(['sync'])
assert result.exception
entries = set(x.basename for x in tmpdir.listdir())
assert 'foo' not in entries and 'bar' not in entries
result = runner.invoke(['sync'], input='y\n' * 6)
assert not result.exception
assert \
set(x.basename for x in tmpdir.join('foo').listdir()) == \
set(x.basename for x in tmpdir.join('bar').listdir()) == \
set('abc')
def test_ident_conflict(tmpdir, runner):
runner.write_with_general(dedent('''
[pair foobar]
a = foo
b = bar
[storage foo]
type = filesystem
path = {base}/foo/
fileext = .txt
[storage bar]
type = filesystem
path = {base}/bar/
fileext = .txt
'''.format(base=str(tmpdir))))
foo = tmpdir.mkdir('foo')
tmpdir.mkdir('bar')
foo.join('one.txt').write('UID:1')
foo.join('two.txt').write('UID:1')
foo.join('three.txt').write('UID:1')
result = runner.invoke(['sync'])
assert result.exception
assert ('error: foobar: Storage "foo" contains multiple items with the '
'same UID or even content') in result.output
assert sorted([
'one.txt' in result.output,
'two.txt' in result.output,
'three.txt' in result.output,
]) == [False, True, True]
@pytest.mark.parametrize('existing,missing', [
('foo', 'bar'),
('bar', 'foo'),
])
def test_unknown_storage(tmpdir, runner, existing, missing):
runner.write_with_general(dedent('''
[pair foobar]
a = foo
b = bar
[storage {existing}]
type = filesystem
path = {base}/{existing}/
fileext = .txt
'''.format(base=str(tmpdir), existing=existing)))
tmpdir.mkdir(existing)
result = runner.invoke(['sync'])
assert result.exception
assert (
"Storage '{missing}' not found. "
"These are the configured storages: ['{existing}']"
.format(missing=missing, existing=existing)
) in result.output