Merge pull request #515 from pimutils/partial-sync

Partial sync
This commit is contained in:
Markus Unterwaditzer 2016-10-03 20:52:34 +02:00 committed by GitHub
commit be297b52df
8 changed files with 239 additions and 117 deletions

View file

@ -14,6 +14,7 @@ Version 0.14.0
- ``vdirsyncer sync`` now continues other uploads if one upload failed. The - ``vdirsyncer sync`` now continues other uploads if one upload failed. The
exit code in such situations is still non-zero. exit code in such situations is still non-zero.
- Add ``partial_sync`` option to pair section.
Version 0.13.1 Version 0.13.1
============== ==============

View file

@ -101,6 +101,13 @@ Pair Section
Vdirsyncer never attempts to "automatically merge" the two items. Vdirsyncer never attempts to "automatically merge" the two items.
- ``partial_sync``: Assume A is read-only, B not. If you change items on B,
vdirsyncer can't sync the changes to A. What should happen instead?
- ``error``: An error is shown.
- ``ignore``: The change is ignored.
- ``revert`` (default): The change is reverted on next sync.
- ``metadata``: Metadata keys that should be synchronized when ``vdirsyncer - ``metadata``: Metadata keys that should be synchronized when ``vdirsyncer
metasync`` is executed. Example:: metasync`` is executed. Example::

View file

