Drop multithreading support

This is mainly in preparation to moving to an async architecture.
This commit is contained in:
Hugo Osvaldo Barrera 2021-06-17 18:47:13 +02:00
parent 25435ce11d
commit 4930b5f389
6 changed files with 33 additions and 229 deletions

View file

@ -50,41 +50,6 @@ def test_sync_inexistant_pair(tmpdir, runner):
assert "pair foo does not exist." in result.output.lower()
def test_debug_connections(tmpdir, runner):
runner.write_with_general(
dedent(
"""
[pair my_pair]
a = "my_a"
b = "my_b"
collections = null
[storage my_a]
type = "filesystem"
path = "{0}/path_a/"
fileext = ".txt"
[storage my_b]
type = "filesystem"
path = "{0}/path_b/"
fileext = ".txt"
"""
).format(str(tmpdir))
)
tmpdir.mkdir("path_a")
tmpdir.mkdir("path_b")
result = runner.invoke(["discover"])
assert not result.exception
result = runner.invoke(["-vdebug", "sync", "--max-workers=3"])
assert "using 3 maximal workers" in result.output.lower()
result = runner.invoke(["-vdebug", "sync"])
assert "using 1 maximal workers" in result.output.lower()
def test_empty_storage(tmpdir, runner):
runner.write_with_general(
dedent(

View file

@ -65,33 +65,6 @@ def app(ctx, config):
main = app
def max_workers_callback(ctx, param, value):
if value == 0 and logging.getLogger("vdirsyncer").level == logging.DEBUG:
value = 1
cli_logger.debug(f"Using {value} maximal workers.")
return value
def max_workers_option(default=0):
help = "Use at most this many connections. "
if default == 0:
help += (
'The default is 0, which means "as many as necessary". '
"With -vdebug enabled, the default is 1."
)
else:
help += f"The default is {default}."
return click.option(
"--max-workers",
default=default,
type=click.IntRange(min=0, max=None),
callback=max_workers_callback,
help=help,
)
def collections_arg_callback(ctx, param, value):
"""
Expand the various CLI shortforms ("pair, pair/collection") to an iterable
@ -126,10 +99,9 @@ collections_arg = click.argument(
"to be deleted from both sides."
),
)
@max_workers_option()
@pass_context
@catch_errors
def sync(ctx, collections, force_delete, max_workers):
def sync(ctx, collections, force_delete):
"""
Synchronize the given collections or pairs. If no arguments are given, all
will be synchronized.
@ -151,53 +123,36 @@ def sync(ctx, collections, force_delete, max_workers):
vdirsyncer sync bob/first_collection
"""
from .tasks import prepare_pair, sync_collection
from .utils import WorkerQueue
wq = WorkerQueue(max_workers)
with wq.join():
for pair_name, collections in collections:
wq.put(
functools.partial(
prepare_pair,
pair_name=pair_name,
collections=collections,
config=ctx.config,
force_delete=force_delete,
callback=sync_collection,
)
)
wq.spawn_worker()
for pair_name, collections in collections:
prepare_pair(
pair_name=pair_name,
collections=collections,
config=ctx.config,
force_delete=force_delete,
callback=sync_collection,
)
@app.command()
@collections_arg
@max_workers_option()
@pass_context
@catch_errors
def metasync(ctx, collections, max_workers):
def metasync(ctx, collections):
"""
Synchronize metadata of the given collections or pairs.
See the `sync` command for usage.
"""
from .tasks import prepare_pair, metasync_collection
from .utils import WorkerQueue
wq = WorkerQueue(max_workers)
with wq.join():
for pair_name, collections in collections:
wq.put(
functools.partial(
prepare_pair,
pair_name=pair_name,
collections=collections,
config=ctx.config,
callback=metasync_collection,
)
)
wq.spawn_worker()
for pair_name, collections in collections:
prepare_pair(
pair_name=pair_name,
collections=collections,
config=ctx.config,
callback=metasync_collection,
)
@app.command()
@ -210,33 +165,25 @@ def metasync(ctx, collections, max_workers):
"for debugging. This is slow and may crash for broken servers."
),
)
@max_workers_option(default=1)
@pass_context
@catch_errors
def discover(ctx, pairs, max_workers, list):
def discover(ctx, pairs, list):
"""
Refresh collection cache for the given pairs.
"""
from .tasks import discover_collections
from .utils import WorkerQueue
config = ctx.config
wq = WorkerQueue(max_workers)
with wq.join():
for pair_name in pairs or config.pairs:
pair = config.get_pair(pair_name)
for pair_name in pairs or config.pairs:
pair = config.get_pair(pair_name)
wq.put(
functools.partial(
discover_collections,
status_path=config.general["status_path"],
pair=pair,
from_cache=False,
list_collections=list,
)
)
wq.spawn_worker()
discover_collections(
status_path=config.general["status_path"],
pair=pair,
from_cache=False,
list_collections=list,
)
@app.command()

View file

@ -4,8 +4,6 @@ import string
from configparser import RawConfigParser
from itertools import chain
from click_threading import get_ui_worker
from .. import exceptions
from .. import PROJECT_HOME
from ..utils import cached_property
@ -257,11 +255,7 @@ class PairConfig:
b_name = self.config_b["instance_name"]
command = conflict_resolution[1:]
def inner():
return _resolve_conflict_via_command(a, b, command, a_name, b_name)
ui_worker = get_ui_worker()
return ui_worker.put(inner)
return _resolve_conflict_via_command(a, b, command, a_name, b_name)
return resolve
else:

View file

@ -1,4 +1,3 @@
import functools
import json
from .. import exceptions
@ -16,15 +15,13 @@ from .utils import manage_sync_status
from .utils import save_status
def prepare_pair(wq, pair_name, collections, config, callback, **kwargs):
def prepare_pair(pair_name, collections, config, callback, **kwargs):
pair = config.get_pair(pair_name)
all_collections = dict(
collections_for_pair(status_path=config.general["status_path"], pair=pair)
)
# spawn one worker less because we can reuse the current one
new_workers = -1
for collection_name in collections or all_collections:
try:
config_a, config_b = all_collections[collection_name]
@ -35,20 +32,12 @@ def prepare_pair(wq, pair_name, collections, config, callback, **kwargs):
pair_name, json.dumps(collection_name), list(all_collections)
)
)
new_workers += 1
collection = CollectionConfig(pair, collection_name, config_a, config_b)
wq.put(
functools.partial(
callback, collection=collection, general=config.general, **kwargs
)
)
for _ in range(new_workers):
wq.spawn_worker()
callback(collection=collection, general=config.general, **kwargs)
def sync_collection(wq, collection, general, force_delete):
def sync_collection(collection, general, force_delete):
pair = collection.pair
status_name = get_status_name(pair.name, collection.name)
@ -87,7 +76,7 @@ def sync_collection(wq, collection, general, force_delete):
raise JobFailed()
def discover_collections(wq, pair, **kwargs):
def discover_collections(pair, **kwargs):
rv = collections_for_pair(pair=pair, **kwargs)
collections = list(c for c, (a, b) in rv)
if collections == [None]:
@ -128,7 +117,7 @@ def repair_collection(config, collection, repair_unsafe_uid):
repair_storage(storage, repair_unsafe_uid=repair_unsafe_uid)
def metasync_collection(wq, collection, general):
def metasync_collection(collection, general):
from ..metasync import metasync
pair = collection.pair

View file

@ -1,14 +1,11 @@
import contextlib
import errno
import importlib
import itertools
import json
import os
import queue
import sys
import click
import click_threading
from atomicwrites import atomic_write
from . import cli_logger
@ -311,92 +308,6 @@ def handle_storage_init_error(cls, config):
)
class WorkerQueue:
"""
A simple worker-queue setup.
Note that workers quit if queue is empty. That means you have to first put
things into the queue before spawning the worker!
"""
def __init__(self, max_workers):
self._queue = queue.Queue()
self._workers = []
self._max_workers = max_workers
self._shutdown_handlers = []
# According to http://stackoverflow.com/a/27062830, those are
# threadsafe compared to increasing a simple integer variable.
self.num_done_tasks = itertools.count()
self.num_failed_tasks = itertools.count()
def shutdown(self):
while self._shutdown_handlers:
try:
self._shutdown_handlers.pop()()
except Exception:
pass
def _worker(self):
while True:
try:
func = self._queue.get(False)
except queue.Empty:
break
try:
func(wq=self)
except Exception:
handle_cli_error()
next(self.num_failed_tasks)
finally:
self._queue.task_done()
next(self.num_done_tasks)
if not self._queue.unfinished_tasks:
self.shutdown()
def spawn_worker(self):
if self._max_workers and len(self._workers) >= self._max_workers:
return
t = click_threading.Thread(target=self._worker)
t.start()
self._workers.append(t)
@contextlib.contextmanager
def join(self):
assert self._workers or not self._queue.unfinished_tasks
ui_worker = click_threading.UiWorker()
self._shutdown_handlers.append(ui_worker.shutdown)
_echo = click.echo
with ui_worker.patch_click():
yield
if not self._workers:
# Ugly hack, needed because ui_worker is not running.
click.echo = _echo
cli_logger.critical("Nothing to do.")
sys.exit(5)
ui_worker.run()
self._queue.join()
for worker in self._workers:
worker.join()
tasks_failed = next(self.num_failed_tasks)
tasks_done = next(self.num_done_tasks)
if tasks_failed > 0:
cli_logger.error(
"{} out of {} tasks failed.".format(tasks_failed, tasks_done)
)
sys.exit(1)
def put(self, f):
return self._queue.put(f)
def assert_permissions(path, wanted):
permissions = os.stat(path).st_mode & 0o777
if permissions > wanted:

View file

@ -5,7 +5,6 @@ import urllib.parse as urlparse
import click
from atomicwrites import atomic_write
from click_threading import get_ui_worker
from . import base
from . import dav
@ -41,8 +40,7 @@ class GoogleSession(dav.DAVSession):
raise exceptions.UserError("requests-oauthlib not installed")
token_file = expand_path(token_file)
ui_worker = get_ui_worker()
ui_worker.put(lambda: self._init_token(token_file, client_id, client_secret))
return self._init_token(token_file, client_id, client_secret)
def _init_token(self, token_file, client_id, client_secret):
token = None