mirror of
https://github.com/samsonjs/vdirsyncer.git
synced 2026-04-27 14:57:41 +00:00
Make _create_bogus_item a fixture.
Also rename the "storage" fixture to "get_storage".
This commit is contained in:
parent
6841b25264
commit
2e2082fb55
4 changed files with 60 additions and 64 deletions
|
|
@ -28,24 +28,28 @@ class BaseStorageTests(object):
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def storage(self, storage_args):
|
def get_storage(self, storage_args):
|
||||||
def inner(**kw):
|
def inner(**kw):
|
||||||
return self.storage_class(**storage_args(**kw))
|
return self.storage_class(**storage_args(**kw))
|
||||||
|
|
||||||
return inner
|
return inner
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def s(self, storage):
|
def s(self, get_storage):
|
||||||
return storage()
|
return get_storage()
|
||||||
|
|
||||||
def _create_bogus_item(self, item_template=None):
|
@pytest.fixture
|
||||||
|
def get_item(self):
|
||||||
|
def inner(item_template=None):
|
||||||
# assert that special chars are handled correctly.
|
# assert that special chars are handled correctly.
|
||||||
r = '{}@vdirsyncer'.format(random.random())
|
r = '{}@vdirsyncer'.format(random.random())
|
||||||
item_template = item_template or self.item_template
|
item_template = item_template or self.item_template
|
||||||
return Item(item_template.format(r=r))
|
return Item(item_template.format(r=r))
|
||||||
|
|
||||||
def test_generic(self, s):
|
return inner
|
||||||
items = [self._create_bogus_item() for i in range(1, 10)]
|
|
||||||
|
def test_generic(self, s, get_item):
|
||||||
|
items = [get_item() for i in range(1, 10)]
|
||||||
hrefs = []
|
hrefs = []
|
||||||
for item in items:
|
for item in items:
|
||||||
hrefs.append(s.upload(item))
|
hrefs.append(s.upload(item))
|
||||||
|
|
@ -61,67 +65,67 @@ class BaseStorageTests(object):
|
||||||
def test_empty_get_multi(self, s):
|
def test_empty_get_multi(self, s):
|
||||||
assert list(s.get_multi([])) == []
|
assert list(s.get_multi([])) == []
|
||||||
|
|
||||||
def test_upload_already_existing(self, s):
|
def test_upload_already_existing(self, s, get_item):
|
||||||
item = self._create_bogus_item()
|
item = get_item()
|
||||||
s.upload(item)
|
s.upload(item)
|
||||||
with pytest.raises(exceptions.PreconditionFailed):
|
with pytest.raises(exceptions.PreconditionFailed):
|
||||||
s.upload(item)
|
s.upload(item)
|
||||||
|
|
||||||
def test_upload(self, s):
|
def test_upload(self, s, get_item):
|
||||||
item = self._create_bogus_item()
|
item = get_item()
|
||||||
href, etag = s.upload(item)
|
href, etag = s.upload(item)
|
||||||
assert_item_equals(s.get(href)[0], item)
|
assert_item_equals(s.get(href)[0], item)
|
||||||
|
|
||||||
def test_update(self, s):
|
def test_update(self, s, get_item):
|
||||||
item = self._create_bogus_item()
|
item = get_item()
|
||||||
href, etag = s.upload(item)
|
href, etag = s.upload(item)
|
||||||
assert_item_equals(s.get(href)[0], item)
|
assert_item_equals(s.get(href)[0], item)
|
||||||
|
|
||||||
new_item = self._create_bogus_item()
|
new_item = get_item()
|
||||||
new_etag = s.update(href, new_item, etag)
|
new_etag = s.update(href, new_item, etag)
|
||||||
# See https://github.com/untitaker/vdirsyncer/issues/48
|
# See https://github.com/untitaker/vdirsyncer/issues/48
|
||||||
assert isinstance(new_etag, (bytes, text_type))
|
assert isinstance(new_etag, (bytes, text_type))
|
||||||
assert_item_equals(s.get(href)[0], new_item)
|
assert_item_equals(s.get(href)[0], new_item)
|
||||||
|
|
||||||
def test_update_nonexisting(self, s):
|
def test_update_nonexisting(self, s, get_item):
|
||||||
item = self._create_bogus_item()
|
item = get_item()
|
||||||
with pytest.raises(exceptions.PreconditionFailed):
|
with pytest.raises(exceptions.PreconditionFailed):
|
||||||
s.update('huehue', item, '"123"')
|
s.update('huehue', item, '"123"')
|
||||||
|
|
||||||
def test_wrong_etag(self, s):
|
def test_wrong_etag(self, s, get_item):
|
||||||
item = self._create_bogus_item()
|
item = get_item()
|
||||||
href, etag = s.upload(item)
|
href, etag = s.upload(item)
|
||||||
with pytest.raises(exceptions.PreconditionFailed):
|
with pytest.raises(exceptions.PreconditionFailed):
|
||||||
s.update(href, item, '"lolnope"')
|
s.update(href, item, '"lolnope"')
|
||||||
with pytest.raises(exceptions.PreconditionFailed):
|
with pytest.raises(exceptions.PreconditionFailed):
|
||||||
s.delete(href, '"lolnope"')
|
s.delete(href, '"lolnope"')
|
||||||
|
|
||||||
def test_delete(self, s):
|
def test_delete(self, s, get_item):
|
||||||
href, etag = s.upload(self._create_bogus_item())
|
href, etag = s.upload(get_item())
|
||||||
s.delete(href, etag)
|
s.delete(href, etag)
|
||||||
assert not list(s.list())
|
assert not list(s.list())
|
||||||
|
|
||||||
def test_delete_nonexisting(self, s):
|
def test_delete_nonexisting(self, s, get_item):
|
||||||
with pytest.raises(exceptions.PreconditionFailed):
|
with pytest.raises(exceptions.PreconditionFailed):
|
||||||
s.delete('1', '"123"')
|
s.delete('1', '"123"')
|
||||||
|
|
||||||
def test_list(self, s):
|
def test_list(self, s, get_item):
|
||||||
assert not list(s.list())
|
assert not list(s.list())
|
||||||
href, etag = s.upload(self._create_bogus_item())
|
href, etag = s.upload(get_item())
|
||||||
assert list(s.list()) == [(href, etag)]
|
assert list(s.list()) == [(href, etag)]
|
||||||
|
|
||||||
def test_has(self, s):
|
def test_has(self, s, get_item):
|
||||||
assert not s.has('asd')
|
assert not s.has('asd')
|
||||||
href, etag = s.upload(self._create_bogus_item())
|
href, etag = s.upload(get_item())
|
||||||
assert s.has(href)
|
assert s.has(href)
|
||||||
assert not s.has('asd')
|
assert not s.has('asd')
|
||||||
|
|
||||||
def test_update_others_stay_the_same(self, s):
|
def test_update_others_stay_the_same(self, s, get_item):
|
||||||
info = dict([
|
info = dict([
|
||||||
s.upload(self._create_bogus_item()),
|
s.upload(get_item()),
|
||||||
s.upload(self._create_bogus_item()),
|
s.upload(get_item()),
|
||||||
s.upload(self._create_bogus_item()),
|
s.upload(get_item()),
|
||||||
s.upload(self._create_bogus_item())
|
s.upload(get_item())
|
||||||
])
|
])
|
||||||
|
|
||||||
assert dict(
|
assert dict(
|
||||||
|
|
@ -135,7 +139,7 @@ class BaseStorageTests(object):
|
||||||
|
|
||||||
class SupportsCollections(object):
|
class SupportsCollections(object):
|
||||||
|
|
||||||
def test_discover(self, storage_args):
|
def test_discover(self, storage_args, get_item):
|
||||||
collections = set()
|
collections = set()
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
|
|
@ -147,7 +151,7 @@ class SupportsCollections(object):
|
||||||
s = self.storage_class(**storage_args(collection=collection))
|
s = self.storage_class(**storage_args(collection=collection))
|
||||||
|
|
||||||
# radicale ignores empty collections during discovery
|
# radicale ignores empty collections during discovery
|
||||||
item = self._create_bogus_item()
|
item = get_item()
|
||||||
s.upload(item)
|
s.upload(item)
|
||||||
|
|
||||||
collections.add(s.collection)
|
collections.add(s.collection)
|
||||||
|
|
@ -176,8 +180,8 @@ class SupportsCollections(object):
|
||||||
|
|
||||||
assert 'collection argument must not be given' in str(excinfo.value)
|
assert 'collection argument must not be given' in str(excinfo.value)
|
||||||
|
|
||||||
def test_collection_arg(self, storage):
|
def test_collection_arg(self, get_storage):
|
||||||
s = storage(collection='test2')
|
s = get_storage(collection='test2')
|
||||||
# Can't do stronger assertion because of radicale, which needs a
|
# Can't do stronger assertion because of radicale, which needs a
|
||||||
# fileextension to guess the collection type.
|
# fileextension to guess the collection type.
|
||||||
assert 'test2' in s.collection
|
assert 'test2' in s.collection
|
||||||
|
|
|
||||||
|
|
@ -53,12 +53,6 @@ class DavStorageTests(ServerMixin, StorageTests):
|
||||||
pass
|
pass
|
||||||
assert not list(s.list())
|
assert not list(s.list())
|
||||||
|
|
||||||
def test_wrong_etag(self, s):
|
|
||||||
super(DavStorageTests, self).test_wrong_etag(s)
|
|
||||||
|
|
||||||
def test_update_nonexisting(self, s):
|
|
||||||
super(DavStorageTests, self).test_update_nonexisting(s)
|
|
||||||
|
|
||||||
def test_dav_empty_get_multi_performance(self, s, monkeypatch):
|
def test_dav_empty_get_multi_performance(self, s, monkeypatch):
|
||||||
def breakdown(*a, **kw):
|
def breakdown(*a, **kw):
|
||||||
raise AssertionError('Expected not to be called.')
|
raise AssertionError('Expected not to be called.')
|
||||||
|
|
@ -73,9 +67,9 @@ class TestCaldavStorage(DavStorageTests):
|
||||||
|
|
||||||
item_template = TASK_TEMPLATE
|
item_template = TASK_TEMPLATE
|
||||||
|
|
||||||
def test_both_vtodo_and_vevent(self, s):
|
def test_both_vtodo_and_vevent(self, s, get_item):
|
||||||
task = self._create_bogus_item(item_template=TASK_TEMPLATE)
|
task = get_item(item_template=TASK_TEMPLATE)
|
||||||
event = self._create_bogus_item(item_template=EVENT_TEMPLATE)
|
event = get_item(item_template=EVENT_TEMPLATE)
|
||||||
href_etag_task = s.upload(task)
|
href_etag_task = s.upload(task)
|
||||||
href_etag_event = s.upload(event)
|
href_etag_event = s.upload(event)
|
||||||
assert set(s.list()) == set([
|
assert set(s.list()) == set([
|
||||||
|
|
@ -84,18 +78,16 @@ class TestCaldavStorage(DavStorageTests):
|
||||||
])
|
])
|
||||||
|
|
||||||
@pytest.mark.parametrize('item_type', ['VTODO', 'VEVENT'])
|
@pytest.mark.parametrize('item_type', ['VTODO', 'VEVENT'])
|
||||||
def test_item_types_correctness(self, item_type, storage_args):
|
def test_item_types_correctness(self, item_type, storage_args, get_item):
|
||||||
other_item_type = 'VTODO' if item_type == 'VEVENT' else 'VEVENT'
|
other_item_type = 'VTODO' if item_type == 'VEVENT' else 'VEVENT'
|
||||||
s = self.storage_class(item_types=(item_type,), **storage_args())
|
s = self.storage_class(item_types=(item_type,), **storage_args())
|
||||||
try:
|
try:
|
||||||
s.upload(self._create_bogus_item(
|
s.upload(get_item(item_template=templates[other_item_type]))
|
||||||
item_template=templates[other_item_type]))
|
s.upload(get_item(item_template=templates[other_item_type]))
|
||||||
s.upload(self._create_bogus_item(
|
|
||||||
item_template=templates[other_item_type]))
|
|
||||||
except (exceptions.Error, requests.exceptions.HTTPError):
|
except (exceptions.Error, requests.exceptions.HTTPError):
|
||||||
pass
|
pass
|
||||||
href, etag = \
|
href, etag = \
|
||||||
s.upload(self._create_bogus_item(
|
s.upload(get_item(
|
||||||
item_template=templates[item_type]))
|
item_template=templates[item_type]))
|
||||||
((href2, etag2),) = s.list()
|
((href2, etag2),) = s.list()
|
||||||
assert href2 == href
|
assert href2 == href
|
||||||
|
|
@ -109,9 +101,9 @@ class TestCaldavStorage(DavStorageTests):
|
||||||
()
|
()
|
||||||
])
|
])
|
||||||
def test_item_types_performance(self, storage_args, item_types,
|
def test_item_types_performance(self, storage_args, item_types,
|
||||||
monkeypatch):
|
monkeypatch, get_item):
|
||||||
s = self.storage_class(item_types=item_types, **storage_args())
|
s = self.storage_class(item_types=item_types, **storage_args())
|
||||||
item = self._create_bogus_item()
|
item = get_item()
|
||||||
href, etag = s.upload(item)
|
href, etag = s.upload(item)
|
||||||
|
|
||||||
old_dav_query = s._dav_query
|
old_dav_query = s._dav_query
|
||||||
|
|
@ -131,13 +123,13 @@ class TestCaldavStorage(DavStorageTests):
|
||||||
|
|
||||||
@pytest.mark.xfail(dav_server == 'radicale',
|
@pytest.mark.xfail(dav_server == 'radicale',
|
||||||
reason='Radicale doesn\'t support timeranges.')
|
reason='Radicale doesn\'t support timeranges.')
|
||||||
def test_timerange_correctness(self, storage_args):
|
def test_timerange_correctness(self, storage_args, get_item):
|
||||||
start_date = datetime.datetime(2013, 9, 10)
|
start_date = datetime.datetime(2013, 9, 10)
|
||||||
end_date = datetime.datetime(2013, 9, 13)
|
end_date = datetime.datetime(2013, 9, 13)
|
||||||
s = self.storage_class(start_date=start_date, end_date=end_date,
|
s = self.storage_class(start_date=start_date, end_date=end_date,
|
||||||
**storage_args())
|
**storage_args())
|
||||||
|
|
||||||
too_old_item = self._create_bogus_item(item_template=dedent(u'''
|
too_old_item = get_item(item_template=dedent(u'''
|
||||||
BEGIN:VCALENDAR
|
BEGIN:VCALENDAR
|
||||||
VERSION:2.0
|
VERSION:2.0
|
||||||
PRODID:-//hacksw/handcal//NONSGML v1.0//EN
|
PRODID:-//hacksw/handcal//NONSGML v1.0//EN
|
||||||
|
|
@ -151,7 +143,7 @@ class TestCaldavStorage(DavStorageTests):
|
||||||
END:VCALENDAR
|
END:VCALENDAR
|
||||||
''').strip())
|
''').strip())
|
||||||
|
|
||||||
too_new_item = self._create_bogus_item(item_template=dedent(u'''
|
too_new_item = get_item(item_template=dedent(u'''
|
||||||
BEGIN:VCALENDAR
|
BEGIN:VCALENDAR
|
||||||
VERSION:2.0
|
VERSION:2.0
|
||||||
PRODID:-//hacksw/handcal//NONSGML v1.0//EN
|
PRODID:-//hacksw/handcal//NONSGML v1.0//EN
|
||||||
|
|
@ -165,7 +157,7 @@ class TestCaldavStorage(DavStorageTests):
|
||||||
END:VCALENDAR
|
END:VCALENDAR
|
||||||
''').strip())
|
''').strip())
|
||||||
|
|
||||||
good_item = self._create_bogus_item(item_template=dedent(u'''
|
good_item = get_item(item_template=dedent(u'''
|
||||||
BEGIN:VCALENDAR
|
BEGIN:VCALENDAR
|
||||||
VERSION:2.0
|
VERSION:2.0
|
||||||
PRODID:-//hacksw/handcal//NONSGML v1.0//EN
|
PRODID:-//hacksw/handcal//NONSGML v1.0//EN
|
||||||
|
|
|
||||||
|
|
@ -78,16 +78,16 @@ class TestHttpStorage(BaseStorageTests):
|
||||||
return {'url': 'http://localhost:123/collection.txt',
|
return {'url': 'http://localhost:123/collection.txt',
|
||||||
'path': self.tmpfile}
|
'path': self.tmpfile}
|
||||||
|
|
||||||
def test_update(self, s):
|
def test_update(self, s, get_item):
|
||||||
'''The original testcase tries to fetch with the old href. But this
|
'''The original testcase tries to fetch with the old href. But this
|
||||||
storage doesn't have real hrefs, so the href might change if the
|
storage doesn't have real hrefs, so the href might change if the
|
||||||
underlying UID changes. '''
|
underlying UID changes. '''
|
||||||
|
|
||||||
item = self._create_bogus_item()
|
item = get_item()
|
||||||
href, etag = s.upload(item)
|
href, etag = s.upload(item)
|
||||||
assert_item_equals(s.get(href)[0], item)
|
assert_item_equals(s.get(href)[0], item)
|
||||||
|
|
||||||
new_item = self._create_bogus_item()
|
new_item = get_item()
|
||||||
s.update(href, new_item, etag)
|
s.update(href, new_item, etag)
|
||||||
((new_href, new_etag),) = s.list()
|
((new_href, new_etag),) = s.list()
|
||||||
assert_item_equals(s.get(new_href)[0], new_item)
|
assert_item_equals(s.get(new_href)[0], new_item)
|
||||||
|
|
|
||||||
|
|
@ -42,16 +42,16 @@ class TestSingleFileStorage(BaseStorageTests):
|
||||||
with pytest.raises(IOError):
|
with pytest.raises(IOError):
|
||||||
s = self.storage_class(str(tmpdir) + '/foo.ics', create=False)
|
s = self.storage_class(str(tmpdir) + '/foo.ics', create=False)
|
||||||
|
|
||||||
def test_update(self, s):
|
def test_update(self, s, get_item):
|
||||||
'''The original testcase tries to fetch with the old href. But this
|
'''The original testcase tries to fetch with the old href. But this
|
||||||
storage doesn't have real hrefs, so the href might change if the
|
storage doesn't have real hrefs, so the href might change if the
|
||||||
underlying UID changes. '''
|
underlying UID changes. '''
|
||||||
|
|
||||||
item = self._create_bogus_item()
|
item = get_item()
|
||||||
href, etag = s.upload(item)
|
href, etag = s.upload(item)
|
||||||
assert_item_equals(s.get(href)[0], item)
|
assert_item_equals(s.get(href)[0], item)
|
||||||
|
|
||||||
new_item = self._create_bogus_item()
|
new_item = get_item()
|
||||||
s.update(href, new_item, etag)
|
s.update(href, new_item, etag)
|
||||||
((new_href, new_etag),) = s.list()
|
((new_href, new_etag),) = s.list()
|
||||||
assert_item_equals(s.get(new_href)[0], new_item)
|
assert_item_equals(s.get(new_href)[0], new_item)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue