From df55926c395b09c3ff3f94144249ca4eee96964e Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Sat, 21 Feb 2015 13:44:59 +0100 Subject: [PATCH] Sync rewrite --- tests/test_sync.py | 4 +- vdirsyncer/sync.py | 286 +++++++++++++++++++++------------------------ 2 files changed, 138 insertions(+), 152 deletions(-) diff --git a/tests/test_sync.py b/tests/test_sync.py index bc2baf5..9410c0e 100644 --- a/tests/test_sync.py +++ b/tests/test_sync.py @@ -245,8 +245,8 @@ def test_both_readonly(): def test_readonly(): - a = MemoryStorage() - b = MemoryStorage() + a = MemoryStorage(instance_name='a') + b = MemoryStorage(instance_name='b') status = {} href_a, _ = a.upload(Item(u'UID:1')) href_b, _ = b.upload(Item(u'UID:2')) diff --git a/vdirsyncer/sync.py b/vdirsyncer/sync.py index d91f6ca..06cdf37 100644 --- a/vdirsyncer/sync.py +++ b/vdirsyncer/sync.py @@ -76,50 +76,53 @@ class BothReadOnly(SyncError): ''' -def _prefetch(storage, rv, hrefs): - if rv is None: - rv = {} - if not hrefs: - return rv +class StorageInfo(object): + '''A wrapper class that holds prefetched items, the status and other + things.''' + def __init__(self, storage, status): + ''' + :param status: {ident: (href, etag)} + ''' + self.storage = storage + self.status = status + self.idents = None - for href, item, etag in storage.get_multi(hrefs): - props = rv[href] - props['item'] = item - props['ident'] = item.ident - if props['etag'] != etag: - raise SyncError('Etag changed during sync.') + def prepare_idents(self, other_read_only): + href_to_status = dict((href, (ident, etag)) + for ident, (href, etag) + in iteritems(self.status)) - return rv + hrefs_to_download = [] + self.idents = {} + for href, etag in self.storage.list(): + if href in href_to_status: + ident, old_etag = href_to_status[href] + self.idents[ident] = { + 'etag': etag, + 'href': href, + 'ident': ident + } -def _prepare_hrefs(storage, other_storage, href_to_status): - hrefs = {} - download = [] - for href, etag in storage.list(): - props = hrefs[href] = {'etag': etag, 'href': href} - if href in href_to_status: - ident, old_etag = href_to_status[href] - props['ident'] = ident - if etag != old_etag and not other_storage.read_only: - download.append(href) - else: - download.append(href) + if etag != old_etag and not other_read_only: + hrefs_to_download.append(href) + else: + hrefs_to_download.append(href) - _prefetch(storage, hrefs, download) - return hrefs + # Prefetch items + for href, item, etag in (self.storage.get_multi(hrefs_to_download) if + hrefs_to_download else ()): + props = self.idents.setdefault(item.ident, {}) + props['item'] = item + props['ident'] = item.ident + props.setdefault('etag', etag) + props.setdefault('href', href) - -def _prepare_idents(storage, other_storage, href_to_status): - hrefs = _prepare_hrefs(storage, other_storage, href_to_status) - - rv = {} - for href, props in iteritems(hrefs): - other_props = rv.setdefault(props['ident'], props) - if other_props != props: - raise IdentConflict(storage=storage, - hrefs=[props['href'], other_props['href']]) - - return rv + if props['href'] != href: + raise IdentConflict(storage=self.storage, + hrefs=[props['href'], href]) + if props['etag'] != etag: + raise SyncError('Etag changed during sync.') def sync(storage_a, storage_b, status, conflict_resolution=None, @@ -145,146 +148,133 @@ def sync(storage_a, storage_b, status, conflict_resolution=None, if storage_a.read_only and storage_b.read_only: raise BothReadOnly() - a_href_to_status = dict( - (href_a, (ident, etag_a)) + a_info = StorageInfo(storage_a, dict( + (ident, (href_a, etag_a)) for ident, (href_a, etag_a, href_b, etag_b) in iteritems(status) - ) - b_href_to_status = dict( - (href_b, (ident, etag_b)) + )) + b_info = StorageInfo(storage_b, dict( + (ident, (href_b, etag_b)) for ident, (href_a, etag_a, href_b, etag_b) in iteritems(status) - ) - # ident => {'etag': etag, 'item': optional item, 'href': href} - a_idents = _prepare_idents(storage_a, storage_b, a_href_to_status) - b_idents = _prepare_idents(storage_b, storage_a, b_href_to_status) + )) - if bool(a_idents) != bool(b_idents) and status and not force_delete: + a_info.prepare_idents(storage_b.read_only) + b_info.prepare_idents(storage_a.read_only) + + if bool(a_info.idents) != bool(b_info.idents) \ + and status and not force_delete: raise StorageEmpty( - empty_storage=(storage_b if a_idents else storage_a)) + empty_storage=(storage_b if a_info.idents else storage_a)) - del a_href_to_status, b_href_to_status - - storages = { - 'a': (storage_a, a_idents), - 'b': (storage_b, b_idents) - } - - actions = list(_get_actions(storages, status)) + actions = list(_get_actions(a_info, b_info)) with storage_a.at_once(): with storage_b.at_once(): for action in actions: - action(storages, status, conflict_resolution) + action(a_info, b_info, conflict_resolution) + + status.clear() + for ident in uniq(itertools.chain(a_info.status, b_info.status)): + href_a, etag_a = a_info.status[ident] + href_b, etag_b = b_info.status[ident] + status[ident] = href_a, etag_a, href_b, etag_b -def _action_upload(ident, dest): - source = 'a' if dest == 'b' else 'b' +def _action_upload(ident, source, dest): - def inner(storages, status, conflict_resolution): - source_storage, source_idents = storages[source] - dest_storage, dest_idents = storages[dest] + def inner(a, b, conflict_resolution): sync_logger.info('Copying (uploading) item {} to {}' - .format(ident, dest_storage)) + .format(ident, dest.storage)) + source_meta = source.idents[ident] - source_meta = source_idents[ident] - source_href = source_meta['href'] - source_etag = source_meta['etag'] - source_status = (source_href, source_etag) - - dest_status = (None, None) - - if dest_storage.read_only: + if dest.storage.read_only: sync_logger.warning('{dest} is read-only. Skipping update...' - .format(dest=dest_storage)) + .format(dest=dest.storage)) + dest_href = dest_etag = None else: item = source_meta['item'] - dest_href, dest_etag = dest_storage.upload(item) - dest_status = (dest_href, dest_etag) + dest_href, dest_etag = dest.storage.upload(item) - status[ident] = source_status + dest_status if source == 'a' else \ - dest_status + source_status + source.status[ident] = source_meta['href'], source_meta['etag'] + dest.status[ident] = dest_href, dest_etag return inner -def _action_update(ident, dest): - source = 'a' if dest == 'b' else 'b' +def _action_update(ident, source, dest): - def inner(storages, status, conflict_resolution): - source_storage, source_idents = storages[source] - dest_storage, dest_idents = storages[dest] + def inner(a, b, conflict_resolution): sync_logger.info('Copying (updating) item {} to {}' - .format(ident, dest_storage)) + .format(ident, dest.storage)) + source_meta = source.idents[ident] - source_meta = source_idents[ident] - source_href = source_meta['href'] - source_etag = source_meta['etag'] - source_status = (source_href, source_etag) - - dest_meta = dest_idents[ident] - dest_href = dest_meta['href'] - dest_etag = dest_meta['etag'] - dest_status = (dest_href, dest_etag) - - if dest_storage.read_only: + if dest.storage.read_only: sync_logger.info('{dest} is read-only. Skipping update...' - .format(dest=dest_storage)) + .format(dest=dest.storage)) + dest_href = dest_etag = None else: - item = source_meta['item'] - dest_etag = dest_storage.update(dest_href, item, dest_etag) + dest_meta = dest.idents[ident] + dest_href = dest_meta['href'] + dest_etag = dest.storage.update(dest_href, source_meta['item'], + dest_meta['etag']) assert isinstance(dest_etag, (bytes, text_type)) - dest_status = (dest_href, dest_etag) - - status[ident] = source_status + dest_status if source == 'a' else \ - dest_status + source_status + source.status[ident] = source_meta['href'], source_meta['etag'] + dest.status[ident] = dest_href, dest_etag return inner -def _action_delete(ident, dest): - def inner(storages, status, conflict_resolution): - if dest is not None: - dest_storage, dest_idents = storages[dest] - sync_logger.info('Deleting item {} from {}' - .format(ident, dest_storage)) - if dest_storage.read_only: - sync_logger.warning('{dest} is read-only, skipping deletion...' - .format(dest=dest_storage)) - else: - dest_meta = dest_idents[ident] - dest_etag = dest_meta['etag'] - dest_href = dest_meta['href'] - dest_storage.delete(dest_href, dest_etag) - else: - sync_logger.info('Deleting status info for nonexisting item {}' - .format(ident)) +def _action_delete(ident, info): + storage = info.storage + idents = info.idents - del status[ident] + def inner(a, b, conflict_resolution): + sync_logger.info('Deleting item {} from {}'.format(ident, storage)) + if storage.read_only: + sync_logger.warning('{} is read-only, skipping deletion...' + .format(storage)) + else: + meta = idents[ident] + etag = meta['etag'] + href = meta['href'] + storage.delete(href, etag) + + del a.status[ident] + del b.status[ident] + + return inner + + +def _action_delete_status(ident): + def inner(a, b, conflict_resolution): + sync_logger.info('Deleting status info for nonexisting item {}' + .format(ident)) + del a.status[ident] + del b.status[ident] return inner def _action_conflict_resolve(ident): - def inner(storages, status, conflict_resolution): + def inner(a, b, conflict_resolution): sync_logger.info('Doing conflict resolution for item {}...' .format(ident)) - a_storage, a_idents = storages['a'] - b_storage, b_idents = storages['b'] - meta_a = a_idents[ident] - meta_b = b_idents[ident] - href_a = meta_a['href'] - href_b = meta_b['href'] + meta_a = a.idents[ident] + meta_b = b.idents[ident] + if meta_a['item'].raw == meta_b['item'].raw: sync_logger.info('...same content on both sides.') - status[ident] = href_a, meta_a['etag'], href_b, meta_b['etag'] + a.status[ident] = meta_a['href'], meta_a['etag'] + b.status[ident] = meta_b['href'], meta_b['etag'] elif conflict_resolution is None: - raise SyncConflict(ident=ident, href_a=href_a, href_b=href_b) + raise SyncConflict(ident=ident, href_a=meta_a['href'], + href_b=meta_b['href']) elif conflict_resolution == 'a wins': - sync_logger.info('...{} wins.'.format(a_storage)) - _action_update(ident, 'b')(storages, status, conflict_resolution) + sync_logger.info('...{} wins.'.format(a.storage)) + _action_update(ident, a, b)(a, b, conflict_resolution) elif conflict_resolution == 'b wins': - sync_logger.info('...{} wins.'.format(b_storage)) - _action_update(ident, 'a')(storages, status, conflict_resolution) + sync_logger.info('...{} wins.'.format(b.storage)) + _action_update(ident, b, a)(a, b, conflict_resolution) else: raise ValueError('Invalid conflict resolution mode: {}' .format(conflict_resolution)) @@ -292,20 +282,16 @@ def _action_conflict_resolve(ident): return inner -def _get_actions(storages, status): - storage_a, a_idents = storages['a'] - storage_b, b_idents = storages['b'] - - for ident in uniq(itertools.chain(a_idents, b_idents, status)): - a = a_idents.get(ident, None) - b = b_idents.get(ident, None) +def _get_actions(a_info, b_info): + for ident in uniq(itertools.chain(a_info.idents, b_info.idents, + a_info.status)): + a = a_info.idents.get(ident, None) + b = b_info.idents.get(ident, None) assert not a or a['etag'] is not None assert not b or b['etag'] is not None - try: - _, status_etag_a, _, status_etag_b = status[ident] - except KeyError: - status_etag_a = status_etag_b = None + _, status_etag_a = a_info.status.get(ident, (None, None)) + _, status_etag_b = b_info.status.get(ident, (None, None)) if a and b: if a['etag'] != status_etag_a and b['etag'] != status_etag_b: @@ -314,26 +300,26 @@ def _get_actions(storages, status): yield _action_conflict_resolve(ident) elif a['etag'] != status_etag_a: # item was only modified in a - yield _action_update(ident, 'b') + yield _action_update(ident, a_info, b_info) elif b['etag'] != status_etag_b: # item was only modified in b - yield _action_update(ident, 'a') + yield _action_update(ident, b_info, a_info) elif a and not b: if a['etag'] != status_etag_a: # was deleted from b but modified on a # OR: new item was created in a - yield _action_upload(ident, 'b') + yield _action_upload(ident, a_info, b_info) else: # was deleted from b and not modified on a - yield _action_delete(ident, 'a') + yield _action_delete(ident, a_info) elif not a and b: if b['etag'] != status_etag_b: # was deleted from a but modified on b # OR: new item was created in b - yield _action_upload(ident, 'a') + yield _action_upload(ident, b_info, a_info) else: # was deleted from a and not changed on b - yield _action_delete(ident, 'b') + yield _action_delete(ident, b_info) elif not a and not b: # was deleted from a and b, clean up status - yield _action_delete(ident, None) + yield _action_delete_status(ident)