mirror of
https://github.com/samsonjs/vdirsyncer.git
synced 2026-04-27 14:57:41 +00:00
Handle collections correctly
Fix #132 Passing the collections parameter used to mean that the storage should append its value to the URL or path. This was a leaky abstraction for the reasons explained in #132. The new behavior removes this meaning from this parameter. Vdirsyncer now maintains a cache of discovered collections.
This commit is contained in:
parent
1e8e931464
commit
06a701bc10
9 changed files with 376 additions and 149 deletions
|
|
@ -148,22 +148,30 @@ class SupportsCollections(object):
|
||||||
|
|
||||||
def test_discover(self, get_storage_args, get_item):
|
def test_discover(self, get_storage_args, get_item):
|
||||||
expected = set()
|
expected = set()
|
||||||
|
items = {}
|
||||||
|
|
||||||
for i in range(1, 5):
|
for i in range(1, 5):
|
||||||
# Create collections, but use the "collection" attribute because
|
# Create collections, but use the "collection" attribute because
|
||||||
# Radicale requires file extensions in their names.
|
# Radicale requires file extensions in their names.
|
||||||
expected.add(
|
collection = 'test{}'.format(i)
|
||||||
self.storage_class(
|
s = self.storage_class(**get_storage_args(collection=collection))
|
||||||
**get_storage_args(collection='test{}'.format(i))
|
items[s.collection] = [s.upload(get_item())]
|
||||||
).collection
|
expected.add(s.collection)
|
||||||
)
|
|
||||||
|
|
||||||
d = self.storage_class.discover(
|
d = self.storage_class.discover(
|
||||||
**get_storage_args(collection=None))
|
**get_storage_args(collection=None))
|
||||||
|
|
||||||
actual = set(s.collection for s in d)
|
actual = set(args['collection'] for args in d)
|
||||||
assert actual >= expected
|
assert actual >= expected
|
||||||
|
|
||||||
|
for storage_args in d:
|
||||||
|
collection = storage_args['collection']
|
||||||
|
if collection not in expected:
|
||||||
|
continue
|
||||||
|
s = self.storage_class(**storage_args)
|
||||||
|
rv = list(s.list())
|
||||||
|
assert rv == items[collection]
|
||||||
|
|
||||||
def test_discover_collection_arg(self, get_storage_args):
|
def test_discover_collection_arg(self, get_storage_args):
|
||||||
args = get_storage_args(collection='test2')
|
args = get_storage_args(collection='test2')
|
||||||
with pytest.raises(TypeError) as excinfo:
|
with pytest.raises(TypeError) as excinfo:
|
||||||
|
|
|
||||||
|
|
@ -110,6 +110,7 @@ class ServerMixin(object):
|
||||||
url = 'http://127.0.0.1/bob/'
|
url = 'http://127.0.0.1/bob/'
|
||||||
if collection is not None:
|
if collection is not None:
|
||||||
collection += self.storage_class.fileext
|
collection += self.storage_class.fileext
|
||||||
|
url = url.rstrip('/') + '/' + collection
|
||||||
|
|
||||||
rv = {'url': url, 'username': 'bob', 'password': 'bob',
|
rv = {'url': url, 'username': 'bob', 'password': 'bob',
|
||||||
'collection': collection, 'unsafe_href_chars': ''}
|
'collection': collection, 'unsafe_href_chars': ''}
|
||||||
|
|
|
||||||
|
|
@ -31,22 +31,22 @@ class TestFilesystemStorage(StorageTests):
|
||||||
path = self.tmpdir
|
path = self.tmpdir
|
||||||
if collection is not None:
|
if collection is not None:
|
||||||
os.makedirs(os.path.join(path, collection))
|
os.makedirs(os.path.join(path, collection))
|
||||||
|
path = os.path.join(path, collection)
|
||||||
return {'path': path, 'fileext': '.txt', 'collection': collection}
|
return {'path': path, 'fileext': '.txt', 'collection': collection}
|
||||||
return inner
|
return inner
|
||||||
|
|
||||||
def test_create_is_false(self, tmpdir):
|
def test_create_is_false(self, tmpdir):
|
||||||
with pytest.raises(IOError):
|
with pytest.raises(IOError):
|
||||||
self.storage_class(str(tmpdir), '.txt', collection='lol',
|
self.storage_class(str(tmpdir) + '/lol', '.txt', create=False)
|
||||||
create=False)
|
|
||||||
|
|
||||||
def test_is_not_directory(self, tmpdir):
|
def test_is_not_directory(self, tmpdir):
|
||||||
with pytest.raises(IOError):
|
with pytest.raises(IOError):
|
||||||
f = tmpdir.join('hue')
|
f = tmpdir.join('hue')
|
||||||
f.write('stub')
|
f.write('stub')
|
||||||
self.storage_class(str(tmpdir), '.txt', collection='hue')
|
self.storage_class(str(tmpdir) + '/hue', '.txt')
|
||||||
|
|
||||||
def test_create_is_true(self, tmpdir):
|
def test_create_is_true(self, tmpdir):
|
||||||
self.storage_class(str(tmpdir), '.txt', collection='asd')
|
self.storage_class(str(tmpdir) + '/asd', '.txt')
|
||||||
assert tmpdir.listdir() == [tmpdir.join('asd')]
|
assert tmpdir.listdir() == [tmpdir.join('asd')]
|
||||||
|
|
||||||
def test_broken_data(self, tmpdir):
|
def test_broken_data(self, tmpdir):
|
||||||
|
|
|
||||||
|
|
@ -79,11 +79,9 @@ def test_parse_pairs_args():
|
||||||
assert sorted(
|
assert sorted(
|
||||||
cli.parse_pairs_args(['foo/foocoll', 'one', 'eins'], pairs)
|
cli.parse_pairs_args(['foo/foocoll', 'one', 'eins'], pairs)
|
||||||
) == [
|
) == [
|
||||||
('eins', None),
|
('eins', set()),
|
||||||
('foo', 'foocoll'),
|
('foo', {'foocoll'}),
|
||||||
('one', 'a'),
|
('one', set()),
|
||||||
('one', 'b'),
|
|
||||||
('one', 'c')
|
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -111,21 +109,12 @@ def test_simple_run(tmpdir):
|
||||||
runner = CliRunner(env={'VDIRSYNCER_CONFIG': str(config_file)})
|
runner = CliRunner(env={'VDIRSYNCER_CONFIG': str(config_file)})
|
||||||
result = runner.invoke(cli.app, ['sync'])
|
result = runner.invoke(cli.app, ['sync'])
|
||||||
assert not result.exception
|
assert not result.exception
|
||||||
assert result.output.lower().strip() == 'syncing my_pair'
|
|
||||||
|
|
||||||
tmpdir.join('path_a/haha.txt').write('UID:haha')
|
tmpdir.join('path_a/haha.txt').write('UID:haha')
|
||||||
result = runner.invoke(cli.app, ['sync'])
|
result = runner.invoke(cli.app, ['sync'])
|
||||||
assert result.output == ('Syncing my_pair\n'
|
assert 'Copying (uploading) item haha to my_b' in result.output
|
||||||
'Copying (uploading) item haha to my_b\n')
|
|
||||||
assert tmpdir.join('path_b/haha.txt').read() == 'UID:haha'
|
assert tmpdir.join('path_b/haha.txt').read() == 'UID:haha'
|
||||||
|
|
||||||
result = runner.invoke(cli.app, ['sync', 'my_pair', 'my_pair'])
|
|
||||||
assert set(result.output.splitlines()) == set([
|
|
||||||
'Syncing my_pair',
|
|
||||||
'warning: Already prepared my_pair, skipping'
|
|
||||||
])
|
|
||||||
assert not result.exception
|
|
||||||
|
|
||||||
|
|
||||||
def test_empty_storage(tmpdir):
|
def test_empty_storage(tmpdir):
|
||||||
config_file = tmpdir.join('config')
|
config_file = tmpdir.join('config')
|
||||||
|
|
@ -151,7 +140,6 @@ def test_empty_storage(tmpdir):
|
||||||
runner = CliRunner(env={'VDIRSYNCER_CONFIG': str(config_file)})
|
runner = CliRunner(env={'VDIRSYNCER_CONFIG': str(config_file)})
|
||||||
result = runner.invoke(cli.app, ['sync'])
|
result = runner.invoke(cli.app, ['sync'])
|
||||||
assert not result.exception
|
assert not result.exception
|
||||||
assert result.output.lower().strip() == 'syncing my_pair'
|
|
||||||
|
|
||||||
tmpdir.join('path_a/haha.txt').write('UID:haha')
|
tmpdir.join('path_a/haha.txt').write('UID:haha')
|
||||||
result = runner.invoke(cli.app, ['sync'])
|
result = runner.invoke(cli.app, ['sync'])
|
||||||
|
|
@ -244,3 +232,120 @@ def test_invalid_storage_name():
|
||||||
cli.load_config(f)
|
cli.load_config(f)
|
||||||
|
|
||||||
assert 'invalid characters' in str(excinfo.value).lower()
|
assert 'invalid characters' in str(excinfo.value).lower()
|
||||||
|
|
||||||
|
|
||||||
|
def test_deprecated_item_status(tmpdir):
|
||||||
|
f = tmpdir.join('mypair.items')
|
||||||
|
f.write(dedent('''
|
||||||
|
["ident", ["href_a", "etag_a", "href_b", "etag_b"]]
|
||||||
|
["ident_two", ["href_a", "etag_a", "href_b", "etag_b"]]
|
||||||
|
''').strip())
|
||||||
|
|
||||||
|
data = {
|
||||||
|
'ident': ['href_a', 'etag_a', 'href_b', 'etag_b'],
|
||||||
|
'ident_two': ['href_a', 'etag_a', 'href_b', 'etag_b']
|
||||||
|
}
|
||||||
|
|
||||||
|
assert cli.load_status(str(tmpdir), 'mypair', data_type='items') == data
|
||||||
|
|
||||||
|
cli.save_status(str(tmpdir), 'mypair', data_type='items', data=data)
|
||||||
|
assert cli.load_status(str(tmpdir), 'mypair', data_type='items') == data
|
||||||
|
|
||||||
|
|
||||||
|
def test_collections_cache_invalidation(tmpdir):
|
||||||
|
cfg = tmpdir.join('config')
|
||||||
|
cfg.write(dedent('''
|
||||||
|
[general]
|
||||||
|
status_path = {0}/status/
|
||||||
|
|
||||||
|
[storage foo]
|
||||||
|
type = filesystem
|
||||||
|
path = {0}/foo/
|
||||||
|
fileext = .txt
|
||||||
|
|
||||||
|
[storage bar]
|
||||||
|
type = filesystem
|
||||||
|
path = {0}/bar/
|
||||||
|
fileext = .txt
|
||||||
|
|
||||||
|
[pair foobar]
|
||||||
|
a = foo
|
||||||
|
b = bar
|
||||||
|
collections = a, b, c
|
||||||
|
''').format(str(tmpdir)))
|
||||||
|
|
||||||
|
foo = tmpdir.mkdir('foo')
|
||||||
|
bar = tmpdir.mkdir('bar')
|
||||||
|
foo.mkdir('a').join('itemone.txt').write('UID:itemone')
|
||||||
|
|
||||||
|
runner = CliRunner()
|
||||||
|
result = runner.invoke(cli.app, ['sync'],
|
||||||
|
env={'VDIRSYNCER_CONFIG': str(cfg)})
|
||||||
|
assert not result.exception
|
||||||
|
|
||||||
|
rv = bar.join('a').listdir()
|
||||||
|
assert len(rv) == 1
|
||||||
|
assert rv[0].basename == 'itemone.txt'
|
||||||
|
|
||||||
|
cfg.write(dedent('''
|
||||||
|
[general]
|
||||||
|
status_path = {0}/status/
|
||||||
|
|
||||||
|
[storage foo]
|
||||||
|
type = filesystem
|
||||||
|
path = {0}/foo/
|
||||||
|
fileext = .txt
|
||||||
|
|
||||||
|
[storage bar]
|
||||||
|
type = filesystem
|
||||||
|
path = {0}/bar2/
|
||||||
|
fileext = .txt
|
||||||
|
|
||||||
|
[pair foobar]
|
||||||
|
a = foo
|
||||||
|
b = bar
|
||||||
|
collections = a, b, c
|
||||||
|
''').format(str(tmpdir)))
|
||||||
|
|
||||||
|
tmpdir.join('status').remove()
|
||||||
|
bar2 = tmpdir.mkdir('bar2')
|
||||||
|
result = runner.invoke(cli.app, ['sync'],
|
||||||
|
env={'VDIRSYNCER_CONFIG': str(cfg)})
|
||||||
|
assert not result.exception
|
||||||
|
|
||||||
|
rv = bar.join('a').listdir()
|
||||||
|
rv2 = bar2.join('a').listdir()
|
||||||
|
assert len(rv) == len(rv2) == 1
|
||||||
|
assert rv[0].basename == rv2[0].basename == 'itemone.txt'
|
||||||
|
|
||||||
|
|
||||||
|
def test_invalid_pairs_as_cli_arg(tmpdir):
|
||||||
|
cfg = tmpdir.join('config')
|
||||||
|
cfg.write(dedent('''
|
||||||
|
[general]
|
||||||
|
status_path = {0}/status/
|
||||||
|
|
||||||
|
[storage foo]
|
||||||
|
type = filesystem
|
||||||
|
path = {0}/foo/
|
||||||
|
fileext = .txt
|
||||||
|
|
||||||
|
[storage bar]
|
||||||
|
type = filesystem
|
||||||
|
path = {0}/bar/
|
||||||
|
fileext = .txt
|
||||||
|
|
||||||
|
[pair foobar]
|
||||||
|
a = foo
|
||||||
|
b = bar
|
||||||
|
collections = a, b, c
|
||||||
|
''').format(str(tmpdir)))
|
||||||
|
|
||||||
|
tmpdir.mkdir('foo')
|
||||||
|
tmpdir.mkdir('bar')
|
||||||
|
|
||||||
|
runner = CliRunner()
|
||||||
|
result = runner.invoke(cli.app, ['sync', 'foobar/d'],
|
||||||
|
env={'VDIRSYNCER_CONFIG': str(cfg)})
|
||||||
|
assert result.exception
|
||||||
|
assert 'pair foobar: collection d not found' in result.output.lower()
|
||||||
|
|
|
||||||
|
|
@ -246,7 +246,8 @@ def test_get_class_init_args_on_storage():
|
||||||
from vdirsyncer.storage.memory import MemoryStorage
|
from vdirsyncer.storage.memory import MemoryStorage
|
||||||
|
|
||||||
all, required = utils.get_class_init_args(MemoryStorage)
|
all, required = utils.get_class_init_args(MemoryStorage)
|
||||||
assert all == set(['fileext', 'collection', 'read_only', 'instance_name'])
|
assert all == set(['fileext', 'collection', 'read_only', 'instance_name',
|
||||||
|
'collection_human'])
|
||||||
assert not required
|
assert not required
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@
|
||||||
|
|
||||||
import errno
|
import errno
|
||||||
import functools
|
import functools
|
||||||
|
import hashlib
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
import string
|
import string
|
||||||
|
|
@ -78,6 +79,115 @@ def get_status_name(pair, collection):
|
||||||
return pair + '/' + collection
|
return pair + '/' + collection
|
||||||
|
|
||||||
|
|
||||||
|
def get_collections_cache_key(pair_options, config_a, config_b):
|
||||||
|
m = hashlib.sha256()
|
||||||
|
j = json.dumps([pair_options, config_a, config_b], sort_keys=True)
|
||||||
|
m.update(j.encode('utf-8'))
|
||||||
|
return m.hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
def collections_for_pair(status_path, name_a, name_b, pair_name, config_a,
|
||||||
|
config_b, pair_options, skip_cache=False):
|
||||||
|
'''Determine all configured collections for a given pair. Takes care of
|
||||||
|
shortcut expansion and result caching.
|
||||||
|
|
||||||
|
:param status_path: The path to the status directory.
|
||||||
|
:param name_a: The config name of storage A.
|
||||||
|
:param name_b: The config name of storage B.
|
||||||
|
:param pair_name: The config name of the pair.
|
||||||
|
:param config_a: The configuration for storage A, with pair-defined
|
||||||
|
defaults.
|
||||||
|
:param config_b: The configuration for storage B, with pair-defined
|
||||||
|
defaults.
|
||||||
|
:param pair_options: Pair-specific options.
|
||||||
|
:param skip_cache: Whether to skip the cached data and always do discovery.
|
||||||
|
Even with this option enabled, the new cache is written.
|
||||||
|
|
||||||
|
:returns: iterable of (collection, a_args, b_args)
|
||||||
|
'''
|
||||||
|
rv = load_status(status_path, pair_name, data_type='collections')
|
||||||
|
cache_key = get_collections_cache_key(pair_options, config_a, config_b)
|
||||||
|
if rv and not skip_cache:
|
||||||
|
if rv.get('cache_key', None) == cache_key:
|
||||||
|
return rv.get('collections', rv)
|
||||||
|
elif rv:
|
||||||
|
cli_logger.info('Detected change in config file, discovering '
|
||||||
|
'collections for {}'.format(pair_name))
|
||||||
|
|
||||||
|
cli_logger.info('Discovering collections for pair {}'
|
||||||
|
.format(pair_name))
|
||||||
|
|
||||||
|
# We have to use a list here because the special None/null value would get
|
||||||
|
# mangled to string (because JSON objects always have string keys).
|
||||||
|
rv = list(_collections_for_pair_impl(status_path, name_a, name_b,
|
||||||
|
pair_name, config_a, config_b,
|
||||||
|
pair_options))
|
||||||
|
save_status(status_path, pair_name, data_type='collections',
|
||||||
|
data={'collections': rv, 'cache_key': cache_key})
|
||||||
|
return rv
|
||||||
|
|
||||||
|
|
||||||
|
def _collections_for_pair_impl(status_path, name_a, name_b, pair_name,
|
||||||
|
config_a, config_b, pair_options):
|
||||||
|
|
||||||
|
def _discover_from_config(config):
|
||||||
|
storage_type = config['type']
|
||||||
|
cls, config = storage_class_from_config(config)
|
||||||
|
|
||||||
|
try:
|
||||||
|
discovered = list(cls.discover(**config))
|
||||||
|
except Exception:
|
||||||
|
return handle_storage_init_error(cls, config)
|
||||||
|
else:
|
||||||
|
rv = {}
|
||||||
|
for args in discovered:
|
||||||
|
args['type'] = storage_type
|
||||||
|
rv[args['collection']] = args
|
||||||
|
return rv
|
||||||
|
|
||||||
|
def _get_coll(discovered, collection, storage_name, config):
|
||||||
|
try:
|
||||||
|
return discovered[collection]
|
||||||
|
except KeyError:
|
||||||
|
storage_type = config['type']
|
||||||
|
cls, config = storage_class_from_config(config)
|
||||||
|
try:
|
||||||
|
rv = cls.join_collection(collection=collection, **config)
|
||||||
|
rv['type'] = storage_type
|
||||||
|
return rv
|
||||||
|
except NotImplementedError:
|
||||||
|
raise CliError(
|
||||||
|
'Unable to find collection {collection!r} for storage '
|
||||||
|
'{storage_name!r}.\n For pair {pair_name!r}, you wanted '
|
||||||
|
'to use the collections {shortcut!r}, which yielded '
|
||||||
|
'{collection!r} (amongst others). Vdirsyncer was unable '
|
||||||
|
'to find an equivalent collection for the other '
|
||||||
|
'storage.'.format(
|
||||||
|
collection=collection, shortcut=shortcut,
|
||||||
|
storage_name=storage_name, pair_name=pair_name
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
shortcuts = set(_parse_old_config_list_value(pair_options, 'collections'))
|
||||||
|
if not shortcuts:
|
||||||
|
yield None, (config_a, config_b)
|
||||||
|
else:
|
||||||
|
a_discovered = _discover_from_config(config_a)
|
||||||
|
b_discovered = _discover_from_config(config_b)
|
||||||
|
|
||||||
|
for shortcut in shortcuts:
|
||||||
|
if shortcut in ('from a', 'from b'):
|
||||||
|
collections = (a_discovered if shortcut == 'from a'
|
||||||
|
else b_discovered)
|
||||||
|
else:
|
||||||
|
collections = [shortcut]
|
||||||
|
|
||||||
|
for collection in collections:
|
||||||
|
a_args = _get_coll(a_discovered, collection, name_a, config_a)
|
||||||
|
b_args = _get_coll(b_discovered, collection, name_b, config_b)
|
||||||
|
yield collection, (a_args, b_args)
|
||||||
|
|
||||||
|
|
||||||
def validate_general_section(general_config):
|
def validate_general_section(general_config):
|
||||||
if general_config is None:
|
if general_config is None:
|
||||||
raise CliError(
|
raise CliError(
|
||||||
|
|
@ -143,19 +253,43 @@ def load_config(f, pair_options=('collections', 'conflict_resolution')):
|
||||||
return general, pairs, storages
|
return general, pairs, storages
|
||||||
|
|
||||||
|
|
||||||
def load_status(path, status_name):
|
def load_status(base_path, pair, collection=None, data_type=None):
|
||||||
full_path = expand_path(os.path.join(path, status_name))
|
assert data_type is not None
|
||||||
if not os.path.exists(full_path):
|
status_name = get_status_name(pair, collection)
|
||||||
return {}
|
path = expand_path(os.path.join(base_path, status_name))
|
||||||
with open(full_path) as f:
|
if os.path.isfile(path) and data_type == 'items':
|
||||||
return dict(json.loads(line) for line in f)
|
new_path = path + '.items'
|
||||||
|
cli_logger.warning('Migrating statuses: Renaming {} to {}'
|
||||||
|
.format(path, new_path))
|
||||||
|
os.rename(path, new_path)
|
||||||
|
|
||||||
|
path += '.' + data_type
|
||||||
|
if not os.path.exists(path):
|
||||||
|
return None
|
||||||
|
|
||||||
|
with open(path) as f:
|
||||||
|
try:
|
||||||
|
return dict(json.load(f))
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
f.seek(0)
|
||||||
|
try:
|
||||||
|
return dict(json.loads(line) for line in f)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
def save_status(path, status_name, status):
|
def save_status(base_path, pair, collection=None, data_type=None, data=None):
|
||||||
full_path = expand_path(os.path.join(path, status_name))
|
assert data_type is not None
|
||||||
base_path = os.path.dirname(full_path)
|
assert data is not None
|
||||||
|
status_name = get_status_name(pair, collection)
|
||||||
|
path = expand_path(os.path.join(base_path, status_name)) + '.' + data_type
|
||||||
|
base_path = os.path.dirname(path)
|
||||||
|
|
||||||
if os.path.isfile(base_path):
|
if collection is not None and os.path.isfile(base_path):
|
||||||
raise CliError('{} is probably a legacy file and could be removed '
|
raise CliError('{} is probably a legacy file and could be removed '
|
||||||
'automatically, but this choice is left to the '
|
'automatically, but this choice is left to the '
|
||||||
'user. If you think this is an error, please file '
|
'user. If you think this is an error, please file '
|
||||||
|
|
@ -167,10 +301,8 @@ def save_status(path, status_name, status):
|
||||||
if e.errno != errno.EEXIST:
|
if e.errno != errno.EEXIST:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
with safe_write(full_path, 'w+') as f:
|
with safe_write(path, 'w+') as f:
|
||||||
for k, v in status.items():
|
json.dump(data, f)
|
||||||
json.dump((k, v), f)
|
|
||||||
f.write('\n')
|
|
||||||
|
|
||||||
|
|
||||||
def storage_class_from_config(config):
|
def storage_class_from_config(config):
|
||||||
|
|
@ -226,33 +358,24 @@ def handle_storage_init_error(cls, config):
|
||||||
def parse_pairs_args(pairs_args, all_pairs):
|
def parse_pairs_args(pairs_args, all_pairs):
|
||||||
'''
|
'''
|
||||||
Expand the various CLI shortforms ("pair, pair/collection") to an iterable
|
Expand the various CLI shortforms ("pair, pair/collection") to an iterable
|
||||||
of (pair, collection).
|
of (pair, collections).
|
||||||
'''
|
'''
|
||||||
if not pairs_args:
|
rv = {}
|
||||||
pairs_args = list(all_pairs)
|
for pair_and_collection in (pairs_args or all_pairs):
|
||||||
for pair_and_collection in pairs_args:
|
|
||||||
pair, collection = pair_and_collection, None
|
pair, collection = pair_and_collection, None
|
||||||
if '/' in pair:
|
if '/' in pair:
|
||||||
pair, collection = pair.split('/')
|
pair, collection = pair.split('/')
|
||||||
|
|
||||||
try:
|
if pair not in all_pairs:
|
||||||
a_name, b_name, pair_options, storage_defaults = \
|
|
||||||
all_pairs[pair]
|
|
||||||
except KeyError:
|
|
||||||
raise CliError('Pair not found: {}\n'
|
raise CliError('Pair not found: {}\n'
|
||||||
'These are the pairs found: {}'
|
'These are the pairs found: {}'
|
||||||
.format(pair, list(all_pairs)))
|
.format(pair, list(all_pairs)))
|
||||||
|
|
||||||
if collection is None:
|
collections = rv.setdefault(pair, set())
|
||||||
collections = set(
|
if collection:
|
||||||
_parse_old_config_list_value(pair_options, 'collections')
|
collections.add(collection)
|
||||||
or [None]
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
collections = [collection]
|
|
||||||
|
|
||||||
for c in collections:
|
return rv.items()
|
||||||
yield pair, c
|
|
||||||
|
|
||||||
# We create the app inside a factory and destroy that factory after first use
|
# We create the app inside a factory and destroy that factory after first use
|
||||||
# to avoid pollution of the module namespace.
|
# to avoid pollution of the module namespace.
|
||||||
|
|
@ -337,12 +460,12 @@ def _create_app():
|
||||||
wq = WorkerQueue(max_workers)
|
wq = WorkerQueue(max_workers)
|
||||||
wq.handled_jobs = set()
|
wq.handled_jobs = set()
|
||||||
|
|
||||||
for pair_name, collection in parse_pairs_args(pairs, all_pairs):
|
for pair_name, collections in parse_pairs_args(pairs, all_pairs):
|
||||||
wq.spawn_worker()
|
wq.spawn_worker()
|
||||||
wq.put(
|
wq.put(
|
||||||
functools.partial(prepare_sync, pair_name=pair_name,
|
functools.partial(sync_pair, pair_name=pair_name,
|
||||||
collection=collection, general=general,
|
collections_to_sync=collections,
|
||||||
all_pairs=all_pairs,
|
general=general, all_pairs=all_pairs,
|
||||||
all_storages=all_storages,
|
all_storages=all_storages,
|
||||||
force_delete=force_delete))
|
force_delete=force_delete))
|
||||||
|
|
||||||
|
|
@ -354,76 +477,40 @@ app = main = _create_app()
|
||||||
del _create_app
|
del _create_app
|
||||||
|
|
||||||
|
|
||||||
def expand_collection(pair_name, collection, general, all_pairs, all_storages):
|
def sync_pair(wq, pair_name, collections_to_sync, general, all_pairs,
|
||||||
a_name, b_name, pair_options, storage_defaults = all_pairs[pair_name]
|
all_storages, force_delete):
|
||||||
if collection in ('from a', 'from b'):
|
key = ('prepare', pair_name)
|
||||||
# from_name: name of the storage which should be used for discovery
|
|
||||||
# other_name: the other storage's name
|
|
||||||
if collection == 'from a':
|
|
||||||
from_name, other_name = a_name, b_name
|
|
||||||
else:
|
|
||||||
from_name, other_name = b_name, a_name
|
|
||||||
|
|
||||||
cli_logger.info('Syncing {}: Discovering collections from {}'
|
|
||||||
.format(pair_name, from_name))
|
|
||||||
|
|
||||||
config = dict(storage_defaults)
|
|
||||||
config.update(all_storages[from_name])
|
|
||||||
cls, config = storage_class_from_config(config)
|
|
||||||
try:
|
|
||||||
storages = list(cls.discover(**config))
|
|
||||||
except Exception:
|
|
||||||
handle_storage_init_error(cls, config)
|
|
||||||
|
|
||||||
for storage in storages:
|
|
||||||
config = dict(storage_defaults)
|
|
||||||
config.update(all_storages[other_name])
|
|
||||||
config['collection'] = actual_collection = storage.collection
|
|
||||||
other_storage = storage_instance_from_config(config)
|
|
||||||
|
|
||||||
if collection == 'from a':
|
|
||||||
a, b = storage, other_storage
|
|
||||||
else:
|
|
||||||
b, a = storage, other_storage
|
|
||||||
|
|
||||||
yield actual_collection, a, b
|
|
||||||
else:
|
|
||||||
config = dict(storage_defaults)
|
|
||||||
config.update(all_storages[a_name])
|
|
||||||
config['collection'] = collection
|
|
||||||
a = storage_instance_from_config(config)
|
|
||||||
|
|
||||||
config = dict(storage_defaults)
|
|
||||||
config.update(all_storages[b_name])
|
|
||||||
config['collection'] = collection
|
|
||||||
b = storage_instance_from_config(config)
|
|
||||||
|
|
||||||
yield collection, a, b
|
|
||||||
|
|
||||||
|
|
||||||
def prepare_sync(wq, pair_name, collection, general, all_pairs, all_storages,
|
|
||||||
force_delete):
|
|
||||||
key = ('prepare', pair_name, collection)
|
|
||||||
if key in wq.handled_jobs:
|
if key in wq.handled_jobs:
|
||||||
status_name = get_status_name(pair_name, collection)
|
cli_logger.warning('Already prepared {}, skipping'.format(pair_name))
|
||||||
cli_logger.warning('Already prepared {}, skipping'.format(status_name))
|
|
||||||
return
|
return
|
||||||
wq.handled_jobs.add(key)
|
wq.handled_jobs.add(key)
|
||||||
|
|
||||||
a_name, b_name, pair_options, storage_defaults = all_pairs[pair_name]
|
a_name, b_name, pair_options, storage_defaults = all_pairs[pair_name]
|
||||||
jobs = list(expand_collection(pair_name, collection, general, all_pairs,
|
|
||||||
all_storages))
|
|
||||||
|
|
||||||
for i in range(len(jobs) - 1):
|
all_collections = dict(collections_for_pair(
|
||||||
# spawn one worker less because we can reuse the current one
|
general['status_path'], a_name, b_name, pair_name,
|
||||||
|
all_storages[a_name], all_storages[b_name], pair_options
|
||||||
|
))
|
||||||
|
|
||||||
|
# spawn one worker less because we can reuse the current one
|
||||||
|
new_workers = -1
|
||||||
|
for collection in (collections_to_sync or all_collections):
|
||||||
|
try:
|
||||||
|
config_a, config_b = all_collections[collection]
|
||||||
|
except KeyError:
|
||||||
|
raise CliError('Pair {}: Collection {} not found. These are the '
|
||||||
|
'configured collections:\n{}'.format(
|
||||||
|
pair_name, collection, list(all_collections)))
|
||||||
|
new_workers += 1
|
||||||
|
wq.put(functools.partial(
|
||||||
|
sync_collection, pair_name=pair_name, collection=collection,
|
||||||
|
config_a=config_a, config_b=config_b, pair_options=pair_options,
|
||||||
|
general=general, force_delete=force_delete
|
||||||
|
))
|
||||||
|
|
||||||
|
for i in range(new_workers):
|
||||||
wq.spawn_worker()
|
wq.spawn_worker()
|
||||||
|
|
||||||
for collection, a, b in jobs:
|
|
||||||
wq.put(functools.partial(sync_collection, pair_name=pair_name,
|
|
||||||
collection=collection, a=a, b=b,
|
|
||||||
pair_options=pair_options, general=general,
|
|
||||||
force_delete=force_delete))
|
|
||||||
|
|
||||||
|
|
||||||
def handle_cli_error(status_name='sync'):
|
def handle_cli_error(status_name='sync'):
|
||||||
try:
|
try:
|
||||||
|
|
@ -460,8 +547,8 @@ def handle_cli_error(status_name='sync'):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def sync_collection(wq, pair_name, collection, a, b, pair_options, general,
|
def sync_collection(wq, pair_name, collection, config_a, config_b,
|
||||||
force_delete):
|
pair_options, general, force_delete):
|
||||||
status_name = get_status_name(pair_name, collection)
|
status_name = get_status_name(pair_name, collection)
|
||||||
|
|
||||||
key = ('sync', pair_name, collection)
|
key = ('sync', pair_name, collection)
|
||||||
|
|
@ -473,8 +560,12 @@ def sync_collection(wq, pair_name, collection, a, b, pair_options, general,
|
||||||
try:
|
try:
|
||||||
cli_logger.info('Syncing {}'.format(status_name))
|
cli_logger.info('Syncing {}'.format(status_name))
|
||||||
|
|
||||||
status = load_status(general['status_path'], status_name)
|
status = load_status(general['status_path'], pair_name,
|
||||||
|
collection, data_type='items') or {}
|
||||||
cli_logger.debug('Loaded status for {}'.format(status_name))
|
cli_logger.debug('Loaded status for {}'.format(status_name))
|
||||||
|
|
||||||
|
a = storage_instance_from_config(config_a)
|
||||||
|
b = storage_instance_from_config(config_b)
|
||||||
sync(
|
sync(
|
||||||
a, b, status,
|
a, b, status,
|
||||||
conflict_resolution=pair_options.get('conflict_resolution', None),
|
conflict_resolution=pair_options.get('conflict_resolution', None),
|
||||||
|
|
@ -484,7 +575,8 @@ def sync_collection(wq, pair_name, collection, a, b, pair_options, general,
|
||||||
if not handle_cli_error(status_name):
|
if not handle_cli_error(status_name):
|
||||||
raise JobFailed()
|
raise JobFailed()
|
||||||
|
|
||||||
save_status(general['status_path'], status_name, status)
|
save_status(general['status_path'], pair_name, collection,
|
||||||
|
data_type='items', data=status)
|
||||||
|
|
||||||
|
|
||||||
class WorkerQueue(object):
|
class WorkerQueue(object):
|
||||||
|
|
|
||||||
|
|
@ -30,10 +30,6 @@ class Storage(object):
|
||||||
Strings can be either unicode strings or bytestrings. If bytestrings, an
|
Strings can be either unicode strings or bytestrings. If bytestrings, an
|
||||||
ASCII encoding is assumed.
|
ASCII encoding is assumed.
|
||||||
|
|
||||||
:param collection: If None, the given URL or path is already directly
|
|
||||||
referring to a collection. Otherwise it will be treated as a basepath
|
|
||||||
to many collections (e.g. a vdir) and the given collection name will be
|
|
||||||
looked for.
|
|
||||||
:param read_only: Whether the synchronization algorithm should avoid writes
|
:param read_only: Whether the synchronization algorithm should avoid writes
|
||||||
to this storage. Some storages accept no value other than ``True``.
|
to this storage. Some storages accept no value other than ``True``.
|
||||||
'''
|
'''
|
||||||
|
|
@ -44,10 +40,16 @@ class Storage(object):
|
||||||
# overridden by subclasses.
|
# overridden by subclasses.
|
||||||
storage_name = None
|
storage_name = None
|
||||||
|
|
||||||
# The string used in the config to denote a particular instance. Should be
|
# The string used in the config to denote a particular instance. Will be
|
||||||
# overridden during instantiation.
|
# overridden during instantiation.
|
||||||
instance_name = None
|
instance_name = None
|
||||||
|
|
||||||
|
# The machine-readable name of this collection.
|
||||||
|
collection = None
|
||||||
|
|
||||||
|
# The human-readable name of this collection.
|
||||||
|
collection_human = None
|
||||||
|
|
||||||
# A value of True means the storage does not support write-methods such as
|
# A value of True means the storage does not support write-methods such as
|
||||||
# upload, update and delete. A value of False means the storage does
|
# upload, update and delete. A value of False means the storage does
|
||||||
# support those methods.
|
# support those methods.
|
||||||
|
|
@ -56,7 +58,8 @@ class Storage(object):
|
||||||
# The attribute values to show in the representation of the storage.
|
# The attribute values to show in the representation of the storage.
|
||||||
_repr_attributes = ()
|
_repr_attributes = ()
|
||||||
|
|
||||||
def __init__(self, instance_name=None, read_only=None, collection=None):
|
def __init__(self, instance_name=None, read_only=None, collection=None,
|
||||||
|
collection_human=None):
|
||||||
if read_only is None:
|
if read_only is None:
|
||||||
read_only = self.read_only
|
read_only = self.read_only
|
||||||
if self.read_only and not read_only:
|
if self.read_only and not read_only:
|
||||||
|
|
@ -66,6 +69,7 @@ class Storage(object):
|
||||||
instance_name = '{}/{}'.format(instance_name, collection)
|
instance_name = '{}/{}'.format(instance_name, collection)
|
||||||
self.instance_name = instance_name
|
self.instance_name = instance_name
|
||||||
self.collection = collection
|
self.collection = collection
|
||||||
|
self.collection_human = collection_human
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def discover(cls, **kwargs):
|
def discover(cls, **kwargs):
|
||||||
|
|
@ -74,8 +78,21 @@ class Storage(object):
|
||||||
:param **kwargs: Keyword arguments to additionally pass to the storage
|
:param **kwargs: Keyword arguments to additionally pass to the storage
|
||||||
instances returned. You shouldn't pass `collection` here, otherwise
|
instances returned. You shouldn't pass `collection` here, otherwise
|
||||||
TypeError will be raised.
|
TypeError will be raised.
|
||||||
:returns: Iterable of storages which represent the discovered
|
:returns: iterable of ``storage_args``.
|
||||||
collections, all of which are passed kwargs during initialization.
|
``storage_args`` is a dictionary of ``**kwargs`` to pass to this
|
||||||
|
class to obtain a storage instance pointing to this collection. It
|
||||||
|
also must contain a ``"collection"`` key. That key's value is used
|
||||||
|
to match two collections together for synchronization. IOW it is a
|
||||||
|
machine-readable identifier for the collection, usually obtained
|
||||||
|
from the last segment of a URL or filesystem path.
|
||||||
|
|
||||||
|
'''
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def join_collection(cls, collection, **kwargs):
|
||||||
|
'''Append the collection to the URL or path specified in ``**kwargs``
|
||||||
|
and return the new arguments.
|
||||||
'''
|
'''
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -250,9 +250,6 @@ class DavStorage(Storage):
|
||||||
super(DavStorage, self).__init__(**kwargs)
|
super(DavStorage, self).__init__(**kwargs)
|
||||||
|
|
||||||
url = url.rstrip('/') + '/'
|
url = url.rstrip('/') + '/'
|
||||||
collection = kwargs.get('collection')
|
|
||||||
if collection is not None:
|
|
||||||
url = utils.urlparse.urljoin(url, collection)
|
|
||||||
self.session = DavSession(url, username, password, verify, auth,
|
self.session = DavSession(url, username, password, verify, auth,
|
||||||
useragent, verify_fingerprint)
|
useragent, verify_fingerprint)
|
||||||
self.unsafe_href_chars = unsafe_href_chars
|
self.unsafe_href_chars = unsafe_href_chars
|
||||||
|
|
@ -271,10 +268,12 @@ class DavStorage(Storage):
|
||||||
))
|
))
|
||||||
d = cls.discovery_class(DavSession(url=url, **discover_args))
|
d = cls.discovery_class(DavSession(url=url, **discover_args))
|
||||||
for c in d.discover():
|
for c in d.discover():
|
||||||
base, collection = c['href'].rstrip('/').rsplit('/', 1)
|
url = c['href']
|
||||||
s = cls(url=base, collection=collection, **kwargs)
|
_, collection = url.rstrip('/').rsplit('/', 1)
|
||||||
s.displayname = c['displayname']
|
storage_args = {'url': url, 'collection': collection,
|
||||||
yield s
|
'collection_human': c['displayname']}
|
||||||
|
storage_args.update(kwargs)
|
||||||
|
yield storage_args
|
||||||
|
|
||||||
def _normalize_href(self, *args, **kwargs):
|
def _normalize_href(self, *args, **kwargs):
|
||||||
return _normalize_href(self.session.url, *args, **kwargs)
|
return _normalize_href(self.session.url, *args, **kwargs)
|
||||||
|
|
|
||||||
|
|
@ -40,9 +40,6 @@ class FilesystemStorage(Storage):
|
||||||
def __init__(self, path, fileext, encoding='utf-8', create=True, **kwargs):
|
def __init__(self, path, fileext, encoding='utf-8', create=True, **kwargs):
|
||||||
super(FilesystemStorage, self).__init__(**kwargs)
|
super(FilesystemStorage, self).__init__(**kwargs)
|
||||||
path = expand_path(path)
|
path = expand_path(path)
|
||||||
collection = kwargs.get('collection')
|
|
||||||
if collection is not None:
|
|
||||||
path = os.path.join(path, collection)
|
|
||||||
checkdir(path, create=create)
|
checkdir(path, create=create)
|
||||||
self.path = path
|
self.path = path
|
||||||
self.encoding = encoding
|
self.encoding = encoding
|
||||||
|
|
@ -54,9 +51,16 @@ class FilesystemStorage(Storage):
|
||||||
raise TypeError('collection argument must not be given.')
|
raise TypeError('collection argument must not be given.')
|
||||||
path = expand_path(path)
|
path = expand_path(path)
|
||||||
for collection in os.listdir(path):
|
for collection in os.listdir(path):
|
||||||
if os.path.isdir(os.path.join(path, collection)):
|
collection_path = os.path.join(path, collection)
|
||||||
s = cls(path=path, collection=collection, **kwargs)
|
if os.path.isdir(collection_path):
|
||||||
yield s
|
args = dict(collection=collection, path=collection_path,
|
||||||
|
**kwargs)
|
||||||
|
yield args
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def join_collection(cls, collection, **kwargs):
|
||||||
|
kwargs['path'] = os.path.join(kwargs['path'], collection)
|
||||||
|
return kwargs
|
||||||
|
|
||||||
def _get_filepath(self, href):
|
def _get_filepath(self, href):
|
||||||
return os.path.join(self.path, href)
|
return os.path.join(self.path, href)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue