Stop using multiprocessing.dummy.Pool

- Custom job queue with workers based on threads.
- Collection discovery is now done in a separate thread. Due to the
  gained flexibility, we could do the sync actions in separate threads
  too?
- The processes parameter has been removed, the related new option is
  only available on the CLI.
This commit is contained in:
Markus Unterwaditzer 2014-10-15 17:24:58 +02:00
parent ac942bff67
commit 29d80b7be0
3 changed files with 141 additions and 209 deletions

View file

@ -20,17 +20,6 @@ General Section
next sync. The data is needed to determine whether a new item means it has
been added on one side or deleted on the other.
- ``processes``: Optional, defines the maximal amount of threads to use for
syncing. By default there is no limit, which means vdirsyncer will try to
open a connection for each collection to be synced. The value ``0`` is
ignored. Setting this to ``1`` will only synchronize one collection at a
time.
While this often greatly increases performance, you might have valid reasons
to set this to a smaller number. For example, your DAV server running on a
Raspberry Pi is so slow that multiple connections don't help much, since the
CPU and not the network is the bottleneck.
.. note::
Due to restrictions in Python's threading module, setting ``processes``

View file

@ -70,48 +70,6 @@ def test_storage_instance_from_config(monkeypatch):
assert cli.storage_instance_from_config(config) == 'OK'
def test_expand_collection(monkeypatch):
x = lambda *a: list(cli.expand_collection(*a))
assert x(None, 'foo', None, None) == ['foo']
assert x(None, 'from lol', None, None) == ['from lol']
all_pairs = {'mypair': ('my_a', 'my_b', None, {'lol': True})}
all_storages = {'my_a': {'type': 'mytype_a', 'is_a': True},
'my_b': {'type': 'mytype_b', 'is_b': True}}
class TypeA(object):
@classmethod
def discover(cls, **config):
assert config == {
'is_a': True,
'lol': True
}
for i in range(1, 4):
s = cls()
s.collection = 'a{}'.format(i)
yield s
class TypeB(object):
@classmethod
def discover(cls, **config):
assert config == {
'is_b': True,
'lol': True
}
for i in range(1, 4):
s = cls()
s.collection = 'b{}'.format(i)
yield s
import vdirsyncer.storage
monkeypatch.setitem(vdirsyncer.storage.storage_names, 'mytype_a', TypeA)
monkeypatch.setitem(vdirsyncer.storage.storage_names, 'mytype_b', TypeB)
assert x('mypair', 'mycoll', all_pairs, all_storages) == ['mycoll']
assert x('mypair', 'from a', all_pairs, all_storages) == ['a1', 'a2', 'a3']
assert x('mypair', 'from b', all_pairs, all_storages) == ['b1', 'b2', 'b3']
def test_parse_pairs_args():
pairs = {
'foo': ('bar', 'baz', {'conflict_resolution': 'a wins'},
@ -162,6 +120,11 @@ def test_simple_run(tmpdir):
'Copying (uploading) item haha to my_b\n')
assert tmpdir.join('path_b/haha.txt').read() == 'UID:haha'
result = runner.invoke(cli.app, ['sync', 'my_pair', 'my_pair'])
assert set(result.output.splitlines()) == set([
'Syncing my_pair',
'warning: Already prepared my_pair, skipping'
])
def test_missing_general_section(tmpdir):
config_file = tmpdir.join('config')
@ -228,62 +191,3 @@ def test_verbosity(tmpdir):
)
assert result.exception
assert 'invalid verbosity value' in result.output.lower()
def test_fail_fast(tmpdir):
runner = CliRunner()
config_file = tmpdir.join('config')
config_file.write(dedent('''
[general]
status_path = {status}
processes = 1
[storage a1]
type = filesystem
fileext = .txt
path = {a1}
[storage a2]
type = filesystem
fileext = .txt
path = {a2}
create = False
[storage b1]
type = filesystem
fileext = .txt
path = {b1}
[storage b2]
type = filesystem
fileext = .txt
path = {b2}
[pair a]
a = a1
b = a2
[pair b]
a = b1
b = b2
''').format(
status=str(tmpdir.mkdir('status')),
a1=str(tmpdir.mkdir('a1')),
a2=str(tmpdir.join('a2')),
b1=str(tmpdir.mkdir('b1')),
b2=str(tmpdir.mkdir('b2'))
))
result = runner.invoke(cli.app, ['sync', 'a', 'b'],
env={'VDIRSYNCER_CONFIG': str(config_file)})
lines = result.output.splitlines()
assert 'Syncing a' in lines
assert 'Syncing b' in lines
assert result.exception
result = runner.invoke(cli.app, ['sync', '--fail-fast', 'a', 'b'],
env={'VDIRSYNCER_CONFIG': str(config_file)})
lines = result.output.splitlines()
assert 'Syncing a' in lines
assert 'Syncing b' not in lines
assert result.exception

View file

