Merge pull request #151 from untitaker/discovery

Discovery
This commit is contained in:
Markus Unterwaditzer 2014-12-16 17:50:59 +01:00
commit d19ee9fa27
10 changed files with 509 additions and 175 deletions

View file

@ -91,26 +91,8 @@ further reference, it uses the storages
More Configuration More Configuration
================== ==================
But what if we want to synchronize multiple addressbooks from the same server? Conflict resolution
Of course we could create new pairs and storages for each addressbook, but that -------------------
is very tedious to do. Instead we will use a shortcut:
- Remove the last segment from the URL, so that it ends with ``.../bob/``
instead of ``.../bob/default/``.
- Add the following line to the *pair* section::
[pair my_contacts]
...
collections = ["default", "work"]
This will synchronize
``https://owncloud.example.com/remote.php/carddav/addressbooks/bob/default/``
with ``~/.contacts/default/`` and
``https://owncloud.example.com/remote.php/carddav/addressbooks/bob/work/`` with
``~/.contacts/work/``. Under the hood, vdirsyncer also just copies the pairs
and storages for each collection and appends the collection name to the path or
URL.
It almost seems like it could work. But what if the same item is changed on It almost seems like it could work. But what if the same item is changed on
both sides? What should vdirsyncer do? By default, it will show an ugly error both sides? What should vdirsyncer do? By default, it will show an ugly error
@ -125,3 +107,39 @@ Earlier we wrote that ``b = my_contacts_remote``, so when vdirsyncer encounters
the situation where an item changed on both sides, it will simply overwrite the the situation where an item changed on both sides, it will simply overwrite the
local item with the one from the server. Of course ``a wins`` is also a valid local item with the one from the server. Of course ``a wins`` is also a valid
value. value.
Collection discovery
--------------------
Configuring each collection (=addressbook/calendar) becomes extremely
repetitive if they are all on the same server. Vdirsyncer can do this for you
by automatically downloading a list of the configured user's collections::
[pair my_contacts]
a = my_contacts_local
b = my_contacts_remote
collections = from b
[storage my_contacts_local]
type = filesystem
path = ~/.contacts/
fileext = .vcf
[storage my_contacts_remote]
type = carddav
url = https://owncloud.example.com/remote.php/carddav/addressbooks/bob/
username = bob
password = asdf
With the above configuration, vdirsyncer will fetch all available collections
from the server, and create subdirectories for each of them in
``~/.contacts/``. For example, ownCloud's default addressbook ``"default"``
would be synchronized to the location ``~/.contacts/default/``.
Vdirsyncer fetches this list on first sync, and will re-fetch it if you change
your configuration file. However, if new collections are created on the server,
it will not automatically start synchronizing those [1]_. You should run
``vdirsyncer discover`` to re-fetch this list instead.
.. [1] Because collections are added rarely, and checking for this case before
every synchronization isn't worth the overhead.

View file

@ -148,22 +148,30 @@ class SupportsCollections(object):
def test_discover(self, get_storage_args, get_item): def test_discover(self, get_storage_args, get_item):
expected = set() expected = set()
items = {}
for i in range(1, 5): for i in range(1, 5):
# Create collections, but use the "collection" attribute because # Create collections, but use the "collection" attribute because
# Radicale requires file extensions in their names. # Radicale requires file extensions in their names.
expected.add( collection = 'test{}'.format(i)
self.storage_class( s = self.storage_class(**get_storage_args(collection=collection))
**get_storage_args(collection='test{}'.format(i)) items[s.collection] = [s.upload(get_item())]
).collection expected.add(s.collection)
)
d = self.storage_class.discover( d = self.storage_class.discover(
**get_storage_args(collection=None)) **get_storage_args(collection=None))
actual = set(s.collection for s in d) actual = set(args['collection'] for args in d)
assert actual >= expected assert actual >= expected
for storage_args in d:
collection = storage_args['collection']
if collection not in expected:
continue
s = self.storage_class(**storage_args)
rv = list(s.list())
assert rv == items[collection]
def test_discover_collection_arg(self, get_storage_args): def test_discover_collection_arg(self, get_storage_args):
args = get_storage_args(collection='test2') args = get_storage_args(collection='test2')
with pytest.raises(TypeError) as excinfo: with pytest.raises(TypeError) as excinfo:

View file

