internal implementation of partial_sync

This commit is contained in:
Markus Unterwaditzer 2016-10-03 15:42:48 +02:00
parent 497e4a958c
commit ffeaf25471
3 changed files with 111 additions and 53 deletions

View file

@ -11,8 +11,8 @@ import pytest
import vdirsyncer.exceptions as exceptions import vdirsyncer.exceptions as exceptions
from vdirsyncer.storage.base import Item from vdirsyncer.storage.base import Item
from vdirsyncer.storage.memory import MemoryStorage, _random_string from vdirsyncer.storage.memory import MemoryStorage, _random_string
from vdirsyncer.sync import BothReadOnly, IdentConflict, StorageEmpty, \ from vdirsyncer.sync import BothReadOnly, IdentConflict, PartialSync, \
SyncConflict, sync StorageEmpty, SyncConflict, sync
from . import assert_item_equals, blow_up, uid_strategy from . import assert_item_equals, blow_up, uid_strategy
@ -78,6 +78,24 @@ def test_read_only_and_prefetch():
assert list(a.list()) == list(b.list()) == [] assert list(a.list()) == list(b.list()) == []
def test_partial_sync_ignore():
a = MemoryStorage()
b = MemoryStorage()
b.read_only = True
status = {}
item1 = Item(u'UID:1\nhaha')
item2 = Item(u'UID:2\nhoho')
meta1 = a.upload(item1)
meta2 = a.upload(item2)
for _ in (1, 2):
sync(a, b, status, force_delete=True, partial_sync='ignore')
assert set(a.list()) == {meta1, meta2}
assert not list(b.list())
def test_upload_and_update(): def test_upload_and_update():
a = MemoryStorage(fileext='.a') a = MemoryStorage(fileext='.a')
b = MemoryStorage(fileext='.b') b = MemoryStorage(fileext='.b')
@ -323,9 +341,9 @@ def test_readonly():
with pytest.raises(exceptions.ReadOnlyError): with pytest.raises(exceptions.ReadOnlyError):
b.upload(Item(u'UID:3')) b.upload(Item(u'UID:3'))
sync(a, b, status) sync(a, b, status, partial_sync='revert')
assert len(status) == 2 and a.has(href_a) and not b.has(href_a) assert len(status) == 2 and a.has(href_a) and not b.has(href_a)
sync(a, b, status) sync(a, b, status, partial_sync='revert')
assert len(status) == 1 and not a.has(href_a) and not b.has(href_a) assert len(status) == 1 and not a.has(href_a) and not b.has(href_a)
@ -429,7 +447,7 @@ class SyncMachine(RuleBasedStateMachine):
@staticmethod @staticmethod
def _get_items(storage): def _get_items(storage):
return sorted(item.raw for etag, item in storage.items.values()) return set(item.raw for etag, item in storage.items.values())
@rule(target=Storage, @rule(target=Storage,
read_only=st.booleans(), read_only=st.booleans(),
@ -480,10 +498,13 @@ class SyncMachine(RuleBasedStateMachine):
a=Storage, b=Storage, a=Storage, b=Storage,
force_delete=st.booleans(), force_delete=st.booleans(),
conflict_resolution=st.one_of((st.just('a wins'), st.just('b wins'))), conflict_resolution=st.one_of((st.just('a wins'), st.just('b wins'))),
with_error_callback=st.booleans() with_error_callback=st.booleans(),
partial_sync=st.one_of((
st.just('ignore'), st.just('revert'), st.just('error')
))
) )
def sync(self, status, a, b, force_delete, conflict_resolution, def sync(self, status, a, b, force_delete, conflict_resolution,
with_error_callback): with_error_callback, partial_sync):
assume(a is not b) assume(a is not b)
old_items_a = self._get_items(a) old_items_a = self._get_items(a)
old_items_b = self._get_items(b) old_items_b = self._get_items(b)
@ -505,10 +526,13 @@ class SyncMachine(RuleBasedStateMachine):
sync(a, b, status, sync(a, b, status,
force_delete=force_delete, force_delete=force_delete,
conflict_resolution=conflict_resolution, conflict_resolution=conflict_resolution,
error_callback=error_callback) error_callback=error_callback,
partial_sync=partial_sync)
for e in errors: for e in errors:
raise e raise e
except PartialSync:
assert partial_sync == 'error'
except ActionIntentionallyFailed: except ActionIntentionallyFailed:
pass pass
except BothReadOnly: except BothReadOnly:
@ -523,7 +547,7 @@ class SyncMachine(RuleBasedStateMachine):
items_a = self._get_items(a) items_a = self._get_items(a)
items_b = self._get_items(b) items_b = self._get_items(b)
assert items_a == items_b assert items_a == items_b or partial_sync == 'ignore'
assert items_a == old_items_a or not a.read_only assert items_a == old_items_a or not a.read_only
assert items_b == old_items_b or not b.read_only assert items_b == old_items_b or not b.read_only

View file

