From 29d80b7be07e01a2cc10ad33832bee6695c7555f Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Wed, 15 Oct 2014 17:24:58 +0200 Subject: [PATCH] Stop using multiprocessing.dummy.Pool - Custom job queue with workers based on threads. - Collection discovery is now done in a separate thread. Due to the gained flexibility, we could do the sync actions in separate threads too? - The processes parameter has been removed, the related new option is only available on the CLI. --- docs/config.rst | 11 --- tests/test_cli.py | 106 +-------------------- vdirsyncer/cli.py | 233 +++++++++++++++++++++++++++------------------- 3 files changed, 141 insertions(+), 209 deletions(-) diff --git a/docs/config.rst b/docs/config.rst index d4ab114..eb1d3e8 100644 --- a/docs/config.rst +++ b/docs/config.rst @@ -20,17 +20,6 @@ General Section next sync. The data is needed to determine whether a new item means it has been added on one side or deleted on the other. -- ``processes``: Optional, defines the maximal amount of threads to use for - syncing. By default there is no limit, which means vdirsyncer will try to - open a connection for each collection to be synced. The value ``0`` is - ignored. Setting this to ``1`` will only synchronize one collection at a - time. - - While this often greatly increases performance, you might have valid reasons - to set this to a smaller number. For example, your DAV server running on a - Raspberry Pi is so slow that multiple connections don't help much, since the - CPU and not the network is the bottleneck. - .. note:: Due to restrictions in Python's threading module, setting ``processes`` diff --git a/tests/test_cli.py b/tests/test_cli.py index ca872b2..ca1a753 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -70,48 +70,6 @@ def test_storage_instance_from_config(monkeypatch): assert cli.storage_instance_from_config(config) == 'OK' -def test_expand_collection(monkeypatch): - x = lambda *a: list(cli.expand_collection(*a)) - assert x(None, 'foo', None, None) == ['foo'] - assert x(None, 'from lol', None, None) == ['from lol'] - - all_pairs = {'mypair': ('my_a', 'my_b', None, {'lol': True})} - all_storages = {'my_a': {'type': 'mytype_a', 'is_a': True}, - 'my_b': {'type': 'mytype_b', 'is_b': True}} - - class TypeA(object): - @classmethod - def discover(cls, **config): - assert config == { - 'is_a': True, - 'lol': True - } - for i in range(1, 4): - s = cls() - s.collection = 'a{}'.format(i) - yield s - - class TypeB(object): - @classmethod - def discover(cls, **config): - assert config == { - 'is_b': True, - 'lol': True - } - for i in range(1, 4): - s = cls() - s.collection = 'b{}'.format(i) - yield s - - import vdirsyncer.storage - monkeypatch.setitem(vdirsyncer.storage.storage_names, 'mytype_a', TypeA) - monkeypatch.setitem(vdirsyncer.storage.storage_names, 'mytype_b', TypeB) - - assert x('mypair', 'mycoll', all_pairs, all_storages) == ['mycoll'] - assert x('mypair', 'from a', all_pairs, all_storages) == ['a1', 'a2', 'a3'] - assert x('mypair', 'from b', all_pairs, all_storages) == ['b1', 'b2', 'b3'] - - def test_parse_pairs_args(): pairs = { 'foo': ('bar', 'baz', {'conflict_resolution': 'a wins'}, @@ -162,6 +120,11 @@ def test_simple_run(tmpdir): 'Copying (uploading) item haha to my_b\n') assert tmpdir.join('path_b/haha.txt').read() == 'UID:haha' + result = runner.invoke(cli.app, ['sync', 'my_pair', 'my_pair']) + assert set(result.output.splitlines()) == set([ + 'Syncing my_pair', + 'warning: Already prepared my_pair, skipping' + ]) def test_missing_general_section(tmpdir): config_file = tmpdir.join('config') @@ -228,62 +191,3 @@ def test_verbosity(tmpdir): ) assert result.exception assert 'invalid verbosity value' in result.output.lower() - - -def test_fail_fast(tmpdir): - runner = CliRunner() - config_file = tmpdir.join('config') - config_file.write(dedent(''' - [general] - status_path = {status} - processes = 1 - - [storage a1] - type = filesystem - fileext = .txt - path = {a1} - - [storage a2] - type = filesystem - fileext = .txt - path = {a2} - create = False - - [storage b1] - type = filesystem - fileext = .txt - path = {b1} - - [storage b2] - type = filesystem - fileext = .txt - path = {b2} - - [pair a] - a = a1 - b = a2 - - [pair b] - a = b1 - b = b2 - ''').format( - status=str(tmpdir.mkdir('status')), - a1=str(tmpdir.mkdir('a1')), - a2=str(tmpdir.join('a2')), - b1=str(tmpdir.mkdir('b1')), - b2=str(tmpdir.mkdir('b2')) - )) - - result = runner.invoke(cli.app, ['sync', 'a', 'b'], - env={'VDIRSYNCER_CONFIG': str(config_file)}) - lines = result.output.splitlines() - assert 'Syncing a' in lines - assert 'Syncing b' in lines - assert result.exception - - result = runner.invoke(cli.app, ['sync', '--fail-fast', 'a', 'b'], - env={'VDIRSYNCER_CONFIG': str(config_file)}) - lines = result.output.splitlines() - assert 'Syncing a' in lines - assert 'Syncing b' not in lines - assert result.exception diff --git a/vdirsyncer/cli.py b/vdirsyncer/cli.py index 87f53e4..4f24e01 100644 --- a/vdirsyncer/cli.py +++ b/vdirsyncer/cli.py @@ -11,6 +11,7 @@ import functools import json import os import sys +import threading from . import __version__, log from .doubleclick import click @@ -24,13 +25,18 @@ try: except ImportError: from configparser import RawConfigParser +try: + from Queue import Queue +except ImportError: + from queue import Queue + cli_logger = log.get(__name__) PROJECT_HOME = 'https://github.com/untitaker/vdirsyncer' DOCS_HOME = 'https://vdirsyncer.readthedocs.org/en/latest' -GENERAL_ALL = set(['processes', 'status_path', 'passwordeval']) +GENERAL_ALL = set(['status_path', 'passwordeval']) GENERAL_REQUIRED = set(['status_path']) @@ -38,6 +44,10 @@ class CliError(RuntimeError): pass +class JobFailed(RuntimeError): + pass + + def get_status_name(pair, collection): if collection is None: return pair @@ -184,38 +194,6 @@ def handle_storage_init_error(cls, config): raise CliError('Failed to initialize {}.'.format(config['instance_name'])) -def expand_collection(pair, collection, all_pairs, all_storages): - ''' - Replace the placeholder collections "from a" and "from b" with actual - ones. - - :param collection: The collection. - :param pair: The pair the collection belongs to. - :param all_pairs: dictionary: pair_name => (name of storage a, - name of storage b, - pair config, - storage defaults) - :returns: One or more collections that replace the given one. The original - collection is returned unmodified if the given collection is neither - "from a" nor "from b". - ''' - if collection in ('from a', 'from b'): - a_name, b_name, _, storage_defaults = all_pairs[pair] - config = dict(storage_defaults) - if collection == 'from a': - config.update(all_storages[a_name]) - else: - config.update(all_storages[b_name]) - cls, config = storage_class_from_config(config) - try: - for s in cls.discover(**config): - yield s.collection - except Exception: - handle_storage_init_error(cls, config) - else: - yield collection - - def parse_pairs_args(pairs_args, all_pairs): ''' Expand the various CLI shortforms ("pair, pair/collection") to an iterable @@ -300,11 +278,13 @@ def _create_app(): @click.option('--force-delete', multiple=True, help=('Disable data-loss protection for the given pairs. ' 'Can be passed multiple times')) - @click.option('--fail-fast', is_flag=True, - help='Exit immediately on first error.') + @click.option('--max-workers', + default=0, type=click.IntRange(min=0, max=None), + help=('Use at most this many connections, 0 means ' + 'unlimited.')) @click.pass_context @catch_errors - def sync(ctx, pairs, force_delete, fail_fast): + def sync(ctx, pairs, force_delete, max_workers): ''' Synchronize the given pairs. If no pairs are given, all will be synchronized. @@ -316,58 +296,51 @@ def _create_app(): from the pair "bob". ''' general, all_pairs, all_storages = ctx.obj['config'] - - actions = [] - handled_collections = set() force_delete = set(force_delete) - for pair_name, _collection in parse_pairs_args(pairs, all_pairs): - for collection in expand_collection(pair_name, _collection, - all_pairs, all_storages): - if (pair_name, collection) in handled_collections: - continue - handled_collections.add((pair_name, collection)) - a_name, b_name, pair_options, storage_defaults = \ - all_pairs[pair_name] + queue = Queue() + workers = [] + cli_logger.debug('Using {} maximal workers.'.format(max_workers)) + exceptions = [] + handled_collections = set() - config_a = dict(storage_defaults) - config_a['collection'] = collection - config_a.update(all_storages[a_name]) + def process_job(): + func = queue.get() + try: + func(queue=queue, spawn_worker=spawn_worker, + handled_collections=handled_collections) + except JobFailed as e: + exceptions.append(e) + except CliError as e: + cli_logger.critical(str(e)) + exceptions.append(e) + finally: + queue.task_done() - config_b = dict(storage_defaults) - config_b['collection'] = collection - config_b.update(all_storages[b_name]) + def spawn_worker(): + if max_workers and len(workers) >= max_workers: + return - actions.append({ - 'config_a': config_a, - 'config_b': config_b, - 'pair_name': pair_name, - 'collection': collection, - 'pair_options': pair_options, - 'general': general, - 'force_delete': force_delete - }) + def worker(): + while True: + process_job() - processes = general.get('processes', 0) or len(actions) - cli_logger.debug('Using {} processes.'.format(processes)) + t = threading.Thread(target=worker) + t.daemon = True + t.start() + workers.append(t) - if processes == 1: - cli_logger.debug('Not using threads.') - rv = (_sync_collection(x) for x in actions) - else: - cli_logger.debug('Using threads.') - from multiprocessing.dummy import Pool - p = Pool(processes=general.get('processes', 0) or len(actions)) + for pair_name, collection in parse_pairs_args(pairs, all_pairs): + spawn_worker() + queue.put( + functools.partial(prepare_sync, pair_name=pair_name, + collection=collection, general=general, + all_pairs=all_pairs, + all_storages=all_storages, + force_delete=force_delete)) - rv = p.imap_unordered(_sync_collection, actions) - - if not fail_fast: - # exhaust iterator before calling all(...), which would return - # after it encounters a False item. - # In other words, all(rv) fails fast. - rv = list(rv) - - if not all(rv): + queue.join() + if exceptions: sys.exit(1) return app @@ -376,25 +349,91 @@ app = main = _create_app() del _create_app -def _sync_collection(x): - return sync_collection(**x) +def prepare_sync(queue, spawn_worker, handled_collections, pair_name, + collection, general, all_pairs, all_storages, force_delete): + + key = ('prepare', pair_name, collection) + if key in handled_collections: + status_name = get_status_name(pair_name, collection) + cli_logger.warning('Already prepared {}, skipping'.format(status_name)) + return + handled_collections.add(key) + + a_name, b_name, pair_options, storage_defaults = all_pairs[pair_name] + if collection in ('from a', 'from b'): + # from_name: name of the storage which should be used for discovery + # other_name: the other storage's name + if collection == 'from a': + from_name, other_name = a_name, b_name + else: + from_name, other_name = b_name, a_name + + cli_logger.info('Syncing {}: Discovering collections from {}' + .format(pair_name, from_name)) + + config = dict(storage_defaults) + config.update(all_storages[from_name]) + cls, config = storage_class_from_config(config) + try: + storages = list(cls.discover(**config)) + except Exception: + handle_storage_init_error(cls, config) + + jobs = [] + for storage in storages: + config = dict(storage_defaults) + config.update(all_storages[other_name]) + config['collection'] = actual_collection = storage.collection + other_storage = storage_instance_from_config(config) + + if collection == 'from a': + a, b = storage, other_storage + else: + b, a = storage, other_storage + + jobs.append((actual_collection, a, b)) + else: + config = dict(storage_defaults) + config.update(all_storages[a_name]) + config['collection'] = collection + a = storage_instance_from_config(config) + + config = dict(storage_defaults) + config.update(all_storages[b_name]) + config['collection'] = collection + b = storage_instance_from_config(config) + + jobs = [ + (collection, a, b) + ] + + for i in range(len(jobs) - 1): + # spawn one worker less because we can reuse the current one + spawn_worker() + + for collection, a, b in jobs: + queue.put(functools.partial(sync_collection, pair_name=pair_name, + collection=collection, a=a, b=b, + pair_options=pair_options, general=general, + force_delete=force_delete)) -def sync_collection(config_a, config_b, pair_name, collection, pair_options, - general, force_delete): +def sync_collection(queue, spawn_worker, handled_collections, pair_name, + collection, a, b, pair_options, general, force_delete): status_name = get_status_name(pair_name, collection) - collection_description = pair_name if collection is None \ - else '{} from {}'.format(collection, pair_name) + + key = ('sync', pair_name, collection) + if key in handled_collections: + cli_logger.warning('Already syncing {}, skipping'.format(status_name)) + return + handled_collections.add(key) rv = True try: - cli_logger.info('Syncing {}'.format(collection_description)) - - a = storage_instance_from_config(config_a) - b = storage_instance_from_config(config_b) + cli_logger.info('Syncing {}'.format(status_name)) status = load_status(general['status_path'], status_name) - cli_logger.debug('Loaded status for {}'.format(collection_description)) + cli_logger.debug('Loaded status for {}'.format(status_name)) sync( a, b, status, conflict_resolution=pair_options.get('conflict_resolution', None), @@ -403,11 +442,10 @@ def sync_collection(config_a, config_b, pair_name, collection, pair_options, except StorageEmpty as e: rv = False cli_logger.error( - '{collection}: Storage "{side}" ({storage}) was completely ' + '{status_name}: Storage "{side}" ({storage}) was completely ' 'emptied. Use "--force-delete {status_name}" to synchronize that ' 'emptyness to the other side, or delete the status by yourself to ' 'restore the items from the non-empty side.'.format( - collection=collection_description, side='a' if e.empty_storage is a else 'b', storage=e.empty_storage, status_name=status_name @@ -416,22 +454,23 @@ def sync_collection(config_a, config_b, pair_name, collection, pair_options, except SyncConflict as e: rv = False cli_logger.error( - '{collection}: One item changed on both sides. Resolve this ' + '{status_name}: One item changed on both sides. Resolve this ' 'conflict manually, or by setting the `conflict_resolution` ' 'parameter in your config file.\n' 'See also {docs}/api.html#pair-section\n' 'Item ID: {e.ident}\n' 'Item href on side A: {e.href_a}\n' 'Item href on side B: {e.href_b}\n' - .format(collection=collection_description, e=e, docs=DOCS_HOME) + .format(status_name=status_name, e=e, docs=DOCS_HOME) ) except (click.Abort, KeyboardInterrupt): rv = False except Exception as e: rv = False cli_logger.exception('Unhandled exception occured while syncing {}.' - .format(collection_description)) + .format(status_name)) if rv: save_status(general['status_path'], status_name, status) - return rv + else: + raise JobFailed()