mirror of
https://github.com/samsonjs/vdirsyncer.git
synced 2026-03-25 08:55:50 +00:00
* Initial etesync support * Optimized get() * Bugfixes * bugfixes * Add writing operations * collection creation WIP * Fix collection creation * tests wip * wip * Final touch for test setup * actually skip tests * Disable metadata tests * Avoid polluting working tree * Avoid importing wsgi-intercept if not installed * Fix collection tests * Proper teardown * Skip bad test * Remove vtodo * wip * wip * wip * wip * wip * wip * wip * wip * wip * Add docs * Remove useless pkg * Don't crash if etesync isn't imported * Stylefix * Fix more import errors * More import errors * Only run etesync on latest python * Fix etesync check * journal-manager migration
360 lines
12 KiB
Python
360 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import random
|
|
import uuid
|
|
|
|
import textwrap
|
|
from urllib.parse import quote as urlquote, unquote as urlunquote
|
|
|
|
import hypothesis.strategies as st
|
|
from hypothesis import given
|
|
|
|
import pytest
|
|
|
|
from vdirsyncer import exceptions
|
|
from vdirsyncer.storage.base import normalize_meta_value
|
|
from vdirsyncer.vobject import Item
|
|
|
|
from .. import EVENT_TEMPLATE, TASK_TEMPLATE, VCARD_TEMPLATE, \
|
|
assert_item_equals, normalize_item, printable_characters_strategy
|
|
|
|
|
|
def get_server_mixin(server_name):
|
|
from . import __name__ as base
|
|
x = __import__('{}.servers.{}'.format(base, server_name), fromlist=[''])
|
|
return x.ServerMixin
|
|
|
|
|
|
def format_item(item_template, uid=None):
|
|
# assert that special chars are handled correctly.
|
|
r = random.random()
|
|
return Item(item_template.format(r=r, uid=uid or r))
|
|
|
|
|
|
class StorageTests(object):
|
|
storage_class = None
|
|
supports_collections = True
|
|
supports_metadata = True
|
|
|
|
@pytest.fixture(params=['VEVENT', 'VTODO', 'VCARD'])
|
|
def item_type(self, request):
|
|
'''Parametrize with all supported item types.'''
|
|
return request.param
|
|
|
|
@pytest.fixture
|
|
def get_storage_args(self):
|
|
'''
|
|
Return a function with the following properties:
|
|
|
|
:param collection: The name of the collection to create and use.
|
|
'''
|
|
raise NotImplementedError()
|
|
|
|
@pytest.fixture
|
|
def s(self, get_storage_args):
|
|
return self.storage_class(**get_storage_args())
|
|
|
|
@pytest.fixture
|
|
def get_item(self, item_type):
|
|
template = {
|
|
'VEVENT': EVENT_TEMPLATE,
|
|
'VTODO': TASK_TEMPLATE,
|
|
'VCARD': VCARD_TEMPLATE,
|
|
}[item_type]
|
|
|
|
return lambda **kw: format_item(template, **kw)
|
|
|
|
@pytest.fixture
|
|
def requires_collections(self):
|
|
if not self.supports_collections:
|
|
pytest.skip('This storage does not support collections.')
|
|
|
|
@pytest.fixture
|
|
def requires_metadata(self):
|
|
if not self.supports_metadata:
|
|
pytest.skip('This storage does not support metadata.')
|
|
|
|
def test_generic(self, s, get_item):
|
|
items = [get_item() for i in range(1, 10)]
|
|
hrefs = []
|
|
for item in items:
|
|
href, etag = s.upload(item)
|
|
if etag is None:
|
|
_, etag = s.get(href)
|
|
hrefs.append((href, etag))
|
|
hrefs.sort()
|
|
assert hrefs == sorted(s.list())
|
|
for href, etag in hrefs:
|
|
assert isinstance(href, (str, bytes))
|
|
assert isinstance(etag, (str, bytes))
|
|
assert s.has(href)
|
|
item, etag2 = s.get(href)
|
|
assert etag == etag2
|
|
|
|
def test_empty_get_multi(self, s):
|
|
assert list(s.get_multi([])) == []
|
|
|
|
def test_get_multi_duplicates(self, s, get_item):
|
|
href, etag = s.upload(get_item())
|
|
if etag is None:
|
|
_, etag = s.get(href)
|
|
(href2, item, etag2), = s.get_multi([href] * 2)
|
|
assert href2 == href
|
|
assert etag2 == etag
|
|
|
|
def test_upload_already_existing(self, s, get_item):
|
|
item = get_item()
|
|
s.upload(item)
|
|
with pytest.raises(exceptions.PreconditionFailed):
|
|
s.upload(item)
|
|
|
|
def test_upload(self, s, get_item):
|
|
item = get_item()
|
|
href, etag = s.upload(item)
|
|
assert_item_equals(s.get(href)[0], item)
|
|
|
|
def test_update(self, s, get_item):
|
|
item = get_item()
|
|
href, etag = s.upload(item)
|
|
if etag is None:
|
|
_, etag = s.get(href)
|
|
assert_item_equals(s.get(href)[0], item)
|
|
|
|
new_item = get_item(uid=item.uid)
|
|
new_etag = s.update(href, new_item, etag)
|
|
if new_etag is None:
|
|
_, new_etag = s.get(href)
|
|
# See https://github.com/pimutils/vdirsyncer/issues/48
|
|
assert isinstance(new_etag, (bytes, str))
|
|
assert_item_equals(s.get(href)[0], new_item)
|
|
|
|
def test_update_nonexisting(self, s, get_item):
|
|
item = get_item()
|
|
with pytest.raises(exceptions.PreconditionFailed):
|
|
s.update('huehue', item, '"123"')
|
|
|
|
def test_wrong_etag(self, s, get_item):
|
|
item = get_item()
|
|
href, etag = s.upload(item)
|
|
with pytest.raises(exceptions.PreconditionFailed):
|
|
s.update(href, item, '"lolnope"')
|
|
with pytest.raises(exceptions.PreconditionFailed):
|
|
s.delete(href, '"lolnope"')
|
|
|
|
def test_delete(self, s, get_item):
|
|
href, etag = s.upload(get_item())
|
|
s.delete(href, etag)
|
|
assert not list(s.list())
|
|
|
|
def test_delete_nonexisting(self, s, get_item):
|
|
with pytest.raises(exceptions.PreconditionFailed):
|
|
s.delete('1', '"123"')
|
|
|
|
def test_list(self, s, get_item):
|
|
assert not list(s.list())
|
|
href, etag = s.upload(get_item())
|
|
if etag is None:
|
|
_, etag = s.get(href)
|
|
assert list(s.list()) == [(href, etag)]
|
|
|
|
def test_has(self, s, get_item):
|
|
assert not s.has('asd')
|
|
href, etag = s.upload(get_item())
|
|
assert s.has(href)
|
|
assert not s.has('asd')
|
|
s.delete(href, etag)
|
|
assert not s.has(href)
|
|
|
|
def test_update_others_stay_the_same(self, s, get_item):
|
|
info = {}
|
|
for _ in range(4):
|
|
href, etag = s.upload(get_item())
|
|
if etag is None:
|
|
_, etag = s.get(href)
|
|
info[href] = etag
|
|
|
|
assert dict(
|
|
(href, etag) for href, item, etag
|
|
in s.get_multi(href for href, etag in info.items())
|
|
) == info
|
|
|
|
def test_repr(self, s, get_storage_args):
|
|
assert self.storage_class.__name__ in repr(s)
|
|
assert s.instance_name is None
|
|
|
|
def test_discover(self, requires_collections, get_storage_args, get_item):
|
|
collections = set()
|
|
for i in range(1, 5):
|
|
collection = 'test{}'.format(i)
|
|
s = self.storage_class(**get_storage_args(collection=collection))
|
|
assert not list(s.list())
|
|
s.upload(get_item())
|
|
collections.add(s.collection)
|
|
|
|
actual = set(
|
|
c['collection'] for c in
|
|
self.storage_class.discover(**get_storage_args(collection=None))
|
|
)
|
|
|
|
assert actual >= collections
|
|
|
|
def test_create_collection(self, requires_collections, get_storage_args,
|
|
get_item):
|
|
if getattr(self, 'dav_server', '') == 'radicale':
|
|
pytest.skip('MKCOL is broken under Radicale 1.x')
|
|
|
|
if getattr(self, 'dav_server', '') in \
|
|
('icloud', 'fastmail', 'davical'):
|
|
pytest.skip('Manual cleanup would be necessary.')
|
|
|
|
args = get_storage_args(collection=None)
|
|
args['collection'] = 'test'
|
|
|
|
s = self.storage_class(
|
|
**self.storage_class.create_collection(**args)
|
|
)
|
|
|
|
href = s.upload(get_item())[0]
|
|
assert href in set(href for href, etag in s.list())
|
|
|
|
def test_discover_collection_arg(self, requires_collections,
|
|
get_storage_args):
|
|
args = get_storage_args(collection='test2')
|
|
with pytest.raises(TypeError) as excinfo:
|
|
list(self.storage_class.discover(**args))
|
|
|
|
assert 'collection argument must not be given' in str(excinfo.value)
|
|
|
|
def test_collection_arg(self, get_storage_args):
|
|
if self.storage_class.storage_name.startswith('etesync'):
|
|
pytest.skip('etesync uses UUIDs.')
|
|
|
|
if self.supports_collections:
|
|
s = self.storage_class(**get_storage_args(collection='test2'))
|
|
# Can't do stronger assertion because of radicale, which needs a
|
|
# fileextension to guess the collection type.
|
|
assert 'test2' in s.collection
|
|
else:
|
|
with pytest.raises(ValueError):
|
|
self.storage_class(collection='ayy', **get_storage_args())
|
|
|
|
def test_case_sensitive_uids(self, s, get_item):
|
|
if s.storage_name == 'filesystem':
|
|
pytest.skip('Behavior depends on the filesystem.')
|
|
|
|
uid = str(uuid.uuid4())
|
|
s.upload(get_item(uid=uid.upper()))
|
|
s.upload(get_item(uid=uid.lower()))
|
|
items = list(href for href, etag in s.list())
|
|
assert len(items) == 2
|
|
assert len(set(items)) == 2
|
|
|
|
def test_specialchars(self, monkeypatch, requires_collections,
|
|
get_storage_args, get_item):
|
|
if getattr(self, 'dav_server', '') == 'radicale':
|
|
pytest.skip('Radicale is fundamentally broken.')
|
|
if getattr(self, 'dav_server', '') in ('icloud', 'fastmail'):
|
|
pytest.skip('iCloud and FastMail reject this name.')
|
|
|
|
monkeypatch.setattr('vdirsyncer.utils.generate_href', lambda x: x)
|
|
|
|
uid = u'test @ foo ät bar град сатану'
|
|
collection = 'test @ foo ät bar'
|
|
|
|
s = self.storage_class(**get_storage_args(collection=collection))
|
|
item = get_item(uid=uid)
|
|
|
|
href, etag = s.upload(item)
|
|
item2, etag2 = s.get(href)
|
|
if etag is not None:
|
|
assert etag2 == etag
|
|
assert_item_equals(item2, item)
|
|
|
|
(_, etag3), = s.list()
|
|
assert etag2 == etag3
|
|
|
|
# etesync uses UUIDs for collection names
|
|
if self.storage_class.storage_name.startswith('etesync'):
|
|
return
|
|
|
|
assert collection in urlunquote(s.collection)
|
|
if self.storage_class.storage_name.endswith('dav'):
|
|
assert urlquote(uid, '/@:') in href
|
|
|
|
def test_metadata(self, requires_metadata, s):
|
|
if not getattr(self, 'dav_server', ''):
|
|
assert not s.get_meta('color')
|
|
assert not s.get_meta('displayname')
|
|
|
|
try:
|
|
s.set_meta('color', None)
|
|
assert not s.get_meta('color')
|
|
s.set_meta('color', u'#ff0000')
|
|
assert s.get_meta('color') == u'#ff0000'
|
|
except exceptions.UnsupportedMetadataError:
|
|
pass
|
|
|
|
for x in (u'hello world', u'hello wörld'):
|
|
s.set_meta('displayname', x)
|
|
rv = s.get_meta('displayname')
|
|
assert rv == x
|
|
assert isinstance(rv, str)
|
|
|
|
@given(value=st.one_of(
|
|
st.none(),
|
|
printable_characters_strategy.filter(lambda x: x.strip() != x)
|
|
))
|
|
def test_metadata_normalization(self, requires_metadata, s, value):
|
|
x = s.get_meta('displayname')
|
|
assert x == normalize_meta_value(x)
|
|
|
|
if not getattr(self, 'dav_server', None):
|
|
# ownCloud replaces "" with "unnamed"
|
|
s.set_meta('displayname', value)
|
|
assert s.get_meta('displayname') == normalize_meta_value(value)
|
|
|
|
def test_recurring_events(self, s, item_type):
|
|
if item_type != 'VEVENT':
|
|
pytest.skip('This storage instance doesn\'t support iCalendar.')
|
|
|
|
uid = str(uuid.uuid4())
|
|
item = Item(textwrap.dedent(u'''
|
|
BEGIN:VCALENDAR
|
|
VERSION:2.0
|
|
BEGIN:VEVENT
|
|
DTSTART;TZID=Australia/Sydney:20140325T084000
|
|
DTEND;TZID=Australia/Sydney:20140325T101000
|
|
DTSTAMP:20140327T060506Z
|
|
UID:{uid}
|
|
RECURRENCE-ID;TZID=Australia/Sydney:20140325T083000
|
|
CREATED:20131216T033331Z
|
|
DESCRIPTION:
|
|
LAST-MODIFIED:20140327T060215Z
|
|
LOCATION:
|
|
SEQUENCE:1
|
|
STATUS:CONFIRMED
|
|
SUMMARY:test Event
|
|
TRANSP:OPAQUE
|
|
END:VEVENT
|
|
BEGIN:VEVENT
|
|
DTSTART;TZID=Australia/Sydney:20140128T083000
|
|
DTEND;TZID=Australia/Sydney:20140128T100000
|
|
RRULE:FREQ=WEEKLY;UNTIL=20141208T213000Z;BYDAY=TU
|
|
DTSTAMP:20140327T060506Z
|
|
UID:{uid}
|
|
CREATED:20131216T033331Z
|
|
DESCRIPTION:
|
|
LAST-MODIFIED:20140222T101012Z
|
|
LOCATION:
|
|
SEQUENCE:0
|
|
STATUS:CONFIRMED
|
|
SUMMARY:Test event
|
|
TRANSP:OPAQUE
|
|
END:VEVENT
|
|
END:VCALENDAR
|
|
'''.format(uid=uid)).strip())
|
|
|
|
href, etag = s.upload(item)
|
|
|
|
item2, etag2 = s.get(href)
|
|
assert normalize_item(item) == normalize_item(item2)
|