Refactor of status handling in sync (#505)

- Using `info.idents` as new status, this saves a few operations where
  no storage actions have to be taken, but the status has to be updated.

- Rename StorageSyncer to _StorageInfo and make it a private API again.

- Ability to pass custom functions for conflict resolution. This part is
  a preparation for #127.
This commit is contained in:
Markus Unterwaditzer 2016-09-18 15:46:56 +02:00 committed by GitHub
parent 99e7ff6d37
commit cfbc7ec71b
3 changed files with 150 additions and 124 deletions

View file

@ -125,6 +125,23 @@ def test_deletion():
assert not a.has('1.a') and not b.has('1.b') assert not a.has('1.a') and not b.has('1.b')
def test_insert_hash():
a = MemoryStorage()
b = MemoryStorage()
status = {}
item = Item('UID:1')
href, etag = a.upload(item)
sync(a, b, status)
for d in status['1']:
del d['hash']
a.update(href, Item('UID:1\nHAHA:YES'), etag)
sync(a, b, status)
assert 'hash' in status['1'][0] and 'hash' in status['1'][1]
def test_already_synced(): def test_already_synced():
a = MemoryStorage(fileext='.a') a = MemoryStorage(fileext='.a')
b = MemoryStorage(fileext='.b') b = MemoryStorage(fileext='.b')
@ -134,9 +151,11 @@ def test_already_synced():
status = { status = {
'1': ({ '1': ({
'href': '1.a', 'href': '1.a',
'hash': item.hash,
'etag': a.get('1.a')[1] 'etag': a.get('1.a')[1]
}, { }, {
'href': '1.b', 'href': '1.b',
'hash': item.hash,
'etag': b.get('1.b')[1] 'etag': b.get('1.b')[1]
}) })
} }
@ -413,7 +432,7 @@ class SyncMachine(RuleBasedStateMachine):
s.read_only = read_only s.read_only = read_only
if flaky_etags: if flaky_etags:
def get(href): def get(href):
_, item = s.items[href] old_etag, item = s.items[href]
etag = _random_string() etag = _random_string()
s.items[href] = etag, item s.items[href] = etag, item
return item, etag return item, etag
@ -444,6 +463,15 @@ class SyncMachine(RuleBasedStateMachine):
storage.items.pop(href, None) storage.items.pop(href, None)
return storage return storage
@rule(target=Status, status=Status, delete_from_b=st.booleans())
def remove_hash_from_status(self, status, delete_from_b):
for a, b in status.values():
if delete_from_b:
a = b
assume('hash' in a)
del a['hash']
return status
@rule( @rule(
target=Status, status=Status, target=Status, status=Status,
a=Storage, b=Storage, a=Storage, b=Storage,
@ -451,6 +479,7 @@ class SyncMachine(RuleBasedStateMachine):
conflict_resolution=st.one_of((st.just('a wins'), st.just('b wins'))) conflict_resolution=st.one_of((st.just('a wins'), st.just('b wins')))
) )
def sync(self, status, a, b, force_delete, conflict_resolution): def sync(self, status, a, b, force_delete, conflict_resolution):
assume(a is not b)
old_items_a = self._get_items(a) old_items_a = self._get_items(a)
old_items_b = self._get_items(b) old_items_b = self._get_items(b)
@ -478,6 +507,8 @@ class SyncMachine(RuleBasedStateMachine):
assert items_a == old_items_a or not a.read_only assert items_a == old_items_a or not a.read_only
assert items_b == old_items_b or not b.read_only assert items_b == old_items_b or not b.read_only
assert set(a.items) | set(b.items) == set(status)
return status return status

View file

@ -3,7 +3,7 @@
import contextlib import contextlib
import functools import functools
from .. import exceptions, sync from .. import exceptions
from ..utils import uniq from ..utils import uniq
from ..utils.vobject import Item # noqa from ..utils.vobject import Item # noqa
@ -47,8 +47,6 @@ class Storage(metaclass=StorageMeta):
fileext = '.txt' fileext = '.txt'
syncer_class = sync.StorageSyncer
# The string used in the config to denote the type of storage. Should be # The string used in the config to denote the type of storage. Should be
# overridden by subclasses. # overridden by subclasses.
storage_name = None storage_name = None

View file

@ -77,7 +77,7 @@ class BothReadOnly(SyncError):
''' '''
class StorageSyncer(object): class _StorageInfo(object):
'''A wrapper class that holds prefetched items, the status and other '''A wrapper class that holds prefetched items, the status and other
things.''' things.'''
def __init__(self, storage, status): def __init__(self, storage, status):
@ -85,59 +85,56 @@ class StorageSyncer(object):
:param status: {ident: {'href': href, 'etag': etag}} :param status: {ident: {'href': href, 'etag': etag}}
''' '''
self.storage = storage self.storage = storage
self.status = status
self.idents = None
def prepare_idents(self): #: Represents the status as given. Must not be modified.
self.status = status
#: Represents the current state of the storage and is modified as items
#: are uploaded and downloaded. Will be dumped into status.
self.new_status = None
def prepare_new_status(self):
href_to_status = dict((meta['href'], (ident, meta)) href_to_status = dict((meta['href'], (ident, meta))
for ident, meta for ident, meta
in self.status.items()) in self.status.items())
prefetch = {} prefetch = []
self.idents = {} self.new_status = {}
def _store_props(ident, props): def _store_props(ident, props):
new_props = self.idents.setdefault(ident, props) new_props = self.new_status.setdefault(ident, props)
if new_props is not props: if new_props is not props:
raise IdentConflict(storage=self.storage, raise IdentConflict(storage=self.storage,
hrefs=[new_props['href'], hrefs=[new_props['href'],
props['href']]) props['href']])
if ident in self.status:
# Necessary if item's href changes.
# Otherwise it's a no-op, like `a = a`.
self.status[ident]['href'] = props['href']
for href, etag in self.storage.list(): for href, etag in self.storage.list():
props = {'href': href, 'etag': etag}
ident, old_meta = href_to_status.get(href, (None, None)) ident, old_meta = href_to_status.get(href, (None, None))
meta = dict(old_meta) if old_meta is not None else {}
meta['href'] = href
meta['etag'] = etag
assert etag is not None assert etag is not None
if old_meta is None or etag != old_meta['etag']: if meta != old_meta:
# Either the item is completely new, or updated # Either the item is completely new, or updated
# In both cases we should prefetch # In both cases we should prefetch
prefetch[href] = props prefetch.append(href)
else: else:
_store_props(ident, props) _store_props(ident, meta)
# Prefetch items # Prefetch items
for href, item, etag in (self.storage.get_multi(prefetch) for href, item, etag in (self.storage.get_multi(prefetch)
if prefetch else ()): if prefetch else ()):
props = prefetch[href] meta = {
'href': href,
assert props['href'] == href 'etag': etag,
if props['etag'] != etag: 'item': item,
sync_logger.warning( 'hash': item.hash,
'Etag of {!r} changed during sync from {!r} to {!r}' }
.format(href, props['etag'], etag) _store_props(item.ident, meta)
)
props['etag'] = etag
props['item'] = item
props['ident'] = ident = item.ident
_store_props(ident, props)
def is_changed(self, ident): def is_changed(self, ident):
status = self.status.get(ident, None) status = self.status.get(ident, None)
meta = self.idents[ident] meta = self.new_status[ident]
if status is None: # new item if status is None: # new item
return True return True
@ -149,9 +146,51 @@ class StorageSyncer(object):
return True return True
else: else:
# only etag changed # only etag changed
status['etag'] = meta['etag']
return False return False
def upload_full(self, item):
if self.storage.read_only:
sync_logger.warning('{} is read-only. Skipping update...'
.format(self.storage))
href = etag = None
else:
href, etag = self.storage.upload(item)
assert item.ident not in self.new_status
self.new_status[item.ident] = {
'href': href,
'hash': item.hash,
'etag': etag
}
def update_full(self, item):
'''Similar to Storage.update, but automatically takes care of ETags and
updating the status.'''
if self.storage.read_only:
sync_logger.warning('{} is read-only. Skipping update...'
.format(self.storage))
href = etag = None
else:
meta = self.new_status[item.ident]
href = meta['href']
etag = self.storage.update(href, item, meta['etag'])
assert isinstance(etag, (bytes, str))
self.new_status[item.ident] = {
'href': href,
'hash': item.hash,
'etag': etag
}
def delete_full(self, ident):
meta = self.new_status.pop(ident)
if self.storage.read_only:
sync_logger.warning('{} is read-only, skipping deletion...'
.format(self.storage))
else:
self.storage.delete(meta['href'], meta['etag'])
def _status_migrate(status): def _status_migrate(status):
for ident in list(status): for ident in list(status):
@ -166,15 +205,22 @@ def _status_migrate(status):
'href': href_b, 'href': href_b,
'etag': etag_b, 'etag': etag_b,
}) })
elif len(value) == 2:
a, b = value
a.setdefault('hash', '')
b.setdefault('hash', '')
def _compress_meta(meta): def _compress_meta(meta):
'''Make in-memory metadata suitable for disk storage by removing fetched '''Make in-memory metadata suitable for disk storage by removing fetched
item content''' item content'''
if set(meta) == {'href', 'etag', 'hash'}:
return meta
return { return {
'href': meta['href'], 'href': meta['href'],
'etag': meta['etag'], 'etag': meta['etag'],
'hash': meta['item'].hash 'hash': meta['hash']
} }
@ -190,9 +236,11 @@ def sync(storage_a, storage_b, status, conflict_resolution=None,
metadata about the two storages for detection of changes. Will be metadata about the two storages for detection of changes. Will be
modified by the function and should be passed to it at the next sync. modified by the function and should be passed to it at the next sync.
If this is the first sync, an empty dictionary should be provided. If this is the first sync, an empty dictionary should be provided.
:param conflict_resolution: Either 'a wins' or 'b wins'. If none is :param conflict_resolution: A function that, given two conflicting item
provided, the sync function will raise versions A and B, returns a new item with conflicts resolved. The UID
:py:exc:`SyncConflict`. must be the same. The strings `"a wins"` and `"b wins"` are also
accepted to mean that that side's version will always be taken. If none
is provided, the sync function will raise :py:exc:`SyncConflict`.
:param force_delete: When one storage got completely emptied between two :param force_delete: When one storage got completely emptied between two
syncs, :py:exc:`StorageEmpty` is raised for syncs, :py:exc:`StorageEmpty` is raised for
safety. Setting this parameter to ``True`` disables this safety safety. Setting this parameter to ``True`` disables this safety
@ -201,24 +249,29 @@ def sync(storage_a, storage_b, status, conflict_resolution=None,
if storage_a.read_only and storage_b.read_only: if storage_a.read_only and storage_b.read_only:
raise BothReadOnly() raise BothReadOnly()
if conflict_resolution == 'a wins':
conflict_resolution = lambda a, b: a
elif conflict_resolution == 'b wins':
conflict_resolution = lambda a, b: b
_status_migrate(status) _status_migrate(status)
a_info = storage_a.syncer_class(storage_a, dict( a_info = _StorageInfo(storage_a, dict(
(ident, meta_a) (ident, meta_a)
for ident, (meta_a, meta_b) in status.items() for ident, (meta_a, meta_b) in status.items()
)) ))
b_info = storage_b.syncer_class(storage_b, dict( b_info = _StorageInfo(storage_b, dict(
(ident, meta_b) (ident, meta_b)
for ident, (meta_a, meta_b) in status.items() for ident, (meta_a, meta_b) in status.items()
)) ))
a_info.prepare_idents() a_info.prepare_new_status()
b_info.prepare_idents() b_info.prepare_new_status()
if status and not force_delete: if status and not force_delete:
if a_info.idents and not b_info.idents: if a_info.new_status and not b_info.new_status:
raise StorageEmpty(empty_storage=storage_b) raise StorageEmpty(empty_storage=storage_b)
elif b_info.idents and not a_info.idents: elif b_info.new_status and not a_info.new_status:
raise StorageEmpty(empty_storage=storage_a) raise StorageEmpty(empty_storage=storage_a)
actions = list(_get_actions(a_info, b_info)) actions = list(_get_actions(a_info, b_info))
@ -229,8 +282,11 @@ def sync(storage_a, storage_b, status, conflict_resolution=None,
action(a_info, b_info, conflict_resolution) action(a_info, b_info, conflict_resolution)
status.clear() status.clear()
for ident in uniq(itertools.chain(a_info.status, b_info.status)): for ident in uniq(itertools.chain(a_info.new_status, b_info.new_status)):
status[ident] = a_info.status[ident], b_info.status[ident] status[ident] = (
_compress_meta(a_info.new_status[ident]),
_compress_meta(b_info.new_status[ident])
)
def _action_upload(ident, source, dest): def _action_upload(ident, source, dest):
@ -238,81 +294,27 @@ def _action_upload(ident, source, dest):
def inner(a, b, conflict_resolution): def inner(a, b, conflict_resolution):
sync_logger.info(u'Copying (uploading) item {} to {}' sync_logger.info(u'Copying (uploading) item {} to {}'
.format(ident, dest.storage)) .format(ident, dest.storage))
source_meta = source.idents[ident] item = source.new_status[ident]['item']
dest.upload_full(item)
if dest.storage.read_only:
sync_logger.warning('{dest} is read-only. Skipping update...'
.format(dest=dest.storage))
dest_href = dest_etag = None
else:
item = source_meta['item']
dest_href, dest_etag = dest.storage.upload(item)
source.status[ident] = _compress_meta(source_meta)
dest.status[ident] = {
'href': dest_href,
'hash': source_meta['item'].hash,
'etag': dest_etag
}
return inner return inner
def _action_update(ident, source, dest): def _action_update(ident, source, dest):
def inner(a, b, conflict_resolution): def inner(a, b, conflict_resolution):
sync_logger.info(u'Copying (updating) item {} to {}' sync_logger.info(u'Copying (updating) item {} to {}'
.format(ident, dest.storage)) .format(ident, dest.storage))
source_meta = source.idents[ident] source_meta = source.new_status[ident]
dest.update_full(source_meta['item'])
if dest.storage.read_only:
sync_logger.warning(u'{dest} is read-only. Skipping update...'
.format(dest=dest.storage))
dest_href = dest_etag = None
else:
dest_meta = dest.idents[ident]
dest_href = dest_meta['href']
dest_etag = dest.storage.update(dest_href, source_meta['item'],
dest_meta['etag'])
assert isinstance(dest_etag, (bytes, str))
source.status[ident] = _compress_meta(source_meta)
dest.status[ident] = {
'href': dest_href,
'hash': source_meta['item'].hash,
'etag': dest_etag
}
return inner return inner
def _action_delete(ident, info): def _action_delete(ident, info):
storage = info.storage
idents = info.idents
def inner(a, b, conflict_resolution): def inner(a, b, conflict_resolution):
sync_logger.info(u'Deleting item {} from {}'.format(ident, storage)) sync_logger.info(u'Deleting item {} from {}'
if storage.read_only: .format(ident, info.storage))
sync_logger.warning('{} is read-only, skipping deletion...' info.delete_full(ident)
.format(storage))
else:
meta = idents[ident]
etag = meta['etag']
href = meta['href']
storage.delete(href, etag)
del a.status[ident]
del b.status[ident]
return inner
def _action_delete_status(ident):
def inner(a, b, conflict_resolution):
sync_logger.info(u'Deleting status info for nonexisting item {}'
.format(ident))
del a.status[ident]
del b.status[ident]
return inner return inner
@ -321,34 +323,32 @@ def _action_conflict_resolve(ident):
def inner(a, b, conflict_resolution): def inner(a, b, conflict_resolution):
sync_logger.info(u'Doing conflict resolution for item {}...' sync_logger.info(u'Doing conflict resolution for item {}...'
.format(ident)) .format(ident))
meta_a = a.idents[ident] meta_a = a.new_status[ident]
meta_b = b.idents[ident] meta_b = b.new_status[ident]
if meta_a['item'].hash == meta_b['item'].hash: if meta_a['item'].hash == meta_b['item'].hash:
sync_logger.info(u'...same content on both sides.') sync_logger.info(u'...same content on both sides.')
a.status[ident] = _compress_meta(meta_a)
b.status[ident] = _compress_meta(meta_b)
elif conflict_resolution is None: elif conflict_resolution is None:
raise SyncConflict(ident=ident, href_a=meta_a['href'], raise SyncConflict(ident=ident, href_a=meta_a['href'],
href_b=meta_b['href']) href_b=meta_b['href'])
elif conflict_resolution == 'a wins': elif callable(conflict_resolution):
sync_logger.info(u'...{} wins.'.format(a.storage)) new_item = conflict_resolution(meta_a['item'], meta_b['item'])
_action_update(ident, a, b)(a, b, conflict_resolution) if new_item.hash != meta_a['item'].hash:
elif conflict_resolution == 'b wins': a.update_full(new_item)
sync_logger.info(u'...{} wins.'.format(b.storage)) if new_item.hash != meta_b['item'].hash:
_action_update(ident, b, a)(a, b, conflict_resolution) b.update_full(new_item)
else: else:
raise exceptions.UserError('Invalid conflict resolution mode: {}' raise exceptions.UserError('Invalid conflict resolution mode: {!r}'
.format(conflict_resolution)) .format(conflict_resolution))
return inner return inner
def _get_actions(a_info, b_info): def _get_actions(a_info, b_info):
for ident in uniq(itertools.chain(a_info.idents, b_info.idents, for ident in uniq(itertools.chain(a_info.new_status, b_info.new_status,
a_info.status)): a_info.status)):
a = ident in a_info.idents # item exists in a a = ident in a_info.new_status # item exists in a
b = ident in b_info.idents # item exists in b b = ident in b_info.new_status # item exists in b
if a and b: if a and b:
a_changed = a_info.is_changed(ident) a_changed = a_info.is_changed(ident)
@ -379,6 +379,3 @@ def _get_actions(a_info, b_info):
else: else:
# was deleted from a and not changed on b # was deleted from a and not changed on b
yield _action_delete(ident, b_info) yield _action_delete(ident, b_info)
elif not a and not b:
# was deleted from a and b, clean up status
yield _action_delete_status(ident)