@ -17,7 +17,7 @@ import click_threading
from . import cli_logger from . import cli_logger
from .. import BUGTRACKER_HOME, DOCS_HOME, exceptions from .. import BUGTRACKER_HOME, DOCS_HOME, exceptions
from ..sync import IdentConflict, StorageEmpty, SyncConflict from ..sync import IdentConflict, PartialSync, StorageEmpty, SyncConflict
from ..utils import expand_path, get_storage_init_args from ..utils import expand_path, get_storage_init_args
try: try:
@ -97,6 +97,13 @@ def handle_cli_error(status_name=None, e=None):
status_name=status_name status_name=status_name
) )
) )
except PartialSync as e:
cli_logger.error(
'{status_name}: Attempted change on {storage}, which is read-only'
'. Set `partial_sync` in your pair section to `ignore` to ignore '
'those changes, or `revert` to revert them on the other side.'
.format(status_name=status_name, storage=e.storage)
)
except SyncConflict as e: except SyncConflict as e:
cli_logger.error( cli_logger.error(
'{status_name}: One item changed on both sides. Resolve this ' '{status_name}: One item changed on both sides. Resolve this '

View file

@ -9,6 +9,7 @@ two CalDAV servers or two local vdirs.
The algorithm is based on the blogpost "How OfflineIMAP works" by Edward Z. The algorithm is based on the blogpost "How OfflineIMAP works" by Edward Z.
Yang. http://blog.ezyang.com/2012/08/how-offlineimap-works/ Yang. http://blog.ezyang.com/2012/08/how-offlineimap-works/
''' '''
import contextlib
import itertools import itertools
import logging import logging
@ -77,6 +78,13 @@ class BothReadOnly(SyncError):
''' '''
class PartialSync(SyncError):
'''
Attempted change on read-only storage.
'''
storage = None
class _StorageInfo(object): class _StorageInfo(object):
'''A wrapper class that holds prefetched items, the status and other '''A wrapper class that holds prefetched items, the status and other
things.''' things.'''
@ -182,7 +190,7 @@ def _compress_meta(meta):
def sync(storage_a, storage_b, status, conflict_resolution=None, def sync(storage_a, storage_b, status, conflict_resolution=None,
force_delete=False, error_callback=None): force_delete=False, error_callback=None, partial_sync='revert'):
'''Synchronizes two storages. '''Synchronizes two storages.
:param storage_a: The first storage :param storage_a: The first storage
@ -204,6 +212,12 @@ def sync(storage_a, storage_b, status, conflict_resolution=None,
measure. measure.
:param error_callback: Instead of raising errors when executing actions, :param error_callback: Instead of raising errors when executing actions,
call the given function with an `Exception` as the only argument. call the given function with an `Exception` as the only argument.
:param partial_sync: What to do when doing sync actions on read-only
storages.
- ``error``: Raise an error.
- ``ignore``: Those actions are simply skipped.
- ``revert`` (default): Revert changes on other side.
''' '''
if storage_a.read_only and storage_b.read_only: if storage_a.read_only and storage_b.read_only:
raise BothReadOnly() raise BothReadOnly()
@ -215,14 +229,14 @@ def sync(storage_a, storage_b, status, conflict_resolution=None,
_status_migrate(status) _status_migrate(status)
a_info = _StorageInfo(storage_a, dict( a_status = {}
(ident, meta_a) b_status = {}
for ident, (meta_a, meta_b) in status.items() for ident, (meta_a, meta_b) in status.items():
)) a_status[ident] = meta_a
b_info = _StorageInfo(storage_b, dict( b_status[ident] = meta_b
(ident, meta_b)
for ident, (meta_a, meta_b) in status.items() a_info = _StorageInfo(storage_a, a_status)
)) b_info = _StorageInfo(storage_b, b_status)
a_info.prepare_new_status() a_info.prepare_new_status()
b_info.prepare_new_status() b_info.prepare_new_status()
@ -238,7 +252,7 @@ def sync(storage_a, storage_b, status, conflict_resolution=None,
with storage_a.at_once(), storage_b.at_once(): with storage_a.at_once(), storage_b.at_once():
for action in actions: for action in actions:
try: try:
action.run(a_info, b_info, conflict_resolution) action.run(a_info, b_info, conflict_resolution, partial_sync)
except Exception as e: except Exception as e:
if error_callback: if error_callback:
error_callback(e) error_callback(e)
@ -255,12 +269,28 @@ def sync(storage_a, storage_b, status, conflict_resolution=None,
class Action: class Action:
def _run_impl(self): def _run_impl(self, a, b):
raise NotImplementedError() raise NotImplementedError()
def run(self, a, b, conflict_resolution): def run(self, a, b, conflict_resolution, partial_sync):
with self.auto_rollback(a, b):
if self.dest.storage.read_only:
if partial_sync == 'error':
raise PartialSync(self.dest.storage)
elif partial_sync == 'ignore':
self.rollback(a, b)
return
else:
assert partial_sync == 'revert'
sync_logger.warning('{} is read-only. Reverting change on '
'next sync.'.format(self.dest.storage))
self._run_impl(a, b)
@contextlib.contextmanager
def auto_rollback(self, a, b):
try: try:
return self._run_impl(a, b, conflict_resolution) yield
except BaseException as e: except BaseException as e:
self.rollback(a, b) self.rollback(a, b)
raise e raise e
@ -279,13 +309,11 @@ class Upload(Action):
self.ident = item.ident self.ident = item.ident
self.dest = dest self.dest = dest
def _run_impl(self, a, b, conflict_resolution): def _run_impl(self, a, b):
sync_logger.info(u'Copying (uploading) item {} to {}' sync_logger.info(u'Copying (uploading) item {} to {}'
.format(self.ident, self.dest.storage)) .format(self.ident, self.dest.storage))
if self.dest.storage.read_only: if self.dest.storage.read_only:
sync_logger.warning('{} is read-only. Skipping update...'
.format(self.dest.storage))
href = etag = None href = etag = None
else: else:
href, etag = self.dest.storage.upload(self.item) href, etag = self.dest.storage.upload(self.item)
@ -304,13 +332,11 @@ class Update(Action):
self.ident = item.ident self.ident = item.ident
self.dest = dest self.dest = dest
def _run_impl(self, a, b, conflict_resolution): def _run_impl(self, a, b):
sync_logger.info(u'Copying (updating) item {} to {}' sync_logger.info(u'Copying (updating) item {} to {}'
.format(self.ident, self.dest.storage)) .format(self.ident, self.dest.storage))
if self.dest.storage.read_only: if self.dest.storage.read_only:
sync_logger.warning('{} is read-only. Skipping update...'
.format(self.dest.storage))
href = etag = None href = etag = None
else: else:
meta = self.dest.new_status[self.ident] meta = self.dest.new_status[self.ident]
@ -330,15 +356,12 @@ class Delete(Action):
self.ident = ident self.ident = ident
self.dest = dest self.dest = dest
def _run_impl(self, a, b, conflict_resolution): def _run_impl(self, a, b):
sync_logger.info(u'Deleting item {} from {}' sync_logger.info(u'Deleting item {} from {}'
.format(self.ident, self.dest.storage)) .format(self.ident, self.dest.storage))
meta = self.dest.new_status[self.ident] meta = self.dest.new_status[self.ident]
if self.dest.storage.read_only: if not self.dest.storage.read_only:
sync_logger.warning('{} is read-only, skipping deletion...'
.format(self.dest.storage))
else:
self.dest.storage.delete(meta['href'], meta['etag']) self.dest.storage.delete(meta['href'], meta['etag'])
del self.dest.new_status[self.ident] del self.dest.new_status[self.ident]
@ -347,26 +370,30 @@ class ResolveConflict(Action):
def __init__(self, ident): def __init__(self, ident):
self.ident = ident self.ident = ident
def _run_impl(self, a, b, conflict_resolution): def run(self, a, b, conflict_resolution, partial_sync):
sync_logger.info(u'Doing conflict resolution for item {}...' with self.auto_rollback(a, b):
.format(self.ident)) sync_logger.info(u'Doing conflict resolution for item {}...'
meta_a = a.new_status[self.ident] .format(self.ident))
meta_b = b.new_status[self.ident] meta_a = a.new_status[self.ident]
meta_b = b.new_status[self.ident]
if meta_a['item'].hash == meta_b['item'].hash: if meta_a['item'].hash == meta_b['item'].hash:
sync_logger.info(u'...same content on both sides.') sync_logger.info(u'...same content on both sides.')
elif conflict_resolution is None: elif conflict_resolution is None:
raise SyncConflict(ident=self.ident, href_a=meta_a['href'], raise SyncConflict(ident=self.ident, href_a=meta_a['href'],
href_b=meta_b['href']) href_b=meta_b['href'])
elif callable(conflict_resolution): elif callable(conflict_resolution):
new_item = conflict_resolution(meta_a['item'], meta_b['item']) new_item = conflict_resolution(meta_a['item'], meta_b['item'])
if new_item.hash != meta_a['item'].hash: if new_item.hash != meta_a['item'].hash:
Update(new_item, a).run(a, b, conflict_resolution) Update(new_item, a).run(a, b, conflict_resolution,
if new_item.hash != meta_b['item'].hash: partial_sync)
Update(new_item, b).run(a, b, conflict_resolution) if new_item.hash != meta_b['item'].hash:
else: Update(new_item, b).run(a, b, conflict_resolution,
raise exceptions.UserError('Invalid conflict resolution mode: {!r}' partial_sync)
.format(conflict_resolution)) else:
raise exceptions.UserError(
'Invalid conflict resolution mode: {!r}'
.format(conflict_resolution))
def _get_actions(a_info, b_info): def _get_actions(a_info, b_info):