Add PairConfig and CollectionConfig

This commit is contained in:
Markus Unterwaditzer 2015-08-20 16:42:43 +02:00
parent d59376e231
commit 8e2070e42d
4 changed files with 88 additions and 77 deletions

View file

@ -180,21 +180,13 @@ def discover(ctx, pairs, max_workers):
wq = WorkerQueue(max_workers) wq = WorkerQueue(max_workers)
with wq.join(): with wq.join():
for pair in (pairs or config.pairs): for pair_name in (pairs or config.pairs):
try: pair = config.get_pair(pair_name)
name_a, name_b, pair_options = config.pairs[pair]
except KeyError:
raise CliError('Pair not found: {}\n'
'These are the pairs found: {}'
.format(pair, list(config.pairs)))
wq.put(functools.partial( wq.put(functools.partial(
discover_collections, discover_collections,
status_path=config.general['status_path'], status_path=config.general['status_path'],
pair_name=pair, pair=pair,
config_a=config.get_storage_args(name_a),
config_b=config.get_storage_args(name_b),
pair_options=pair_options,
skip_cache=True, skip_cache=True,
)) ))
wq.spawn_worker() wq.spawn_worker()

View file

@ -183,11 +183,37 @@ class Config(object):
self.pairs = pairs self.pairs = pairs
self.storages = storages self.storages = storages
def get_storage_args(self, storage_name): def get_storage_args(self, storage_name, pair_name=None):
try: try:
return self.storages[storage_name] return self.storages[storage_name]
except KeyError: except KeyError:
pair_pref = 'Pair {}: '.format(pair_name) if pair_name else ''
raise CliError( raise CliError(
'Storage {!r} not found. These are the configured storages: {}' '{}Storage {!r} not found. '
.format(storage_name, list(self.storages)) 'These are the configured storages: {}'
.format(pair_pref, storage_name, list(self.storages))
) )
def get_pair(self, pair_name):
return PairConfig(self, pair_name, *self.pairs[pair_name])
class PairConfig(object):
def __init__(self, config, name, name_a, name_b, pair_options):
self._config = config
self.name = name
self.name_a = name_a
self.name_a = name_b
self.options = pair_options
self.config_a = config.get_storage_args(name_a, pair_name=name)
self.config_b = config.get_storage_args(name_b, pair_name=name)
class CollectionConfig(object):
def __init__(self, pair, name, config_a, config_b):
self.pair = pair
self._config = pair._config
self.name = name
self.config_a = config_a
self.config_b = config_b

View file

