Complete rewrite of storage API, again

This commit is contained in:
Markus Unterwaditzer 2014-02-25 00:41:15 +01:00
parent 8909460380
commit b878c1dba2
6 changed files with 173 additions and 125 deletions

View file

@ -24,36 +24,45 @@ class Item(object):
class Storage(object):
'''Superclass of all storages, mainly useful to summarize the interface to
implement.'''
def __init__(self, fileext='', item_class=Item):
implement.
Terminology:
- UID: Global identifier of the item, across storages.
- HREF: Per-storage identifier of item, might be UID.
- ETAG: Checksum of item, or something similar that changes when the object does
'''
def __init__(self, fileext='.txt', item_class=Item):
self.fileext = fileext
self.item_class = item_class
def _get_href(self, uid):
return uid + self.fileext
def list(self):
'''
:returns: list of (uid, etag)
:returns: list of (href, etag)
'''
raise NotImplementedError()
def get(self, uid):
def get(self, href):
'''
:param uid: uid to fetch
:param href: href to fetch
:returns: (object, etag)
'''
raise NotImplementedError()
def get_multi(self, uids):
def get_multi(self, hrefs):
'''
:param uids: list of uids to fetch
:returns: iterable of (uid, obj, etag)
:param hrefs: list of hrefs to fetch
:returns: iterable of (href, obj, etag)
'''
for uid in uids:
obj, etag = self.get(uid)
yield uid, obj, etag
for href in hrefs:
obj, etag = self.get(href)
yield href, obj, etag
def has(self, uid):
def has(self, href):
'''
check if item exists
check if item exists by href
:returns: True or False
'''
raise NotImplementedError()
@ -62,23 +71,23 @@ class Storage(object):
'''
Upload a new object, raise
:exc:`vdirsyncer.exceptions.AlreadyExistingError` if it already exists.
:returns: etag on the server
:returns: (href, etag)
'''
raise NotImplementedError()
def update(self, obj, etag):
def update(self, href, obj, etag):
'''
Update the object, raise :exc:`vdirsyncer.exceptions.WrongEtagError` if
the etag on the server doesn't match the given etag, raise
:exc:`vdirsyncer.exceptions.NotFoundError` if the item doesn't exist.
:returns: etag on the server
:returns: etag
'''
raise NotImplementedError()
def delete(self, uid, etag):
def delete(self, href, etag):
'''
Delete the object, raise exceptions when etag doesn't match, no return
value
Delete the object by href, raise exceptions when etag doesn't match, no
return value
'''
raise NotImplementedError()

View file

@ -12,7 +12,9 @@ from vdirsyncer.storage.base import Storage, Item
import vdirsyncer.exceptions as exceptions
class FilesystemStorage(Storage):
'''Saves data in vdir collection, mtime is etag.'''
'''Saves data in vdir collection
mtime is etag
filename without path is href'''
def __init__(self, path, **kwargs):
'''
:param path: Absolute path to a *collection* inside a vdir.
@ -20,34 +22,39 @@ class FilesystemStorage(Storage):
self.path = path
super(FilesystemStorage, self).__init__(**kwargs)
def _get_filepath(self, uid):
return os.path.join(self.path, uid + self.fileext)
def _get_filepath(self, href):
return os.path.join(self.path, href)
def _get_href(self, uid):
return uid + self.fileext
def list(self):
for fname in os.listdir(self.path):
fpath = os.path.join(self.path, fname)
if os.path.isfile(fpath) and fname.endswith(self.fileext):
uid = fname[:-len(self.fileext)]
yield uid, os.path.getmtime(fpath)
yield fname, os.path.getmtime(fpath)
def get(self, uid):
fpath = self._get_filepath(uid)
def get(self, href):
fpath = self._get_filepath(href)
with open(fpath, 'rb') as f:
return Item(f.read()), os.path.getmtime(fpath)
def has(self, uid):
return os.path.isfile(self._get_filepath(uid))
def has(self, href):
return os.path.isfile(self._get_filepath(href))
def upload(self, obj):
fpath = self._get_filepath(obj.uid)
href = self._get_href(obj.uid)
fpath = self._get_filepath(href)
if os.path.exists(fpath):
raise exceptions.AlreadyExistingError(obj.uid)
with open(fpath, 'wb+') as f:
f.write(obj.raw)
return os.path.getmtime(fpath)
return href, os.path.getmtime(fpath)
def update(self, obj, etag):
fpath = self._get_filepath(obj.uid)
def update(self, href, obj, etag):
fpath = self._get_filepath(href)
if href != self._get_href(obj.uid):
raise exceptions.NotFoundError(obj.uid)
if not os.path.exists(fpath):
raise exceptions.NotFoundError(obj.uid)
actual_etag = os.path.getmtime(fpath)
@ -58,10 +65,10 @@ class FilesystemStorage(Storage):
f.write(obj.raw)
return os.path.getmtime(fpath)
def delete(self, uid, etag):
fpath = self._get_filepath(uid)
def delete(self, href, etag):
fpath = self._get_filepath(href)
if not os.path.isfile(fpath):
raise exceptions.NotFoundError(uid)
raise exceptions.NotFoundError(href)
actual_etag = os.path.getmtime(fpath)
if etag != actual_etag:
raise exceptions.WrongEtagError(etag, actual_etag)

