Another sync refactor

This commit is contained in:
Markus Unterwaditzer 2016-09-23 10:18:44 +02:00
parent 8cbfb69691
commit 5f76c9e720
2 changed files with 80 additions and 53 deletions

View file

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
from random import random
from copy import deepcopy
import hypothesis.strategies as st
@ -450,7 +451,7 @@ class SyncMachine(RuleBasedStateMachine):
return s
@rule(target=Storage, s=Storage)
@rule(s=Storage)
def actions_fail(self, s):
def blowup(*a, **kw):
raise ActionIntentionallyFailed()
@ -458,36 +459,33 @@ class SyncMachine(RuleBasedStateMachine):
s.upload = blowup
s.update = blowup
s.delete = blowup
return s
@rule(target=Status)
def newstatus(self):
return {}
@rule(target=Storage, storage=Storage,
@rule(storage=Storage,
uid=uid_strategy,
etag=st.text())
def upload(self, storage, uid, etag):
item = Item(u'UID:{}'.format(uid))
storage.items[uid] = (etag, item)
return storage
@rule(target=Storage, storage=Storage, href=st.text())
@rule(storage=Storage, href=st.text())
def delete(self, storage, href):
storage.items.pop(href, None)
return storage
assume(storage.items.pop(href, None))
@rule(target=Status, status=Status, delete_from_b=st.booleans())
@rule(status=Status, delete_from_b=st.booleans())
def remove_hash_from_status(self, status, delete_from_b):
assume(status)
for a, b in status.values():
if delete_from_b:
a = b
assume('hash' in a)
del a['hash']
return status
@rule(
target=Status, status=Status,
status=Status,
a=Storage, b=Storage,
force_delete=st.booleans(),
conflict_resolution=st.one_of((st.just('a wins'), st.just('b wins')))
@ -496,17 +494,22 @@ class SyncMachine(RuleBasedStateMachine):
assume(a is not b)
old_items_a = self._get_items(a)
old_items_b = self._get_items(b)
old_status = deepcopy(status)
failed_sync = False
a.instance_name = 'a'
b.instance_name = 'b'
try:
# If one storage is read-only, double-sync because changes don't
# get reverted immediately.
for _ in range(2 if a.read_only or b.read_only else 1):
old_status = deepcopy(status)
sync(a, b, status,
force_delete=force_delete,
conflict_resolution=conflict_resolution)
except ActionIntentionallyFailed:
except ActionIntentionallyFailed as e:
assert status == old_status
failed_sync = True
except BothReadOnly:
assert a.read_only and b.read_only
assume(False)
@ -520,13 +523,11 @@ class SyncMachine(RuleBasedStateMachine):
items_a = self._get_items(a)
items_b = self._get_items(b)
assert items_a == items_b
assert items_a == items_b or failed_sync
assert items_a == old_items_a or not a.read_only
assert items_b == old_items_b or not b.read_only
assert set(a.items) | set(b.items) == set(status)
return status
assert set(a.items) | set(b.items) == set(status) or failed_sync
TestSyncMachine = SyncMachine.TestCase

View file

@ -184,12 +184,13 @@ class _StorageInfo(object):
}
def delete_full(self, ident):
meta = self.new_status.pop(ident)
meta = self.new_status[ident]
if self.storage.read_only:
sync_logger.warning('{} is read-only, skipping deletion...'
.format(self.storage))
else:
self.storage.delete(meta['href'], meta['etag'])
del self.new_status[ident]
def _status_migrate(status):
@ -279,7 +280,7 @@ def sync(storage_a, storage_b, status, conflict_resolution=None,
with storage_a.at_once():
with storage_b.at_once():
for action in actions:
action(a_info, b_info, conflict_resolution)
action.run(a_info, b_info, conflict_resolution)
status.clear()
for ident in uniq(itertools.chain(a_info.new_status, b_info.new_status)):
@ -289,47 +290,74 @@ def sync(storage_a, storage_b, status, conflict_resolution=None,
)
def _action_upload(ident, source, dest):
class Action:
def __init__(self, ident, source, dest):
self.ident = ident
self.source = source
self.dest = dest
self.a = None
self.b = None
self.conflict_resolution = None
def inner(a, b, conflict_resolution):
def _run_impl(self):
raise NotImplementedError()
def run(self, a, b, conflict_resolution):
try:
return self._run_impl(a, b, conflict_resolution)
except BaseException as e:
self.rollback(a, b)
raise e
def rollback(self, a, b):
for info in (a, b):
if self.ident in info.status:
info.new_status[self.ident] = info.status[self.ident]
else:
info.new_status.pop(self.ident, None)
class Upload(Action):
def _run_impl(self, a, b, conflict_resolution):
sync_logger.info(u'Copying (uploading) item {} to {}'
.format(ident, dest.storage))
item = source.new_status[ident]['item']
dest.upload_full(item)
return inner
.format(self.ident, self.dest.storage))
item = self.source.new_status[self.ident]['item']
self.dest.upload_full(item)
def _action_update(ident, source, dest):
def inner(a, b, conflict_resolution):
class Update(Action):
def _run_impl(self, a, b, conflict_resolution):
sync_logger.info(u'Copying (updating) item {} to {}'
.format(ident, dest.storage))
source_meta = source.new_status[ident]
dest.update_full(source_meta['item'])
return inner
.format(self.ident, self.dest.storage))
source_meta = self.source.new_status[self.ident]
self.dest.update_full(source_meta['item'])
def _action_delete(ident, info):
def inner(a, b, conflict_resolution):
class Delete(Action):
def __init__(self, ident, dest):
self.ident = ident
self.dest = dest
def _run_impl(self, a, b, conflict_resolution):
sync_logger.info(u'Deleting item {} from {}'
.format(ident, info.storage))
info.delete_full(ident)
return inner
.format(self.ident, self.dest.storage))
self.dest.delete_full(self.ident)
def _action_conflict_resolve(ident):
def inner(a, b, conflict_resolution):
class ResolveConflict(Action):
def __init__(self, ident):
self.ident = ident
def _run_impl(self, a, b, conflict_resolution):
sync_logger.info(u'Doing conflict resolution for item {}...'
.format(ident))
meta_a = a.new_status[ident]
meta_b = b.new_status[ident]
.format(self.ident))
meta_a = a.new_status[self.ident]
meta_b = b.new_status[self.ident]
if meta_a['item'].hash == meta_b['item'].hash:
sync_logger.info(u'...same content on both sides.')
elif conflict_resolution is None:
raise SyncConflict(ident=ident, href_a=meta_a['href'],
raise SyncConflict(ident=self.ident, href_a=meta_a['href'],
href_b=meta_b['href'])
elif callable(conflict_resolution):
new_item = conflict_resolution(meta_a['item'], meta_b['item'])
@ -341,8 +369,6 @@ def _action_conflict_resolve(ident):
raise exceptions.UserError('Invalid conflict resolution mode: {!r}'
.format(conflict_resolution))
return inner
def _get_actions(a_info, b_info):
for ident in uniq(itertools.chain(a_info.new_status, b_info.new_status,
@ -356,26 +382,26 @@ def _get_actions(a_info, b_info):
if a_changed and b_changed:
# item was modified on both sides
# OR: missing status
yield _action_conflict_resolve(ident)
yield ResolveConflict(ident)
elif a_changed and not b_changed:
# item was only modified in a
yield _action_update(ident, a_info, b_info)
yield Update(ident, a_info, b_info)
elif not a_changed and b_changed:
# item was only modified in b
yield _action_update(ident, b_info, a_info)
yield Update(ident, b_info, a_info)
elif a and not b:
if a_info.is_changed(ident):
# was deleted from b but modified on a
# OR: new item was created in a
yield _action_upload(ident, a_info, b_info)
yield Upload(ident, a_info, b_info)
else:
# was deleted from b and not modified on a
yield _action_delete(ident, a_info)
yield Delete(ident, a_info)
elif not a and b:
if b_info.is_changed(ident):
# was deleted from a but modified on b
# OR: new item was created in b
yield _action_upload(ident, b_info, a_info)
yield Upload(ident, b_info, a_info)
else:
# was deleted from a and not changed on b
yield _action_delete(ident, b_info)
yield Delete(ident, b_info)