Merge pull request #126 from untitaker/issue124

Stop using multiprocessing.dummy.Pool
This commit is contained in:
Markus Unterwaditzer 2014-10-18 17:22:04 +02:00
commit 6ac71e0e7c
3 changed files with 208 additions and 229 deletions

View file

@ -20,17 +20,6 @@ General Section
next sync. The data is needed to determine whether a new item means it has
been added on one side or deleted on the other.
- ``processes``: Optional, defines the maximal amount of threads to use for
syncing. By default there is no limit, which means vdirsyncer will try to
open a connection for each collection to be synced. The value ``0`` is
ignored. Setting this to ``1`` will only synchronize one collection at a
time.
While this often greatly increases performance, you might have valid reasons
to set this to a smaller number. For example, your DAV server running on a
Raspberry Pi is so slow that multiple connections don't help much, since the
CPU and not the network is the bottleneck.
.. note::
Due to restrictions in Python's threading module, setting ``processes``

View file

@ -70,48 +70,6 @@ def test_storage_instance_from_config(monkeypatch):
assert cli.storage_instance_from_config(config) == 'OK'
def test_expand_collection(monkeypatch):
x = lambda *a: list(cli.expand_collection(*a))
assert x(None, 'foo', None, None) == ['foo']
assert x(None, 'from lol', None, None) == ['from lol']
all_pairs = {'mypair': ('my_a', 'my_b', None, {'lol': True})}
all_storages = {'my_a': {'type': 'mytype_a', 'is_a': True},
'my_b': {'type': 'mytype_b', 'is_b': True}}
class TypeA(object):
@classmethod
def discover(cls, **config):
assert config == {
'is_a': True,
'lol': True
}
for i in range(1, 4):
s = cls()
s.collection = 'a{}'.format(i)
yield s
class TypeB(object):
@classmethod
def discover(cls, **config):
assert config == {
'is_b': True,
'lol': True
}
for i in range(1, 4):
s = cls()
s.collection = 'b{}'.format(i)
yield s
import vdirsyncer.storage
monkeypatch.setitem(vdirsyncer.storage.storage_names, 'mytype_a', TypeA)
monkeypatch.setitem(vdirsyncer.storage.storage_names, 'mytype_b', TypeB)
assert x('mypair', 'mycoll', all_pairs, all_storages) == ['mycoll']
assert x('mypair', 'from a', all_pairs, all_storages) == ['a1', 'a2', 'a3']
assert x('mypair', 'from b', all_pairs, all_storages) == ['b1', 'b2', 'b3']
def test_parse_pairs_args():
pairs = {
'foo': ('bar', 'baz', {'conflict_resolution': 'a wins'},
@ -162,6 +120,54 @@ def test_simple_run(tmpdir):
'Copying (uploading) item haha to my_b\n')
assert tmpdir.join('path_b/haha.txt').read() == 'UID:haha'
result = runner.invoke(cli.app, ['sync', 'my_pair', 'my_pair'])
assert set(result.output.splitlines()) == set([
'Syncing my_pair',
'warning: Already prepared my_pair, skipping'
])
assert not result.exception
def test_empty_storage(tmpdir):
config_file = tmpdir.join('config')
config_file.write(dedent('''
[general]
status_path = {0}/status/
[pair my_pair]
a = my_a
b = my_b
[storage my_a]
type = filesystem
path = {0}/path_a/
fileext = .txt
[storage my_b]
type = filesystem
path = {0}/path_b/
fileext = .txt
''').format(str(tmpdir)))
runner = CliRunner(env={'VDIRSYNCER_CONFIG': str(config_file)})
result = runner.invoke(cli.app, ['sync'])
assert not result.exception
assert result.output.lower().strip() == 'syncing my_pair'
tmpdir.join('path_a/haha.txt').write('UID:haha')
result = runner.invoke(cli.app, ['sync'])
tmpdir.join('path_b/haha.txt').remove()
result = runner.invoke(cli.app, ['sync'])
assert result.output.splitlines() == [
'Syncing my_pair',
'error: {status_name}: Storage "{name}" was completely emptied. Use '
'"--force-delete {status_name}" to synchronize that emptyness to '
'the other side, or delete the status by yourself to restore the '
'items from the non-empty side.'.format(status_name='my_pair',
name='my_b')
]
assert result.exception
def test_missing_general_section(tmpdir):
config_file = tmpdir.join('config')
@ -228,62 +234,3 @@ def test_verbosity(tmpdir):
)
assert result.exception
assert 'invalid verbosity value' in result.output.lower()
def test_fail_fast(tmpdir):
runner = CliRunner()
config_file = tmpdir.join('config')
config_file.write(dedent('''
[general]
status_path = {status}
processes = 1
[storage a1]
type = filesystem
fileext = .txt
path = {a1}
[storage a2]
type = filesystem
fileext = .txt
path = {a2}
create = False
[storage b1]
type = filesystem
fileext = .txt
path = {b1}
[storage b2]
type = filesystem
fileext = .txt
path = {b2}
[pair a]
a = a1
b = a2
[pair b]
a = b1
b = b2
''').format(
status=str(tmpdir.mkdir('status')),
a1=str(tmpdir.mkdir('a1')),
a2=str(tmpdir.join('a2')),
b1=str(tmpdir.mkdir('b1')),
b2=str(tmpdir.mkdir('b2'))
))
result = runner.invoke(cli.app, ['sync', 'a', 'b'],
env={'VDIRSYNCER_CONFIG': str(config_file)})
lines = result.output.splitlines()
assert 'Syncing a' in lines
assert 'Syncing b' in lines
assert result.exception
result = runner.invoke(cli.app, ['sync', '--fail-fast', 'a', 'b'],
env={'VDIRSYNCER_CONFIG': str(config_file)})
lines = result.output.splitlines()
assert 'Syncing a' in lines
assert 'Syncing b' not in lines
assert result.exception

