Some sync refactoring

- Use only one dict for storing all temporary state of a storage
- Rename functions to be internal
This commit is contained in:
Markus Unterwaditzer 2014-08-30 16:16:32 +02:00
parent 69505f4c61
commit 2bbc92534d

View file

@ -60,11 +60,11 @@ class BothReadOnly(SyncError):
''' '''
def prepare_list(storage, href_to_status): def _prepare_idents(storage, href_to_status):
rv = {} rv = {}
download = [] download = []
for href, etag in storage.list(): for href, etag in storage.list():
props = rv[href] = {'etag': etag} props = rv[href] = {'etag': etag, 'href': href}
if href in href_to_status: if href in href_to_status:
ident, old_etag = href_to_status[href] ident, old_etag = href_to_status[href]
props['ident'] = ident props['ident'] = ident
@ -73,18 +73,18 @@ def prepare_list(storage, href_to_status):
else: else:
download.append(href) download.append(href)
prefetch(storage, rv, download) _prefetch(storage, rv, download)
return rv return dict((x['ident'], x) for href, x in iteritems(rv))
def prefetch(storage, rv, hrefs): def _prefetch(storage, rv, hrefs):
if rv is None: if rv is None:
rv = {} rv = {}
if not hrefs: if not hrefs:
return rv return rv
for href, item, etag in storage.get_multi(hrefs): for href, item, etag in storage.get_multi(hrefs):
props = rv.setdefault(href, {'etag': etag}) props = rv[href]
props['item'] = item props['item'] = item
props['ident'] = item.ident props['ident'] = item.ident
if props['etag'] != etag: if props['etag'] != etag:
@ -124,39 +124,39 @@ def sync(storage_a, storage_b, status, conflict_resolution=None,
(href_b, (ident, etag_b)) (href_b, (ident, etag_b))
for ident, (href_a, etag_a, href_b, etag_b) in iteritems(status) for ident, (href_a, etag_a, href_b, etag_b) in iteritems(status)
) )
# href => {'etag': etag, 'item': optional item, 'ident': ident} # ident => {'etag': etag, 'item': optional item, 'href': href}
list_a = prepare_list(storage_a, a_href_to_status) a_idents = _prepare_idents(storage_a, a_href_to_status)
list_b = prepare_list(storage_b, b_href_to_status) b_idents = _prepare_idents(storage_b, b_href_to_status)
if bool(list_a) != bool(list_b) and status and not force_delete: if bool(a_idents) != bool(b_idents) and status and not force_delete:
raise StorageEmpty(empty_storage=(storage_b if list_a else storage_a)) raise StorageEmpty(
empty_storage=(storage_b if a_idents else storage_a))
a_ident_to_href = dict((x['ident'], href) for href, x in iteritems(list_a))
b_ident_to_href = dict((x['ident'], href) for href, x in iteritems(list_b))
del a_href_to_status, b_href_to_status del a_href_to_status, b_href_to_status
storages = { storages = {
'a': (storage_a, list_a, a_ident_to_href), 'a': (storage_a, a_idents),
'b': (storage_b, list_b, b_ident_to_href) 'b': (storage_b, b_idents)
} }
actions = list(get_actions(storages, status)) actions = list(_get_actions(storages, status))
for action in actions: for action in actions:
action(storages, status, conflict_resolution) action(storages, status, conflict_resolution)
def action_upload(ident, dest): def _action_upload(ident, dest):
source = 'a' if dest == 'b' else 'b' source = 'a' if dest == 'b' else 'b'
def inner(storages, status, conflict_resolution): def inner(storages, status, conflict_resolution):
source_storage, source_list, source_ident_to_href = storages[source] source_storage, source_idents = storages[source]
dest_storage, dest_list, dest_ident_to_href = storages[dest] dest_storage, dest_idents = storages[dest]
sync_logger.info('Copying (uploading) item {} to {}' sync_logger.info('Copying (uploading) item {} to {}'
.format(ident, dest_storage)) .format(ident, dest_storage))
source_href = source_ident_to_href[ident] source_meta = source_idents[ident]
source_etag = source_list[source_href]['etag'] source_href = source_meta['href']
source_etag = source_meta['etag']
source_status = (source_href, source_etag) source_status = (source_href, source_etag)
dest_status = (None, None) dest_status = (None, None)
@ -165,7 +165,7 @@ def action_upload(ident, dest):
sync_logger.warning('{dest} is read-only. Skipping update...' sync_logger.warning('{dest} is read-only. Skipping update...'
.format(dest=dest_storage)) .format(dest=dest_storage))
else: else:
item = source_list[source_href]['item'] item = source_meta['item']
dest_href, dest_etag = dest_storage.upload(item) dest_href, dest_etag = dest_storage.upload(item)
dest_status = (dest_href, dest_etag) dest_status = (dest_href, dest_etag)
@ -175,28 +175,30 @@ def action_upload(ident, dest):
return inner return inner
def action_update(ident, dest): def _action_update(ident, dest):
source = 'a' if dest == 'b' else 'b' source = 'a' if dest == 'b' else 'b'
def inner(storages, status, conflict_resolution): def inner(storages, status, conflict_resolution):
source_storage, source_list, source_ident_to_href = storages[source] source_storage, source_idents = storages[source]
dest_storage, dest_list, dest_ident_to_href = storages[dest] dest_storage, dest_idents = storages[dest]
sync_logger.info('Copying (updating) item {} to {}' sync_logger.info('Copying (updating) item {} to {}'
.format(ident, dest_storage)) .format(ident, dest_storage))
source_href = source_ident_to_href[ident] source_meta = source_idents[ident]
source_etag = source_list[source_href]['etag'] source_href = source_meta['href']
source_etag = source_meta['etag']
source_status = (source_href, source_etag) source_status = (source_href, source_etag)
dest_href = dest_ident_to_href[ident] dest_meta = dest_idents[ident]
dest_etag = dest_list[dest_href]['etag'] dest_href = dest_meta['href']
dest_etag = dest_meta['etag']
dest_status = (dest_href, dest_etag) dest_status = (dest_href, dest_etag)
if dest_storage.read_only: if dest_storage.read_only:
sync_logger.info('{dest} is read-only. Skipping update...' sync_logger.info('{dest} is read-only. Skipping update...'
.format(dest=dest_storage)) .format(dest=dest_storage))
else: else:
item = source_list[source_href]['item'] item = source_meta['item']
dest_etag = dest_storage.update(dest_href, item, dest_etag) dest_etag = dest_storage.update(dest_href, item, dest_etag)
assert isinstance(dest_etag, (bytes, text_type)) assert isinstance(dest_etag, (bytes, text_type))
@ -208,18 +210,19 @@ def action_update(ident, dest):
return inner return inner
def action_delete(ident, dest): def _action_delete(ident, dest):
def inner(storages, status, conflict_resolution): def inner(storages, status, conflict_resolution):
if dest is not None: if dest is not None:
dest_storage, dest_list, dest_ident_to_href = storages[dest] dest_storage, dest_idents = storages[dest]
sync_logger.info('Deleting item {} from {}' sync_logger.info('Deleting item {} from {}'
.format(ident, dest_storage)) .format(ident, dest_storage))
if dest_storage.read_only: if dest_storage.read_only:
sync_logger.warning('{dest} is read-only, skipping deletion...' sync_logger.warning('{dest} is read-only, skipping deletion...'
.format(dest=dest_storage)) .format(dest=dest_storage))
else: else:
dest_href = dest_ident_to_href[ident] dest_meta = dest_idents[ident]
dest_etag = dest_list[dest_href]['etag'] dest_etag = dest_meta['etag']
dest_href = dest_meta['href']
dest_storage.delete(dest_href, dest_etag) dest_storage.delete(dest_href, dest_etag)
else: else:
sync_logger.info('Deleting status info for nonexisting item {}' sync_logger.info('Deleting status info for nonexisting item {}'
@ -230,16 +233,16 @@ def action_delete(ident, dest):
return inner return inner
def action_conflict_resolve(ident): def _action_conflict_resolve(ident):
def inner(storages, status, conflict_resolution): def inner(storages, status, conflict_resolution):
sync_logger.info('Doing conflict resolution for item {}...' sync_logger.info('Doing conflict resolution for item {}...'
.format(ident)) .format(ident))
a_storage, list_a, a_ident_to_href = storages['a'] a_storage, a_idents = storages['a']
b_storage, list_b, b_ident_to_href = storages['b'] b_storage, b_idents = storages['b']
href_a = a_ident_to_href[ident] meta_a = a_idents[ident]
href_b = b_ident_to_href[ident] meta_b = b_idents[ident]
meta_a = list_a[href_a] href_a = meta_a['href']
meta_b = list_b[href_b] href_b = meta_b['href']
if meta_a['item'].raw == meta_b['item'].raw: if meta_a['item'].raw == meta_b['item'].raw:
sync_logger.info('...same content on both sides.') sync_logger.info('...same content on both sides.')
status[ident] = href_a, meta_a['etag'], href_b, meta_b['etag'] status[ident] = href_a, meta_a['etag'], href_b, meta_b['etag']
@ -247,10 +250,10 @@ def action_conflict_resolve(ident):
raise SyncConflict(ident=ident, href_a=href_a, href_b=href_b) raise SyncConflict(ident=ident, href_a=href_a, href_b=href_b)
elif conflict_resolution == 'a wins': elif conflict_resolution == 'a wins':
sync_logger.info('...{} wins.'.format(a_storage)) sync_logger.info('...{} wins.'.format(a_storage))
action_update(ident, 'b')(storages, status, conflict_resolution) _action_update(ident, 'b')(storages, status, conflict_resolution)
elif conflict_resolution == 'b wins': elif conflict_resolution == 'b wins':
sync_logger.info('...{} wins.'.format(b_storage)) sync_logger.info('...{} wins.'.format(b_storage))
action_update(ident, 'a')(storages, status, conflict_resolution) _action_update(ident, 'a')(storages, status, conflict_resolution)
else: else:
raise ValueError('Invalid conflict resolution mode: {}' raise ValueError('Invalid conflict resolution mode: {}'
.format(conflict_resolution)) .format(conflict_resolution))
@ -258,39 +261,37 @@ def action_conflict_resolve(ident):
return inner return inner
def get_actions(storages, status): def _get_actions(storages, status):
storage_a, list_a, a_ident_to_href = storages['a'] storage_a, a_idents = storages['a']
storage_b, list_b, b_ident_to_href = storages['b'] storage_b, b_idents = storages['b']
handled = set() handled = set()
for ident in itertools.chain(a_ident_to_href, b_ident_to_href, status): for ident in itertools.chain(a_idents, b_idents, status):
if ident in handled: if ident in handled:
continue continue
handled.add(ident) handled.add(ident)
href_a = a_ident_to_href.get(ident, None) a = a_idents.get(ident, None)
href_b = b_ident_to_href.get(ident, None) b = b_idents.get(ident, None)
a = list_a.get(href_a, None)
b = list_b.get(href_b, None)
if ident not in status: if ident not in status:
if a and b: # missing status if a and b: # missing status
yield action_conflict_resolve(ident) yield _action_conflict_resolve(ident)
elif a and not b: # new item was created in a elif a and not b: # new item was created in a
yield action_upload(ident, 'b') yield _action_upload(ident, 'b')
elif not a and b: # new item was created in b elif not a and b: # new item was created in b
yield action_upload(ident, 'a') yield _action_upload(ident, 'a')
else: else:
_, status_etag_a, _, status_etag_b = status[ident] _, status_etag_a, _, status_etag_b = status[ident]
if a and b: if a and b:
if a['etag'] != status_etag_a and b['etag'] != status_etag_b: if a['etag'] != status_etag_a and b['etag'] != status_etag_b:
yield action_conflict_resolve(ident) yield _action_conflict_resolve(ident)
elif a['etag'] != status_etag_a: # item was updated in a elif a['etag'] != status_etag_a: # item was updated in a
yield action_update(ident, 'b') yield _action_update(ident, 'b')
elif b['etag'] != status_etag_b: # item was updated in b elif b['etag'] != status_etag_b: # item was updated in b
yield action_update(ident, 'a') yield _action_update(ident, 'a')
elif a and not b: # was deleted from b elif a and not b: # was deleted from b
yield action_delete(ident, 'a') yield _action_delete(ident, 'a')
elif not a and b: # was deleted from a elif not a and b: # was deleted from a
yield action_delete(ident, 'b') yield _action_delete(ident, 'b')
elif not a and not b: # was deleted from a and b elif not a and not b: # was deleted from a and b
yield action_delete(ident, None) yield _action_delete(ident, None)