@ -110,6 +110,7 @@ class ServerMixin(object):
url = 'http://127.0.0.1/bob/' url = 'http://127.0.0.1/bob/'
if collection is not None: if collection is not None:
collection += self.storage_class.fileext collection += self.storage_class.fileext
url = url.rstrip('/') + '/' + collection
rv = {'url': url, 'username': 'bob', 'password': 'bob', rv = {'url': url, 'username': 'bob', 'password': 'bob',
'collection': collection, 'unsafe_href_chars': ''} 'collection': collection, 'unsafe_href_chars': ''}

View file

@ -31,22 +31,22 @@ class TestFilesystemStorage(StorageTests):
path = self.tmpdir path = self.tmpdir
if collection is not None: if collection is not None:
os.makedirs(os.path.join(path, collection)) os.makedirs(os.path.join(path, collection))
path = os.path.join(path, collection)
return {'path': path, 'fileext': '.txt', 'collection': collection} return {'path': path, 'fileext': '.txt', 'collection': collection}
return inner return inner
def test_create_is_false(self, tmpdir): def test_create_is_false(self, tmpdir):
with pytest.raises(IOError): with pytest.raises(IOError):
self.storage_class(str(tmpdir), '.txt', collection='lol', self.storage_class(str(tmpdir) + '/lol', '.txt', create=False)
create=False)
def test_is_not_directory(self, tmpdir): def test_is_not_directory(self, tmpdir):
with pytest.raises(IOError): with pytest.raises(IOError):
f = tmpdir.join('hue') f = tmpdir.join('hue')
f.write('stub') f.write('stub')
self.storage_class(str(tmpdir), '.txt', collection='hue') self.storage_class(str(tmpdir) + '/hue', '.txt')
def test_create_is_true(self, tmpdir): def test_create_is_true(self, tmpdir):
self.storage_class(str(tmpdir), '.txt', collection='asd') self.storage_class(str(tmpdir) + '/asd', '.txt')
assert tmpdir.listdir() == [tmpdir.join('asd')] assert tmpdir.listdir() == [tmpdir.join('asd')]
def test_broken_data(self, tmpdir): def test_broken_data(self, tmpdir):

View file

@ -79,11 +79,9 @@ def test_parse_pairs_args():
assert sorted( assert sorted(
cli.parse_pairs_args(['foo/foocoll', 'one', 'eins'], pairs) cli.parse_pairs_args(['foo/foocoll', 'one', 'eins'], pairs)
) == [ ) == [
('eins', None), ('eins', set()),
('foo', 'foocoll'), ('foo', {'foocoll'}),
('one', 'a'), ('one', set()),
('one', 'b'),
('one', 'c')
] ]
@ -111,21 +109,12 @@ def test_simple_run(tmpdir):
runner = CliRunner(env={'VDIRSYNCER_CONFIG': str(config_file)}) runner = CliRunner(env={'VDIRSYNCER_CONFIG': str(config_file)})
result = runner.invoke(cli.app, ['sync']) result = runner.invoke(cli.app, ['sync'])
assert not result.exception assert not result.exception
assert result.output.lower().strip() == 'syncing my_pair'
tmpdir.join('path_a/haha.txt').write('UID:haha') tmpdir.join('path_a/haha.txt').write('UID:haha')
result = runner.invoke(cli.app, ['sync']) result = runner.invoke(cli.app, ['sync'])
assert result.output == ('Syncing my_pair\n' assert 'Copying (uploading) item haha to my_b' in result.output
'Copying (uploading) item haha to my_b\n')
assert tmpdir.join('path_b/haha.txt').read() == 'UID:haha' assert tmpdir.join('path_b/haha.txt').read() == 'UID:haha'
result = runner.invoke(cli.app, ['sync', 'my_pair', 'my_pair'])
assert set(result.output.splitlines()) == set([
'Syncing my_pair',
'warning: Already prepared my_pair, skipping'
])
assert not result.exception
def test_empty_storage(tmpdir): def test_empty_storage(tmpdir):
config_file = tmpdir.join('config') config_file = tmpdir.join('config')
@ -151,7 +140,6 @@ def test_empty_storage(tmpdir):
runner = CliRunner(env={'VDIRSYNCER_CONFIG': str(config_file)}) runner = CliRunner(env={'VDIRSYNCER_CONFIG': str(config_file)})
result = runner.invoke(cli.app, ['sync']) result = runner.invoke(cli.app, ['sync'])
assert not result.exception assert not result.exception
assert result.output.lower().strip() == 'syncing my_pair'
tmpdir.join('path_a/haha.txt').write('UID:haha') tmpdir.join('path_a/haha.txt').write('UID:haha')
result = runner.invoke(cli.app, ['sync']) result = runner.invoke(cli.app, ['sync'])
@ -244,3 +232,175 @@ def test_invalid_storage_name():
cli.load_config(f) cli.load_config(f)
assert 'invalid characters' in str(excinfo.value).lower() assert 'invalid characters' in str(excinfo.value).lower()
def test_deprecated_item_status(tmpdir):
f = tmpdir.join('mypair.items')
f.write(dedent('''
["ident", ["href_a", "etag_a", "href_b", "etag_b"]]
["ident_two", ["href_a", "etag_a", "href_b", "etag_b"]]
''').strip())
data = {
'ident': ['href_a', 'etag_a', 'href_b', 'etag_b'],
'ident_two': ['href_a', 'etag_a', 'href_b', 'etag_b']
}
assert cli.load_status(str(tmpdir), 'mypair', data_type='items') == data
cli.save_status(str(tmpdir), 'mypair', data_type='items', data=data)
assert cli.load_status(str(tmpdir), 'mypair', data_type='items') == data
def test_collections_cache_invalidation(tmpdir):
cfg = tmpdir.join('config')
cfg.write(dedent('''
[general]
status_path = {0}/status/
[storage foo]
type = filesystem
path = {0}/foo/
fileext = .txt
[storage bar]
type = filesystem
path = {0}/bar/
fileext = .txt
[pair foobar]
a = foo
b = bar
collections = a, b, c
''').format(str(tmpdir)))
foo = tmpdir.mkdir('foo')
bar = tmpdir.mkdir('bar')
foo.mkdir('a').join('itemone.txt').write('UID:itemone')
runner = CliRunner()
result = runner.invoke(cli.app, ['sync'],
env={'VDIRSYNCER_CONFIG': str(cfg)})
assert not result.exception
rv = bar.join('a').listdir()
assert len(rv) == 1
assert rv[0].basename == 'itemone.txt'
cfg.write(dedent('''
[general]
status_path = {0}/status/
[storage foo]
type = filesystem
path = {0}/foo/
fileext = .txt
[storage bar]
type = filesystem
path = {0}/bar2/
fileext = .txt
[pair foobar]
a = foo
b = bar
collections = a, b, c
''').format(str(tmpdir)))
tmpdir.join('status').remove()
bar2 = tmpdir.mkdir('bar2')
result = runner.invoke(cli.app, ['sync'],
env={'VDIRSYNCER_CONFIG': str(cfg)})
assert not result.exception
rv = bar.join('a').listdir()
rv2 = bar2.join('a').listdir()
assert len(rv) == len(rv2) == 1
assert rv[0].basename == rv2[0].basename == 'itemone.txt'
def test_invalid_pairs_as_cli_arg(tmpdir):
cfg = tmpdir.join('config')
cfg.write(dedent('''
[general]
status_path = {0}/status/
[storage foo]
type = filesystem
path = {0}/foo/
fileext = .txt
[storage bar]
type = filesystem
path = {0}/bar/
fileext = .txt
[pair foobar]
a = foo
b = bar
collections = a, b, c
''').format(str(tmpdir)))
tmpdir.mkdir('foo')
tmpdir.mkdir('bar')
runner = CliRunner()
result = runner.invoke(cli.app, ['sync', 'foobar/d'],
env={'VDIRSYNCER_CONFIG': str(cfg)})
assert result.exception
assert 'pair foobar: collection d not found' in result.output.lower()
def test_discover_command(tmpdir):
cfg = tmpdir.join('config')
cfg.write(dedent('''
[general]
status_path = {0}/status/
[storage foo]
type = filesystem
path = {0}/foo/
fileext = .txt
[storage bar]
type = filesystem
path = {0}/bar/
fileext = .txt
[pair foobar]
a = foo
b = bar
collections = from a
''').format(str(tmpdir)))
foo = tmpdir.mkdir('foo')
tmpdir.mkdir('bar')
foo.mkdir('a')
foo.mkdir('b')
foo.mkdir('c')
runner = CliRunner()
result = runner.invoke(cli.app, ['sync'],
env={'VDIRSYNCER_CONFIG': str(cfg)})
assert not result.exception
lines = result.output.splitlines()
assert lines[0].startswith('Discovering')
assert 'Syncing foobar/a' in lines
assert 'Syncing foobar/b' in lines
assert 'Syncing foobar/c' in lines
foo.mkdir('d')
result = runner.invoke(cli.app, ['sync'],
env={'VDIRSYNCER_CONFIG': str(cfg)})
assert not result.exception
assert 'Syncing foobar/d' not in result.output
result = runner.invoke(cli.app, ['discover'],
env={'VDIRSYNCER_CONFIG': str(cfg)})
assert not result.exception
result = runner.invoke(cli.app, ['sync'],
env={'VDIRSYNCER_CONFIG': str(cfg)})
assert not result.exception
assert 'Syncing foobar/d' in result.output