@ -11,6 +11,7 @@ import functools
import json
import os
import sys
import threading
from . import __version__, log
from .doubleclick import click
@ -24,13 +25,18 @@ try:
except ImportError:
from configparser import RawConfigParser
try:
from Queue import Queue
except ImportError:
from queue import Queue
cli_logger = log.get(__name__)
PROJECT_HOME = 'https://github.com/untitaker/vdirsyncer'
DOCS_HOME = 'https://vdirsyncer.readthedocs.org/en/latest'
GENERAL_ALL = set(['processes', 'status_path', 'passwordeval'])
GENERAL_ALL = set(['status_path', 'passwordeval'])
GENERAL_REQUIRED = set(['status_path'])
@ -38,6 +44,10 @@ class CliError(RuntimeError):
pass
class JobFailed(RuntimeError):
pass
def get_status_name(pair, collection):
if collection is None:
return pair
@ -184,38 +194,6 @@ def handle_storage_init_error(cls, config):
raise CliError('Failed to initialize {}.'.format(config['instance_name']))
def expand_collection(pair, collection, all_pairs, all_storages):
'''
Replace the placeholder collections "from a" and "from b" with actual
ones.
:param collection: The collection.
:param pair: The pair the collection belongs to.
:param all_pairs: dictionary: pair_name => (name of storage a,
name of storage b,
pair config,
storage defaults)
:returns: One or more collections that replace the given one. The original
collection is returned unmodified if the given collection is neither
"from a" nor "from b".
'''
if collection in ('from a', 'from b'):
a_name, b_name, _, storage_defaults = all_pairs[pair]
config = dict(storage_defaults)
if collection == 'from a':
config.update(all_storages[a_name])
else:
config.update(all_storages[b_name])
cls, config = storage_class_from_config(config)
try:
for s in cls.discover(**config):
yield s.collection
except Exception:
handle_storage_init_error(cls, config)
else:
yield collection
def parse_pairs_args(pairs_args, all_pairs):
'''
Expand the various CLI shortforms ("pair, pair/collection") to an iterable
@ -300,11 +278,13 @@ def _create_app():
@click.option('--force-delete', multiple=True,
help=('Disable data-loss protection for the given pairs. '
'Can be passed multiple times'))
@click.option('--fail-fast', is_flag=True,
help='Exit immediately on first error.')
@click.option('--max-workers',
default=0, type=click.IntRange(min=0, max=None),
help=('Use at most this many connections, 0 means '
'unlimited.'))
@click.pass_context
@catch_errors
def sync(ctx, pairs, force_delete, fail_fast):
def sync(ctx, pairs, force_delete, max_workers):
'''
Synchronize the given pairs. If no pairs are given, all will be
synchronized.
@ -316,58 +296,51 @@ def _create_app():
from the pair "bob".
'''
general, all_pairs, all_storages = ctx.obj['config']
actions = []
handled_collections = set()
force_delete = set(force_delete)
for pair_name, _collection in parse_pairs_args(pairs, all_pairs):
for collection in expand_collection(pair_name, _collection,
all_pairs, all_storages):
if (pair_name, collection) in handled_collections:
continue
handled_collections.add((pair_name, collection))
a_name, b_name, pair_options, storage_defaults = \
all_pairs[pair_name]
queue = Queue()
workers = []
cli_logger.debug('Using {} maximal workers.'.format(max_workers))
exceptions = []
handled_collections = set()
config_a = dict(storage_defaults)
config_a['collection'] = collection
config_a.update(all_storages[a_name])
def process_job():
func = queue.get()
try:
func(queue=queue, spawn_worker=spawn_worker,
handled_collections=handled_collections)
except JobFailed as e:
exceptions.append(e)
except CliError as e:
cli_logger.critical(str(e))
exceptions.append(e)
finally:
queue.task_done()
config_b = dict(storage_defaults)
config_b['collection'] = collection
config_b.update(all_storages[b_name])
def spawn_worker():
if max_workers and len(workers) >= max_workers:
return
actions.append({
'config_a': config_a,
'config_b': config_b,
'pair_name': pair_name,
'collection': collection,
'pair_options': pair_options,
'general': general,
'force_delete': force_delete
})
def worker():
while True:
process_job()
processes = general.get('processes', 0) or len(actions)
cli_logger.debug('Using {} processes.'.format(processes))
t = threading.Thread(target=worker)
t.daemon = True
t.start()
workers.append(t)
if processes == 1:
cli_logger.debug('Not using threads.')
rv = (_sync_collection(x) for x in actions)
else:
cli_logger.debug('Using threads.')
from multiprocessing.dummy import Pool
p = Pool(processes=general.get('processes', 0) or len(actions))
for pair_name, collection in parse_pairs_args(pairs, all_pairs):
spawn_worker()
queue.put(
functools.partial(prepare_sync, pair_name=pair_name,
collection=collection, general=general,
all_pairs=all_pairs,
all_storages=all_storages,
force_delete=force_delete))
rv = p.imap_unordered(_sync_collection, actions)
if not fail_fast:
# exhaust iterator before calling all(...), which would return
# after it encounters a False item.
# In other words, all(rv) fails fast.
rv = list(rv)
if not all(rv):
queue.join()
if exceptions:
sys.exit(1)
return app
@ -376,25 +349,91 @@ app = main = _create_app()
del _create_app
def _sync_collection(x):
return sync_collection(**x)
def prepare_sync(queue, spawn_worker, handled_collections, pair_name,
collection, general, all_pairs, all_storages, force_delete):
key = ('prepare', pair_name, collection)
if key in handled_collections:
status_name = get_status_name(pair_name, collection)
cli_logger.warning('Already prepared {}, skipping'.format(status_name))
return
handled_collections.add(key)
a_name, b_name, pair_options, storage_defaults = all_pairs[pair_name]
if collection in ('from a', 'from b'):
# from_name: name of the storage which should be used for discovery
# other_name: the other storage's name
if collection == 'from a':
from_name, other_name = a_name, b_name
else:
from_name, other_name = b_name, a_name
cli_logger.info('Syncing {}: Discovering collections from {}'
.format(pair_name, from_name))
config = dict(storage_defaults)
config.update(all_storages[from_name])
cls, config = storage_class_from_config(config)
try:
storages = list(cls.discover(**config))
except Exception:
handle_storage_init_error(cls, config)
jobs = []
for storage in storages:
config = dict(storage_defaults)
config.update(all_storages[other_name])
config['collection'] = actual_collection = storage.collection
other_storage = storage_instance_from_config(config)
if collection == 'from a':
a, b = storage, other_storage
else:
b, a = storage, other_storage
jobs.append((actual_collection, a, b))
else:
config = dict(storage_defaults)
config.update(all_storages[a_name])
config['collection'] = collection
a = storage_instance_from_config(config)
config = dict(storage_defaults)
config.update(all_storages[b_name])
config['collection'] = collection
b = storage_instance_from_config(config)
jobs = [
(collection, a, b)
]
for i in range(len(jobs) - 1):
# spawn one worker less because we can reuse the current one
spawn_worker()
for collection, a, b in jobs:
queue.put(functools.partial(sync_collection, pair_name=pair_name,
collection=collection, a=a, b=b,
pair_options=pair_options, general=general,
force_delete=force_delete))
def sync_collection(config_a, config_b, pair_name, collection, pair_options,
general, force_delete):
def sync_collection(queue, spawn_worker, handled_collections, pair_name,
collection, a, b, pair_options, general, force_delete):
status_name = get_status_name(pair_name, collection)
collection_description = pair_name if collection is None \
else '{} from {}'.format(collection, pair_name)
key = ('sync', pair_name, collection)
if key in handled_collections:
cli_logger.warning('Already syncing {}, skipping'.format(status_name))
return
handled_collections.add(key)
rv = True
try:
cli_logger.info('Syncing {}'.format(collection_description))
a = storage_instance_from_config(config_a)
b = storage_instance_from_config(config_b)
cli_logger.info('Syncing {}'.format(status_name))
status = load_status(general['status_path'], status_name)
cli_logger.debug('Loaded status for {}'.format(collection_description))
cli_logger.debug('Loaded status for {}'.format(status_name))
sync(
a, b, status,
conflict_resolution=pair_options.get('conflict_resolution', None),
@ -403,11 +442,10 @@ def sync_collection(config_a, config_b, pair_name, collection, pair_options,
except StorageEmpty as e:
rv = False
cli_logger.error(
'{collection}: Storage "{side}" ({storage}) was completely '
'{status_name}: Storage "{side}" ({storage}) was completely '
'emptied. Use "--force-delete {status_name}" to synchronize that '
'emptyness to the other side, or delete the status by yourself to '
'restore the items from the non-empty side.'.format(
collection=collection_description,
side='a' if e.empty_storage is a else 'b',
storage=e.empty_storage,
status_name=status_name
@ -416,22 +454,23 @@ def sync_collection(config_a, config_b, pair_name, collection, pair_options,
except SyncConflict as e:
rv = False
cli_logger.error(
'{collection}: One item changed on both sides. Resolve this '
'{status_name}: One item changed on both sides. Resolve this '
'conflict manually, or by setting the `conflict_resolution` '
'parameter in your config file.\n'
'See also {docs}/api.html#pair-section\n'
'Item ID: {e.ident}\n'
'Item href on side A: {e.href_a}\n'
'Item href on side B: {e.href_b}\n'
.format(collection=collection_description, e=e, docs=DOCS_HOME)
.format(status_name=status_name, e=e, docs=DOCS_HOME)
)
except (click.Abort, KeyboardInterrupt):
rv = False
except Exception as e:
rv = False
cli_logger.exception('Unhandled exception occured while syncing {}.'
.format(collection_description))
.format(status_name))
if rv:
save_status(general['status_path'], status_name, status)
return rv
else:
raise JobFailed()