Refactor sync() to use more classes

This commit is contained in:
Markus Unterwaditzer 2017-01-02 18:19:12 +01:00
parent 31963ca920
commit 8f00a6ae39

View file

@ -85,6 +85,34 @@ class PartialSync(SyncError):
storage = None storage = None
class _ItemMetadata:
href = None
_item = None
hash = None
etag = None
def __init__(self, **kwargs):
for k, v in kwargs.items():
assert hasattr(self, k)
setattr(self, k, v)
@property
def item(self):
return self._item
@item.setter
def item(self, item):
self._item = item
self.hash = item.hash
def to_status(self):
return {
'href': self.href,
'etag': self.etag,
'hash': self.hash
}
class _StorageInfo(object): class _StorageInfo(object):
'''A wrapper class that holds prefetched items, the status and other '''A wrapper class that holds prefetched items, the status and other
things.''' things.'''
@ -102,7 +130,7 @@ class _StorageInfo(object):
self.new_status = None self.new_status = None
def prepare_new_status(self): def prepare_new_status(self):
href_to_status = dict((meta['href'], (ident, meta)) href_to_status = dict((meta.href, (ident, meta))
for ident, meta for ident, meta
in self.status.items()) in self.status.items())
@ -113,32 +141,31 @@ class _StorageInfo(object):
new_props = self.new_status.setdefault(ident, props) new_props = self.new_status.setdefault(ident, props)
if new_props is not props: if new_props is not props:
raise IdentConflict(storage=self.storage, raise IdentConflict(storage=self.storage,
hrefs=[new_props['href'], hrefs=[new_props.href,
props['href']]) props.href])
for href, etag in self.storage.list(): for href, etag in self.storage.list():
ident, old_meta = href_to_status.get(href, (None, None)) ident, meta = href_to_status.get(href, (None, None))
meta = dict(old_meta) if old_meta is not None else {} if meta is None:
meta['href'] = href meta = _ItemMetadata()
meta['etag'] = etag
assert etag is not None if meta.href != href or meta.etag != etag:
if meta != old_meta:
# Either the item is completely new, or updated # Either the item is completely new, or updated
# In both cases we should prefetch # In both cases we should prefetch
prefetch.append(href) prefetch.append(href)
else: else:
meta.href = href
meta.etag = etag
_store_props(ident, meta) _store_props(ident, meta)
# Prefetch items # Prefetch items
for href, item, etag in (self.storage.get_multi(prefetch) for href, item, etag in (self.storage.get_multi(prefetch)
if prefetch else ()): if prefetch else ()):
meta = { _store_props(item.ident, _ItemMetadata(
'href': href, href=href,
'etag': etag, etag=etag,
'item': item, item=item
'hash': item.hash, ))
}
_store_props(item.ident, meta)
def is_changed(self, ident): def is_changed(self, ident):
status = self.status.get(ident, None) status = self.status.get(ident, None)
@ -147,9 +174,9 @@ class _StorageInfo(object):
if status is None: # new item if status is None: # new item
return True return True
if meta['etag'] != status['etag']: # etag changed if meta.etag != status.etag: # etag changed
old_hash = status.get('hash') old_hash = status.hash
if old_hash is None or meta['item'].hash != old_hash: if old_hash is None or meta.hash != old_hash:
# item actually changed # item actually changed
return True return True
else: else:
@ -157,7 +184,7 @@ class _StorageInfo(object):
return False return False
def _status_migrate(status): def _migrate_status(status):
for ident in list(status): for ident in list(status):
value = status[ident] value = status[ident]
if len(value) == 4: if len(value) == 4:
@ -174,21 +201,6 @@ def _status_migrate(status):
a, b = value a, b = value
a.setdefault('hash', '') a.setdefault('hash', '')
b.setdefault('hash', '') b.setdefault('hash', '')
def _compress_meta(meta):
'''Make in-memory metadata suitable for disk storage by removing fetched
item content'''
if set(meta) == {'href', 'etag', 'hash'}:
return meta
return {
'href': meta['href'],
'etag': meta['etag'],
'hash': meta['hash']
}
def sync(storage_a, storage_b, status, conflict_resolution=None, def sync(storage_a, storage_b, status, conflict_resolution=None,
force_delete=False, error_callback=None, partial_sync='revert'): force_delete=False, error_callback=None, partial_sync='revert'):
'''Synchronizes two storages. '''Synchronizes two storages.
@ -227,13 +239,13 @@ def sync(storage_a, storage_b, status, conflict_resolution=None,
elif conflict_resolution == 'b wins': elif conflict_resolution == 'b wins':
conflict_resolution = lambda a, b: b conflict_resolution = lambda a, b: b
_status_migrate(status) _migrate_status(status)
a_status = {} a_status = {}
b_status = {} b_status = {}
for ident, (meta_a, meta_b) in status.items(): for ident, (meta_a, meta_b) in status.items():
a_status[ident] = meta_a a_status[ident] = _ItemMetadata(**meta_a)
b_status[ident] = meta_b b_status[ident] = _ItemMetadata(**meta_b)
a_info = _StorageInfo(storage_a, a_status) a_info = _StorageInfo(storage_a, a_status)
b_info = _StorageInfo(storage_b, b_status) b_info = _StorageInfo(storage_b, b_status)
@ -263,8 +275,8 @@ def sync(storage_a, storage_b, status, conflict_resolution=None,
for ident in uniq(itertools.chain(a_info.new_status, for ident in uniq(itertools.chain(a_info.new_status,
b_info.new_status)): b_info.new_status)):
status[ident] = ( status[ident] = (
_compress_meta(a_info.new_status[ident]), a_info.new_status[ident].to_status(),
_compress_meta(b_info.new_status[ident]) b_info.new_status[ident].to_status()
) )
@ -317,11 +329,11 @@ class Upload(Action):
href, etag = self.dest.storage.upload(self.item) href, etag = self.dest.storage.upload(self.item)
assert self.ident not in self.dest.new_status assert self.ident not in self.dest.new_status
self.dest.new_status[self.ident] = { self.dest.new_status[self.ident] = _ItemMetadata(
'href': href, href=href,
'hash': self.item.hash, hash=self.item.hash,
'etag': etag etag=etag
} )
class Update(Action): class Update(Action):
@ -333,20 +345,16 @@ class Update(Action):
def _run_impl(self, a, b): def _run_impl(self, a, b):
if self.dest.storage.read_only: if self.dest.storage.read_only:
href = etag = None meta = _ItemMetadata(item=self.item)
else: else:
sync_logger.info(u'Copying (updating) item {} to {}' sync_logger.info(u'Copying (updating) item {} to {}'
.format(self.ident, self.dest.storage)) .format(self.ident, self.dest.storage))
meta = self.dest.new_status[self.ident] meta = self.dest.new_status[self.ident]
href = meta['href'] meta.etag = \
etag = self.dest.storage.update(href, self.item, meta['etag']) self.dest.storage.update(meta.href, self.item, meta.etag)
assert isinstance(etag, (bytes, str)) assert isinstance(meta.etag, (bytes, str))
self.dest.new_status[self.ident] = { self.dest.new_status[self.ident] = meta
'href': href,
'hash': self.item.hash,
'etag': etag
}
class Delete(Action): class Delete(Action):
@ -359,7 +367,7 @@ class Delete(Action):
if not self.dest.storage.read_only: if not self.dest.storage.read_only:
sync_logger.info(u'Deleting item {} from {}' sync_logger.info(u'Deleting item {} from {}'
.format(self.ident, self.dest.storage)) .format(self.ident, self.dest.storage))
self.dest.storage.delete(meta['href'], meta['etag']) self.dest.storage.delete(meta.href, meta.etag)
del self.dest.new_status[self.ident] del self.dest.new_status[self.ident]
@ -374,17 +382,17 @@ class ResolveConflict(Action):
meta_a = a.new_status[self.ident] meta_a = a.new_status[self.ident]
meta_b = b.new_status[self.ident] meta_b = b.new_status[self.ident]
if meta_a['item'].hash == meta_b['item'].hash: if meta_a.hash == meta_b.hash:
sync_logger.info(u'...same content on both sides.') sync_logger.info(u'...same content on both sides.')
elif conflict_resolution is None: elif conflict_resolution is None:
raise SyncConflict(ident=self.ident, href_a=meta_a['href'], raise SyncConflict(ident=self.ident, href_a=meta_a.href,
href_b=meta_b['href']) href_b=meta_b.href)
elif callable(conflict_resolution): elif callable(conflict_resolution):
new_item = conflict_resolution(meta_a['item'], meta_b['item']) new_item = conflict_resolution(meta_a.item, meta_b.item)
if new_item.hash != meta_a['item'].hash: if new_item.hash != meta_a.hash:
Update(new_item, a).run(a, b, conflict_resolution, Update(new_item, a).run(a, b, conflict_resolution,
partial_sync) partial_sync)
if new_item.hash != meta_b['item'].hash: if new_item.hash != meta_b.hash:
Update(new_item, b).run(a, b, conflict_resolution, Update(new_item, b).run(a, b, conflict_resolution,
partial_sync) partial_sync)
else: else:
@ -408,15 +416,15 @@ def _get_actions(a_info, b_info):
yield ResolveConflict(ident) yield ResolveConflict(ident)
elif a_changed and not b_changed: elif a_changed and not b_changed:
# item was only modified in a # item was only modified in a
yield Update(a['item'], b_info) yield Update(a.item, b_info)
elif not a_changed and b_changed: elif not a_changed and b_changed:
# item was only modified in b # item was only modified in b
yield Update(b['item'], a_info) yield Update(b.item, a_info)
elif a and not b: elif a and not b:
if a_info.is_changed(ident): if a_info.is_changed(ident):
# was deleted from b but modified on a # was deleted from b but modified on a
# OR: new item was created in a # OR: new item was created in a
yield Upload(a['item'], b_info) yield Upload(a.item, b_info)
else: else:
# was deleted from b and not modified on a # was deleted from b and not modified on a
yield Delete(ident, a_info) yield Delete(ident, a_info)
@ -424,7 +432,7 @@ def _get_actions(a_info, b_info):
if b_info.is_changed(ident): if b_info.is_changed(ident):
# was deleted from a but modified on b # was deleted from a but modified on b
# OR: new item was created in b # OR: new item was created in b
yield Upload(b['item'], a_info) yield Upload(b.item, a_info)
else: else:
# was deleted from a and not changed on b # was deleted from a and not changed on b
yield Delete(ident, b_info) yield Delete(ident, b_info)