Don't leak worker threads

This commit is contained in:
Markus Unterwaditzer 2015-07-09 09:46:08 +02:00
parent b58935c90d
commit fcec7e1efd
2 changed files with 11 additions and 4 deletions

View file

@ -124,13 +124,13 @@ def sync(pairs, force_delete, max_workers):
wq = WorkerQueue(max_workers)
for pair_name, collections in parse_pairs_args(pairs, all_pairs):
wq.spawn_worker()
wq.put(functools.partial(prepare_pair, pair_name=pair_name,
collections=collections,
general=general, all_pairs=all_pairs,
all_storages=all_storages,
force_delete=force_delete,
callback=sync_collection))
wq.spawn_worker()
wq.join()
@ -152,12 +152,12 @@ def metasync(pairs, max_workers):
wq = WorkerQueue(max_workers)
for pair_name, collections in parse_pairs_args(pairs, all_pairs):
wq.spawn_worker()
wq.put(functools.partial(prepare_pair, pair_name=pair_name,
collections=collections,
general=general, all_pairs=all_pairs,
all_storages=all_storages,
callback=metasync_collection))
wq.spawn_worker()
wq.join()
@ -183,7 +183,6 @@ def discover(pairs, max_workers):
'These are the pairs found: {}'
.format(pair, list(all_pairs)))
wq.spawn_worker()
wq.put(functools.partial(
discover_collections,
status_path=general['status_path'], name_a=name_a, name_b=name_b,
@ -191,6 +190,7 @@ def discover(pairs, max_workers):
config_b=all_storages[name_b], pair_options=pair_options,
skip_cache=True
))
wq.spawn_worker()
wq.join()

View file

@ -546,6 +546,12 @@ def parse_pairs_args(pairs_args, all_pairs):
class WorkerQueue(object):
'''
A simple worker-queue setup.
Note that workers quit if queue is empty. That means you have to first put
things into the queue before spawning the worker!
'''
def __init__(self, max_workers):
self._queue = queue.Queue()
self._workers = []
@ -562,7 +568,7 @@ class WorkerQueue(object):
while True:
try:
func = self._queue.get()
func = self._queue.get(False)
except (_TypeError, _Empty):
# Any kind of error might be raised if vdirsyncer just finished
# processing all items and the interpreter is shutting down,
@ -589,6 +595,7 @@ class WorkerQueue(object):
self._workers.append(t)
def join(self):
assert self._workers or self._queue.empty()
self._queue.join()
if self._exceptions:
sys.exit(1)