mirror of
https://github.com/samsonjs/vdirsyncer.git
synced 2026-03-25 08:55:50 +00:00
Introduce hypothesis into codebase
This commit is contained in:
parent
12c87df092
commit
d8964660f8
5 changed files with 77 additions and 37 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -16,3 +16,4 @@ env
|
|||
dist
|
||||
docs/_build/
|
||||
vdirsyncer/version.py
|
||||
.hypothesis
|
||||
|
|
|
|||
2
Makefile
2
Makefile
|
|
@ -31,7 +31,7 @@ install-servers:
|
|||
done
|
||||
|
||||
install-test: install-servers
|
||||
pip install pytest pytest-xprocess pytest-localserver
|
||||
pip install pytest pytest-xprocess pytest-localserver hypothesis pytest-subtesthack
|
||||
[ $(TRAVIS) != "true" ] || pip install coverage codecov
|
||||
|
||||
test:
|
||||
|
|
|
|||
|
|
@ -1,12 +1,17 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
import json
|
||||
from textwrap import dedent
|
||||
|
||||
from click.testing import CliRunner
|
||||
|
||||
from hypothesis import example, given
|
||||
import hypothesis.strategies as st
|
||||
|
||||
import pytest
|
||||
|
||||
import vdirsyncer.cli as cli
|
||||
from vdirsyncer.utils.compat import PY2, to_native
|
||||
|
||||
|
||||
def test_simple_run(tmpdir, runner):
|
||||
|
|
@ -257,35 +262,58 @@ def test_multiple_pairs(tmpdir, runner):
|
|||
])
|
||||
|
||||
|
||||
def test_create_collections(tmpdir, runner):
|
||||
runner.write_with_general(dedent('''
|
||||
[pair foobar]
|
||||
a = foo
|
||||
b = bar
|
||||
collections = ["a", "b", "c"]
|
||||
@given(collections=st.sets(
|
||||
st.text(
|
||||
st.characters(
|
||||
blacklist_characters=set(
|
||||
u'./\x00' # Invalid chars on POSIX filesystems
|
||||
+ (u';' if PY2 else u'') # https://bugs.python.org/issue16374
|
||||
),
|
||||
# Surrogates can't be encoded to utf-8 in Python
|
||||
blacklist_categories=set(['Cs'])
|
||||
),
|
||||
min_size=1,
|
||||
max_size=50
|
||||
),
|
||||
min_size=1
|
||||
))
|
||||
@example(collections=[u'persönlich'])
|
||||
def test_create_collections(subtest, collections):
|
||||
collections = set(to_native(x, 'utf-8') for x in collections)
|
||||
|
||||
[storage foo]
|
||||
type = filesystem
|
||||
path = {base}/foo/
|
||||
fileext = .txt
|
||||
@subtest
|
||||
def test_inner(tmpdir, runner):
|
||||
runner.write_with_general(dedent('''
|
||||
[pair foobar]
|
||||
a = foo
|
||||
b = bar
|
||||
collections = {colls}
|
||||
|
||||
[storage bar]
|
||||
type = filesystem
|
||||
path = {base}/bar/
|
||||
fileext = .txt
|
||||
'''.format(base=str(tmpdir))))
|
||||
[storage foo]
|
||||
type = filesystem
|
||||
path = {base}/foo/
|
||||
fileext = .txt
|
||||
|
||||
result = runner.invoke(['sync'])
|
||||
assert result.exception
|
||||
entries = set(x.basename for x in tmpdir.listdir())
|
||||
assert 'foo' not in entries and 'bar' not in entries
|
||||
[storage bar]
|
||||
type = filesystem
|
||||
path = {base}/bar/
|
||||
fileext = .txt
|
||||
'''.format(base=str(tmpdir), colls=json.dumps(list(collections)))))
|
||||
|
||||
result = runner.invoke(['sync'], input='y\n' * 6)
|
||||
assert not result.exception
|
||||
assert \
|
||||
set(x.basename for x in tmpdir.join('foo').listdir()) == \
|
||||
set(x.basename for x in tmpdir.join('bar').listdir()) == \
|
||||
set('abc')
|
||||
result = runner.invoke(['sync'])
|
||||
assert result.exception
|
||||
entries = set(x.basename for x in tmpdir.listdir())
|
||||
assert 'foo' not in entries and 'bar' not in entries
|
||||
|
||||
result = runner.invoke(
|
||||
['sync'],
|
||||
input='y\n' * 2 * (len(collections) + 1)
|
||||
)
|
||||
assert not result.exception
|
||||
assert \
|
||||
set(x.basename for x in tmpdir.join('foo').listdir()) == \
|
||||
set(x.basename for x in tmpdir.join('bar').listdir()) == \
|
||||
set(collections)
|
||||
|
||||
|
||||
def test_ident_conflict(tmpdir, runner):
|
||||
|
|
|
|||
|
|
@ -4,9 +4,9 @@ import functools
|
|||
import json
|
||||
|
||||
from .config import CollectionConfig
|
||||
from .utils import CliError, JobFailed, cli_logger, collections_for_pair, \
|
||||
get_status_name, handle_cli_error, load_status, save_status, \
|
||||
storage_class_from_config, storage_instance_from_config
|
||||
from .utils import CliError, JobFailed, cli_logger, coerce_native, \
|
||||
collections_for_pair, get_status_name, handle_cli_error, load_status, \
|
||||
save_status, storage_class_from_config, storage_instance_from_config
|
||||
|
||||
from ..sync import sync
|
||||
|
||||
|
|
@ -24,10 +24,12 @@ def prepare_pair(wq, pair_name, collections, config, callback, **kwargs):
|
|||
try:
|
||||
config_a, config_b = all_collections[collection_name]
|
||||
except KeyError:
|
||||
raise CliError('Pair {}: Collection {} not found. These are the '
|
||||
'configured collections:\n{}'
|
||||
.format(pair_name, collection_name,
|
||||
list(all_collections)))
|
||||
raise CliError(
|
||||
'Pair {}: Collection {} not found. These are the '
|
||||
'configured collections:\n{}'
|
||||
.format(pair_name,
|
||||
coerce_native(collection_name),
|
||||
list(all_collections)))
|
||||
new_workers += 1
|
||||
|
||||
collection = CollectionConfig(pair, collection_name, config_a,
|
||||
|
|
@ -44,11 +46,12 @@ def sync_collection(wq, collection, general, force_delete):
|
|||
status_name = get_status_name(pair.name, collection.name)
|
||||
|
||||
try:
|
||||
cli_logger.info('Syncing {}'.format(status_name))
|
||||
cli_logger.info('Syncing {}'.format(coerce_native(status_name)))
|
||||
|
||||
status = load_status(general['status_path'], pair.name,
|
||||
collection.name, data_type='items') or {}
|
||||
cli_logger.debug('Loaded status for {}'.format(status_name))
|
||||
cli_logger.debug('Loaded status for {}'
|
||||
.format(coerce_native(status_name)))
|
||||
|
||||
a = storage_instance_from_config(collection.config_a)
|
||||
b = storage_instance_from_config(collection.config_b)
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ from . import CliError, cli_logger
|
|||
from .. import DOCS_HOME, exceptions
|
||||
from ..sync import IdentConflict, StorageEmpty, SyncConflict
|
||||
from ..utils import expand_path, get_class_init_args
|
||||
from ..utils.compat import to_native
|
||||
|
||||
try:
|
||||
import Queue as queue
|
||||
|
|
@ -119,7 +120,7 @@ def handle_cli_error(status_name=None):
|
|||
pass
|
||||
except Exception as e:
|
||||
if status_name:
|
||||
msg = 'Unhandled exception occured for {}.'.format(status_name)
|
||||
msg = 'Unhandled exception occured for {!r}.'.format(status_name)
|
||||
else:
|
||||
msg = 'Unhandled exception occured.'
|
||||
|
||||
|
|
@ -226,7 +227,7 @@ def _discover_from_config(config):
|
|||
def _handle_collection_not_found(config, collection, e=None):
|
||||
storage_name = config.get('instance_name', None)
|
||||
|
||||
cli_logger.error('{}No collection {} found for storage {}.'
|
||||
cli_logger.error('{}No collection {!r} found for storage {}.'
|
||||
.format('{}\n'.format(e) if e else '',
|
||||
collection, storage_name))
|
||||
|
||||
|
|
@ -487,3 +488,10 @@ def assert_permissions(path, wanted):
|
|||
cli_logger.warning('Correcting permissions of {} from {:o} to {:o}'
|
||||
.format(path, permissions, wanted))
|
||||
os.chmod(path, wanted)
|
||||
|
||||
|
||||
def coerce_native(x, encoding='utf-8'):
|
||||
try:
|
||||
return to_native(x, encoding)
|
||||
except UnicodeError:
|
||||
return repr(x)
|
||||
|
|
|
|||
Loading…
Reference in a new issue