Create cli.tasks

This commit is contained in:
Markus Unterwaditzer 2014-12-25 23:38:09 +01:00
parent e297e6848a
commit f2d34f4784
4 changed files with 95 additions and 79 deletions

View file

@ -90,7 +90,7 @@ def test_storage_instance_from_config(monkeypatch):
import vdirsyncer.storage
monkeypatch.setitem(vdirsyncer.storage.storage_names, 'lol', lol)
config = {'type': 'lol', 'foo': 'bar', 'baz': 1}
assert cli.utils._storage_instance_from_config(config) == 'OK'
assert cli.utils.storage_instance_from_config(config) == 'OK'
def test_parse_pairs_args():

View file

@ -11,8 +11,9 @@ import functools
import os
import sys
from .tasks import sync_pair
from .utils import CliError, WorkerQueue, cli_logger, collections_for_pair, \
handle_cli_error, load_config, parse_pairs_args, sync_pair
handle_cli_error, load_config, parse_pairs_args
from .. import __version__, log
from ..doubleclick import click
from ..utils import expand_path

83
vdirsyncer/cli/tasks.py Normal file
View file

@ -0,0 +1,83 @@
# -*- coding: utf-8 -*-
'''
vdirsyncer.cli.tasks
~~~~~~~~~~~~~~~~~~~~
:copyright: (c) 2014 Markus Unterwaditzer & contributors
:license: MIT, see LICENSE for more details.
'''
import functools
from .utils import CliError, JobFailed, cli_logger, collections_for_pair, \
get_status_name, handle_cli_error, load_status, save_status, \
storage_instance_from_config
from ..sync import sync
def sync_pair(wq, pair_name, collections_to_sync, general, all_pairs,
all_storages, force_delete):
key = ('prepare', pair_name)
if key in wq.handled_jobs:
cli_logger.warning('Already prepared {}, skipping'.format(pair_name))
return
wq.handled_jobs.add(key)
a_name, b_name, pair_options = all_pairs[pair_name]
all_collections = dict(collections_for_pair(
general['status_path'], a_name, b_name, pair_name,
all_storages[a_name], all_storages[b_name], pair_options
))
# spawn one worker less because we can reuse the current one
new_workers = -1
for collection in (collections_to_sync or all_collections):
try:
config_a, config_b = all_collections[collection]
except KeyError:
raise CliError('Pair {}: Collection {} not found. These are the '
'configured collections:\n{}'.format(
pair_name, collection, list(all_collections)))
new_workers += 1
wq.put(functools.partial(
sync_collection, pair_name=pair_name, collection=collection,
config_a=config_a, config_b=config_b, pair_options=pair_options,
general=general, force_delete=force_delete
))
for i in range(new_workers):
wq.spawn_worker()
def sync_collection(wq, pair_name, collection, config_a, config_b,
pair_options, general, force_delete):
status_name = get_status_name(pair_name, collection)
key = ('sync', pair_name, collection)
if key in wq.handled_jobs:
cli_logger.warning('Already syncing {}, skipping'.format(status_name))
return
wq.handled_jobs.add(key)
try:
cli_logger.info('Syncing {}'.format(status_name))
status = load_status(general['status_path'], pair_name,
collection, data_type='items') or {}
cli_logger.debug('Loaded status for {}'.format(status_name))
a = storage_instance_from_config(config_a)
b = storage_instance_from_config(config_b)
sync(
a, b, status,
conflict_resolution=pair_options.get('conflict_resolution', None),
force_delete=force_delete
)
except:
if not handle_cli_error(status_name):
raise JobFailed()
save_status(general['status_path'], pair_name, collection,
data_type='items', data=status)

View file