@ -449,3 +449,64 @@ def test_conflict_resolution(tmpdir, runner, resolution, expect_foo,
assert fooitem.read() == expect_foo assert fooitem.read() == expect_foo
assert baritem.read() == expect_bar assert baritem.read() == expect_bar
@pytest.mark.parametrize('partial_sync', ['error', 'ignore', 'revert', None])
def test_partial_sync(tmpdir, runner, partial_sync):
runner.write_with_general(dedent('''
[pair foobar]
a = foo
b = bar
collections = null
{partial_sync}
[storage foo]
type = filesystem
fileext = .txt
path = {base}/foo
[storage bar]
type = filesystem
read_only = true
fileext = .txt
path = {base}/bar
'''.format(
partial_sync=('partial_sync = {}\n'.format(partial_sync)
if partial_sync else ''),
base=str(tmpdir)
)))
foo = tmpdir.mkdir('foo')
bar = tmpdir.mkdir('bar')
foo.join('other.txt').write('UID:other')
bar.join('other.txt').write('UID:other')
baritem = bar.join('lol.txt')
baritem.write('UID:lol')
r = runner.invoke(['discover'])
assert not r.exception
r = runner.invoke(['sync'])
assert not r.exception
fooitem = foo.join('lol.txt')
fooitem.remove()
r = runner.invoke(['sync'])
if partial_sync == 'error':
assert r.exception
assert 'Attempted change' in r.output
elif partial_sync == 'ignore':
assert baritem.exists()
r = runner.invoke(['sync'])
assert not r.exception
assert baritem.exists()
else:
assert baritem.exists()
r = runner.invoke(['sync'])
assert not r.exception
assert baritem.exists()
assert fooitem.exists()

View file

@ -11,8 +11,8 @@ import pytest
import vdirsyncer.exceptions as exceptions import vdirsyncer.exceptions as exceptions
from vdirsyncer.storage.base import Item from vdirsyncer.storage.base import Item
from vdirsyncer.storage.memory import MemoryStorage, _random_string from vdirsyncer.storage.memory import MemoryStorage, _random_string
from vdirsyncer.sync import BothReadOnly, IdentConflict, StorageEmpty, \ from vdirsyncer.sync import BothReadOnly, IdentConflict, PartialSync, \
SyncConflict, sync StorageEmpty, SyncConflict, sync
from . import assert_item_equals, blow_up, uid_strategy from . import assert_item_equals, blow_up, uid_strategy
@ -78,6 +78,24 @@ def test_read_only_and_prefetch():
assert list(a.list()) == list(b.list()) == [] assert list(a.list()) == list(b.list()) == []
def test_partial_sync_ignore():
a = MemoryStorage()
b = MemoryStorage()
b.read_only = True
status = {}
item1 = Item(u'UID:1\nhaha')
item2 = Item(u'UID:2\nhoho')
meta1 = a.upload(item1)
meta2 = a.upload(item2)
for _ in (1, 2):
sync(a, b, status, force_delete=True, partial_sync='ignore')
assert set(a.list()) == {meta1, meta2}
assert not list(b.list())
def test_upload_and_update(): def test_upload_and_update():
a = MemoryStorage(fileext='.a') a = MemoryStorage(fileext='.a')
b = MemoryStorage(fileext='.b') b = MemoryStorage(fileext='.b')
@ -323,9 +341,9 @@ def test_readonly():
with pytest.raises(exceptions.ReadOnlyError): with pytest.raises(exceptions.ReadOnlyError):
b.upload(Item(u'UID:3')) b.upload(Item(u'UID:3'))
sync(a, b, status) sync(a, b, status, partial_sync='revert')
assert len(status) == 2 and a.has(href_a) and not b.has(href_a) assert len(status) == 2 and a.has(href_a) and not b.has(href_a)
sync(a, b, status) sync(a, b, status, partial_sync='revert')
assert len(status) == 1 and not a.has(href_a) and not b.has(href_a) assert len(status) == 1 and not a.has(href_a) and not b.has(href_a)
@ -429,7 +447,7 @@ class SyncMachine(RuleBasedStateMachine):
@staticmethod @staticmethod
def _get_items(storage): def _get_items(storage):
return sorted(item.raw for etag, item in storage.items.values()) return set(item.raw for etag, item in storage.items.values())
@rule(target=Storage, @rule(target=Storage,
read_only=st.booleans(), read_only=st.booleans(),
@ -480,10 +498,13 @@ class SyncMachine(RuleBasedStateMachine):
a=Storage, b=Storage, a=Storage, b=Storage,
force_delete=st.booleans(), force_delete=st.booleans(),
conflict_resolution=st.one_of((st.just('a wins'), st.just('b wins'))), conflict_resolution=st.one_of((st.just('a wins'), st.just('b wins'))),
with_error_callback=st.booleans() with_error_callback=st.booleans(),
partial_sync=st.one_of((
st.just('ignore'), st.just('revert'), st.just('error')
))
) )
def sync(self, status, a, b, force_delete, conflict_resolution, def sync(self, status, a, b, force_delete, conflict_resolution,
with_error_callback): with_error_callback, partial_sync):
assume(a is not b) assume(a is not b)
old_items_a = self._get_items(a) old_items_a = self._get_items(a)
old_items_b = self._get_items(b) old_items_b = self._get_items(b)
@ -505,10 +526,13 @@ class SyncMachine(RuleBasedStateMachine):
sync(a, b, status, sync(a, b, status,
force_delete=force_delete, force_delete=force_delete,
conflict_resolution=conflict_resolution, conflict_resolution=conflict_resolution,
error_callback=error_callback) error_callback=error_callback,
partial_sync=partial_sync)
for e in errors: for e in errors:
raise e raise e
except PartialSync:
assert partial_sync == 'error'
except ActionIntentionallyFailed: except ActionIntentionallyFailed:
pass pass
except BothReadOnly: except BothReadOnly:
@ -523,11 +547,12 @@ class SyncMachine(RuleBasedStateMachine):
items_a = self._get_items(a) items_a = self._get_items(a)
items_b = self._get_items(b) items_b = self._get_items(b)
assert items_a == items_b assert items_a == items_b or partial_sync == 'ignore'
assert items_a == old_items_a or not a.read_only assert items_a == old_items_a or not a.read_only
assert items_b == old_items_b or not b.read_only assert items_b == old_items_b or not b.read_only
assert set(a.items) | set(b.items) == set(status) assert set(a.items) | set(b.items) == set(status) or \
partial_sync == 'ignore'
TestSyncMachine = SyncMachine.TestCase TestSyncMachine = SyncMachine.TestCase

View file

@ -254,6 +254,7 @@ class PairConfig(object):
self.options = options self.options = options
self._set_conflict_resolution() self._set_conflict_resolution()
self._set_partial_sync()
self._set_collections() self._set_collections()
def _set_conflict_resolution(self): def _set_conflict_resolution(self):
@ -278,6 +279,11 @@ class PairConfig(object):
else: else:
raise ValueError('Invalid value for `conflict_resolution`.') raise ValueError('Invalid value for `conflict_resolution`.')
def _set_partial_sync(self):
self.partial_sync = self.options.pop('partial_sync', 'revert')
if self.partial_sync not in ('ignore', 'revert', 'error'):
raise ValueError('Invalid value for `partial_sync`.')
def _set_collections(self): def _set_collections(self):
try: try:
collections = self.options['collections'] collections = self.options['collections']

View file

@ -68,8 +68,8 @@ def sync_collection(wq, collection, general, force_delete):
a, b, status, a, b, status,
conflict_resolution=pair.conflict_resolution, conflict_resolution=pair.conflict_resolution,
force_delete=force_delete, force_delete=force_delete,
error_callback=error_callback error_callback=error_callback,
partial_sync=pair.partial_sync
) )
save_status(general['status_path'], pair.name, collection.name, save_status(general['status_path'], pair.name, collection.name,

View file

@ -17,7 +17,7 @@ import click_threading
from . import cli_logger from . import cli_logger
from .. import BUGTRACKER_HOME, DOCS_HOME, exceptions from .. import BUGTRACKER_HOME, DOCS_HOME, exceptions
from ..sync import IdentConflict, StorageEmpty, SyncConflict from ..sync import IdentConflict, PartialSync, StorageEmpty, SyncConflict
from ..utils import expand_path, get_storage_init_args from ..utils import expand_path, get_storage_init_args
try: try:
@ -97,6 +97,13 @@ def handle_cli_error(status_name=None, e=None):
status_name=status_name status_name=status_name
) )
) )
except PartialSync as e:
cli_logger.error(
'{status_name}: Attempted change on {storage}, which is read-only'
'. Set `partial_sync` in your pair section to `ignore` to ignore '
'those changes, or `revert` to revert them on the other side.'
.format(status_name=status_name, storage=e.storage)
)
except SyncConflict as e: except SyncConflict as e:
cli_logger.error( cli_logger.error(
'{status_name}: One item changed on both sides. Resolve this ' '{status_name}: One item changed on both sides. Resolve this '

View file

@ -9,6 +9,7 @@ two CalDAV servers or two local vdirs.
The algorithm is based on the blogpost "How OfflineIMAP works" by Edward Z. The algorithm is based on the blogpost "How OfflineIMAP works" by Edward Z.
Yang. http://blog.ezyang.com/2012/08/how-offlineimap-works/ Yang. http://blog.ezyang.com/2012/08/how-offlineimap-works/
''' '''
import contextlib
import itertools import itertools
import logging import logging
@ -77,6 +78,13 @@ class BothReadOnly(SyncError):
''' '''
class PartialSync(SyncError):
'''
Attempted change on read-only storage.
'''
storage = None
class _StorageInfo(object): class _StorageInfo(object):
'''A wrapper class that holds prefetched items, the status and other '''A wrapper class that holds prefetched items, the status and other
things.''' things.'''
@ -148,50 +156,6 @@ class _StorageInfo(object):
# only etag changed # only etag changed
return False return False
def upload_full(self, item):
if self.storage.read_only:
sync_logger.warning('{} is read-only. Skipping update...'
.format(self.storage))
href = etag = None
else:
href, etag = self.storage.upload(item)
assert item.ident not in self.new_status
self.new_status[item.ident] = {
'href': href,
'hash': item.hash,
'etag': etag
}
def update_full(self, item):
'''Similar to Storage.update, but automatically takes care of ETags and
updating the status.'''
if self.storage.read_only:
sync_logger.warning('{} is read-only. Skipping update...'
.format(self.storage))
href = etag = None
else:
meta = self.new_status[item.ident]
href = meta['href']
etag = self.storage.update(href, item, meta['etag'])
assert isinstance(etag, (bytes, str))
self.new_status[item.ident] = {
'href': href,
'hash': item.hash,
'etag': etag
}
def delete_full(self, ident):
meta = self.new_status[ident]
if self.storage.read_only:
sync_logger.warning('{} is read-only, skipping deletion...'
.format(self.storage))
else:
self.storage.delete(meta['href'], meta['etag'])
del self.new_status[ident]
def _status_migrate(status): def _status_migrate(status):
for ident in list(status): for ident in list(status):
@ -226,7 +190,7 @@ def _compress_meta(meta):
def sync(storage_a, storage_b, status, conflict_resolution=None, def sync(storage_a, storage_b, status, conflict_resolution=None,
force_delete=False, error_callback=None): force_delete=False, error_callback=None, partial_sync='revert'):
'''Synchronizes two storages. '''Synchronizes two storages.
:param storage_a: The first storage :param storage_a: The first storage
@ -248,6 +212,12 @@ def sync(storage_a, storage_b, status, conflict_resolution=None,
measure. measure.
:param error_callback: Instead of raising errors when executing actions, :param error_callback: Instead of raising errors when executing actions,
call the given function with an `Exception` as the only argument. call the given function with an `Exception` as the only argument.
:param partial_sync: What to do when doing sync actions on read-only
storages.
- ``error``: Raise an error.
- ``ignore``: Those actions are simply skipped.
- ``revert`` (default): Revert changes on other side.
''' '''
if storage_a.read_only and storage_b.read_only: if storage_a.read_only and storage_b.read_only:
raise BothReadOnly() raise BothReadOnly()
@ -259,14 +229,14 @@ def sync(storage_a, storage_b, status, conflict_resolution=None,
_status_migrate(status) _status_migrate(status)
a_info = _StorageInfo(storage_a, dict( a_status = {}
(ident, meta_a) b_status = {}
for ident, (meta_a, meta_b) in status.items() for ident, (meta_a, meta_b) in status.items():
)) a_status[ident] = meta_a
b_info = _StorageInfo(storage_b, dict( b_status[ident] = meta_b
(ident, meta_b)
for ident, (meta_a, meta_b) in status.items() a_info = _StorageInfo(storage_a, a_status)
)) b_info = _StorageInfo(storage_b, b_status)
a_info.prepare_new_status() a_info.prepare_new_status()
b_info.prepare_new_status() b_info.prepare_new_status()
@ -282,7 +252,7 @@ def sync(storage_a, storage_b, status, conflict_resolution=None,
with storage_a.at_once(), storage_b.at_once(): with storage_a.at_once(), storage_b.at_once():
for action in actions: for action in actions:
try: try:
action.run(a_info, b_info, conflict_resolution) action.run(a_info, b_info, conflict_resolution, partial_sync)
except Exception as e: except Exception as e:
if error_callback: if error_callback:
error_callback(e) error_callback(e)
@ -299,20 +269,26 @@ def sync(storage_a, storage_b, status, conflict_resolution=None,
class Action: class Action:
def __init__(self, ident, source, dest): def _run_impl(self, a, b):
self.ident = ident
self.source = source
self.dest = dest
self.a = None
self.b = None
self.conflict_resolution = None
def _run_impl(self):
raise NotImplementedError() raise NotImplementedError()
def run(self, a, b, conflict_resolution): def run(self, a, b, conflict_resolution, partial_sync):
with self.auto_rollback(a, b):
if self.dest.storage.read_only:
if partial_sync == 'error':
raise PartialSync(self.dest.storage)
elif partial_sync == 'ignore':
self.rollback(a, b)
return
else:
assert partial_sync == 'revert'
self._run_impl(a, b)
@contextlib.contextmanager
def auto_rollback(self, a, b):
try: try:
return self._run_impl(a, b, conflict_resolution) yield
except BaseException as e: except BaseException as e:
self.rollback(a, b) self.rollback(a, b)
raise e raise e
@ -326,19 +302,51 @@ class Action:
class Upload(Action): class Upload(Action):
def _run_impl(self, a, b, conflict_resolution): def __init__(self, item, dest):
sync_logger.info(u'Copying (uploading) item {} to {}' self.item = item
.format(self.ident, self.dest.storage)) self.ident = item.ident
item = self.source.new_status[self.ident]['item'] self.dest = dest
self.dest.upload_full(item)
def _run_impl(self, a, b):
if self.dest.storage.read_only:
href = etag = None
else:
sync_logger.info(u'Copying (uploading) item {} to {}'
.format(self.ident, self.dest.storage))
href, etag = self.dest.storage.upload(self.item)
assert self.ident not in self.dest.new_status
self.dest.new_status[self.ident] = {
'href': href,
'hash': self.item.hash,
'etag': etag
}
class Update(Action): class Update(Action):
def _run_impl(self, a, b, conflict_resolution): def __init__(self, item, dest):
sync_logger.info(u'Copying (updating) item {} to {}' self.item = item
.format(self.ident, self.dest.storage)) self.ident = item.ident
source_meta = self.source.new_status[self.ident] self.dest = dest
self.dest.update_full(source_meta['item'])
def _run_impl(self, a, b):
if self.dest.storage.read_only:
href = etag = None
else:
sync_logger.info(u'Copying (updating) item {} to {}'
.format(self.ident, self.dest.storage))
meta = self.dest.new_status[self.ident]
href = meta['href']
etag = self.dest.storage.update(href, self.item, meta['etag'])
assert isinstance(etag, (bytes, str))
self.dest.new_status[self.ident] = {
'href': href,
'hash': self.item.hash,
'etag': etag
}
class Delete(Action): class Delete(Action):
@ -346,43 +354,50 @@ class Delete(Action):
self.ident = ident self.ident = ident
self.dest = dest self.dest = dest
def _run_impl(self, a, b, conflict_resolution): def _run_impl(self, a, b):
sync_logger.info(u'Deleting item {} from {}' meta = self.dest.new_status[self.ident]
.format(self.ident, self.dest.storage)) if not self.dest.storage.read_only:
self.dest.delete_full(self.ident) sync_logger.info(u'Deleting item {} from {}'
.format(self.ident, self.dest.storage))
self.dest.storage.delete(meta['href'], meta['etag'])
del self.dest.new_status[self.ident]
class ResolveConflict(Action): class ResolveConflict(Action):
def __init__(self, ident): def __init__(self, ident):
self.ident = ident self.ident = ident
def _run_impl(self, a, b, conflict_resolution): def run(self, a, b, conflict_resolution, partial_sync):
sync_logger.info(u'Doing conflict resolution for item {}...' with self.auto_rollback(a, b):
.format(self.ident)) sync_logger.info(u'Doing conflict resolution for item {}...'
meta_a = a.new_status[self.ident] .format(self.ident))
meta_b = b.new_status[self.ident] meta_a = a.new_status[self.ident]
meta_b = b.new_status[self.ident]
if meta_a['item'].hash == meta_b['item'].hash: if meta_a['item'].hash == meta_b['item'].hash:
sync_logger.info(u'...same content on both sides.') sync_logger.info(u'...same content on both sides.')
elif conflict_resolution is None: elif conflict_resolution is None:
raise SyncConflict(ident=self.ident, href_a=meta_a['href'], raise SyncConflict(ident=self.ident, href_a=meta_a['href'],
href_b=meta_b['href']) href_b=meta_b['href'])
elif callable(conflict_resolution): elif callable(conflict_resolution):
new_item = conflict_resolution(meta_a['item'], meta_b['item']) new_item = conflict_resolution(meta_a['item'], meta_b['item'])
if new_item.hash != meta_a['item'].hash: if new_item.hash != meta_a['item'].hash:
a.update_full(new_item) Update(new_item, a).run(a, b, conflict_resolution,
if new_item.hash != meta_b['item'].hash: partial_sync)
b.update_full(new_item) if new_item.hash != meta_b['item'].hash:
else: Update(new_item, b).run(a, b, conflict_resolution,
raise exceptions.UserError('Invalid conflict resolution mode: {!r}' partial_sync)
.format(conflict_resolution)) else:
raise exceptions.UserError(
'Invalid conflict resolution mode: {!r}'
.format(conflict_resolution))
def _get_actions(a_info, b_info): def _get_actions(a_info, b_info):
for ident in uniq(itertools.chain(a_info.new_status, b_info.new_status, for ident in uniq(itertools.chain(a_info.new_status, b_info.new_status,
a_info.status)): a_info.status)):
a = ident in a_info.new_status # item exists in a a = a_info.new_status.get(ident, None) # item exists in a
b = ident in b_info.new_status # item exists in b b = b_info.new_status.get(ident, None) # item exists in b
if a and b: if a and b:
a_changed = a_info.is_changed(ident) a_changed = a_info.is_changed(ident)
@ -393,15 +408,15 @@ def _get_actions(a_info, b_info):
yield ResolveConflict(ident) yield ResolveConflict(ident)
elif a_changed and not b_changed: elif a_changed and not b_changed:
# item was only modified in a # item was only modified in a
yield Update(ident, a_info, b_info) yield Update(a['item'], b_info)
elif not a_changed and b_changed: elif not a_changed and b_changed:
# item was only modified in b # item was only modified in b
yield Update(ident, b_info, a_info) yield Update(b['item'], a_info)
elif a and not b: elif a and not b:
if a_info.is_changed(ident): if a_info.is_changed(ident):
# was deleted from b but modified on a # was deleted from b but modified on a
# OR: new item was created in a # OR: new item was created in a
yield Upload(ident, a_info, b_info) yield Upload(a['item'], b_info)
else: else:
# was deleted from b and not modified on a # was deleted from b and not modified on a
yield Delete(ident, a_info) yield Delete(ident, a_info)
@ -409,7 +424,7 @@ def _get_actions(a_info, b_info):
if b_info.is_changed(ident): if b_info.is_changed(ident):
# was deleted from a but modified on b # was deleted from a but modified on b
# OR: new item was created in b # OR: new item was created in b
yield Upload(ident, b_info, a_info) yield Upload(b['item'], a_info)
else: else:
# was deleted from a and not changed on b # was deleted from a and not changed on b
yield Delete(ident, b_info) yield Delete(ident, b_info)