@ -3,6 +3,7 @@
import functools import functools
import json import json
from .config import CollectionConfig
from .utils import CliError, JobFailed, cli_logger, collections_for_pair, \ from .utils import CliError, JobFailed, cli_logger, collections_for_pair, \
get_status_name, handle_cli_error, load_status, save_status, \ get_status_name, handle_cli_error, load_status, save_status, \
storage_class_from_config, storage_instance_from_config storage_class_from_config, storage_instance_from_config
@ -11,69 +12,66 @@ from ..sync import sync
def prepare_pair(wq, pair_name, collections, config, callback, **kwargs): def prepare_pair(wq, pair_name, collections, config, callback, **kwargs):
a_name, b_name, pair_options = config.pairs[pair_name] pair = config.get_pair(pair_name)
config_a = config.get_storage_args(a_name)
config_b = config.get_storage_args(b_name)
all_collections = dict(collections_for_pair( all_collections = dict(collections_for_pair(
status_path=config.general['status_path'], pair_name=pair_name, status_path=config.general['status_path'], pair=pair
config_a=config_a, config_b=config_b, pair_options=pair_options
)) ))
# spawn one worker less because we can reuse the current one # spawn one worker less because we can reuse the current one
new_workers = -1 new_workers = -1
for collection in (collections or all_collections): for collection_name in (collections or all_collections):
try: try:
config_a, config_b = all_collections[collection] config_a, config_b = all_collections[collection_name]
except KeyError: except KeyError:
raise CliError('Pair {}: Collection {} not found. These are the ' raise CliError('Pair {}: Collection {} not found. These are the '
'configured collections:\n{}'.format( 'configured collections:\n{}'
pair_name, collection, list(all_collections))) .format(pair_name, collection_name,
list(all_collections)))
new_workers += 1 new_workers += 1
wq.put(functools.partial(
callback, pair_name=pair_name, collection=collection, collection = CollectionConfig(pair, collection_name, config_a,
config_a=config_a, config_b=config_b, pair_options=pair_options, config_b)
general=config.general, **kwargs wq.put(functools.partial(callback, collection=collection,
)) general=config.general, **kwargs))
for i in range(new_workers): for i in range(new_workers):
wq.spawn_worker() wq.spawn_worker()
def sync_collection(wq, pair_name, collection, config_a, config_b, def sync_collection(wq, collection, general, force_delete):
pair_options, general, force_delete): pair = collection.pair
status_name = get_status_name(pair_name, collection) status_name = get_status_name(pair.name, collection.name)
try: try:
cli_logger.info('Syncing {}'.format(status_name)) cli_logger.info('Syncing {}'.format(status_name))
status = load_status(general['status_path'], pair_name, status = load_status(general['status_path'], pair.name,
collection, data_type='items') or {} collection.name, data_type='items') or {}
cli_logger.debug('Loaded status for {}'.format(status_name)) cli_logger.debug('Loaded status for {}'.format(status_name))
a = storage_instance_from_config(config_a) a = storage_instance_from_config(collection.config_a)
b = storage_instance_from_config(config_b) b = storage_instance_from_config(collection.config_b)
sync( sync(
a, b, status, a, b, status,
conflict_resolution=pair_options.get('conflict_resolution', None), conflict_resolution=pair.options.get('conflict_resolution', None),
force_delete=force_delete force_delete=force_delete
) )
except: except:
handle_cli_error(status_name) handle_cli_error(status_name)
raise JobFailed() raise JobFailed()
save_status(general['status_path'], pair_name, collection, save_status(general['status_path'], pair.name, collection.name,
data_type='items', data=status) data_type='items', data=status)
def discover_collections(wq, pair_name, **kwargs): def discover_collections(wq, pair, **kwargs):
rv = collections_for_pair(pair_name=pair_name, **kwargs) rv = collections_for_pair(pair=pair, **kwargs)
collections = list(c for c, (a, b) in rv) collections = list(c for c, (a, b) in rv)
if collections == [None]: if collections == [None]:
collections = None collections = None
cli_logger.info('Saved for {}: collections = {}' cli_logger.info('Saved for {}: collections = {}'
.format(pair_name, json.dumps(collections))) .format(pair.name, json.dumps(collections)))
def repair_collection(config, collection): def repair_collection(config, collection):
@ -104,28 +102,27 @@ def repair_collection(config, collection):
repair_storage(storage) repair_storage(storage)
def metasync_collection(wq, pair_name, collection, config_a, config_b, def metasync_collection(wq, pair, collection, general):
pair_options, general):
from ..metasync import metasync from ..metasync import metasync
status_name = get_status_name(pair_name, collection) status_name = get_status_name(pair.name, collection)
try: try:
cli_logger.info('Metasyncing {}'.format(status_name)) cli_logger.info('Metasyncing {}'.format(status_name))
status = load_status(general['status_path'], pair_name, status = load_status(general['status_path'], pair.name,
collection, data_type='metadata') or {} collection, data_type='metadata') or {}
a = storage_instance_from_config(config_a) a = storage_instance_from_config(pair.config_a)
b = storage_instance_from_config(config_b) b = storage_instance_from_config(pair.config_b)
metasync( metasync(
a, b, status, a, b, status,
conflict_resolution=pair_options.get('conflict_resolution', None), conflict_resolution=pair.options.get('conflict_resolution', None),
keys=pair_options.get('metadata', None) or () keys=pair.options.get('metadata', None) or ()
) )
except: except:
handle_cli_error(status_name) handle_cli_error(status_name)
raise JobFailed() raise JobFailed()
save_status(general['status_path'], pair_name, collection, save_status(general['status_path'], pair.name, collection,
data_type='metadata', data=status) data_type='metadata', data=status)

View file