@ -8,7 +8,6 @@
'''
import errno
import functools
import hashlib
import json
import os
@ -20,7 +19,7 @@ from itertools import chain
from .. import DOCS_HOME, PROJECT_HOME, log
from ..doubleclick import click
from ..storage import storage_names
from ..sync import StorageEmpty, SyncConflict, sync
from ..sync import StorageEmpty, SyncConflict
from ..utils import expand_path, get_class_init_args, parse_options, \
safe_write
@ -109,7 +108,7 @@ def _parse_old_config_list_value(d, key):
return value
def _get_status_name(pair, collection):
def get_status_name(pair, collection):
if collection is None:
return pair
return pair + '/' + collection
@ -163,7 +162,7 @@ def collections_for_pair(status_path, name_a, name_b, pair_name, config_a,
def _discover_from_config(config):
storage_type = config['type']
cls, config = _storage_class_from_config(config)
cls, config = storage_class_from_config(config)
try:
discovered = list(cls.discover(**config))
@ -182,7 +181,7 @@ def _get_coll(pair_name, storage_name, collection, discovered, config):
return discovered[collection]
except KeyError:
storage_type = config['type']
cls, config = _storage_class_from_config(config)
cls, config = storage_class_from_config(config)
try:
args = cls.join_collection(collection=collection, **config)
args['type'] = storage_type
@ -293,7 +292,7 @@ def load_config(f):
def load_status(base_path, pair, collection=None, data_type=None):
assert data_type is not None
status_name = _get_status_name(pair, collection)
status_name = get_status_name(pair, collection)
path = expand_path(os.path.join(base_path, status_name))
if os.path.isfile(path) and data_type == 'items':
new_path = path + '.items'
@ -323,7 +322,7 @@ def load_status(base_path, pair, collection=None, data_type=None):
def save_status(base_path, pair, collection=None, data_type=None, data=None):
assert data_type is not None
assert data is not None
status_name = _get_status_name(pair, collection)
status_name = get_status_name(pair, collection)
path = expand_path(os.path.join(base_path, status_name)) + '.' + data_type
base_path = os.path.dirname(path)
@ -343,7 +342,7 @@ def save_status(base_path, pair, collection=None, data_type=None, data=None):
json.dump(data, f)
def _storage_class_from_config(config):
def storage_class_from_config(config):
config = dict(config)
storage_name = config.pop('type')
cls = storage_names.get(storage_name, None)
@ -352,14 +351,14 @@ def _storage_class_from_config(config):
return cls, config
def _storage_instance_from_config(config):
def storage_instance_from_config(config):
'''
:param config: A configuration dictionary to pass as kwargs to the class
corresponding to config['type']
:param description: A name for the storage for debugging purposes
'''
cls, config = _storage_class_from_config(config)
cls, config = storage_class_from_config(config)
try:
return cls(**config)
@ -416,73 +415,6 @@ def parse_pairs_args(pairs_args, all_pairs):
return rv.items()
def sync_pair(wq, pair_name, collections_to_sync, general, all_pairs,
all_storages, force_delete):
key = ('prepare', pair_name)
if key in wq.handled_jobs:
cli_logger.warning('Already prepared {}, skipping'.format(pair_name))
return
wq.handled_jobs.add(key)
a_name, b_name, pair_options = all_pairs[pair_name]
all_collections = dict(collections_for_pair(
general['status_path'], a_name, b_name, pair_name,
all_storages[a_name], all_storages[b_name], pair_options
))
# spawn one worker less because we can reuse the current one
new_workers = -1
for collection in (collections_to_sync or all_collections):
try:
config_a, config_b = all_collections[collection]
except KeyError:
raise CliError('Pair {}: Collection {} not found. These are the '
'configured collections:\n{}'.format(
pair_name, collection, list(all_collections)))
new_workers += 1
wq.put(functools.partial(
sync_collection, pair_name=pair_name, collection=collection,
config_a=config_a, config_b=config_b, pair_options=pair_options,
general=general, force_delete=force_delete
))
for i in range(new_workers):
wq.spawn_worker()
def sync_collection(wq, pair_name, collection, config_a, config_b,
pair_options, general, force_delete):
status_name = _get_status_name(pair_name, collection)
key = ('sync', pair_name, collection)
if key in wq.handled_jobs:
cli_logger.warning('Already syncing {}, skipping'.format(status_name))
return
wq.handled_jobs.add(key)
try:
cli_logger.info('Syncing {}'.format(status_name))
status = load_status(general['status_path'], pair_name,
collection, data_type='items') or {}
cli_logger.debug('Loaded status for {}'.format(status_name))
a = _storage_instance_from_config(config_a)
b = _storage_instance_from_config(config_b)
sync(
a, b, status,
conflict_resolution=pair_options.get('conflict_resolution', None),
force_delete=force_delete
)
except:
if not handle_cli_error(status_name):
raise JobFailed()
save_status(general['status_path'], pair_name, collection,
data_type='items', data=status)
class WorkerQueue(object):
def __init__(self, max_workers):
self._queue = Queue()