Catch more errors in WorkerQueue

See #167
This commit is contained in:
Markus Unterwaditzer 2015-01-18 21:20:15 +01:00
parent 86bcf206fd
commit a02fd034e4

View file

@ -30,9 +30,9 @@ except ImportError:
from configparser import RawConfigParser from configparser import RawConfigParser
try: try:
from Queue import Queue import Queue as queue
except ImportError: except ImportError:
from queue import Queue import queue
cli_logger = log.get(__name__) cli_logger = log.get(__name__)
@ -451,30 +451,36 @@ def parse_pairs_args(pairs_args, all_pairs):
class WorkerQueue(object): class WorkerQueue(object):
def __init__(self, max_workers): def __init__(self, max_workers):
self._queue = Queue() self._queue = queue.Queue()
self._workers = [] self._workers = []
self._exceptions = [] self._exceptions = []
self._max_workers = max_workers self._max_workers = max_workers
def _process_job(self): def _worker(self):
func = self._queue.get() while True:
try: try:
func(wq=self) func = self._queue.get()
except: except (TypeError, queue.Empty):
if not handle_cli_error(): # TypeError might be raised if vdirsyncer just finished
self._exceptions.append(sys.exc_info()[1]) # processing all items and the interpreter is shutting down,
finally: # yet the workers try to get new tasks.
self._queue.task_done() # https://github.com/untitaker/vdirsyncer/issues/167
# http://bugs.python.org/issue14623
break
try:
func(wq=self)
except:
if not handle_cli_error():
self._exceptions.append(sys.exc_info()[1])
finally:
self._queue.task_done()
def spawn_worker(self): def spawn_worker(self):
if self._max_workers and len(self._workers) >= self._max_workers: if self._max_workers and len(self._workers) >= self._max_workers:
return return
def worker(): t = threading.Thread(target=self._worker)
while True:
self._process_job()
t = threading.Thread(target=worker)
t.daemon = True t.daemon = True
t.start() t.start()
self._workers.append(t) self._workers.append(t)