View file

@ -7,10 +7,12 @@
:license: MIT, see LICENSE for more details.
'''
import contextlib
import functools
import json
import os
import sys
import threading
from . import __version__, log
from .doubleclick import click
@ -24,13 +26,18 @@ try:
except ImportError:
from configparser import RawConfigParser
try:
from Queue import Queue
except ImportError:
from queue import Queue
cli_logger = log.get(__name__)
PROJECT_HOME = 'https://github.com/untitaker/vdirsyncer'
DOCS_HOME = 'https://vdirsyncer.readthedocs.org/en/latest'
GENERAL_ALL = set(['processes', 'status_path', 'passwordeval'])
GENERAL_ALL = set(['status_path', 'passwordeval'])
GENERAL_REQUIRED = set(['status_path'])
@ -38,6 +45,10 @@ class CliError(RuntimeError):
pass
class JobFailed(RuntimeError):
pass
def get_status_name(pair, collection):
if collection is None:
return pair
@ -184,38 +195,6 @@ def handle_storage_init_error(cls, config):
raise CliError('Failed to initialize {}.'.format(config['instance_name']))
def expand_collection(pair, collection, all_pairs, all_storages):
'''
Replace the placeholder collections "from a" and "from b" with actual
ones.
:param collection: The collection.
:param pair: The pair the collection belongs to.
:param all_pairs: dictionary: pair_name => (name of storage a,
name of storage b,
pair config,
storage defaults)
:returns: One or more collections that replace the given one. The original
collection is returned unmodified if the given collection is neither
"from a" nor "from b".
'''
if collection in ('from a', 'from b'):
a_name, b_name, _, storage_defaults = all_pairs[pair]
config = dict(storage_defaults)
if collection == 'from a':
config.update(all_storages[a_name])
else:
config.update(all_storages[b_name])
cls, config = storage_class_from_config(config)
try:
for s in cls.discover(**config):
yield s.collection
except Exception:
handle_storage_init_error(cls, config)
else:
yield collection
def parse_pairs_args(pairs_args, all_pairs):
'''
Expand the various CLI shortforms ("pair, pair/collection") to an iterable
@ -300,11 +279,13 @@ def _create_app():
@click.option('--force-delete', multiple=True,
help=('Disable data-loss protection for the given pairs. '
'Can be passed multiple times'))
@click.option('--fail-fast', is_flag=True,
help='Exit immediately on first error.')
@click.option('--max-workers',
default=0, type=click.IntRange(min=0, max=None),
help=('Use at most this many connections, 0 means '
'unlimited.'))
@click.pass_context
@catch_errors
def sync(ctx, pairs, force_delete, fail_fast):
def sync(ctx, pairs, force_delete, max_workers):
'''
Synchronize the given pairs. If no pairs are given, all will be
synchronized.
@ -316,58 +297,51 @@ def _create_app():
from the pair "bob".
'''
general, all_pairs, all_storages = ctx.obj['config']
actions = []
handled_collections = set()
force_delete = set(force_delete)
for pair_name, _collection in parse_pairs_args(pairs, all_pairs):
for collection in expand_collection(pair_name, _collection,
all_pairs, all_storages):
if (pair_name, collection) in handled_collections:
continue
handled_collections.add((pair_name, collection))
a_name, b_name, pair_options, storage_defaults = \
all_pairs[pair_name]
queue = Queue()
workers = []
cli_logger.debug('Using {} maximal workers.'.format(max_workers))
exceptions = []
handled_collections = set()
config_a = dict(storage_defaults)
config_a['collection'] = collection
config_a.update(all_storages[a_name])
def process_job():
func = queue.get()
try:
func(queue=queue, spawn_worker=spawn_worker,
handled_collections=handled_collections)
except JobFailed as e:
exceptions.append(e)
except CliError as e:
cli_logger.critical(str(e))
exceptions.append(e)
finally:
queue.task_done()
config_b = dict(storage_defaults)
config_b['collection'] = collection
config_b.update(all_storages[b_name])
def spawn_worker():
if max_workers and len(workers) >= max_workers:
return
actions.append({
'config_a': config_a,
'config_b': config_b,
'pair_name': pair_name,
'collection': collection,
'pair_options': pair_options,
'general': general,
'force_delete': force_delete
})
def worker():
while True:
process_job()
processes = general.get('processes', 0) or len(actions)
cli_logger.debug('Using {} processes.'.format(processes))
t = threading.Thread(target=worker)
t.daemon = True
t.start()
workers.append(t)
if processes == 1:
cli_logger.debug('Not using threads.')
rv = (_sync_collection(x) for x in actions)
else:
cli_logger.debug('Using threads.')
from multiprocessing.dummy import Pool
p = Pool(processes=general.get('processes', 0) or len(actions))
for pair_name, collection in parse_pairs_args(pairs, all_pairs):
spawn_worker()
queue.put(
functools.partial(prepare_sync, pair_name=pair_name,
collection=collection, general=general,
all_pairs=all_pairs,
all_storages=all_storages,
force_delete=force_delete))
rv = p.imap_unordered(_sync_collection, actions)
if not fail_fast:
# exhaust iterator before calling all(...), which would return
# after it encounters a False item.
# In other words, all(rv) fails fast.
rv = list(rv)
if not all(rv):
queue.join()
if exceptions:
sys.exit(1)
return app
@ -376,62 +350,131 @@ app = main = _create_app()
del _create_app
def _sync_collection(x):
return sync_collection(**x)
def expand_collection(pair_name, collection, general, all_pairs, all_storages):
a_name, b_name, pair_options, storage_defaults = all_pairs[pair_name]
if collection in ('from a', 'from b'):
# from_name: name of the storage which should be used for discovery
# other_name: the other storage's name
if collection == 'from a':
from_name, other_name = a_name, b_name
else:
from_name, other_name = b_name, a_name
cli_logger.info('Syncing {}: Discovering collections from {}'
.format(pair_name, from_name))
config = dict(storage_defaults)
config.update(all_storages[from_name])
cls, config = storage_class_from_config(config)
try:
storages = list(cls.discover(**config))
except Exception:
handle_storage_init_error(cls, config)
for storage in storages:
config = dict(storage_defaults)
config.update(all_storages[other_name])
config['collection'] = actual_collection = storage.collection
other_storage = storage_instance_from_config(config)
if collection == 'from a':
a, b = storage, other_storage
else:
b, a = storage, other_storage
yield actual_collection, a, b
else:
config = dict(storage_defaults)
config.update(all_storages[a_name])
config['collection'] = collection
a = storage_instance_from_config(config)
config = dict(storage_defaults)
config.update(all_storages[b_name])
config['collection'] = collection
b = storage_instance_from_config(config)
yield collection, a, b
def sync_collection(config_a, config_b, pair_name, collection, pair_options,
general, force_delete):
status_name = get_status_name(pair_name, collection)
collection_description = pair_name if collection is None \
else '{} from {}'.format(collection, pair_name)
def prepare_sync(queue, spawn_worker, handled_collections, pair_name,
collection, general, all_pairs, all_storages, force_delete):
key = ('prepare', pair_name, collection)
if key in handled_collections:
status_name = get_status_name(pair_name, collection)
cli_logger.warning('Already prepared {}, skipping'.format(status_name))
return
handled_collections.add(key)
rv = True
a_name, b_name, pair_options, storage_defaults = all_pairs[pair_name]
jobs = list(expand_collection(pair_name, collection, general, all_pairs,
all_storages))
for i in range(len(jobs) - 1):
# spawn one worker less because we can reuse the current one
spawn_worker()
for collection, a, b in jobs:
queue.put(functools.partial(sync_collection, pair_name=pair_name,
collection=collection, a=a, b=b,
pair_options=pair_options, general=general,
force_delete=force_delete))
@contextlib.contextmanager
def catch_sync_errors(status_name):
try:
cli_logger.info('Syncing {}'.format(collection_description))
a = storage_instance_from_config(config_a)
b = storage_instance_from_config(config_b)
status = load_status(general['status_path'], status_name)
cli_logger.debug('Loaded status for {}'.format(collection_description))
sync(
a, b, status,
conflict_resolution=pair_options.get('conflict_resolution', None),
force_delete=status_name in force_delete
)
yield
except StorageEmpty as e:
rv = False
cli_logger.error(
'{collection}: Storage "{side}" ({storage}) was completely '
'emptied. Use "--force-delete {status_name}" to synchronize that '
'emptyness to the other side, or delete the status by yourself to '
'restore the items from the non-empty side.'.format(
collection=collection_description,
side='a' if e.empty_storage is a else 'b',
storage=e.empty_storage,
'{status_name}: Storage "{name}" was completely emptied. Use '
'"--force-delete {status_name}" to synchronize that emptyness to '
'the other side, or delete the status by yourself to restore the '
'items from the non-empty side.'.format(
name=e.empty_storage.instance_name,
status_name=status_name
)
)
raise JobFailed()
except SyncConflict as e:
rv = False
cli_logger.error(
'{collection}: One item changed on both sides. Resolve this '
'{status_name}: One item changed on both sides. Resolve this '
'conflict manually, or by setting the `conflict_resolution` '
'parameter in your config file.\n'
'See also {docs}/api.html#pair-section\n'
'Item ID: {e.ident}\n'
'Item href on side A: {e.href_a}\n'
'Item href on side B: {e.href_b}\n'
.format(collection=collection_description, e=e, docs=DOCS_HOME)
.format(status_name=status_name, e=e, docs=DOCS_HOME)
)
raise JobFailed()
except (click.Abort, KeyboardInterrupt):
rv = False
raise JobFailed()
except Exception as e:
rv = False
cli_logger.exception('Unhandled exception occured while syncing {}.'
.format(collection_description))
.format(status_name))
raise JobFailed()
if rv:
save_status(general['status_path'], status_name, status)
return rv
def sync_collection(queue, spawn_worker, handled_collections, pair_name,
collection, a, b, pair_options, general, force_delete):
status_name = get_status_name(pair_name, collection)
key = ('sync', pair_name, collection)
if key in handled_collections:
cli_logger.warning('Already syncing {}, skipping'.format(status_name))
return
handled_collections.add(key)
with catch_sync_errors(status_name):
cli_logger.info('Syncing {}'.format(status_name))
status = load_status(general['status_path'], status_name)
cli_logger.debug('Loaded status for {}'.format(status_name))
sync(
a, b, status,
conflict_resolution=pair_options.get('conflict_resolution', None),
force_delete=status_name in force_delete
)
save_status(general['status_path'], status_name, status)