View file

@ -16,41 +16,42 @@ class MemoryStorage(Storage):
Saves data in RAM, only useful for testing.
'''
def __init__(self, **kwargs):
self.items = {} # uid => (etag, object)
self.items = {} # href => (etag, object)
super(MemoryStorage, self).__init__(**kwargs)
def list(self):
for uid, (etag, obj) in self.items.items():
yield uid, etag
for href, (etag, obj) in self.items.items():
yield href, etag
def get(self, uid):
etag, obj = self.items[uid]
def get(self, href):
etag, obj = self.items[href]
return obj, etag
def has(self, uid):
return uid in self.items
def has(self, href):
return href in self.items
def upload(self, obj):
if obj.uid in self.items:
raise exceptions.AlreadyExistingError(obj)
href = self._get_href(obj.uid)
if href in self.items:
raise exceptions.AlreadyExistingError(obj.uid)
etag = datetime.datetime.now()
self.items[obj.uid] = (etag, obj)
return etag
self.items[href] = (etag, obj)
return href, etag
def update(self, obj, etag):
if obj.uid not in self.items:
raise exceptions.NotFoundError(obj)
actual_etag, _ = self.items[obj.uid]
def update(self, href, obj, etag):
if href != self._get_href(obj.uid) or href not in self.items:
raise exceptions.NotFoundError(href)
actual_etag, _ = self.items[href]
if etag != actual_etag:
raise exceptions.WrongEtagError(etag, actual_etag)
new_etag = datetime.datetime.now()
self.items[obj.uid] = (new_etag, obj)
return etag
self.items[href] = (new_etag, obj)
return new_etag
def delete(self, uid, etag):
if not self.has(uid):
raise exceptions.NotFoundError(uid)
if etag != self.items[uid][0]:
def delete(self, href, etag):
if not self.has(href):
raise exceptions.NotFoundError(href)
if etag != self.items[href][0]:
raise exceptions.WrongEtagError(etag)
del self.items[uid]
del self.items[href]

View file

@ -13,6 +13,30 @@
:license: MIT, see LICENSE for more details.
'''
def prepare_list(storage, href_to_uid):
for href, etag in storage.list():
props = {'etag': etag}
if href in href_to_uid:
props['uid'] = href_to_uid[href]
else:
obj, new_etag = storage.get(href)
assert etag == new_etag
props['uid'] = obj.uid
props['obj'] = obj
yield href, props
def prefetch(storage, item_list, hrefs):
hrefs_to_prefetch = []
for href in hrefs:
if 'obj' not in item_list[href]:
hrefs_to_prefetch.append(href)
for href, obj, etag in storage.get_multi(hrefs_to_prefetch):
assert item_list[href]['etag'] == etag
item_list[href]['obj'] = obj
def sync(storage_a, storage_b, status):
'''Syncronizes two storages.
@ -20,48 +44,56 @@ def sync(storage_a, storage_b, status):
:type storage_a: :class:`vdirsyncer.storage.base.Storage`
:param storage_b: The second storage
:type storage_b: :class:`vdirsyncer.storage.base.Storage`
:param status:
{uid: (etag_a, etag_b)}, metadata about the two storages for detection
of changes. Will be modified by the function and should be passed to it
at the next sync. If this is the first sync, an empty dictionary should
be provided.
:param status: {uid: (href_a, etag_a, href_b, etag_b)}
metadata about the two storages for detection of changes. Will be
modified by the function and should be passed to it at the next sync.
If this is the first sync, an empty dictionary should be provided.
'''
list_a = dict(storage_a.list()) # {uid: etag}
list_b = dict(storage_b.list())
items_a = {} # items prefetched for copy
items_b = {} # {uid: (item, etag)}
a_href_to_uid = dict((href_a, uid) for uid, (href_a, etag_a, href_b, etag_b) in status.iteritems())
b_href_to_uid = dict((href_b, uid) for uid, (href_a, etag_a, href_b, etag_b) in status.iteritems())
list_a = dict(prepare_list(storage_a, a_href_to_uid)) # href => {'etag': etag, 'obj': optional object, 'uid': uid}
list_b = dict(prepare_list(storage_b, b_href_to_uid))
a_uid_to_href = dict((x['uid'], href) for href, x in list_a.iteritems())
b_uid_to_href = dict((x['uid'], href) for href, x in list_b.iteritems())
etags_a = dict((x['uid'], x['etag']) for href, x in list_a.iteritems())
etags_b = dict((x['uid'], x['etag']) for href, x in list_b.iteritems())
del a_href_to_uid, b_href_to_uid
actions, prefetch_from_a, prefetch_from_b = \
get_actions(list_a, list_b, status)
def prefetch():
for uid, item, etag in storage_a.get_multi(prefetch_from_a):
items_a[uid] = (item, etag)
for uid, item, etag in storage_b.get_multi(prefetch_from_b):
items_b[uid] = (item, etag)
prefetch()
get_actions(etags_a, etags_b, status)
prefetch(storage_a, list_a, (a_uid_to_href[x] for x in prefetch_from_a))
prefetch(storage_b, list_b, (b_uid_to_href[x] for x in prefetch_from_b))
storages = {
'a': (storage_a, items_a, list_a),
'b': (storage_b, items_b, list_b),
'a': (storage_a, list_a, a_uid_to_href),
'b': (storage_b, list_b, b_uid_to_href),
None: (None, None, None)
}
for action, uid, source, dest in actions:
source_storage, source_items, source_list = storages[source]
dest_storage, dest_items, dest_list = storages[dest]
source_storage, source_list, source_uid_to_href = storages[source]
dest_storage, dest_list, dest_uid_to_href = storages[dest]
if action in ('upload', 'update'):
item, source_etag = source_items[uid]
source_href = source_uid_to_href[uid]
source_etag = source_list[source_href]['etag']
obj = source_list[source_href]['obj']
if action == 'upload':
dest_etag = dest_storage.upload(item)
dest_href, dest_etag = dest_storage.upload(obj)
else:
dest_etag = dest_storage.update(item, dest_list[uid])
status[uid] = (source_etag, dest_etag) if source == 'a' else (dest_etag, source_etag)
dest_href = dest_uid_to_href[uid]
old_etag = dest_list[dest_href]['etag']
dest_etag = dest_storage.update(dest_href, obj, old_etag)
source_status = (source_href, source_etag)
dest_status = (dest_href, dest_etag)
status[uid] = source_status + dest_status if source == 'a' else dest_status + source_status
elif action == 'delete':
if dest is not None:
dest_storage.delete(uid, dest_list[uid])
dest_href = dest_uid_to_href[uid]
dest_etag = dest_list[dest_href]['etag']
dest_storage.delete(dest_href, dest_etag)
del status[uid]
def get_actions(list_a, list_b, status):
@ -80,13 +112,14 @@ def get_actions(list_a, list_b, status):
prefetch_from_b.append(uid)
actions.append(('upload', uid, 'b', 'a'))
else:
href_a, etag_a, href_b, etag_b = status[uid]
if uid in list_a and uid in list_b:
if list_a[uid] != status[uid][0] and list_b[uid] != status[uid][1]:
if list_a[uid] != etag_a and list_b[uid] != etag_b:
1/0 # conflict resolution TODO
elif list_a[uid] != status[uid][0]: # item was updated in a
elif list_a[uid] != etag_a: # item was updated in a
prefetch_from_a.append(uid)
actions.append(('update', uid, 'a', 'b'))
elif list_b[uid] != status[uid][1]: # item was updated in b
elif list_b[uid] != etag_b: # item was updated in b
prefetch_from_b.append(uid)
actions.append(('update', uid, 'b', 'a'))
else: # completely in sync!

