diff --git a/vdirsyncer/cli/utils.py b/vdirsyncer/cli/utils.py index 9245c58..5d22440 100644 --- a/vdirsyncer/cli/utils.py +++ b/vdirsyncer/cli/utils.py @@ -30,9 +30,9 @@ except ImportError: from configparser import RawConfigParser try: - from Queue import Queue + import Queue as queue except ImportError: - from queue import Queue + import queue cli_logger = log.get(__name__) @@ -451,30 +451,36 @@ def parse_pairs_args(pairs_args, all_pairs): class WorkerQueue(object): def __init__(self, max_workers): - self._queue = Queue() + self._queue = queue.Queue() self._workers = [] self._exceptions = [] self._max_workers = max_workers - def _process_job(self): - func = self._queue.get() - try: - func(wq=self) - except: - if not handle_cli_error(): - self._exceptions.append(sys.exc_info()[1]) - finally: - self._queue.task_done() + def _worker(self): + while True: + try: + func = self._queue.get() + except (TypeError, queue.Empty): + # TypeError might be raised if vdirsyncer just finished + # processing all items and the interpreter is shutting down, + # yet the workers try to get new tasks. + # https://github.com/untitaker/vdirsyncer/issues/167 + # http://bugs.python.org/issue14623 + break + + try: + func(wq=self) + except: + if not handle_cli_error(): + self._exceptions.append(sys.exc_info()[1]) + finally: + self._queue.task_done() def spawn_worker(self): if self._max_workers and len(self._workers) >= self._max_workers: return - def worker(): - while True: - self._process_job() - - t = threading.Thread(target=worker) + t = threading.Thread(target=self._worker) t.daemon = True t.start() self._workers.append(t)