View file

@ -246,7 +246,8 @@ def test_get_class_init_args_on_storage():
from vdirsyncer.storage.memory import MemoryStorage from vdirsyncer.storage.memory import MemoryStorage
all, required = utils.get_class_init_args(MemoryStorage) all, required = utils.get_class_init_args(MemoryStorage)
assert all == set(['fileext', 'collection', 'read_only', 'instance_name']) assert all == set(['fileext', 'collection', 'read_only', 'instance_name',
'collection_human'])
assert not required assert not required

View file

@ -9,6 +9,7 @@
import errno import errno
import functools import functools
import hashlib
import json import json
import os import os
import string import string
@ -78,6 +79,115 @@ def get_status_name(pair, collection):
return pair + '/' + collection return pair + '/' + collection
def get_collections_cache_key(pair_options, config_a, config_b):
m = hashlib.sha256()
j = json.dumps([pair_options, config_a, config_b], sort_keys=True)
m.update(j.encode('utf-8'))
return m.hexdigest()
def collections_for_pair(status_path, name_a, name_b, pair_name, config_a,
config_b, pair_options, skip_cache=False):
'''Determine all configured collections for a given pair. Takes care of
shortcut expansion and result caching.
:param status_path: The path to the status directory.
:param name_a: The config name of storage A.
:param name_b: The config name of storage B.
:param pair_name: The config name of the pair.
:param config_a: The configuration for storage A, with pair-defined
defaults.
:param config_b: The configuration for storage B, with pair-defined
defaults.
:param pair_options: Pair-specific options.
:param skip_cache: Whether to skip the cached data and always do discovery.
Even with this option enabled, the new cache is written.
:returns: iterable of (collection, a_args, b_args)
'''
rv = load_status(status_path, pair_name, data_type='collections')
cache_key = get_collections_cache_key(pair_options, config_a, config_b)
if rv and not skip_cache:
if rv.get('cache_key', None) == cache_key:
return rv.get('collections', rv)
elif rv:
cli_logger.info('Detected change in config file, discovering '
'collections for {}'.format(pair_name))
cli_logger.info('Discovering collections for pair {}'
.format(pair_name))
# We have to use a list here because the special None/null value would get
# mangled to string (because JSON objects always have string keys).
rv = list(_collections_for_pair_impl(status_path, name_a, name_b,
pair_name, config_a, config_b,
pair_options))
save_status(status_path, pair_name, data_type='collections',
data={'collections': rv, 'cache_key': cache_key})
return rv
def _collections_for_pair_impl(status_path, name_a, name_b, pair_name,
config_a, config_b, pair_options):
def _discover_from_config(config):
storage_type = config['type']
cls, config = storage_class_from_config(config)
try:
discovered = list(cls.discover(**config))
except Exception:
return handle_storage_init_error(cls, config)
else:
rv = {}
for args in discovered:
args['type'] = storage_type
rv[args['collection']] = args
return rv
def _get_coll(discovered, collection, storage_name, config):
try:
return discovered[collection]
except KeyError:
storage_type = config['type']
cls, config = storage_class_from_config(config)
try:
rv = cls.join_collection(collection=collection, **config)
rv['type'] = storage_type
return rv
except NotImplementedError:
raise CliError(
'Unable to find collection {collection!r} for storage '
'{storage_name!r}.\n For pair {pair_name!r}, you wanted '
'to use the collections {shortcut!r}, which yielded '
'{collection!r} (amongst others). Vdirsyncer was unable '
'to find an equivalent collection for the other '
'storage.'.format(
collection=collection, shortcut=shortcut,
storage_name=storage_name, pair_name=pair_name
)
)
shortcuts = set(_parse_old_config_list_value(pair_options, 'collections'))
if not shortcuts:
yield None, (config_a, config_b)
else:
a_discovered = _discover_from_config(config_a)
b_discovered = _discover_from_config(config_b)
for shortcut in shortcuts:
if shortcut in ('from a', 'from b'):
collections = (a_discovered if shortcut == 'from a'
else b_discovered)
else:
collections = [shortcut]
for collection in collections:
a_args = _get_coll(a_discovered, collection, name_a, config_a)
b_args = _get_coll(b_discovered, collection, name_b, config_b)
yield collection, (a_args, b_args)
def validate_general_section(general_config): def validate_general_section(general_config):
if general_config is None: if general_config is None:
raise CliError( raise CliError(
@ -143,19 +253,43 @@ def load_config(f, pair_options=('collections', 'conflict_resolution')):
return general, pairs, storages return general, pairs, storages
def load_status(path, status_name): def load_status(base_path, pair, collection=None, data_type=None):
full_path = expand_path(os.path.join(path, status_name)) assert data_type is not None
if not os.path.exists(full_path): status_name = get_status_name(pair, collection)
return {} path = expand_path(os.path.join(base_path, status_name))
with open(full_path) as f: if os.path.isfile(path) and data_type == 'items':
return dict(json.loads(line) for line in f) new_path = path + '.items'
cli_logger.warning('Migrating statuses: Renaming {} to {}'
.format(path, new_path))
os.rename(path, new_path)
path += '.' + data_type
if not os.path.exists(path):
return None
with open(path) as f:
try:
return dict(json.load(f))
except ValueError:
pass
f.seek(0)
try:
return dict(json.loads(line) for line in f)
except ValueError:
pass
return {}
def save_status(path, status_name, status): def save_status(base_path, pair, collection=None, data_type=None, data=None):
full_path = expand_path(os.path.join(path, status_name)) assert data_type is not None
base_path = os.path.dirname(full_path) assert data is not None
status_name = get_status_name(pair, collection)
path = expand_path(os.path.join(base_path, status_name)) + '.' + data_type
base_path = os.path.dirname(path)
if os.path.isfile(base_path): if collection is not None and os.path.isfile(base_path):
raise CliError('{} is probably a legacy file and could be removed ' raise CliError('{} is probably a legacy file and could be removed '
'automatically, but this choice is left to the ' 'automatically, but this choice is left to the '
'user. If you think this is an error, please file ' 'user. If you think this is an error, please file '
@ -167,10 +301,8 @@ def save_status(path, status_name, status):
if e.errno != errno.EEXIST: if e.errno != errno.EEXIST:
raise raise
with safe_write(full_path, 'w+') as f: with safe_write(path, 'w+') as f:
for k, v in status.items(): json.dump(data, f)
json.dump((k, v), f)
f.write('\n')
def storage_class_from_config(config): def storage_class_from_config(config):
@ -226,33 +358,24 @@ def handle_storage_init_error(cls, config):
def parse_pairs_args(pairs_args, all_pairs): def parse_pairs_args(pairs_args, all_pairs):
''' '''
Expand the various CLI shortforms ("pair, pair/collection") to an iterable Expand the various CLI shortforms ("pair, pair/collection") to an iterable
of (pair, collection). of (pair, collections).
''' '''
if not pairs_args: rv = {}
pairs_args = list(all_pairs) for pair_and_collection in (pairs_args or all_pairs):
for pair_and_collection in pairs_args:
pair, collection = pair_and_collection, None pair, collection = pair_and_collection, None
if '/' in pair: if '/' in pair:
pair, collection = pair.split('/') pair, collection = pair.split('/')
try: if pair not in all_pairs:
a_name, b_name, pair_options, storage_defaults = \
all_pairs[pair]
except KeyError:
raise CliError('Pair not found: {}\n' raise CliError('Pair not found: {}\n'
'These are the pairs found: {}' 'These are the pairs found: {}'
.format(pair, list(all_pairs))) .format(pair, list(all_pairs)))
if collection is None: collections = rv.setdefault(pair, set())
collections = set( if collection:
_parse_old_config_list_value(pair_options, 'collections') collections.add(collection)
or [None]
)
else:
collections = [collection]
for c in collections: return rv.items()
yield pair, c
# We create the app inside a factory and destroy that factory after first use # We create the app inside a factory and destroy that factory after first use
# to avoid pollution of the module namespace. # to avoid pollution of the module namespace.
@ -309,21 +432,23 @@ def _create_app():
raise CliError('Error during reading config {}: {}' raise CliError('Error during reading config {}: {}'
.format(fname, e)) .format(fname, e))
max_workers_option = click.option(
'--max-workers', default=0, type=click.IntRange(min=0, max=None),
help=('Use at most this many connections, 0 means unlimited.')
)
@app.command() @app.command()
@click.argument('pairs', nargs=-1) @click.argument('pairs', nargs=-1)
@click.option('--force-delete/--no-force-delete', @click.option('--force-delete/--no-force-delete',
help=('Disable data-loss protection for the given pairs. ' help=('Disable data-loss protection for the given pairs. '
'Can be passed multiple times')) 'Can be passed multiple times'))
@click.option('--max-workers', @max_workers_option
default=0, type=click.IntRange(min=0, max=None),
help=('Use at most this many connections, 0 means '
'unlimited.'))
@click.pass_context @click.pass_context
@catch_errors @catch_errors
def sync(ctx, pairs, force_delete, max_workers): def sync(ctx, pairs, force_delete, max_workers):
''' '''
Synchronize the given pairs. If no pairs are given, all will be Synchronize the given collections or pairs. If no arguments are given,
synchronized. all will be synchronized.
Examples: Examples:
`vdirsyncer sync` will sync everything configured. `vdirsyncer sync` will sync everything configured.
@ -337,93 +462,89 @@ def _create_app():
wq = WorkerQueue(max_workers) wq = WorkerQueue(max_workers)
wq.handled_jobs = set() wq.handled_jobs = set()
for pair_name, collection in parse_pairs_args(pairs, all_pairs): for pair_name, collections in parse_pairs_args(pairs, all_pairs):
wq.spawn_worker() wq.spawn_worker()
wq.put( wq.put(
functools.partial(prepare_sync, pair_name=pair_name, functools.partial(sync_pair, pair_name=pair_name,
collection=collection, general=general, collections_to_sync=collections,
all_pairs=all_pairs, general=general, all_pairs=all_pairs,
all_storages=all_storages, all_storages=all_storages,
force_delete=force_delete)) force_delete=force_delete))
wq.join() wq.join()
@app.command()
@click.argument('pairs', nargs=-1)
@max_workers_option
@click.pass_context
@catch_errors
def discover(ctx, pairs, max_workers):
'''
Refresh collection cache for the given pairs.
'''
general, all_pairs, all_storages = ctx.obj['config']
cli_logger.debug('Using {} maximal workers.'.format(max_workers))
wq = WorkerQueue(max_workers)
for pair in (pairs or all_pairs):
try:
name_a, name_b, pair_options, storage_defaults = \
all_pairs[pair]
except KeyError:
raise CliError('Pair not found: {}\n'
'These are the pairs found: {}'
.format(pair, list(all_pairs)))
wq.spawn_worker()
wq.put(functools.partial(
(lambda wq, **kwargs: collections_for_pair(**kwargs)),
status_path=general['status_path'], name_a=name_a,
name_b=name_b, pair_name=pair, config_a=all_storages[name_a],
config_b=all_storages[name_b], pair_options=pair_options,
skip_cache=True))
wq.join()
return app return app
app = main = _create_app() app = main = _create_app()
del _create_app del _create_app
def expand_collection(pair_name, collection, general, all_pairs, all_storages): def sync_pair(wq, pair_name, collections_to_sync, general, all_pairs,
a_name, b_name, pair_options, storage_defaults = all_pairs[pair_name] all_storages, force_delete):
if collection in ('from a', 'from b'): key = ('prepare', pair_name)
# from_name: name of the storage which should be used for discovery
# other_name: the other storage's name
if collection == 'from a':
from_name, other_name = a_name, b_name
else:
from_name, other_name = b_name, a_name
cli_logger.info('Syncing {}: Discovering collections from {}'
.format(pair_name, from_name))
config = dict(storage_defaults)
config.update(all_storages[from_name])
cls, config = storage_class_from_config(config)
try:
storages = list(cls.discover(**config))
except Exception:
handle_storage_init_error(cls, config)
for storage in storages:
config = dict(storage_defaults)
config.update(all_storages[other_name])
config['collection'] = actual_collection = storage.collection
other_storage = storage_instance_from_config(config)
if collection == 'from a':
a, b = storage, other_storage
else:
b, a = storage, other_storage
yield actual_collection, a, b
else:
config = dict(storage_defaults)
config.update(all_storages[a_name])
config['collection'] = collection
a = storage_instance_from_config(config)
config = dict(storage_defaults)
config.update(all_storages[b_name])
config['collection'] = collection
b = storage_instance_from_config(config)
yield collection, a, b
def prepare_sync(wq, pair_name, collection, general, all_pairs, all_storages,
force_delete):
key = ('prepare', pair_name, collection)
if key in wq.handled_jobs: if key in wq.handled_jobs:
status_name = get_status_name(pair_name, collection) cli_logger.warning('Already prepared {}, skipping'.format(pair_name))
cli_logger.warning('Already prepared {}, skipping'.format(status_name))
return return
wq.handled_jobs.add(key) wq.handled_jobs.add(key)
a_name, b_name, pair_options, storage_defaults = all_pairs[pair_name] a_name, b_name, pair_options, storage_defaults = all_pairs[pair_name]
jobs = list(expand_collection(pair_name, collection, general, all_pairs,
all_storages))
for i in range(len(jobs) - 1): all_collections = dict(collections_for_pair(
# spawn one worker less because we can reuse the current one general['status_path'], a_name, b_name, pair_name,
all_storages[a_name], all_storages[b_name], pair_options
))
# spawn one worker less because we can reuse the current one
new_workers = -1
for collection in (collections_to_sync or all_collections):
try:
config_a, config_b = all_collections[collection]
except KeyError:
raise CliError('Pair {}: Collection {} not found. These are the '
'configured collections:\n{}'.format(
pair_name, collection, list(all_collections)))
new_workers += 1
wq.put(functools.partial(
sync_collection, pair_name=pair_name, collection=collection,
config_a=config_a, config_b=config_b, pair_options=pair_options,
general=general, force_delete=force_delete
))
for i in range(new_workers):
wq.spawn_worker() wq.spawn_worker()
for collection, a, b in jobs:
wq.put(functools.partial(sync_collection, pair_name=pair_name,
collection=collection, a=a, b=b,
pair_options=pair_options, general=general,
force_delete=force_delete))
def handle_cli_error(status_name='sync'): def handle_cli_error(status_name='sync'):
try: try:
@ -460,8 +581,8 @@ def handle_cli_error(status_name='sync'):
return True return True
def sync_collection(wq, pair_name, collection, a, b, pair_options, general, def sync_collection(wq, pair_name, collection, config_a, config_b,
force_delete): pair_options, general, force_delete):
status_name = get_status_name(pair_name, collection) status_name = get_status_name(pair_name, collection)
key = ('sync', pair_name, collection) key = ('sync', pair_name, collection)
@ -473,8 +594,12 @@ def sync_collection(wq, pair_name, collection, a, b, pair_options, general,
try: try:
cli_logger.info('Syncing {}'.format(status_name)) cli_logger.info('Syncing {}'.format(status_name))
status = load_status(general['status_path'], status_name) status = load_status(general['status_path'], pair_name,
collection, data_type='items') or {}
cli_logger.debug('Loaded status for {}'.format(status_name)) cli_logger.debug('Loaded status for {}'.format(status_name))
a = storage_instance_from_config(config_a)
b = storage_instance_from_config(config_b)
sync( sync(
a, b, status, a, b, status,
conflict_resolution=pair_options.get('conflict_resolution', None), conflict_resolution=pair_options.get('conflict_resolution', None),
@ -484,7 +609,8 @@ def sync_collection(wq, pair_name, collection, a, b, pair_options, general,
if not handle_cli_error(status_name): if not handle_cli_error(status_name):
raise JobFailed() raise JobFailed()
save_status(general['status_path'], status_name, status) save_status(general['status_path'], pair_name, collection,
data_type='items', data=status)
class WorkerQueue(object): class WorkerQueue(object):

View file

@ -30,10 +30,6 @@ class Storage(object):
Strings can be either unicode strings or bytestrings. If bytestrings, an Strings can be either unicode strings or bytestrings. If bytestrings, an
ASCII encoding is assumed. ASCII encoding is assumed.
:param collection: If None, the given URL or path is already directly
referring to a collection. Otherwise it will be treated as a basepath
to many collections (e.g. a vdir) and the given collection name will be
looked for.
:param read_only: Whether the synchronization algorithm should avoid writes :param read_only: Whether the synchronization algorithm should avoid writes
to this storage. Some storages accept no value other than ``True``. to this storage. Some storages accept no value other than ``True``.
''' '''
@ -44,10 +40,16 @@ class Storage(object):
# overridden by subclasses. # overridden by subclasses.
storage_name = None storage_name = None
# The string used in the config to denote a particular instance. Should be # The string used in the config to denote a particular instance. Will be
# overridden during instantiation. # overridden during instantiation.
instance_name = None instance_name = None
# The machine-readable name of this collection.
collection = None
# The human-readable name of this collection.
collection_human = None
# A value of True means the storage does not support write-methods such as # A value of True means the storage does not support write-methods such as
# upload, update and delete. A value of False means the storage does # upload, update and delete. A value of False means the storage does
# support those methods. # support those methods.
@ -56,7 +58,8 @@ class Storage(object):
# The attribute values to show in the representation of the storage. # The attribute values to show in the representation of the storage.
_repr_attributes = () _repr_attributes = ()
def __init__(self, instance_name=None, read_only=None, collection=None): def __init__(self, instance_name=None, read_only=None, collection=None,
collection_human=None):
if read_only is None: if read_only is None:
read_only = self.read_only read_only = self.read_only
if self.read_only and not read_only: if self.read_only and not read_only:
@ -66,6 +69,7 @@ class Storage(object):
instance_name = '{}/{}'.format(instance_name, collection) instance_name = '{}/{}'.format(instance_name, collection)
self.instance_name = instance_name self.instance_name = instance_name
self.collection = collection self.collection = collection
self.collection_human = collection_human
@classmethod @classmethod
def discover(cls, **kwargs): def discover(cls, **kwargs):
@ -74,8 +78,21 @@ class Storage(object):
:param **kwargs: Keyword arguments to additionally pass to the storage :param **kwargs: Keyword arguments to additionally pass to the storage
instances returned. You shouldn't pass `collection` here, otherwise instances returned. You shouldn't pass `collection` here, otherwise
TypeError will be raised. TypeError will be raised.
:returns: Iterable of storages which represent the discovered :returns: iterable of ``storage_args``.
collections, all of which are passed kwargs during initialization. ``storage_args`` is a dictionary of ``**kwargs`` to pass to this
class to obtain a storage instance pointing to this collection. It
also must contain a ``"collection"`` key. That key's value is used
to match two collections together for synchronization. IOW it is a
machine-readable identifier for the collection, usually obtained
from the last segment of a URL or filesystem path.
'''
raise NotImplementedError()
@classmethod
def join_collection(cls, collection, **kwargs):
'''Append the collection to the URL or path specified in ``**kwargs``
and return the new arguments.
''' '''
raise NotImplementedError() raise NotImplementedError()

View file

@ -250,9 +250,6 @@ class DavStorage(Storage):
super(DavStorage, self).__init__(**kwargs) super(DavStorage, self).__init__(**kwargs)
url = url.rstrip('/') + '/' url = url.rstrip('/') + '/'
collection = kwargs.get('collection')
if collection is not None:
url = utils.urlparse.urljoin(url, collection)
self.session = DavSession(url, username, password, verify, auth, self.session = DavSession(url, username, password, verify, auth,
useragent, verify_fingerprint) useragent, verify_fingerprint)
self.unsafe_href_chars = unsafe_href_chars self.unsafe_href_chars = unsafe_href_chars
@ -271,10 +268,12 @@ class DavStorage(Storage):
)) ))
d = cls.discovery_class(DavSession(url=url, **discover_args)) d = cls.discovery_class(DavSession(url=url, **discover_args))
for c in d.discover(): for c in d.discover():
base, collection = c['href'].rstrip('/').rsplit('/', 1) url = c['href']
s = cls(url=base, collection=collection, **kwargs) _, collection = url.rstrip('/').rsplit('/', 1)
s.displayname = c['displayname'] storage_args = {'url': url, 'collection': collection,
yield s 'collection_human': c['displayname']}
storage_args.update(kwargs)
yield storage_args
def _normalize_href(self, *args, **kwargs): def _normalize_href(self, *args, **kwargs):
return _normalize_href(self.session.url, *args, **kwargs) return _normalize_href(self.session.url, *args, **kwargs)

View file

@ -40,9 +40,6 @@ class FilesystemStorage(Storage):
def __init__(self, path, fileext, encoding='utf-8', create=True, **kwargs): def __init__(self, path, fileext, encoding='utf-8', create=True, **kwargs):
super(FilesystemStorage, self).__init__(**kwargs) super(FilesystemStorage, self).__init__(**kwargs)
path = expand_path(path) path = expand_path(path)
collection = kwargs.get('collection')
if collection is not None:
path = os.path.join(path, collection)
checkdir(path, create=create) checkdir(path, create=create)
self.path = path self.path = path
self.encoding = encoding self.encoding = encoding
@ -54,9 +51,16 @@ class FilesystemStorage(Storage):
raise TypeError('collection argument must not be given.') raise TypeError('collection argument must not be given.')
path = expand_path(path) path = expand_path(path)
for collection in os.listdir(path): for collection in os.listdir(path):
if os.path.isdir(os.path.join(path, collection)): collection_path = os.path.join(path, collection)
s = cls(path=path, collection=collection, **kwargs) if os.path.isdir(collection_path):
yield s args = dict(collection=collection, path=collection_path,
**kwargs)
yield args
@classmethod
def join_collection(cls, collection, **kwargs):
kwargs['path'] = os.path.join(kwargs['path'], collection)
return kwargs
def _get_filepath(self, href): def _get_filepath(self, href):
return os.path.join(self.path, href) return os.path.join(self.path, href)