View file

@ -37,13 +37,11 @@ class StorageTests(object):
s = self._get_storage(fileext=fileext)
for item in items:
s.upload(Item(item))
a = set(uid for uid, etag in s.list())
b = set(str(y) for y in range(1, 10))
assert a == b
for i in b:
assert s.has(i)
item, etag = s.get(i)
assert item.raw == 'UID:{}'.format(i)
hrefs = (href for href, etag in s.list())
for href in hrefs:
assert s.has(href)
obj, etag = s.get(href)
assert obj.raw == 'UID:{}'.format(obj.uid)
def test_upload_already_existing(self):
s = self._get_storage()
@ -54,14 +52,14 @@ class StorageTests(object):
def test_update_nonexisting(self):
s = self._get_storage()
item = Item('UID:1')
self.assertRaises(exceptions.NotFoundError, s.update, item, 123)
self.assertRaises(exceptions.NotFoundError, s.update, 'huehue', item, 123)
def test_wrong_etag(self):
s = self._get_storage()
item = Item('UID:1')
etag = s.upload(item)
self.assertRaises(exceptions.WrongEtagError, s.update, item, 'lolnope')
self.assertRaises(exceptions.WrongEtagError, s.delete, '1', 'lolnope')
obj = Item('UID:1')
href, etag = s.upload(obj)
self.assertRaises(exceptions.WrongEtagError, s.update, href, obj, 'lolnope')
self.assertRaises(exceptions.WrongEtagError, s.delete, href, 'lolnope')
def test_delete_nonexisting(self):
s = self._get_storage()

View file

@ -20,78 +20,78 @@ class SyncTests(TestCase):
def test_irrelevant_status(self):
a = MemoryStorage()
b = MemoryStorage()
status = {'1': ('UID:1', 1234)}
status = {'1': ('1.asd', 1234, '1.ics', 2345)}
sync(a, b, status)
assert not status
assert empty_storage(a)
assert empty_storage(b)
def test_missing_status(self):
a = MemoryStorage()
b = MemoryStorage()
a = MemoryStorage(fileext='.txt')
b = MemoryStorage(fileext='.asd')
status = {}
item = Item('UID:1')
a.upload(item)
b.upload(item)
sync(a, b, status)
assert list(status) == ['1']
assert a.has('1')
assert b.has('1')
assert len(status) == 1
assert a.has('1.txt')
assert b.has('1.asd')
def test_upload_and_update(self):
a = MemoryStorage()
b = MemoryStorage()
a = MemoryStorage(fileext='.txt')
b = MemoryStorage(fileext='.asd')
status = {}
item = Item('UID:1') # new item 1 in a
a.upload(item)
sync(a, b, status)
assert b.get('1')[0].raw == item.raw
assert b.get('1.asd')[0].raw == item.raw
item = Item('UID:1\nASDF:YES') # update of item 1 in b
b.update(item, b.get('1')[1])
b.update('1.asd', item, b.get('1.asd')[1])
sync(a, b, status)
assert a.get('1')[0].raw == item.raw
assert a.get('1.txt')[0].raw == item.raw
item2 = Item('UID:2') # new item 2 in b
b.upload(item2)
sync(a, b, status)
assert a.get('2')[0].raw == item2.raw
assert a.get('2.txt')[0].raw == item2.raw
item2 = Item('UID:2\nASDF:YES') # update of item 2 in a
a.update(item2, a.get('2')[1])
a.update('2.txt', item2, a.get('2.txt')[1])
sync(a, b, status)
assert b.get('2')[0].raw == item2.raw
assert b.get('2.asd')[0].raw == item2.raw
def test_deletion(self):
a = MemoryStorage()
b = MemoryStorage()
a = MemoryStorage(fileext='.txt')
b = MemoryStorage(fileext='.asd')
status = {}
item = Item('UID:1')
a.upload(item)
sync(a, b, status)
b.delete('1', b.get('1')[1])
b.delete('1.asd', b.get('1.asd')[1])
sync(a, b, status)
assert not a.has('1') and not b.has('1')
assert not a.has('1.txt') and not b.has('1.asd')
a.upload(item)
sync(a, b, status)
assert a.has('1') and b.has('1')
a.delete('1', a.get('1')[1])
assert a.has('1.txt') and b.has('1.asd')
a.delete('1.txt', a.get('1.txt')[1])
sync(a, b, status)
assert not a.has('1') and not b.has('1')
assert not a.has('1.txt') and not b.has('1.asd')
def test_already_synced(self):
a = MemoryStorage()
b = MemoryStorage()
a = MemoryStorage(fileext='.txt')
b = MemoryStorage(fileext='.asd')
item = Item('UID:1')
a.upload(item)
b.upload(item)
status = {'1': (a.get('1')[1], b.get('1')[1])}
status = {'1': ('1.txt', a.get('1.txt')[1], '1.asd', b.get('1.asd')[1])}
old_status = dict(status)
a.update = b.update = a.upload = b.upload = \
lambda *a, **kw: self.fail('Method shouldn\'t have been called.')
sync(a, b, status)
assert status == old_status
assert a.has('1') and b.has('1')
assert a.has('1.txt') and b.has('1.asd')