mirror of
https://github.com/samsonjs/vdirsyncer.git
synced 2026-04-27 14:57:41 +00:00
commit
2c6ccd1dfa
2 changed files with 138 additions and 152 deletions
|
|
@ -245,8 +245,8 @@ def test_both_readonly():
|
||||||
|
|
||||||
|
|
||||||
def test_readonly():
|
def test_readonly():
|
||||||
a = MemoryStorage()
|
a = MemoryStorage(instance_name='a')
|
||||||
b = MemoryStorage()
|
b = MemoryStorage(instance_name='b')
|
||||||
status = {}
|
status = {}
|
||||||
href_a, _ = a.upload(Item(u'UID:1'))
|
href_a, _ = a.upload(Item(u'UID:1'))
|
||||||
href_b, _ = b.upload(Item(u'UID:2'))
|
href_b, _ = b.upload(Item(u'UID:2'))
|
||||||
|
|
|
||||||
|
|
@ -76,50 +76,53 @@ class BothReadOnly(SyncError):
|
||||||
'''
|
'''
|
||||||
|
|
||||||
|
|
||||||
def _prefetch(storage, rv, hrefs):
|
class StorageInfo(object):
|
||||||
if rv is None:
|
'''A wrapper class that holds prefetched items, the status and other
|
||||||
rv = {}
|
things.'''
|
||||||
if not hrefs:
|
def __init__(self, storage, status):
|
||||||
return rv
|
'''
|
||||||
|
:param status: {ident: (href, etag)}
|
||||||
|
'''
|
||||||
|
self.storage = storage
|
||||||
|
self.status = status
|
||||||
|
self.idents = None
|
||||||
|
|
||||||
for href, item, etag in storage.get_multi(hrefs):
|
def prepare_idents(self, other_read_only):
|
||||||
props = rv[href]
|
href_to_status = dict((href, (ident, etag))
|
||||||
props['item'] = item
|
for ident, (href, etag)
|
||||||
props['ident'] = item.ident
|
in iteritems(self.status))
|
||||||
if props['etag'] != etag:
|
|
||||||
raise SyncError('Etag changed during sync.')
|
|
||||||
|
|
||||||
return rv
|
hrefs_to_download = []
|
||||||
|
self.idents = {}
|
||||||
|
|
||||||
|
for href, etag in self.storage.list():
|
||||||
|
if href in href_to_status:
|
||||||
|
ident, old_etag = href_to_status[href]
|
||||||
|
self.idents[ident] = {
|
||||||
|
'etag': etag,
|
||||||
|
'href': href,
|
||||||
|
'ident': ident
|
||||||
|
}
|
||||||
|
|
||||||
def _prepare_hrefs(storage, other_storage, href_to_status):
|
if etag != old_etag and not other_read_only:
|
||||||
hrefs = {}
|
hrefs_to_download.append(href)
|
||||||
download = []
|
else:
|
||||||
for href, etag in storage.list():
|
hrefs_to_download.append(href)
|
||||||
props = hrefs[href] = {'etag': etag, 'href': href}
|
|
||||||
if href in href_to_status:
|
|
||||||
ident, old_etag = href_to_status[href]
|
|
||||||
props['ident'] = ident
|
|
||||||
if etag != old_etag and not other_storage.read_only:
|
|
||||||
download.append(href)
|
|
||||||
else:
|
|
||||||
download.append(href)
|
|
||||||
|
|
||||||
_prefetch(storage, hrefs, download)
|
# Prefetch items
|
||||||
return hrefs
|
for href, item, etag in (self.storage.get_multi(hrefs_to_download) if
|
||||||
|
hrefs_to_download else ()):
|
||||||
|
props = self.idents.setdefault(item.ident, {})
|
||||||
|
props['item'] = item
|
||||||
|
props['ident'] = item.ident
|
||||||
|
props.setdefault('etag', etag)
|
||||||
|
props.setdefault('href', href)
|
||||||
|
|
||||||
|
if props['href'] != href:
|
||||||
def _prepare_idents(storage, other_storage, href_to_status):
|
raise IdentConflict(storage=self.storage,
|
||||||
hrefs = _prepare_hrefs(storage, other_storage, href_to_status)
|
hrefs=[props['href'], href])
|
||||||
|
if props['etag'] != etag:
|
||||||
rv = {}
|
raise SyncError('Etag changed during sync.')
|
||||||
for href, props in iteritems(hrefs):
|
|
||||||
other_props = rv.setdefault(props['ident'], props)
|
|
||||||
if other_props != props:
|
|
||||||
raise IdentConflict(storage=storage,
|
|
||||||
hrefs=[props['href'], other_props['href']])
|
|
||||||
|
|
||||||
return rv
|
|
||||||
|
|
||||||
|
|
||||||
def sync(storage_a, storage_b, status, conflict_resolution=None,
|
def sync(storage_a, storage_b, status, conflict_resolution=None,
|
||||||
|
|
@ -145,146 +148,133 @@ def sync(storage_a, storage_b, status, conflict_resolution=None,
|
||||||
if storage_a.read_only and storage_b.read_only:
|
if storage_a.read_only and storage_b.read_only:
|
||||||
raise BothReadOnly()
|
raise BothReadOnly()
|
||||||
|
|
||||||
a_href_to_status = dict(
|
a_info = StorageInfo(storage_a, dict(
|
||||||
(href_a, (ident, etag_a))
|
(ident, (href_a, etag_a))
|
||||||
for ident, (href_a, etag_a, href_b, etag_b) in iteritems(status)
|
for ident, (href_a, etag_a, href_b, etag_b) in iteritems(status)
|
||||||
)
|
))
|
||||||
b_href_to_status = dict(
|
b_info = StorageInfo(storage_b, dict(
|
||||||
(href_b, (ident, etag_b))
|
(ident, (href_b, etag_b))
|
||||||
for ident, (href_a, etag_a, href_b, etag_b) in iteritems(status)
|
for ident, (href_a, etag_a, href_b, etag_b) in iteritems(status)
|
||||||
)
|
))
|
||||||
# ident => {'etag': etag, 'item': optional item, 'href': href}
|
|
||||||
a_idents = _prepare_idents(storage_a, storage_b, a_href_to_status)
|
|
||||||
b_idents = _prepare_idents(storage_b, storage_a, b_href_to_status)
|
|
||||||
|
|
||||||
if bool(a_idents) != bool(b_idents) and status and not force_delete:
|
a_info.prepare_idents(storage_b.read_only)
|
||||||
|
b_info.prepare_idents(storage_a.read_only)
|
||||||
|
|
||||||
|
if bool(a_info.idents) != bool(b_info.idents) \
|
||||||
|
and status and not force_delete:
|
||||||
raise StorageEmpty(
|
raise StorageEmpty(
|
||||||
empty_storage=(storage_b if a_idents else storage_a))
|
empty_storage=(storage_b if a_info.idents else storage_a))
|
||||||
|
|
||||||
del a_href_to_status, b_href_to_status
|
actions = list(_get_actions(a_info, b_info))
|
||||||
|
|
||||||
storages = {
|
|
||||||
'a': (storage_a, a_idents),
|
|
||||||
'b': (storage_b, b_idents)
|
|
||||||
}
|
|
||||||
|
|
||||||
actions = list(_get_actions(storages, status))
|
|
||||||
|
|
||||||
with storage_a.at_once():
|
with storage_a.at_once():
|
||||||
with storage_b.at_once():
|
with storage_b.at_once():
|
||||||
for action in actions:
|
for action in actions:
|
||||||
action(storages, status, conflict_resolution)
|
action(a_info, b_info, conflict_resolution)
|
||||||
|
|
||||||
|
status.clear()
|
||||||
|
for ident in uniq(itertools.chain(a_info.status, b_info.status)):
|
||||||
|
href_a, etag_a = a_info.status[ident]
|
||||||
|
href_b, etag_b = b_info.status[ident]
|
||||||
|
status[ident] = href_a, etag_a, href_b, etag_b
|
||||||
|
|
||||||
|
|
||||||
def _action_upload(ident, dest):
|
def _action_upload(ident, source, dest):
|
||||||
source = 'a' if dest == 'b' else 'b'
|
|
||||||
|
|
||||||
def inner(storages, status, conflict_resolution):
|
def inner(a, b, conflict_resolution):
|
||||||
source_storage, source_idents = storages[source]
|
|
||||||
dest_storage, dest_idents = storages[dest]
|
|
||||||
sync_logger.info('Copying (uploading) item {} to {}'
|
sync_logger.info('Copying (uploading) item {} to {}'
|
||||||
.format(ident, dest_storage))
|
.format(ident, dest.storage))
|
||||||
|
source_meta = source.idents[ident]
|
||||||
|
|
||||||
source_meta = source_idents[ident]
|
if dest.storage.read_only:
|
||||||
source_href = source_meta['href']
|
|
||||||
source_etag = source_meta['etag']
|
|
||||||
source_status = (source_href, source_etag)
|
|
||||||
|
|
||||||
dest_status = (None, None)
|
|
||||||
|
|
||||||
if dest_storage.read_only:
|
|
||||||
sync_logger.warning('{dest} is read-only. Skipping update...'
|
sync_logger.warning('{dest} is read-only. Skipping update...'
|
||||||
.format(dest=dest_storage))
|
.format(dest=dest.storage))
|
||||||
|
dest_href = dest_etag = None
|
||||||
else:
|
else:
|
||||||
item = source_meta['item']
|
item = source_meta['item']
|
||||||
dest_href, dest_etag = dest_storage.upload(item)
|
dest_href, dest_etag = dest.storage.upload(item)
|
||||||
dest_status = (dest_href, dest_etag)
|
|
||||||
|
|
||||||
status[ident] = source_status + dest_status if source == 'a' else \
|
source.status[ident] = source_meta['href'], source_meta['etag']
|
||||||
dest_status + source_status
|
dest.status[ident] = dest_href, dest_etag
|
||||||
|
|
||||||
return inner
|
return inner
|
||||||
|
|
||||||
|
|
||||||
def _action_update(ident, dest):
|
def _action_update(ident, source, dest):
|
||||||
source = 'a' if dest == 'b' else 'b'
|
|
||||||
|
|
||||||
def inner(storages, status, conflict_resolution):
|
def inner(a, b, conflict_resolution):
|
||||||
source_storage, source_idents = storages[source]
|
|
||||||
dest_storage, dest_idents = storages[dest]
|
|
||||||
sync_logger.info('Copying (updating) item {} to {}'
|
sync_logger.info('Copying (updating) item {} to {}'
|
||||||
.format(ident, dest_storage))
|
.format(ident, dest.storage))
|
||||||
|
source_meta = source.idents[ident]
|
||||||
|
|
||||||
source_meta = source_idents[ident]
|
if dest.storage.read_only:
|
||||||
source_href = source_meta['href']
|
|
||||||
source_etag = source_meta['etag']
|
|
||||||
source_status = (source_href, source_etag)
|
|
||||||
|
|
||||||
dest_meta = dest_idents[ident]
|
|
||||||
dest_href = dest_meta['href']
|
|
||||||
dest_etag = dest_meta['etag']
|
|
||||||
dest_status = (dest_href, dest_etag)
|
|
||||||
|
|
||||||
if dest_storage.read_only:
|
|
||||||
sync_logger.info('{dest} is read-only. Skipping update...'
|
sync_logger.info('{dest} is read-only. Skipping update...'
|
||||||
.format(dest=dest_storage))
|
.format(dest=dest.storage))
|
||||||
|
dest_href = dest_etag = None
|
||||||
else:
|
else:
|
||||||
item = source_meta['item']
|
dest_meta = dest.idents[ident]
|
||||||
dest_etag = dest_storage.update(dest_href, item, dest_etag)
|
dest_href = dest_meta['href']
|
||||||
|
dest_etag = dest.storage.update(dest_href, source_meta['item'],
|
||||||
|
dest_meta['etag'])
|
||||||
assert isinstance(dest_etag, (bytes, text_type))
|
assert isinstance(dest_etag, (bytes, text_type))
|
||||||
|
|
||||||
dest_status = (dest_href, dest_etag)
|
source.status[ident] = source_meta['href'], source_meta['etag']
|
||||||
|
dest.status[ident] = dest_href, dest_etag
|
||||||
status[ident] = source_status + dest_status if source == 'a' else \
|
|
||||||
dest_status + source_status
|
|
||||||
|
|
||||||
return inner
|
return inner
|
||||||
|
|
||||||
|
|
||||||
def _action_delete(ident, dest):
|
def _action_delete(ident, info):
|
||||||
def inner(storages, status, conflict_resolution):
|
storage = info.storage
|
||||||
if dest is not None:
|
idents = info.idents
|
||||||
dest_storage, dest_idents = storages[dest]
|
|
||||||
sync_logger.info('Deleting item {} from {}'
|
|
||||||
.format(ident, dest_storage))
|
|
||||||
if dest_storage.read_only:
|
|
||||||
sync_logger.warning('{dest} is read-only, skipping deletion...'
|
|
||||||
.format(dest=dest_storage))
|
|
||||||
else:
|
|
||||||
dest_meta = dest_idents[ident]
|
|
||||||
dest_etag = dest_meta['etag']
|
|
||||||
dest_href = dest_meta['href']
|
|
||||||
dest_storage.delete(dest_href, dest_etag)
|
|
||||||
else:
|
|
||||||
sync_logger.info('Deleting status info for nonexisting item {}'
|
|
||||||
.format(ident))
|
|
||||||
|
|
||||||
del status[ident]
|
def inner(a, b, conflict_resolution):
|
||||||
|
sync_logger.info('Deleting item {} from {}'.format(ident, storage))
|
||||||
|
if storage.read_only:
|
||||||
|
sync_logger.warning('{} is read-only, skipping deletion...'
|
||||||
|
.format(storage))
|
||||||
|
else:
|
||||||
|
meta = idents[ident]
|
||||||
|
etag = meta['etag']
|
||||||
|
href = meta['href']
|
||||||
|
storage.delete(href, etag)
|
||||||
|
|
||||||
|
del a.status[ident]
|
||||||
|
del b.status[ident]
|
||||||
|
|
||||||
|
return inner
|
||||||
|
|
||||||
|
|
||||||
|
def _action_delete_status(ident):
|
||||||
|
def inner(a, b, conflict_resolution):
|
||||||
|
sync_logger.info('Deleting status info for nonexisting item {}'
|
||||||
|
.format(ident))
|
||||||
|
del a.status[ident]
|
||||||
|
del b.status[ident]
|
||||||
|
|
||||||
return inner
|
return inner
|
||||||
|
|
||||||
|
|
||||||
def _action_conflict_resolve(ident):
|
def _action_conflict_resolve(ident):
|
||||||
def inner(storages, status, conflict_resolution):
|
def inner(a, b, conflict_resolution):
|
||||||
sync_logger.info('Doing conflict resolution for item {}...'
|
sync_logger.info('Doing conflict resolution for item {}...'
|
||||||
.format(ident))
|
.format(ident))
|
||||||
a_storage, a_idents = storages['a']
|
meta_a = a.idents[ident]
|
||||||
b_storage, b_idents = storages['b']
|
meta_b = b.idents[ident]
|
||||||
meta_a = a_idents[ident]
|
|
||||||
meta_b = b_idents[ident]
|
|
||||||
href_a = meta_a['href']
|
|
||||||
href_b = meta_b['href']
|
|
||||||
if meta_a['item'].raw == meta_b['item'].raw:
|
if meta_a['item'].raw == meta_b['item'].raw:
|
||||||
sync_logger.info('...same content on both sides.')
|
sync_logger.info('...same content on both sides.')
|
||||||
status[ident] = href_a, meta_a['etag'], href_b, meta_b['etag']
|
a.status[ident] = meta_a['href'], meta_a['etag']
|
||||||
|
b.status[ident] = meta_b['href'], meta_b['etag']
|
||||||
elif conflict_resolution is None:
|
elif conflict_resolution is None:
|
||||||
raise SyncConflict(ident=ident, href_a=href_a, href_b=href_b)
|
raise SyncConflict(ident=ident, href_a=meta_a['href'],
|
||||||
|
href_b=meta_b['href'])
|
||||||
elif conflict_resolution == 'a wins':
|
elif conflict_resolution == 'a wins':
|
||||||
sync_logger.info('...{} wins.'.format(a_storage))
|
sync_logger.info('...{} wins.'.format(a.storage))
|
||||||
_action_update(ident, 'b')(storages, status, conflict_resolution)
|
_action_update(ident, a, b)(a, b, conflict_resolution)
|
||||||
elif conflict_resolution == 'b wins':
|
elif conflict_resolution == 'b wins':
|
||||||
sync_logger.info('...{} wins.'.format(b_storage))
|
sync_logger.info('...{} wins.'.format(b.storage))
|
||||||
_action_update(ident, 'a')(storages, status, conflict_resolution)
|
_action_update(ident, b, a)(a, b, conflict_resolution)
|
||||||
else:
|
else:
|
||||||
raise ValueError('Invalid conflict resolution mode: {}'
|
raise ValueError('Invalid conflict resolution mode: {}'
|
||||||
.format(conflict_resolution))
|
.format(conflict_resolution))
|
||||||
|
|
@ -292,20 +282,16 @@ def _action_conflict_resolve(ident):
|
||||||
return inner
|
return inner
|
||||||
|
|
||||||
|
|
||||||
def _get_actions(storages, status):
|
def _get_actions(a_info, b_info):
|
||||||
storage_a, a_idents = storages['a']
|
for ident in uniq(itertools.chain(a_info.idents, b_info.idents,
|
||||||
storage_b, b_idents = storages['b']
|
a_info.status)):
|
||||||
|
a = a_info.idents.get(ident, None)
|
||||||
for ident in uniq(itertools.chain(a_idents, b_idents, status)):
|
b = b_info.idents.get(ident, None)
|
||||||
a = a_idents.get(ident, None)
|
|
||||||
b = b_idents.get(ident, None)
|
|
||||||
assert not a or a['etag'] is not None
|
assert not a or a['etag'] is not None
|
||||||
assert not b or b['etag'] is not None
|
assert not b or b['etag'] is not None
|
||||||
|
|
||||||
try:
|
_, status_etag_a = a_info.status.get(ident, (None, None))
|
||||||
_, status_etag_a, _, status_etag_b = status[ident]
|
_, status_etag_b = b_info.status.get(ident, (None, None))
|
||||||
except KeyError:
|
|
||||||
status_etag_a = status_etag_b = None
|
|
||||||
|
|
||||||
if a and b:
|
if a and b:
|
||||||
if a['etag'] != status_etag_a and b['etag'] != status_etag_b:
|
if a['etag'] != status_etag_a and b['etag'] != status_etag_b:
|
||||||
|
|
@ -314,26 +300,26 @@ def _get_actions(storages, status):
|
||||||
yield _action_conflict_resolve(ident)
|
yield _action_conflict_resolve(ident)
|
||||||
elif a['etag'] != status_etag_a:
|
elif a['etag'] != status_etag_a:
|
||||||
# item was only modified in a
|
# item was only modified in a
|
||||||
yield _action_update(ident, 'b')
|
yield _action_update(ident, a_info, b_info)
|
||||||
elif b['etag'] != status_etag_b:
|
elif b['etag'] != status_etag_b:
|
||||||
# item was only modified in b
|
# item was only modified in b
|
||||||
yield _action_update(ident, 'a')
|
yield _action_update(ident, b_info, a_info)
|
||||||
elif a and not b:
|
elif a and not b:
|
||||||
if a['etag'] != status_etag_a:
|
if a['etag'] != status_etag_a:
|
||||||
# was deleted from b but modified on a
|
# was deleted from b but modified on a
|
||||||
# OR: new item was created in a
|
# OR: new item was created in a
|
||||||
yield _action_upload(ident, 'b')
|
yield _action_upload(ident, a_info, b_info)
|
||||||
else:
|
else:
|
||||||
# was deleted from b and not modified on a
|
# was deleted from b and not modified on a
|
||||||
yield _action_delete(ident, 'a')
|
yield _action_delete(ident, a_info)
|
||||||
elif not a and b:
|
elif not a and b:
|
||||||
if b['etag'] != status_etag_b:
|
if b['etag'] != status_etag_b:
|
||||||
# was deleted from a but modified on b
|
# was deleted from a but modified on b
|
||||||
# OR: new item was created in b
|
# OR: new item was created in b
|
||||||
yield _action_upload(ident, 'a')
|
yield _action_upload(ident, b_info, a_info)
|
||||||
else:
|
else:
|
||||||
# was deleted from a and not changed on b
|
# was deleted from a and not changed on b
|
||||||
yield _action_delete(ident, 'b')
|
yield _action_delete(ident, b_info)
|
||||||
elif not a and not b:
|
elif not a and not b:
|
||||||
# was deleted from a and b, clean up status
|
# was deleted from a and b, clean up status
|
||||||
yield _action_delete(ident, None)
|
yield _action_delete_status(ident)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue