mirror of
https://github.com/samsonjs/vdirsyncer.git
synced 2026-04-27 14:57:41 +00:00
Merge pull request #317 from untitaker/issue315
Introduce hypothesis for testing unicode
This commit is contained in:
commit
3b9cce2128
5 changed files with 77 additions and 37 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -16,3 +16,4 @@ env
|
||||||
dist
|
dist
|
||||||
docs/_build/
|
docs/_build/
|
||||||
vdirsyncer/version.py
|
vdirsyncer/version.py
|
||||||
|
.hypothesis
|
||||||
|
|
|
||||||
2
Makefile
2
Makefile
|
|
@ -31,7 +31,7 @@ install-servers:
|
||||||
done
|
done
|
||||||
|
|
||||||
install-test: install-servers
|
install-test: install-servers
|
||||||
pip install pytest pytest-xprocess pytest-localserver
|
pip install pytest pytest-xprocess pytest-localserver hypothesis pytest-subtesthack
|
||||||
[ $(TRAVIS) != "true" ] || pip install coverage codecov
|
[ $(TRAVIS) != "true" ] || pip install coverage codecov
|
||||||
|
|
||||||
test:
|
test:
|
||||||
|
|
|
||||||
|
|
@ -1,12 +1,17 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import json
|
||||||
from textwrap import dedent
|
from textwrap import dedent
|
||||||
|
|
||||||
from click.testing import CliRunner
|
from click.testing import CliRunner
|
||||||
|
|
||||||
|
from hypothesis import example, given
|
||||||
|
import hypothesis.strategies as st
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
import vdirsyncer.cli as cli
|
import vdirsyncer.cli as cli
|
||||||
|
from vdirsyncer.utils.compat import PY2, to_native
|
||||||
|
|
||||||
|
|
||||||
def test_simple_run(tmpdir, runner):
|
def test_simple_run(tmpdir, runner):
|
||||||
|
|
@ -257,35 +262,58 @@ def test_multiple_pairs(tmpdir, runner):
|
||||||
])
|
])
|
||||||
|
|
||||||
|
|
||||||
def test_create_collections(tmpdir, runner):
|
@given(collections=st.sets(
|
||||||
runner.write_with_general(dedent('''
|
st.text(
|
||||||
[pair foobar]
|
st.characters(
|
||||||
a = foo
|
blacklist_characters=set(
|
||||||
b = bar
|
u'./\x00' # Invalid chars on POSIX filesystems
|
||||||
collections = ["a", "b", "c"]
|
+ (u';' if PY2 else u'') # https://bugs.python.org/issue16374
|
||||||
|
),
|
||||||
|
# Surrogates can't be encoded to utf-8 in Python
|
||||||
|
blacklist_categories=set(['Cs'])
|
||||||
|
),
|
||||||
|
min_size=1,
|
||||||
|
max_size=50
|
||||||
|
),
|
||||||
|
min_size=1
|
||||||
|
))
|
||||||
|
@example(collections=[u'persönlich'])
|
||||||
|
def test_create_collections(subtest, collections):
|
||||||
|
collections = set(to_native(x, 'utf-8') for x in collections)
|
||||||
|
|
||||||
[storage foo]
|
@subtest
|
||||||
type = filesystem
|
def test_inner(tmpdir, runner):
|
||||||
path = {base}/foo/
|
runner.write_with_general(dedent('''
|
||||||
fileext = .txt
|
[pair foobar]
|
||||||
|
a = foo
|
||||||
|
b = bar
|
||||||
|
collections = {colls}
|
||||||
|
|
||||||
[storage bar]
|
[storage foo]
|
||||||
type = filesystem
|
type = filesystem
|
||||||
path = {base}/bar/
|
path = {base}/foo/
|
||||||
fileext = .txt
|
fileext = .txt
|
||||||
'''.format(base=str(tmpdir))))
|
|
||||||
|
|
||||||
result = runner.invoke(['sync'])
|
[storage bar]
|
||||||
assert result.exception
|
type = filesystem
|
||||||
entries = set(x.basename for x in tmpdir.listdir())
|
path = {base}/bar/
|
||||||
assert 'foo' not in entries and 'bar' not in entries
|
fileext = .txt
|
||||||
|
'''.format(base=str(tmpdir), colls=json.dumps(list(collections)))))
|
||||||
|
|
||||||
result = runner.invoke(['sync'], input='y\n' * 6)
|
result = runner.invoke(['sync'])
|
||||||
assert not result.exception
|
assert result.exception
|
||||||
assert \
|
entries = set(x.basename for x in tmpdir.listdir())
|
||||||
set(x.basename for x in tmpdir.join('foo').listdir()) == \
|
assert 'foo' not in entries and 'bar' not in entries
|
||||||
set(x.basename for x in tmpdir.join('bar').listdir()) == \
|
|
||||||
set('abc')
|
result = runner.invoke(
|
||||||
|
['sync'],
|
||||||
|
input='y\n' * 2 * (len(collections) + 1)
|
||||||
|
)
|
||||||
|
assert not result.exception
|
||||||
|
assert \
|
||||||
|
set(x.basename for x in tmpdir.join('foo').listdir()) == \
|
||||||
|
set(x.basename for x in tmpdir.join('bar').listdir()) == \
|
||||||
|
set(collections)
|
||||||
|
|
||||||
|
|
||||||
def test_ident_conflict(tmpdir, runner):
|
def test_ident_conflict(tmpdir, runner):
|
||||||
|
|
|
||||||
|
|
@ -4,9 +4,9 @@ import functools
|
||||||
import json
|
import json
|
||||||
|
|
||||||
from .config import CollectionConfig
|
from .config import CollectionConfig
|
||||||
from .utils import CliError, JobFailed, cli_logger, collections_for_pair, \
|
from .utils import CliError, JobFailed, cli_logger, coerce_native, \
|
||||||
get_status_name, handle_cli_error, load_status, save_status, \
|
collections_for_pair, get_status_name, handle_cli_error, load_status, \
|
||||||
storage_class_from_config, storage_instance_from_config
|
save_status, storage_class_from_config, storage_instance_from_config
|
||||||
|
|
||||||
from ..sync import sync
|
from ..sync import sync
|
||||||
|
|
||||||
|
|
@ -24,10 +24,12 @@ def prepare_pair(wq, pair_name, collections, config, callback, **kwargs):
|
||||||
try:
|
try:
|
||||||
config_a, config_b = all_collections[collection_name]
|
config_a, config_b = all_collections[collection_name]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
raise CliError('Pair {}: Collection {} not found. These are the '
|
raise CliError(
|
||||||
'configured collections:\n{}'
|
'Pair {}: Collection {} not found. These are the '
|
||||||
.format(pair_name, collection_name,
|
'configured collections:\n{}'
|
||||||
list(all_collections)))
|
.format(pair_name,
|
||||||
|
coerce_native(collection_name),
|
||||||
|
list(all_collections)))
|
||||||
new_workers += 1
|
new_workers += 1
|
||||||
|
|
||||||
collection = CollectionConfig(pair, collection_name, config_a,
|
collection = CollectionConfig(pair, collection_name, config_a,
|
||||||
|
|
@ -44,11 +46,12 @@ def sync_collection(wq, collection, general, force_delete):
|
||||||
status_name = get_status_name(pair.name, collection.name)
|
status_name = get_status_name(pair.name, collection.name)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
cli_logger.info('Syncing {}'.format(status_name))
|
cli_logger.info('Syncing {}'.format(coerce_native(status_name)))
|
||||||
|
|
||||||
status = load_status(general['status_path'], pair.name,
|
status = load_status(general['status_path'], pair.name,
|
||||||
collection.name, data_type='items') or {}
|
collection.name, data_type='items') or {}
|
||||||
cli_logger.debug('Loaded status for {}'.format(status_name))
|
cli_logger.debug('Loaded status for {}'
|
||||||
|
.format(coerce_native(status_name)))
|
||||||
|
|
||||||
a = storage_instance_from_config(collection.config_a)
|
a = storage_instance_from_config(collection.config_a)
|
||||||
b = storage_instance_from_config(collection.config_b)
|
b = storage_instance_from_config(collection.config_b)
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,7 @@ from . import CliError, cli_logger
|
||||||
from .. import DOCS_HOME, exceptions
|
from .. import DOCS_HOME, exceptions
|
||||||
from ..sync import IdentConflict, StorageEmpty, SyncConflict
|
from ..sync import IdentConflict, StorageEmpty, SyncConflict
|
||||||
from ..utils import expand_path, get_class_init_args
|
from ..utils import expand_path, get_class_init_args
|
||||||
|
from ..utils.compat import to_native
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import Queue as queue
|
import Queue as queue
|
||||||
|
|
@ -119,7 +120,7 @@ def handle_cli_error(status_name=None):
|
||||||
pass
|
pass
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if status_name:
|
if status_name:
|
||||||
msg = 'Unhandled exception occured for {}.'.format(status_name)
|
msg = 'Unhandled exception occured for {!r}.'.format(status_name)
|
||||||
else:
|
else:
|
||||||
msg = 'Unhandled exception occured.'
|
msg = 'Unhandled exception occured.'
|
||||||
|
|
||||||
|
|
@ -226,7 +227,7 @@ def _discover_from_config(config):
|
||||||
def _handle_collection_not_found(config, collection, e=None):
|
def _handle_collection_not_found(config, collection, e=None):
|
||||||
storage_name = config.get('instance_name', None)
|
storage_name = config.get('instance_name', None)
|
||||||
|
|
||||||
cli_logger.error('{}No collection {} found for storage {}.'
|
cli_logger.error('{}No collection {!r} found for storage {}.'
|
||||||
.format('{}\n'.format(e) if e else '',
|
.format('{}\n'.format(e) if e else '',
|
||||||
collection, storage_name))
|
collection, storage_name))
|
||||||
|
|
||||||
|
|
@ -487,3 +488,10 @@ def assert_permissions(path, wanted):
|
||||||
cli_logger.warning('Correcting permissions of {} from {:o} to {:o}'
|
cli_logger.warning('Correcting permissions of {} from {:o} to {:o}'
|
||||||
.format(path, permissions, wanted))
|
.format(path, permissions, wanted))
|
||||||
os.chmod(path, wanted)
|
os.chmod(path, wanted)
|
||||||
|
|
||||||
|
|
||||||
|
def coerce_native(x, encoding='utf-8'):
|
||||||
|
try:
|
||||||
|
return to_native(x, encoding)
|
||||||
|
except UnicodeError:
|
||||||
|
return repr(x)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue