diff --git a/tests/test_sync.py b/tests/test_sync.py index 315821d..2e5c300 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -125,6 +125,23 @@ def test_deletion(): assert not a.has('1.a') and not b.has('1.b') +def test_insert_hash(): + a = MemoryStorage() + b = MemoryStorage() + status = {} + + item = Item('UID:1') + href, etag = a.upload(item) + sync(a, b, status) + + for d in status['1']: + del d['hash'] + + a.update(href, Item('UID:1\nHAHA:YES'), etag) + sync(a, b, status) + assert 'hash' in status['1'][0] and 'hash' in status['1'][1] + + def test_already_synced(): a = MemoryStorage(fileext='.a') b = MemoryStorage(fileext='.b') @@ -134,9 +151,11 @@ def test_already_synced(): status = { '1': ({ 'href': '1.a', + 'hash': item.hash, 'etag': a.get('1.a')[1] }, { 'href': '1.b', + 'hash': item.hash, 'etag': b.get('1.b')[1] }) } @@ -413,7 +432,7 @@ class SyncMachine(RuleBasedStateMachine): s.read_only = read_only if flaky_etags: def get(href): - _, item = s.items[href] + old_etag, item = s.items[href] etag = _random_string() s.items[href] = etag, item return item, etag @@ -444,6 +463,15 @@ class SyncMachine(RuleBasedStateMachine): storage.items.pop(href, None) return storage + @rule(target=Status, status=Status, delete_from_b=st.booleans()) + def remove_hash_from_status(self, status, delete_from_b): + for a, b in status.values(): + if delete_from_b: + a = b + assume('hash' in a) + del a['hash'] + return status + @rule( target=Status, status=Status, a=Storage, b=Storage, @@ -451,6 +479,7 @@ class SyncMachine(RuleBasedStateMachine): conflict_resolution=st.one_of((st.just('a wins'), st.just('b wins'))) ) def sync(self, status, a, b, force_delete, conflict_resolution): + assume(a is not b) old_items_a = self._get_items(a) old_items_b = self._get_items(b) @@ -478,6 +507,8 @@ class SyncMachine(RuleBasedStateMachine): assert items_a == old_items_a or not a.read_only assert items_b == old_items_b or not b.read_only + assert set(a.items) | set(b.items) == set(status) + return status diff --git a/vdirsyncer/storage/base.py b/vdirsyncer/storage/base.py index ba5621d..860171a 100644 --- a/vdirsyncer/storage/base.py +++ b/vdirsyncer/storage/base.py @@ -3,7 +3,7 @@ import contextlib import functools -from .. import exceptions, sync +from .. import exceptions from ..utils import uniq from ..utils.vobject import Item # noqa @@ -47,8 +47,6 @@ class Storage(metaclass=StorageMeta): fileext = '.txt' - syncer_class = sync.StorageSyncer - # The string used in the config to denote the type of storage. Should be # overridden by subclasses. storage_name = None diff --git a/vdirsyncer/sync.py b/vdirsyncer/sync.py index 75b0570..6bcd9c5 100644 --- a/vdirsyncer/sync.py +++ b/vdirsyncer/sync.py @@ -77,7 +77,7 @@ class BothReadOnly(SyncError): ''' -class StorageSyncer(object): +class _StorageInfo(object): '''A wrapper class that holds prefetched items, the status and other things.''' def __init__(self, storage, status): @@ -85,59 +85,56 @@ class StorageSyncer(object): :param status: {ident: {'href': href, 'etag': etag}} ''' self.storage = storage - self.status = status - self.idents = None - def prepare_idents(self): + #: Represents the status as given. Must not be modified. + self.status = status + + #: Represents the current state of the storage and is modified as items + #: are uploaded and downloaded. Will be dumped into status. + self.new_status = None + + def prepare_new_status(self): href_to_status = dict((meta['href'], (ident, meta)) for ident, meta in self.status.items()) - prefetch = {} - self.idents = {} + prefetch = [] + self.new_status = {} def _store_props(ident, props): - new_props = self.idents.setdefault(ident, props) + new_props = self.new_status.setdefault(ident, props) if new_props is not props: raise IdentConflict(storage=self.storage, hrefs=[new_props['href'], props['href']]) - if ident in self.status: - # Necessary if item's href changes. - # Otherwise it's a no-op, like `a = a`. - self.status[ident]['href'] = props['href'] - for href, etag in self.storage.list(): - props = {'href': href, 'etag': etag} ident, old_meta = href_to_status.get(href, (None, None)) + meta = dict(old_meta) if old_meta is not None else {} + meta['href'] = href + meta['etag'] = etag assert etag is not None - if old_meta is None or etag != old_meta['etag']: + if meta != old_meta: # Either the item is completely new, or updated # In both cases we should prefetch - prefetch[href] = props + prefetch.append(href) else: - _store_props(ident, props) + _store_props(ident, meta) # Prefetch items for href, item, etag in (self.storage.get_multi(prefetch) if prefetch else ()): - props = prefetch[href] - - assert props['href'] == href - if props['etag'] != etag: - sync_logger.warning( - 'Etag of {!r} changed during sync from {!r} to {!r}' - .format(href, props['etag'], etag) - ) - props['etag'] = etag - props['item'] = item - props['ident'] = ident = item.ident - _store_props(ident, props) + meta = { + 'href': href, + 'etag': etag, + 'item': item, + 'hash': item.hash, + } + _store_props(item.ident, meta) def is_changed(self, ident): status = self.status.get(ident, None) - meta = self.idents[ident] + meta = self.new_status[ident] if status is None: # new item return True @@ -149,9 +146,51 @@ class StorageSyncer(object): return True else: # only etag changed - status['etag'] = meta['etag'] return False + def upload_full(self, item): + if self.storage.read_only: + sync_logger.warning('{} is read-only. Skipping update...' + .format(self.storage)) + href = etag = None + else: + href, etag = self.storage.upload(item) + + assert item.ident not in self.new_status + self.new_status[item.ident] = { + 'href': href, + 'hash': item.hash, + 'etag': etag + } + + def update_full(self, item): + '''Similar to Storage.update, but automatically takes care of ETags and + updating the status.''' + + if self.storage.read_only: + sync_logger.warning('{} is read-only. Skipping update...' + .format(self.storage)) + href = etag = None + else: + meta = self.new_status[item.ident] + href = meta['href'] + etag = self.storage.update(href, item, meta['etag']) + assert isinstance(etag, (bytes, str)) + + self.new_status[item.ident] = { + 'href': href, + 'hash': item.hash, + 'etag': etag + } + + def delete_full(self, ident): + meta = self.new_status.pop(ident) + if self.storage.read_only: + sync_logger.warning('{} is read-only, skipping deletion...' + .format(self.storage)) + else: + self.storage.delete(meta['href'], meta['etag']) + def _status_migrate(status): for ident in list(status): @@ -166,15 +205,22 @@ def _status_migrate(status): 'href': href_b, 'etag': etag_b, }) + elif len(value) == 2: + a, b = value + a.setdefault('hash', '') + b.setdefault('hash', '') def _compress_meta(meta): '''Make in-memory metadata suitable for disk storage by removing fetched item content''' + if set(meta) == {'href', 'etag', 'hash'}: + return meta + return { 'href': meta['href'], 'etag': meta['etag'], - 'hash': meta['item'].hash + 'hash': meta['hash'] } @@ -190,9 +236,11 @@ def sync(storage_a, storage_b, status, conflict_resolution=None, metadata about the two storages for detection of changes. Will be modified by the function and should be passed to it at the next sync. If this is the first sync, an empty dictionary should be provided. - :param conflict_resolution: Either 'a wins' or 'b wins'. If none is - provided, the sync function will raise - :py:exc:`SyncConflict`. + :param conflict_resolution: A function that, given two conflicting item + versions A and B, returns a new item with conflicts resolved. The UID + must be the same. The strings `"a wins"` and `"b wins"` are also + accepted to mean that that side's version will always be taken. If none + is provided, the sync function will raise :py:exc:`SyncConflict`. :param force_delete: When one storage got completely emptied between two syncs, :py:exc:`StorageEmpty` is raised for safety. Setting this parameter to ``True`` disables this safety @@ -201,24 +249,29 @@ def sync(storage_a, storage_b, status, conflict_resolution=None, if storage_a.read_only and storage_b.read_only: raise BothReadOnly() + if conflict_resolution == 'a wins': + conflict_resolution = lambda a, b: a + elif conflict_resolution == 'b wins': + conflict_resolution = lambda a, b: b + _status_migrate(status) - a_info = storage_a.syncer_class(storage_a, dict( + a_info = _StorageInfo(storage_a, dict( (ident, meta_a) for ident, (meta_a, meta_b) in status.items() )) - b_info = storage_b.syncer_class(storage_b, dict( + b_info = _StorageInfo(storage_b, dict( (ident, meta_b) for ident, (meta_a, meta_b) in status.items() )) - a_info.prepare_idents() - b_info.prepare_idents() + a_info.prepare_new_status() + b_info.prepare_new_status() if status and not force_delete: - if a_info.idents and not b_info.idents: + if a_info.new_status and not b_info.new_status: raise StorageEmpty(empty_storage=storage_b) - elif b_info.idents and not a_info.idents: + elif b_info.new_status and not a_info.new_status: raise StorageEmpty(empty_storage=storage_a) actions = list(_get_actions(a_info, b_info)) @@ -229,8 +282,11 @@ def sync(storage_a, storage_b, status, conflict_resolution=None, action(a_info, b_info, conflict_resolution) status.clear() - for ident in uniq(itertools.chain(a_info.status, b_info.status)): - status[ident] = a_info.status[ident], b_info.status[ident] + for ident in uniq(itertools.chain(a_info.new_status, b_info.new_status)): + status[ident] = ( + _compress_meta(a_info.new_status[ident]), + _compress_meta(b_info.new_status[ident]) + ) def _action_upload(ident, source, dest): @@ -238,81 +294,27 @@ def _action_upload(ident, source, dest): def inner(a, b, conflict_resolution): sync_logger.info(u'Copying (uploading) item {} to {}' .format(ident, dest.storage)) - source_meta = source.idents[ident] - - if dest.storage.read_only: - sync_logger.warning('{dest} is read-only. Skipping update...' - .format(dest=dest.storage)) - dest_href = dest_etag = None - else: - item = source_meta['item'] - dest_href, dest_etag = dest.storage.upload(item) - - source.status[ident] = _compress_meta(source_meta) - dest.status[ident] = { - 'href': dest_href, - 'hash': source_meta['item'].hash, - 'etag': dest_etag - } + item = source.new_status[ident]['item'] + dest.upload_full(item) return inner def _action_update(ident, source, dest): - def inner(a, b, conflict_resolution): sync_logger.info(u'Copying (updating) item {} to {}' .format(ident, dest.storage)) - source_meta = source.idents[ident] - - if dest.storage.read_only: - sync_logger.warning(u'{dest} is read-only. Skipping update...' - .format(dest=dest.storage)) - dest_href = dest_etag = None - else: - dest_meta = dest.idents[ident] - dest_href = dest_meta['href'] - dest_etag = dest.storage.update(dest_href, source_meta['item'], - dest_meta['etag']) - assert isinstance(dest_etag, (bytes, str)) - - source.status[ident] = _compress_meta(source_meta) - dest.status[ident] = { - 'href': dest_href, - 'hash': source_meta['item'].hash, - 'etag': dest_etag - } + source_meta = source.new_status[ident] + dest.update_full(source_meta['item']) return inner def _action_delete(ident, info): - storage = info.storage - idents = info.idents - def inner(a, b, conflict_resolution): - sync_logger.info(u'Deleting item {} from {}'.format(ident, storage)) - if storage.read_only: - sync_logger.warning('{} is read-only, skipping deletion...' - .format(storage)) - else: - meta = idents[ident] - etag = meta['etag'] - href = meta['href'] - storage.delete(href, etag) - - del a.status[ident] - del b.status[ident] - - return inner - - -def _action_delete_status(ident): - def inner(a, b, conflict_resolution): - sync_logger.info(u'Deleting status info for nonexisting item {}' - .format(ident)) - del a.status[ident] - del b.status[ident] + sync_logger.info(u'Deleting item {} from {}' + .format(ident, info.storage)) + info.delete_full(ident) return inner @@ -321,34 +323,32 @@ def _action_conflict_resolve(ident): def inner(a, b, conflict_resolution): sync_logger.info(u'Doing conflict resolution for item {}...' .format(ident)) - meta_a = a.idents[ident] - meta_b = b.idents[ident] + meta_a = a.new_status[ident] + meta_b = b.new_status[ident] if meta_a['item'].hash == meta_b['item'].hash: sync_logger.info(u'...same content on both sides.') - a.status[ident] = _compress_meta(meta_a) - b.status[ident] = _compress_meta(meta_b) elif conflict_resolution is None: raise SyncConflict(ident=ident, href_a=meta_a['href'], href_b=meta_b['href']) - elif conflict_resolution == 'a wins': - sync_logger.info(u'...{} wins.'.format(a.storage)) - _action_update(ident, a, b)(a, b, conflict_resolution) - elif conflict_resolution == 'b wins': - sync_logger.info(u'...{} wins.'.format(b.storage)) - _action_update(ident, b, a)(a, b, conflict_resolution) + elif callable(conflict_resolution): + new_item = conflict_resolution(meta_a['item'], meta_b['item']) + if new_item.hash != meta_a['item'].hash: + a.update_full(new_item) + if new_item.hash != meta_b['item'].hash: + b.update_full(new_item) else: - raise exceptions.UserError('Invalid conflict resolution mode: {}' + raise exceptions.UserError('Invalid conflict resolution mode: {!r}' .format(conflict_resolution)) return inner def _get_actions(a_info, b_info): - for ident in uniq(itertools.chain(a_info.idents, b_info.idents, + for ident in uniq(itertools.chain(a_info.new_status, b_info.new_status, a_info.status)): - a = ident in a_info.idents # item exists in a - b = ident in b_info.idents # item exists in b + a = ident in a_info.new_status # item exists in a + b = ident in b_info.new_status # item exists in b if a and b: a_changed = a_info.is_changed(ident) @@ -379,6 +379,3 @@ def _get_actions(a_info, b_info): else: # was deleted from a and not changed on b yield _action_delete(ident, b_info) - elif not a and not b: - # was deleted from a and b, clean up status - yield _action_delete_status(ident)