Add error_callback to sync()

This commit is contained in:
Markus Unterwaditzer 2016-10-01 18:19:53 +02:00
parent 95c6be6aee
commit 34ac29fc2a
2 changed files with 72 additions and 25 deletions

View file

@ -419,6 +419,10 @@ class ActionIntentionallyFailed(Exception):
pass pass
def action_failure(*a, **kw):
raise ActionIntentionallyFailed()
class SyncMachine(RuleBasedStateMachine): class SyncMachine(RuleBasedStateMachine):
Status = Bundle('status') Status = Bundle('status')
Storage = Bundle('storage') Storage = Bundle('storage')
@ -452,12 +456,9 @@ class SyncMachine(RuleBasedStateMachine):
@rule(s=Storage) @rule(s=Storage)
def actions_fail(self, s): def actions_fail(self, s):
def blowup(*a, **kw): s.upload = action_failure
raise ActionIntentionallyFailed() s.update = action_failure
s.delete = action_failure
s.upload = blowup
s.update = blowup
s.delete = blowup
@rule(target=Status) @rule(target=Status)
def newstatus(self): def newstatus(self):
@ -478,17 +479,25 @@ class SyncMachine(RuleBasedStateMachine):
status=Status, status=Status,
a=Storage, b=Storage, a=Storage, b=Storage,
force_delete=st.booleans(), force_delete=st.booleans(),
conflict_resolution=st.one_of((st.just('a wins'), st.just('b wins'))) conflict_resolution=st.one_of((st.just('a wins'), st.just('b wins'))),
with_error_callback=st.booleans()
) )
def sync(self, status, a, b, force_delete, conflict_resolution): def sync(self, status, a, b, force_delete, conflict_resolution,
with_error_callback):
assume(a is not b) assume(a is not b)
old_items_a = self._get_items(a) old_items_a = self._get_items(a)
old_items_b = self._get_items(b) old_items_b = self._get_items(b)
failed_sync = False
a.instance_name = 'a' a.instance_name = 'a'
b.instance_name = 'b' b.instance_name = 'b'
errors = []
if with_error_callback:
error_callback = errors.append
else:
error_callback = None
try: try:
# If one storage is read-only, double-sync because changes don't # If one storage is read-only, double-sync because changes don't
# get reverted immediately. # get reverted immediately.
@ -496,10 +505,13 @@ class SyncMachine(RuleBasedStateMachine):
old_status = deepcopy(status) old_status = deepcopy(status)
sync(a, b, status, sync(a, b, status,
force_delete=force_delete, force_delete=force_delete,
conflict_resolution=conflict_resolution) conflict_resolution=conflict_resolution,
error_callback=error_callback)
for e in errors:
raise e
except ActionIntentionallyFailed: except ActionIntentionallyFailed:
assert status == old_status pass
failed_sync = True
except BothReadOnly: except BothReadOnly:
assert a.read_only and b.read_only assert a.read_only and b.read_only
assume(False) assume(False)
@ -508,16 +520,42 @@ class SyncMachine(RuleBasedStateMachine):
raise raise
else: else:
assert not list(a.list()) or not list(b.list()) assert not list(a.list()) or not list(b.list())
return status else:
items_a = self._get_items(a) items_a = self._get_items(a)
items_b = self._get_items(b) items_b = self._get_items(b)
assert items_a == items_b or failed_sync assert items_a == items_b
assert items_a == old_items_a or not a.read_only assert items_a == old_items_a or not a.read_only
assert items_b == old_items_b or not b.read_only assert items_b == old_items_b or not b.read_only
assert set(a.items) | set(b.items) == set(status) or failed_sync assert set(a.items) | set(b.items) == set(status)
TestSyncMachine = SyncMachine.TestCase TestSyncMachine = SyncMachine.TestCase
@pytest.mark.parametrize('error_callback', [True, False])
def test_rollback(error_callback):
a = MemoryStorage()
b = MemoryStorage()
status = {}
a.items['0'] = ('', Item('UID:0'))
b.items['1'] = ('', Item('UID:1'))
b.upload = b.update = b.delete = action_failure
if error_callback:
errors = []
sync(a, b, status=status, conflict_resolution='a wins',
error_callback=errors.append)
assert len(errors) == 1
assert isinstance(errors[0], ActionIntentionallyFailed)
assert len(status) == 1
assert status['1']
else:
with pytest.raises(ActionIntentionallyFailed):
sync(a, b, status=status, conflict_resolution='a wins')

View file

@ -11,6 +11,7 @@ Yang. http://blog.ezyang.com/2012/08/how-offlineimap-works/
''' '''
import itertools import itertools
import logging import logging
import sys
from . import exceptions from . import exceptions
from .utils import uniq from .utils import uniq
@ -226,7 +227,7 @@ def _compress_meta(meta):
def sync(storage_a, storage_b, status, conflict_resolution=None, def sync(storage_a, storage_b, status, conflict_resolution=None,
force_delete=False): force_delete=False, error_callback=None):
'''Synchronizes two storages. '''Synchronizes two storages.
:param storage_a: The first storage :param storage_a: The first storage
@ -246,6 +247,8 @@ def sync(storage_a, storage_b, status, conflict_resolution=None,
syncs, :py:exc:`StorageEmpty` is raised for syncs, :py:exc:`StorageEmpty` is raised for
safety. Setting this parameter to ``True`` disables this safety safety. Setting this parameter to ``True`` disables this safety
measure. measure.
:param error_callback: Instead of raising errors when executing actions,
call the given function with an `Exception` as the only argument.
''' '''
if storage_a.read_only and storage_b.read_only: if storage_a.read_only and storage_b.read_only:
raise BothReadOnly() raise BothReadOnly()
@ -277,13 +280,19 @@ def sync(storage_a, storage_b, status, conflict_resolution=None,
actions = list(_get_actions(a_info, b_info)) actions = list(_get_actions(a_info, b_info))
with storage_a.at_once(): with storage_a.at_once(), storage_b.at_once():
with storage_b.at_once():
for action in actions: for action in actions:
try:
action.run(a_info, b_info, conflict_resolution) action.run(a_info, b_info, conflict_resolution)
except Exception as e:
if error_callback:
error_callback(e)
else:
raise
status.clear() status.clear()
for ident in uniq(itertools.chain(a_info.new_status, b_info.new_status)): for ident in uniq(itertools.chain(a_info.new_status,
b_info.new_status)):
status[ident] = ( status[ident] = (
_compress_meta(a_info.new_status[ident]), _compress_meta(a_info.new_status[ident]),
_compress_meta(b_info.new_status[ident]) _compress_meta(b_info.new_status[ident])