Use more fixtures

This commit is contained in:
Markus Unterwaditzer 2014-07-06 23:22:02 +02:00
parent 29ba12cf19
commit c750561fb8
6 changed files with 65 additions and 76 deletions

View file

@ -20,22 +20,31 @@ from .. import SIMPLE_TEMPLATE, assert_item_equals
class BaseStorageTests(object): class BaseStorageTests(object):
item_template = SIMPLE_TEMPLATE item_template = SIMPLE_TEMPLATE
@pytest.fixture
def storage_args(self):
return self.get_storage_args
def get_storage_args(self, collection=None):
raise NotImplementedError()
@pytest.fixture
def storage(self, storage_args):
def inner(**kw):
return self.storage_class(**storage_args(**kw))
return inner
@pytest.fixture
def s(self, storage):
return storage()
def _create_bogus_item(self, item_template=None): def _create_bogus_item(self, item_template=None):
r = random.random() r = random.random()
item_template = item_template or self.item_template item_template = item_template or self.item_template
return Item(item_template.format(r=r)) return Item(item_template.format(r=r))
def get_storage_args(self, collection=None): def test_generic(self, s):
raise NotImplementedError()
def _get_storage(self):
s = self.storage_class(**self.get_storage_args())
assert not list(s.list())
return s
def test_generic(self):
items = [self._create_bogus_item() for i in range(1, 10)] items = [self._create_bogus_item() for i in range(1, 10)]
s = self._get_storage()
hrefs = [] hrefs = []
for item in items: for item in items:
hrefs.append(s.upload(item)) hrefs.append(s.upload(item))
@ -48,25 +57,21 @@ class BaseStorageTests(object):
item, etag2 = s.get(href) item, etag2 = s.get(href)
assert etag == etag2 assert etag == etag2
def test_empty_get_multi(self): def test_empty_get_multi(self, s):
s = self._get_storage()
assert list(s.get_multi([])) == [] assert list(s.get_multi([])) == []
def test_upload_already_existing(self): def test_upload_already_existing(self, s):
s = self._get_storage()
item = self._create_bogus_item() item = self._create_bogus_item()
s.upload(item) s.upload(item)
with pytest.raises(exceptions.PreconditionFailed): with pytest.raises(exceptions.PreconditionFailed):
s.upload(item) s.upload(item)
def test_upload(self): def test_upload(self, s):
s = self._get_storage()
item = self._create_bogus_item() item = self._create_bogus_item()
href, etag = s.upload(item) href, etag = s.upload(item)
assert_item_equals(s.get(href)[0], item) assert_item_equals(s.get(href)[0], item)
def test_update(self): def test_update(self, s):
s = self._get_storage()
item = self._create_bogus_item() item = self._create_bogus_item()
href, etag = s.upload(item) href, etag = s.upload(item)
assert_item_equals(s.get(href)[0], item) assert_item_equals(s.get(href)[0], item)
@ -77,16 +82,14 @@ class BaseStorageTests(object):
assert isinstance(new_etag, (bytes, text_type)) assert isinstance(new_etag, (bytes, text_type))
assert_item_equals(s.get(href)[0], new_item) assert_item_equals(s.get(href)[0], new_item)
def test_update_nonexisting(self): def test_update_nonexisting(self, s):
s = self._get_storage()
item = self._create_bogus_item() item = self._create_bogus_item()
with pytest.raises(exceptions.PreconditionFailed): with pytest.raises(exceptions.PreconditionFailed):
s.update(s._get_href(item), item, '"123"') s.update(s._get_href(item), item, '"123"')
with pytest.raises(exceptions.PreconditionFailed): with pytest.raises(exceptions.PreconditionFailed):
s.update('huehue', item, '"123"') s.update('huehue', item, '"123"')
def test_wrong_etag(self): def test_wrong_etag(self, s):
s = self._get_storage()
item = self._create_bogus_item() item = self._create_bogus_item()
href, etag = s.upload(item) href, etag = s.upload(item)
with pytest.raises(exceptions.PreconditionFailed): with pytest.raises(exceptions.PreconditionFailed):
@ -94,32 +97,27 @@ class BaseStorageTests(object):
with pytest.raises(exceptions.PreconditionFailed): with pytest.raises(exceptions.PreconditionFailed):
s.delete(href, '"lolnope"') s.delete(href, '"lolnope"')
def test_delete(self): def test_delete(self, s):
s = self._get_storage()
href, etag = s.upload(self._create_bogus_item()) href, etag = s.upload(self._create_bogus_item())
s.delete(href, etag) s.delete(href, etag)
assert not list(s.list()) assert not list(s.list())
def test_delete_nonexisting(self): def test_delete_nonexisting(self, s):
s = self._get_storage()
with pytest.raises(exceptions.PreconditionFailed): with pytest.raises(exceptions.PreconditionFailed):
s.delete('1', '"123"') s.delete('1', '"123"')
def test_list(self): def test_list(self, s):
s = self._get_storage()
assert not list(s.list()) assert not list(s.list())
s.upload(self._create_bogus_item()) s.upload(self._create_bogus_item())
assert list(s.list()) assert list(s.list())
def test_has(self): def test_has(self, s):
s = self._get_storage()
assert not s.has('asd') assert not s.has('asd')
href, etag = s.upload(self._create_bogus_item()) href, etag = s.upload(self._create_bogus_item())
assert s.has(href) assert s.has(href)
assert not s.has('asd') assert not s.has('asd')
def test_update_others_stay_the_same(self): def test_update_others_stay_the_same(self, s):
s = self._get_storage()
info = dict([ info = dict([
s.upload(self._create_bogus_item()), s.upload(self._create_bogus_item()),
s.upload(self._create_bogus_item()), s.upload(self._create_bogus_item()),
@ -132,14 +130,13 @@ class BaseStorageTests(object):
in s.get_multi(href for href, etag in iteritems(info)) in s.get_multi(href for href, etag in iteritems(info))
) == info ) == info
def test_repr(self): def test_repr(self, s):
s = self._get_storage()
assert self.storage_class.__name__ in repr(s) assert self.storage_class.__name__ in repr(s)
class SupportsCollections(object): class SupportsCollections(object):
def test_discover(self): def test_discover(self, storage_args):
collections = set() collections = set()
def main(): def main():
@ -148,8 +145,7 @@ class SupportsCollections(object):
# Create collections on-the-fly for most storages # Create collections on-the-fly for most storages
# Except ownCloud, which already has all of them, and more # Except ownCloud, which already has all of them, and more
i += 1 i += 1
s = self.storage_class( s = self.storage_class(**storage_args(collection=collection))
**self.get_storage_args(collection=collection))
# radicale ignores empty collections during discovery # radicale ignores empty collections during discovery
item = self._create_bogus_item() item = self._create_bogus_item()
@ -159,7 +155,7 @@ class SupportsCollections(object):
main() # remove leftover variables from loop for safety main() # remove leftover variables from loop for safety
d = self.storage_class.discover( d = self.storage_class.discover(
**self.get_storage_args(collection=None)) **storage_args(collection=None))
def main(): def main():
for s in d: for s in d:
@ -174,15 +170,15 @@ class SupportsCollections(object):
assert not collections assert not collections
def test_discover_collection_arg(self): def test_discover_collection_arg(self, storage_args):
args = self.get_storage_args(collection='test2') args = storage_args(collection='test2')
with pytest.raises(TypeError) as excinfo: with pytest.raises(TypeError) as excinfo:
list(self.storage_class.discover(**args)) list(self.storage_class.discover(**args))
assert 'collection argument must not be given' in str(excinfo.value) assert 'collection argument must not be given' in str(excinfo.value)
def test_collection_arg(self): def test_collection_arg(self, storage):
s = self.storage_class(**self.get_storage_args(collection='test2')) s = storage(collection='test2')
# Can't do stronger assertion because of radicale, which needs a # Can't do stronger assertion because of radicale, which needs a
# fileextension to guess the collection type. # fileextension to guess the collection type.
assert 'test2' in s.collection assert 'test2' in s.collection

View file

@ -44,28 +44,23 @@ templates = {
class DavStorageTests(ServerMixin, StorageTests): class DavStorageTests(ServerMixin, StorageTests):
def test_dav_broken_item(self): def test_dav_broken_item(self, s):
item = Item(u'HAHA:YES') item = Item(u'HAHA:YES')
s = self._get_storage()
try: try:
s.upload(item) s.upload(item)
except (exceptions.Error, requests.exceptions.HTTPError): except (exceptions.Error, requests.exceptions.HTTPError):
pass pass
assert not list(s.list()) assert not list(s.list())
@pytest.mark.xfail(dav_server == 'owncloud', @pytest.mark.xfail(dav_server == 'owncloud', reason='See issue #16')
reason='See issue #16') def test_wrong_etag(self, s):
def test_wrong_etag(self): super(DavStorageTests, self).test_wrong_etag(s)
super(DavStorageTests, self).test_wrong_etag()
@pytest.mark.xfail(dav_server == 'owncloud', @pytest.mark.xfail(dav_server == 'owncloud', reason='See issue #16')
reason='See issue #16') def test_update_nonexisting(self, s):
def test_update_nonexisting(self): super(DavStorageTests, self).test_update_nonexisting(s)
super(DavStorageTests, self).test_update_nonexisting()
def test_dav_empty_get_multi_performance(self, monkeypatch):
s = self._get_storage()
def test_dav_empty_get_multi_performance(self, s, monkeypatch):
def breakdown(*a, **kw): def breakdown(*a, **kw):
raise AssertionError('Expected not to be called.') raise AssertionError('Expected not to be called.')
@ -79,10 +74,9 @@ class TestCaldavStorage(DavStorageTests):
item_template = TASK_TEMPLATE item_template = TASK_TEMPLATE
def test_both_vtodo_and_vevent(self): def test_both_vtodo_and_vevent(self, s):
task = self._create_bogus_item(item_template=TASK_TEMPLATE) task = self._create_bogus_item(item_template=TASK_TEMPLATE)
event = self._create_bogus_item(item_template=EVENT_TEMPLATE) event = self._create_bogus_item(item_template=EVENT_TEMPLATE)
s = self._get_storage()
href_etag_task = s.upload(task) href_etag_task = s.upload(task)
href_etag_event = s.upload(event) href_etag_event = s.upload(event)
assert set(s.list()) == set([ assert set(s.list()) == set([
@ -91,10 +85,9 @@ class TestCaldavStorage(DavStorageTests):
]) ])
@pytest.mark.parametrize('item_type', ['VTODO', 'VEVENT']) @pytest.mark.parametrize('item_type', ['VTODO', 'VEVENT'])
def test_item_types_correctness(self, item_type): def test_item_types_correctness(self, item_type, storage_args):
other_item_type = 'VTODO' if item_type == 'VEVENT' else 'VEVENT' other_item_type = 'VTODO' if item_type == 'VEVENT' else 'VEVENT'
kw = self.get_storage_args() s = self.storage_class(item_types=(item_type,), **storage_args())
s = self.storage_class(item_types=(item_type,), **kw)
try: try:
s.upload(self._create_bogus_item( s.upload(self._create_bogus_item(
item_template=templates[other_item_type])) item_template=templates[other_item_type]))
@ -116,9 +109,9 @@ class TestCaldavStorage(DavStorageTests):
('VTODO', 'VEVENT', 'VJOURNAL'), ('VTODO', 'VEVENT', 'VJOURNAL'),
() ()
]) ])
def test_item_types_performance(self, item_types, monkeypatch): def test_item_types_performance(self, storage_args, item_types,
kw = self.get_storage_args() monkeypatch):
s = self.storage_class(item_types=item_types, **kw) s = self.storage_class(item_types=item_types, **storage_args())
item = self._create_bogus_item() item = self._create_bogus_item()
href, etag = s.upload(item) href, etag = s.upload(item)
@ -139,11 +132,11 @@ class TestCaldavStorage(DavStorageTests):
@pytest.mark.xfail(dav_server == 'radicale', @pytest.mark.xfail(dav_server == 'radicale',
reason='Radicale doesn\'t support timeranges.') reason='Radicale doesn\'t support timeranges.')
def test_timerange_correctness(self): def test_timerange_correctness(self, storage_args):
kw = self.get_storage_args()
start_date = datetime.datetime(2013, 9, 10) start_date = datetime.datetime(2013, 9, 10)
end_date = datetime.datetime(2013, 9, 13) end_date = datetime.datetime(2013, 9, 13)
s = self.storage_class(start_date=start_date, end_date=end_date, **kw) s = self.storage_class(start_date=start_date, end_date=end_date,
**storage_args())
too_old_item = self._create_bogus_item(item_template=dedent(u''' too_old_item = self._create_bogus_item(item_template=dedent(u'''
BEGIN:VCALENDAR BEGIN:VCALENDAR
@ -193,15 +186,15 @@ class TestCaldavStorage(DavStorageTests):
assert list(s.list()) == [(href, etag)] assert list(s.list()) == [(href, etag)]
def test_item_types_passed_as_string(self): def test_item_types_passed_as_string(self, storage_args):
kw = self.get_storage_args() kw = storage_args()
a = self.storage_class(item_types='VTODO,VEVENT', **kw) a = self.storage_class(item_types='VTODO,VEVENT', **kw)
b = self.storage_class(item_types=('VTODO', 'VEVENT'), **kw) b = self.storage_class(item_types=('VTODO', 'VEVENT'), **kw)
assert a.item_types == b.item_types == ('VTODO', 'VEVENT') assert a.item_types == b.item_types == ('VTODO', 'VEVENT')
def test_invalid_resource(self, monkeypatch): def test_invalid_resource(self, monkeypatch, storage_args):
calls = [] calls = []
args = self.get_storage_args(collection=None) args = storage_args(collection=None)
def request(session, method, url, data=None, headers=None, auth=None, def request(session, method, url, data=None, headers=None, auth=None,
verify=None): verify=None):

View file

@ -78,12 +78,11 @@ class TestHttpStorage(BaseStorageTests):
return {'url': 'http://localhost:123/collection.txt', return {'url': 'http://localhost:123/collection.txt',
'path': self.tmpfile} 'path': self.tmpfile}
def test_update(self): def test_update(self, s):
'''The original testcase tries to fetch with the old href. But this '''The original testcase tries to fetch with the old href. But this
storage doesn't have real hrefs, so the href might change if the storage doesn't have real hrefs, so the href might change if the
underlying UID changes. ''' underlying UID changes. '''
s = self._get_storage()
item = self._create_bogus_item() item = self._create_bogus_item()
href, etag = s.upload(item) href, etag = s.upload(item)
assert_item_equals(s.get(href)[0], item) assert_item_equals(s.get(href)[0], item)

View file

@ -42,12 +42,11 @@ class TestSingleFileStorage(BaseStorageTests):
with pytest.raises(IOError): with pytest.raises(IOError):
s = self.storage_class(str(tmpdir) + '/foo.ics', create=False) s = self.storage_class(str(tmpdir) + '/foo.ics', create=False)
def test_update(self): def test_update(self, s):
'''The original testcase tries to fetch with the old href. But this '''The original testcase tries to fetch with the old href. But this
storage doesn't have real hrefs, so the href might change if the storage doesn't have real hrefs, so the href might change if the
underlying UID changes. ''' underlying UID changes. '''
s = self._get_storage()
item = self._create_bogus_item() item = self._create_bogus_item()
href, etag = s.upload(item) href, etag = s.upload(item)
assert_item_equals(s.get(href)[0], item) assert_item_equals(s.get(href)[0], item)

View file

@ -157,5 +157,5 @@ def test_get_class_init_args_on_storage():
from vdirsyncer.storage.memory import MemoryStorage from vdirsyncer.storage.memory import MemoryStorage
all, required = utils.get_class_init_args(MemoryStorage) all, required = utils.get_class_init_args(MemoryStorage)
assert all == set(['read_only']) assert all == set(['collection', 'read_only'])
assert not required assert not required

View file

@ -23,7 +23,9 @@ class MemoryStorage(Storage):
Saves data in RAM, only useful for testing. Saves data in RAM, only useful for testing.
''' '''
def __init__(self, **kwargs): def __init__(self, collection=None, **kwargs):
if collection is not None:
raise ValueError('MemoryStorage does not support collections.')
self.items = {} # href => (etag, item) self.items = {} # href => (etag, item)
super(MemoryStorage, self).__init__(**kwargs) super(MemoryStorage, self).__init__(**kwargs)