Factor out worker queue code from sync function

This commit is contained in:
Markus Unterwaditzer 2014-12-15 20:14:22 +01:00
parent ddd8ab675e
commit 6f959b3bd3

View file

@ -313,48 +313,20 @@ def _create_app():
general, all_pairs, all_storages = ctx.obj['config']
force_delete = set(force_delete)
queue = Queue()
workers = []
cli_logger.debug('Using {} maximal workers.'.format(max_workers))
exceptions = []
handled_collections = set()
def process_job():
func = queue.get()
try:
func(queue=queue, spawn_worker=spawn_worker,
handled_collections=handled_collections)
except:
if not handle_cli_error():
exceptions.append(sys.exc_info()[1])
finally:
queue.task_done()
def spawn_worker():
if max_workers and len(workers) >= max_workers:
return
def worker():
while True:
process_job()
t = threading.Thread(target=worker)
t.daemon = True
t.start()
workers.append(t)
wq = WorkerQueue(max_workers)
wq.handled_jobs = set()
for pair_name, collection in parse_pairs_args(pairs, all_pairs):
spawn_worker()
queue.put(
wq.spawn_worker()
wq.put(
functools.partial(prepare_sync, pair_name=pair_name,
collection=collection, general=general,
all_pairs=all_pairs,
all_storages=all_storages,
force_delete=force_delete))
queue.join()
if exceptions:
sys.exit(1)
wq.join()
return app
@ -409,14 +381,14 @@ def expand_collection(pair_name, collection, general, all_pairs, all_storages):
yield collection, a, b
def prepare_sync(queue, spawn_worker, handled_collections, pair_name,
collection, general, all_pairs, all_storages, force_delete):
def prepare_sync(wq, pair_name, collection, general, all_pairs, all_storages,
force_delete):
key = ('prepare', pair_name, collection)
if key in handled_collections:
if key in wq.handled_jobs:
status_name = get_status_name(pair_name, collection)
cli_logger.warning('Already prepared {}, skipping'.format(status_name))
return
handled_collections.add(key)
wq.handled_jobs.add(key)
a_name, b_name, pair_options, storage_defaults = all_pairs[pair_name]
jobs = list(expand_collection(pair_name, collection, general, all_pairs,
@ -424,13 +396,13 @@ def prepare_sync(queue, spawn_worker, handled_collections, pair_name,
for i in range(len(jobs) - 1):
# spawn one worker less because we can reuse the current one
spawn_worker()
wq.spawn_worker()
for collection, a, b in jobs:
queue.put(functools.partial(sync_collection, pair_name=pair_name,
collection=collection, a=a, b=b,
pair_options=pair_options, general=general,
force_delete=force_delete))
wq.put(functools.partial(sync_collection, pair_name=pair_name,
collection=collection, a=a, b=b,
pair_options=pair_options, general=general,
force_delete=force_delete))
def handle_cli_error(status_name='sync'):
@ -468,15 +440,15 @@ def handle_cli_error(status_name='sync'):
return True
def sync_collection(queue, spawn_worker, handled_collections, pair_name,
collection, a, b, pair_options, general, force_delete):
def sync_collection(wq, pair_name, collection, a, b, pair_options, general,
force_delete):
status_name = get_status_name(pair_name, collection)
key = ('sync', pair_name, collection)
if key in handled_collections:
if key in wq.handled_jobs:
cli_logger.warning('Already syncing {}, skipping'.format(status_name))
return
handled_collections.add(key)
wq.handled_jobs.add(key)
try:
cli_logger.info('Syncing {}'.format(status_name))
@ -493,3 +465,42 @@ def sync_collection(queue, spawn_worker, handled_collections, pair_name,
raise JobFailed()
save_status(general['status_path'], status_name, status)
class WorkerQueue(object):
def __init__(self, max_workers):
self._queue = Queue()
self._workers = []
self._exceptions = []
self._max_workers = max_workers
def _process_job(self):
func = self._queue.get()
try:
func(wq=self)
except:
if not handle_cli_error():
self._exceptions.append(sys.exc_info()[1])
finally:
self._queue.task_done()
def spawn_worker(self):
if self._max_workers and len(self._workers) >= self._max_workers:
return
def worker():
while True:
self._process_job()
t = threading.Thread(target=worker)
t.daemon = True
t.start()
self._workers.append(t)
def join(self):
self._queue.join()
if self._exceptions:
sys.exit(1)
def put(self, f):
return self._queue.put(f)