@ -127,56 +127,51 @@ def get_status_name(pair, collection):
return pair + '/' + collection return pair + '/' + collection
def _get_collections_cache_key(pair_options, config_a, config_b): def _get_collections_cache_key(pair):
m = hashlib.sha256() m = hashlib.sha256()
j = json.dumps([ j = json.dumps([
DISCOVERY_CACHE_VERSION, DISCOVERY_CACHE_VERSION,
pair_options.get('collections', None), pair.options.get('collections', None),
config_a, pair.config_a,
config_b pair.config_b,
], sort_keys=True) ], sort_keys=True)
m.update(j.encode('utf-8')) m.update(j.encode('utf-8'))
return m.hexdigest() return m.hexdigest()
def collections_for_pair(status_path, pair_name, config_a, def collections_for_pair(status_path, pair, skip_cache=False):
config_b, pair_options, skip_cache=False):
'''Determine all configured collections for a given pair. Takes care of '''Determine all configured collections for a given pair. Takes care of
shortcut expansion and result caching. shortcut expansion and result caching.
:param status_path: The path to the status directory. :param status_path: The path to the status directory.
:param pair_name: The config name of the pair.
:param config_a: The configuration for storage A.
:param config_b: The configuration for storage B.
:param pair_options: Pair-specific options.
:param skip_cache: Whether to skip the cached data and always do discovery. :param skip_cache: Whether to skip the cached data and always do discovery.
Even with this option enabled, the new cache is written. Even with this option enabled, the new cache is written.
:returns: iterable of (collection, (a_args, b_args)) :returns: iterable of (collection, (a_args, b_args))
''' '''
rv = load_status(status_path, pair_name, data_type='collections') rv = load_status(status_path, pair.name, data_type='collections')
cache_key = _get_collections_cache_key(pair_options, config_a, config_b) cache_key = _get_collections_cache_key(pair)
if rv and not skip_cache: if rv and not skip_cache:
if rv.get('cache_key', None) == cache_key: if rv.get('cache_key', None) == cache_key:
return list(_expand_collections_cache( return list(_expand_collections_cache(
rv['collections'], config_a, config_b rv['collections'], pair.config_a, pair.config_b
)) ))
elif rv: elif rv:
cli_logger.info('Detected change in config file, discovering ' cli_logger.info('Detected change in config file, discovering '
'collections for {}'.format(pair_name)) 'collections for {}'.format(pair.name))
cli_logger.info('Discovering collections for pair {}' cli_logger.info('Discovering collections for pair {}'
.format(pair_name)) .format(pair.name))
# We have to use a list here because the special None/null value would get # We have to use a list here because the special None/null value would get
# mangled to string (because JSON objects always have string keys). # mangled to string (because JSON objects always have string keys).
rv = list(_collections_for_pair_impl(status_path, pair_name, config_a, rv = list(_collections_for_pair_impl(status_path, pair))
config_b, pair_options))
save_status(status_path, pair_name, data_type='collections', save_status(status_path, pair.name, data_type='collections',
data={ data={
'collections': list( 'collections': list(
_compress_collections_cache(rv, config_a, config_b) _compress_collections_cache(rv, pair.config_a,
pair.config_b)
), ),
'cache_key': cache_key 'cache_key': cache_key
}) })
@ -247,15 +242,14 @@ def _handle_collection_not_found(config, collection, e=None):
storage=storage_name)) storage=storage_name))
def _collections_for_pair_impl(status_path, pair_name, config_a, config_b, def _collections_for_pair_impl(status_path, pair):
pair_options):
shortcuts = set(pair_options.get('collections', ())) shortcuts = set(pair.options.get('collections', ()))
if not shortcuts: if not shortcuts:
yield None, (config_a, config_b) yield None, (pair.config_a, pair.config_b)
else: else:
a_discovered = _discover_from_config(config_a) a_discovered = _discover_from_config(pair.config_a)
b_discovered = _discover_from_config(config_b) b_discovered = _discover_from_config(pair.config_b)
for shortcut in shortcuts: for shortcut in shortcuts:
if shortcut == 'from a': if shortcut == 'from a':
@ -269,12 +263,14 @@ def _collections_for_pair_impl(status_path, pair_name, config_a, config_b,
try: try:
a_args = a_discovered[collection] a_args = a_discovered[collection]
except KeyError: except KeyError:
a_args = _handle_collection_not_found(config_a, collection) a_args = _handle_collection_not_found(pair.config_a,
collection)
try: try:
b_args = b_discovered[collection] b_args = b_discovered[collection]
except KeyError: except KeyError:
b_args = _handle_collection_not_found(config_b, collection) b_args = _handle_collection_not_found(pair.config_b,
collection)
yield collection, (a_args, b_args) yield collection, (a_args, b_args)