From 6f959b3bd3b6047328ed285598b2d151b2ab15e2 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 15 Dec 2014 20:14:22 +0100 Subject: [PATCH] Factor out worker queue code from sync function --- vdirsyncer/cli.py | 103 +++++++++++++++++++++++++--------------------- 1 file changed, 57 insertions(+), 46 deletions(-) diff --git a/vdirsyncer/cli.py b/vdirsyncer/cli.py index 07a6b6f..de3dcfc 100644 --- a/vdirsyncer/cli.py +++ b/vdirsyncer/cli.py @@ -313,48 +313,20 @@ def _create_app(): general, all_pairs, all_storages = ctx.obj['config'] force_delete = set(force_delete) - queue = Queue() - workers = [] cli_logger.debug('Using {} maximal workers.'.format(max_workers)) - exceptions = [] - handled_collections = set() - - def process_job(): - func = queue.get() - try: - func(queue=queue, spawn_worker=spawn_worker, - handled_collections=handled_collections) - except: - if not handle_cli_error(): - exceptions.append(sys.exc_info()[1]) - finally: - queue.task_done() - - def spawn_worker(): - if max_workers and len(workers) >= max_workers: - return - - def worker(): - while True: - process_job() - - t = threading.Thread(target=worker) - t.daemon = True - t.start() - workers.append(t) + wq = WorkerQueue(max_workers) + wq.handled_jobs = set() for pair_name, collection in parse_pairs_args(pairs, all_pairs): - spawn_worker() - queue.put( + wq.spawn_worker() + wq.put( functools.partial(prepare_sync, pair_name=pair_name, collection=collection, general=general, all_pairs=all_pairs, all_storages=all_storages, force_delete=force_delete)) - queue.join() - if exceptions: - sys.exit(1) + wq.join() return app @@ -409,14 +381,14 @@ def expand_collection(pair_name, collection, general, all_pairs, all_storages): yield collection, a, b -def prepare_sync(queue, spawn_worker, handled_collections, pair_name, - collection, general, all_pairs, all_storages, force_delete): +def prepare_sync(wq, pair_name, collection, general, all_pairs, all_storages, + force_delete): key = ('prepare', pair_name, collection) - if key in handled_collections: + if key in wq.handled_jobs: status_name = get_status_name(pair_name, collection) cli_logger.warning('Already prepared {}, skipping'.format(status_name)) return - handled_collections.add(key) + wq.handled_jobs.add(key) a_name, b_name, pair_options, storage_defaults = all_pairs[pair_name] jobs = list(expand_collection(pair_name, collection, general, all_pairs, @@ -424,13 +396,13 @@ def prepare_sync(queue, spawn_worker, handled_collections, pair_name, for i in range(len(jobs) - 1): # spawn one worker less because we can reuse the current one - spawn_worker() + wq.spawn_worker() for collection, a, b in jobs: - queue.put(functools.partial(sync_collection, pair_name=pair_name, - collection=collection, a=a, b=b, - pair_options=pair_options, general=general, - force_delete=force_delete)) + wq.put(functools.partial(sync_collection, pair_name=pair_name, + collection=collection, a=a, b=b, + pair_options=pair_options, general=general, + force_delete=force_delete)) def handle_cli_error(status_name='sync'): @@ -468,15 +440,15 @@ def handle_cli_error(status_name='sync'): return True -def sync_collection(queue, spawn_worker, handled_collections, pair_name, - collection, a, b, pair_options, general, force_delete): +def sync_collection(wq, pair_name, collection, a, b, pair_options, general, + force_delete): status_name = get_status_name(pair_name, collection) key = ('sync', pair_name, collection) - if key in handled_collections: + if key in wq.handled_jobs: cli_logger.warning('Already syncing {}, skipping'.format(status_name)) return - handled_collections.add(key) + wq.handled_jobs.add(key) try: cli_logger.info('Syncing {}'.format(status_name)) @@ -493,3 +465,42 @@ def sync_collection(queue, spawn_worker, handled_collections, pair_name, raise JobFailed() save_status(general['status_path'], status_name, status) + + +class WorkerQueue(object): + def __init__(self, max_workers): + self._queue = Queue() + self._workers = [] + self._exceptions = [] + self._max_workers = max_workers + + def _process_job(self): + func = self._queue.get() + try: + func(wq=self) + except: + if not handle_cli_error(): + self._exceptions.append(sys.exc_info()[1]) + finally: + self._queue.task_done() + + def spawn_worker(self): + if self._max_workers and len(self._workers) >= self._max_workers: + return + + def worker(): + while True: + self._process_job() + + t = threading.Thread(target=worker) + t.daemon = True + t.start() + self._workers.append(t) + + def join(self): + self._queue.join() + if self._exceptions: + sys.exit(1) + + def put(self, f): + return self._queue.put(f)