From f2d34f478474422f8c9686b4f634a93e0d26e3ca Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Thu, 25 Dec 2014 23:38:09 +0100 Subject: [PATCH] Create cli.tasks --- tests/test_cli.py | 2 +- vdirsyncer/cli/__init__.py | 3 +- vdirsyncer/cli/tasks.py | 83 ++++++++++++++++++++++++++++++++++++ vdirsyncer/cli/utils.py | 86 ++++---------------------------------- 4 files changed, 95 insertions(+), 79 deletions(-) create mode 100644 vdirsyncer/cli/tasks.py diff --git a/tests/test_cli.py b/tests/test_cli.py index b1d2500..d4970db 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -90,7 +90,7 @@ def test_storage_instance_from_config(monkeypatch): import vdirsyncer.storage monkeypatch.setitem(vdirsyncer.storage.storage_names, 'lol', lol) config = {'type': 'lol', 'foo': 'bar', 'baz': 1} - assert cli.utils._storage_instance_from_config(config) == 'OK' + assert cli.utils.storage_instance_from_config(config) == 'OK' def test_parse_pairs_args(): diff --git a/vdirsyncer/cli/__init__.py b/vdirsyncer/cli/__init__.py index 418b4db..3769ca8 100644 --- a/vdirsyncer/cli/__init__.py +++ b/vdirsyncer/cli/__init__.py @@ -11,8 +11,9 @@ import functools import os import sys +from .tasks import sync_pair from .utils import CliError, WorkerQueue, cli_logger, collections_for_pair, \ - handle_cli_error, load_config, parse_pairs_args, sync_pair + handle_cli_error, load_config, parse_pairs_args from .. import __version__, log from ..doubleclick import click from ..utils import expand_path diff --git a/vdirsyncer/cli/tasks.py b/vdirsyncer/cli/tasks.py new file mode 100644 index 0000000..6992d71 --- /dev/null +++ b/vdirsyncer/cli/tasks.py @@ -0,0 +1,83 @@ +# -*- coding: utf-8 -*- +''' + vdirsyncer.cli.tasks + ~~~~~~~~~~~~~~~~~~~~ + + :copyright: (c) 2014 Markus Unterwaditzer & contributors + :license: MIT, see LICENSE for more details. +''' + +import functools + +from .utils import CliError, JobFailed, cli_logger, collections_for_pair, \ + get_status_name, handle_cli_error, load_status, save_status, \ + storage_instance_from_config + +from ..sync import sync + + +def sync_pair(wq, pair_name, collections_to_sync, general, all_pairs, + all_storages, force_delete): + key = ('prepare', pair_name) + if key in wq.handled_jobs: + cli_logger.warning('Already prepared {}, skipping'.format(pair_name)) + return + wq.handled_jobs.add(key) + + a_name, b_name, pair_options = all_pairs[pair_name] + + all_collections = dict(collections_for_pair( + general['status_path'], a_name, b_name, pair_name, + all_storages[a_name], all_storages[b_name], pair_options + )) + + # spawn one worker less because we can reuse the current one + new_workers = -1 + for collection in (collections_to_sync or all_collections): + try: + config_a, config_b = all_collections[collection] + except KeyError: + raise CliError('Pair {}: Collection {} not found. These are the ' + 'configured collections:\n{}'.format( + pair_name, collection, list(all_collections))) + new_workers += 1 + wq.put(functools.partial( + sync_collection, pair_name=pair_name, collection=collection, + config_a=config_a, config_b=config_b, pair_options=pair_options, + general=general, force_delete=force_delete + )) + + for i in range(new_workers): + wq.spawn_worker() + + +def sync_collection(wq, pair_name, collection, config_a, config_b, + pair_options, general, force_delete): + status_name = get_status_name(pair_name, collection) + + key = ('sync', pair_name, collection) + if key in wq.handled_jobs: + cli_logger.warning('Already syncing {}, skipping'.format(status_name)) + return + wq.handled_jobs.add(key) + + try: + cli_logger.info('Syncing {}'.format(status_name)) + + status = load_status(general['status_path'], pair_name, + collection, data_type='items') or {} + cli_logger.debug('Loaded status for {}'.format(status_name)) + + a = storage_instance_from_config(config_a) + b = storage_instance_from_config(config_b) + sync( + a, b, status, + conflict_resolution=pair_options.get('conflict_resolution', None), + force_delete=force_delete + ) + except: + if not handle_cli_error(status_name): + raise JobFailed() + + save_status(general['status_path'], pair_name, collection, + data_type='items', data=status) diff --git a/vdirsyncer/cli/utils.py b/vdirsyncer/cli/utils.py index fdf3093..59115c2 100644 --- a/vdirsyncer/cli/utils.py +++ b/vdirsyncer/cli/utils.py @@ -8,7 +8,6 @@ ''' import errno -import functools import hashlib import json import os @@ -20,7 +19,7 @@ from itertools import chain from .. import DOCS_HOME, PROJECT_HOME, log from ..doubleclick import click from ..storage import storage_names -from ..sync import StorageEmpty, SyncConflict, sync +from ..sync import StorageEmpty, SyncConflict from ..utils import expand_path, get_class_init_args, parse_options, \ safe_write @@ -109,7 +108,7 @@ def _parse_old_config_list_value(d, key): return value -def _get_status_name(pair, collection): +def get_status_name(pair, collection): if collection is None: return pair return pair + '/' + collection @@ -163,7 +162,7 @@ def collections_for_pair(status_path, name_a, name_b, pair_name, config_a, def _discover_from_config(config): storage_type = config['type'] - cls, config = _storage_class_from_config(config) + cls, config = storage_class_from_config(config) try: discovered = list(cls.discover(**config)) @@ -182,7 +181,7 @@ def _get_coll(pair_name, storage_name, collection, discovered, config): return discovered[collection] except KeyError: storage_type = config['type'] - cls, config = _storage_class_from_config(config) + cls, config = storage_class_from_config(config) try: args = cls.join_collection(collection=collection, **config) args['type'] = storage_type @@ -293,7 +292,7 @@ def load_config(f): def load_status(base_path, pair, collection=None, data_type=None): assert data_type is not None - status_name = _get_status_name(pair, collection) + status_name = get_status_name(pair, collection) path = expand_path(os.path.join(base_path, status_name)) if os.path.isfile(path) and data_type == 'items': new_path = path + '.items' @@ -323,7 +322,7 @@ def load_status(base_path, pair, collection=None, data_type=None): def save_status(base_path, pair, collection=None, data_type=None, data=None): assert data_type is not None assert data is not None - status_name = _get_status_name(pair, collection) + status_name = get_status_name(pair, collection) path = expand_path(os.path.join(base_path, status_name)) + '.' + data_type base_path = os.path.dirname(path) @@ -343,7 +342,7 @@ def save_status(base_path, pair, collection=None, data_type=None, data=None): json.dump(data, f) -def _storage_class_from_config(config): +def storage_class_from_config(config): config = dict(config) storage_name = config.pop('type') cls = storage_names.get(storage_name, None) @@ -352,14 +351,14 @@ def _storage_class_from_config(config): return cls, config -def _storage_instance_from_config(config): +def storage_instance_from_config(config): ''' :param config: A configuration dictionary to pass as kwargs to the class corresponding to config['type'] :param description: A name for the storage for debugging purposes ''' - cls, config = _storage_class_from_config(config) + cls, config = storage_class_from_config(config) try: return cls(**config) @@ -416,73 +415,6 @@ def parse_pairs_args(pairs_args, all_pairs): return rv.items() -def sync_pair(wq, pair_name, collections_to_sync, general, all_pairs, - all_storages, force_delete): - key = ('prepare', pair_name) - if key in wq.handled_jobs: - cli_logger.warning('Already prepared {}, skipping'.format(pair_name)) - return - wq.handled_jobs.add(key) - - a_name, b_name, pair_options = all_pairs[pair_name] - - all_collections = dict(collections_for_pair( - general['status_path'], a_name, b_name, pair_name, - all_storages[a_name], all_storages[b_name], pair_options - )) - - # spawn one worker less because we can reuse the current one - new_workers = -1 - for collection in (collections_to_sync or all_collections): - try: - config_a, config_b = all_collections[collection] - except KeyError: - raise CliError('Pair {}: Collection {} not found. These are the ' - 'configured collections:\n{}'.format( - pair_name, collection, list(all_collections))) - new_workers += 1 - wq.put(functools.partial( - sync_collection, pair_name=pair_name, collection=collection, - config_a=config_a, config_b=config_b, pair_options=pair_options, - general=general, force_delete=force_delete - )) - - for i in range(new_workers): - wq.spawn_worker() - - -def sync_collection(wq, pair_name, collection, config_a, config_b, - pair_options, general, force_delete): - status_name = _get_status_name(pair_name, collection) - - key = ('sync', pair_name, collection) - if key in wq.handled_jobs: - cli_logger.warning('Already syncing {}, skipping'.format(status_name)) - return - wq.handled_jobs.add(key) - - try: - cli_logger.info('Syncing {}'.format(status_name)) - - status = load_status(general['status_path'], pair_name, - collection, data_type='items') or {} - cli_logger.debug('Loaded status for {}'.format(status_name)) - - a = _storage_instance_from_config(config_a) - b = _storage_instance_from_config(config_b) - sync( - a, b, status, - conflict_resolution=pair_options.get('conflict_resolution', None), - force_delete=force_delete - ) - except: - if not handle_cli_error(status_name): - raise JobFailed() - - save_status(general['status_path'], pair_name, collection, - data_type='items', data=status) - - class WorkerQueue(object): def __init__(self, max_workers): self._queue = Queue()