From fcec7e1efdd3e43faebfd796fceee9bf7719b6bb Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Thu, 9 Jul 2015 09:46:08 +0200 Subject: [PATCH] Don't leak worker threads --- vdirsyncer/cli/__init__.py | 6 +++--- vdirsyncer/cli/utils.py | 9 ++++++++- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/vdirsyncer/cli/__init__.py b/vdirsyncer/cli/__init__.py index 6f2a854..fd0c6b3 100644 --- a/vdirsyncer/cli/__init__.py +++ b/vdirsyncer/cli/__init__.py @@ -124,13 +124,13 @@ def sync(pairs, force_delete, max_workers): wq = WorkerQueue(max_workers) for pair_name, collections in parse_pairs_args(pairs, all_pairs): - wq.spawn_worker() wq.put(functools.partial(prepare_pair, pair_name=pair_name, collections=collections, general=general, all_pairs=all_pairs, all_storages=all_storages, force_delete=force_delete, callback=sync_collection)) + wq.spawn_worker() wq.join() @@ -152,12 +152,12 @@ def metasync(pairs, max_workers): wq = WorkerQueue(max_workers) for pair_name, collections in parse_pairs_args(pairs, all_pairs): - wq.spawn_worker() wq.put(functools.partial(prepare_pair, pair_name=pair_name, collections=collections, general=general, all_pairs=all_pairs, all_storages=all_storages, callback=metasync_collection)) + wq.spawn_worker() wq.join() @@ -183,7 +183,6 @@ def discover(pairs, max_workers): 'These are the pairs found: {}' .format(pair, list(all_pairs))) - wq.spawn_worker() wq.put(functools.partial( discover_collections, status_path=general['status_path'], name_a=name_a, name_b=name_b, @@ -191,6 +190,7 @@ def discover(pairs, max_workers): config_b=all_storages[name_b], pair_options=pair_options, skip_cache=True )) + wq.spawn_worker() wq.join() diff --git a/vdirsyncer/cli/utils.py b/vdirsyncer/cli/utils.py index 547df4e..84a4b14 100644 --- a/vdirsyncer/cli/utils.py +++ b/vdirsyncer/cli/utils.py @@ -546,6 +546,12 @@ def parse_pairs_args(pairs_args, all_pairs): class WorkerQueue(object): + ''' + A simple worker-queue setup. + + Note that workers quit if queue is empty. That means you have to first put + things into the queue before spawning the worker! + ''' def __init__(self, max_workers): self._queue = queue.Queue() self._workers = [] @@ -562,7 +568,7 @@ class WorkerQueue(object): while True: try: - func = self._queue.get() + func = self._queue.get(False) except (_TypeError, _Empty): # Any kind of error might be raised if vdirsyncer just finished # processing all items and the interpreter is shutting down, @@ -589,6 +595,7 @@ class WorkerQueue(object): self._workers.append(t) def join(self): + assert self._workers or self._queue.empty() self._queue.join() if self._exceptions: sys.exit(1)