From 4930b5f389459e028a098fc3b66aa91739bdc7a9 Mon Sep 17 00:00:00 2001 From: Hugo Osvaldo Barrera Date: Thu, 17 Jun 2021 18:47:13 +0200 Subject: [PATCH] Drop multithreading support This is mainly in preparation to moving to an async architecture. --- tests/system/cli/test_sync.py | 35 ------------ vdirsyncer/cli/__init__.py | 105 +++++++++------------------------- vdirsyncer/cli/config.py | 8 +-- vdirsyncer/cli/tasks.py | 21 ++----- vdirsyncer/cli/utils.py | 89 ---------------------------- vdirsyncer/storage/google.py | 4 +- 6 files changed, 33 insertions(+), 229 deletions(-) diff --git a/tests/system/cli/test_sync.py b/tests/system/cli/test_sync.py index ad009ac..bcffa1d 100644 --- a/tests/system/cli/test_sync.py +++ b/tests/system/cli/test_sync.py @@ -50,41 +50,6 @@ def test_sync_inexistant_pair(tmpdir, runner): assert "pair foo does not exist." in result.output.lower() -def test_debug_connections(tmpdir, runner): - runner.write_with_general( - dedent( - """ - [pair my_pair] - a = "my_a" - b = "my_b" - collections = null - - [storage my_a] - type = "filesystem" - path = "{0}/path_a/" - fileext = ".txt" - - [storage my_b] - type = "filesystem" - path = "{0}/path_b/" - fileext = ".txt" - """ - ).format(str(tmpdir)) - ) - - tmpdir.mkdir("path_a") - tmpdir.mkdir("path_b") - - result = runner.invoke(["discover"]) - assert not result.exception - - result = runner.invoke(["-vdebug", "sync", "--max-workers=3"]) - assert "using 3 maximal workers" in result.output.lower() - - result = runner.invoke(["-vdebug", "sync"]) - assert "using 1 maximal workers" in result.output.lower() - - def test_empty_storage(tmpdir, runner): runner.write_with_general( dedent( diff --git a/vdirsyncer/cli/__init__.py b/vdirsyncer/cli/__init__.py index d1cb738..02eb41b 100644 --- a/vdirsyncer/cli/__init__.py +++ b/vdirsyncer/cli/__init__.py @@ -65,33 +65,6 @@ def app(ctx, config): main = app -def max_workers_callback(ctx, param, value): - if value == 0 and logging.getLogger("vdirsyncer").level == logging.DEBUG: - value = 1 - - cli_logger.debug(f"Using {value} maximal workers.") - return value - - -def max_workers_option(default=0): - help = "Use at most this many connections. " - if default == 0: - help += ( - 'The default is 0, which means "as many as necessary". ' - "With -vdebug enabled, the default is 1." - ) - else: - help += f"The default is {default}." - - return click.option( - "--max-workers", - default=default, - type=click.IntRange(min=0, max=None), - callback=max_workers_callback, - help=help, - ) - - def collections_arg_callback(ctx, param, value): """ Expand the various CLI shortforms ("pair, pair/collection") to an iterable @@ -126,10 +99,9 @@ collections_arg = click.argument( "to be deleted from both sides." ), ) -@max_workers_option() @pass_context @catch_errors -def sync(ctx, collections, force_delete, max_workers): +def sync(ctx, collections, force_delete): """ Synchronize the given collections or pairs. If no arguments are given, all will be synchronized. @@ -151,53 +123,36 @@ def sync(ctx, collections, force_delete, max_workers): vdirsyncer sync bob/first_collection """ from .tasks import prepare_pair, sync_collection - from .utils import WorkerQueue - wq = WorkerQueue(max_workers) - - with wq.join(): - for pair_name, collections in collections: - wq.put( - functools.partial( - prepare_pair, - pair_name=pair_name, - collections=collections, - config=ctx.config, - force_delete=force_delete, - callback=sync_collection, - ) - ) - wq.spawn_worker() + for pair_name, collections in collections: + prepare_pair( + pair_name=pair_name, + collections=collections, + config=ctx.config, + force_delete=force_delete, + callback=sync_collection, + ) @app.command() @collections_arg -@max_workers_option() @pass_context @catch_errors -def metasync(ctx, collections, max_workers): +def metasync(ctx, collections): """ Synchronize metadata of the given collections or pairs. See the `sync` command for usage. """ from .tasks import prepare_pair, metasync_collection - from .utils import WorkerQueue - wq = WorkerQueue(max_workers) - - with wq.join(): - for pair_name, collections in collections: - wq.put( - functools.partial( - prepare_pair, - pair_name=pair_name, - collections=collections, - config=ctx.config, - callback=metasync_collection, - ) - ) - wq.spawn_worker() + for pair_name, collections in collections: + prepare_pair( + pair_name=pair_name, + collections=collections, + config=ctx.config, + callback=metasync_collection, + ) @app.command() @@ -210,33 +165,25 @@ def metasync(ctx, collections, max_workers): "for debugging. This is slow and may crash for broken servers." ), ) -@max_workers_option(default=1) @pass_context @catch_errors -def discover(ctx, pairs, max_workers, list): +def discover(ctx, pairs, list): """ Refresh collection cache for the given pairs. """ from .tasks import discover_collections - from .utils import WorkerQueue config = ctx.config - wq = WorkerQueue(max_workers) - with wq.join(): - for pair_name in pairs or config.pairs: - pair = config.get_pair(pair_name) + for pair_name in pairs or config.pairs: + pair = config.get_pair(pair_name) - wq.put( - functools.partial( - discover_collections, - status_path=config.general["status_path"], - pair=pair, - from_cache=False, - list_collections=list, - ) - ) - wq.spawn_worker() + discover_collections( + status_path=config.general["status_path"], + pair=pair, + from_cache=False, + list_collections=list, + ) @app.command() diff --git a/vdirsyncer/cli/config.py b/vdirsyncer/cli/config.py index 79a5484..40833c4 100644 --- a/vdirsyncer/cli/config.py +++ b/vdirsyncer/cli/config.py @@ -4,8 +4,6 @@ import string from configparser import RawConfigParser from itertools import chain -from click_threading import get_ui_worker - from .. import exceptions from .. import PROJECT_HOME from ..utils import cached_property @@ -257,11 +255,7 @@ class PairConfig: b_name = self.config_b["instance_name"] command = conflict_resolution[1:] - def inner(): - return _resolve_conflict_via_command(a, b, command, a_name, b_name) - - ui_worker = get_ui_worker() - return ui_worker.put(inner) + return _resolve_conflict_via_command(a, b, command, a_name, b_name) return resolve else: diff --git a/vdirsyncer/cli/tasks.py b/vdirsyncer/cli/tasks.py index 818b0ae..24907db 100644 --- a/vdirsyncer/cli/tasks.py +++ b/vdirsyncer/cli/tasks.py @@ -1,4 +1,3 @@ -import functools import json from .. import exceptions @@ -16,15 +15,13 @@ from .utils import manage_sync_status from .utils import save_status -def prepare_pair(wq, pair_name, collections, config, callback, **kwargs): +def prepare_pair(pair_name, collections, config, callback, **kwargs): pair = config.get_pair(pair_name) all_collections = dict( collections_for_pair(status_path=config.general["status_path"], pair=pair) ) - # spawn one worker less because we can reuse the current one - new_workers = -1 for collection_name in collections or all_collections: try: config_a, config_b = all_collections[collection_name] @@ -35,20 +32,12 @@ def prepare_pair(wq, pair_name, collections, config, callback, **kwargs): pair_name, json.dumps(collection_name), list(all_collections) ) ) - new_workers += 1 collection = CollectionConfig(pair, collection_name, config_a, config_b) - wq.put( - functools.partial( - callback, collection=collection, general=config.general, **kwargs - ) - ) - - for _ in range(new_workers): - wq.spawn_worker() + callback(collection=collection, general=config.general, **kwargs) -def sync_collection(wq, collection, general, force_delete): +def sync_collection(collection, general, force_delete): pair = collection.pair status_name = get_status_name(pair.name, collection.name) @@ -87,7 +76,7 @@ def sync_collection(wq, collection, general, force_delete): raise JobFailed() -def discover_collections(wq, pair, **kwargs): +def discover_collections(pair, **kwargs): rv = collections_for_pair(pair=pair, **kwargs) collections = list(c for c, (a, b) in rv) if collections == [None]: @@ -128,7 +117,7 @@ def repair_collection(config, collection, repair_unsafe_uid): repair_storage(storage, repair_unsafe_uid=repair_unsafe_uid) -def metasync_collection(wq, collection, general): +def metasync_collection(collection, general): from ..metasync import metasync pair = collection.pair diff --git a/vdirsyncer/cli/utils.py b/vdirsyncer/cli/utils.py index a9d244c..3f7dd50 100644 --- a/vdirsyncer/cli/utils.py +++ b/vdirsyncer/cli/utils.py @@ -1,14 +1,11 @@ import contextlib import errno import importlib -import itertools import json import os -import queue import sys import click -import click_threading from atomicwrites import atomic_write from . import cli_logger @@ -311,92 +308,6 @@ def handle_storage_init_error(cls, config): ) -class WorkerQueue: - """ - A simple worker-queue setup. - - Note that workers quit if queue is empty. That means you have to first put - things into the queue before spawning the worker! - """ - - def __init__(self, max_workers): - self._queue = queue.Queue() - self._workers = [] - self._max_workers = max_workers - self._shutdown_handlers = [] - - # According to http://stackoverflow.com/a/27062830, those are - # threadsafe compared to increasing a simple integer variable. - self.num_done_tasks = itertools.count() - self.num_failed_tasks = itertools.count() - - def shutdown(self): - while self._shutdown_handlers: - try: - self._shutdown_handlers.pop()() - except Exception: - pass - - def _worker(self): - while True: - try: - func = self._queue.get(False) - except queue.Empty: - break - - try: - func(wq=self) - except Exception: - handle_cli_error() - next(self.num_failed_tasks) - finally: - self._queue.task_done() - next(self.num_done_tasks) - if not self._queue.unfinished_tasks: - self.shutdown() - - def spawn_worker(self): - if self._max_workers and len(self._workers) >= self._max_workers: - return - - t = click_threading.Thread(target=self._worker) - t.start() - self._workers.append(t) - - @contextlib.contextmanager - def join(self): - assert self._workers or not self._queue.unfinished_tasks - ui_worker = click_threading.UiWorker() - self._shutdown_handlers.append(ui_worker.shutdown) - _echo = click.echo - - with ui_worker.patch_click(): - yield - - if not self._workers: - # Ugly hack, needed because ui_worker is not running. - click.echo = _echo - cli_logger.critical("Nothing to do.") - sys.exit(5) - - ui_worker.run() - self._queue.join() - for worker in self._workers: - worker.join() - - tasks_failed = next(self.num_failed_tasks) - tasks_done = next(self.num_done_tasks) - - if tasks_failed > 0: - cli_logger.error( - "{} out of {} tasks failed.".format(tasks_failed, tasks_done) - ) - sys.exit(1) - - def put(self, f): - return self._queue.put(f) - - def assert_permissions(path, wanted): permissions = os.stat(path).st_mode & 0o777 if permissions > wanted: diff --git a/vdirsyncer/storage/google.py b/vdirsyncer/storage/google.py index 09d3535..610bad4 100644 --- a/vdirsyncer/storage/google.py +++ b/vdirsyncer/storage/google.py @@ -5,7 +5,6 @@ import urllib.parse as urlparse import click from atomicwrites import atomic_write -from click_threading import get_ui_worker from . import base from . import dav @@ -41,8 +40,7 @@ class GoogleSession(dav.DAVSession): raise exceptions.UserError("requests-oauthlib not installed") token_file = expand_path(token_file) - ui_worker = get_ui_worker() - ui_worker.put(lambda: self._init_token(token_file, client_id, client_secret)) + return self._init_token(token_file, client_id, client_secret) def _init_token(self, token_file, client_id, client_secret): token = None