Move exception handling into context manager

This commit is contained in:
Markus Unterwaditzer 2014-10-16 21:35:32 +02:00
parent 93d29972ec
commit aba0a40fbc

View file

@ -7,6 +7,7 @@
:license: MIT, see LICENSE for more details. :license: MIT, see LICENSE for more details.
''' '''
import contextlib
import functools import functools
import json import json
import os import os
@ -349,16 +350,7 @@ app = main = _create_app()
del _create_app del _create_app
def prepare_sync(queue, spawn_worker, handled_collections, pair_name, def expand_collection(pair_name, collection, general, all_pairs, all_storages):
collection, general, all_pairs, all_storages, force_delete):
key = ('prepare', pair_name, collection)
if key in handled_collections:
status_name = get_status_name(pair_name, collection)
cli_logger.warning('Already prepared {}, skipping'.format(status_name))
return
handled_collections.add(key)
a_name, b_name, pair_options, storage_defaults = all_pairs[pair_name] a_name, b_name, pair_options, storage_defaults = all_pairs[pair_name]
if collection in ('from a', 'from b'): if collection in ('from a', 'from b'):
# from_name: name of the storage which should be used for discovery # from_name: name of the storage which should be used for discovery
@ -379,7 +371,6 @@ def prepare_sync(queue, spawn_worker, handled_collections, pair_name,
except Exception: except Exception:
handle_storage_init_error(cls, config) handle_storage_init_error(cls, config)
jobs = []
for storage in storages: for storage in storages:
config = dict(storage_defaults) config = dict(storage_defaults)
config.update(all_storages[other_name]) config.update(all_storages[other_name])
@ -391,7 +382,7 @@ def prepare_sync(queue, spawn_worker, handled_collections, pair_name,
else: else:
b, a = storage, other_storage b, a = storage, other_storage
jobs.append((actual_collection, a, b)) yield actual_collection, a, b
else: else:
config = dict(storage_defaults) config = dict(storage_defaults)
config.update(all_storages[a_name]) config.update(all_storages[a_name])
@ -403,9 +394,21 @@ def prepare_sync(queue, spawn_worker, handled_collections, pair_name,
config['collection'] = collection config['collection'] = collection
b = storage_instance_from_config(config) b = storage_instance_from_config(config)
jobs = [ yield collection, a, b
(collection, a, b)
]
def prepare_sync(queue, spawn_worker, handled_collections, pair_name,
collection, general, all_pairs, all_storages, force_delete):
key = ('prepare', pair_name, collection)
if key in handled_collections:
status_name = get_status_name(pair_name, collection)
cli_logger.warning('Already prepared {}, skipping'.format(status_name))
return
handled_collections.add(key)
a_name, b_name, pair_options, storage_defaults = all_pairs[pair_name]
jobs = list(expand_collection(pair_name, collection, general, all_pairs,
all_storages))
for i in range(len(jobs) - 1): for i in range(len(jobs) - 1):
# spawn one worker less because we can reuse the current one # spawn one worker less because we can reuse the current one
@ -418,41 +421,22 @@ def prepare_sync(queue, spawn_worker, handled_collections, pair_name,
force_delete=force_delete)) force_delete=force_delete))
def sync_collection(queue, spawn_worker, handled_collections, pair_name, @contextlib.contextmanager
collection, a, b, pair_options, general, force_delete): def catch_sync_errors(status_name):
status_name = get_status_name(pair_name, collection)
key = ('sync', pair_name, collection)
if key in handled_collections:
cli_logger.warning('Already syncing {}, skipping'.format(status_name))
return
handled_collections.add(key)
rv = True
try: try:
cli_logger.info('Syncing {}'.format(status_name)) yield
status = load_status(general['status_path'], status_name)
cli_logger.debug('Loaded status for {}'.format(status_name))
sync(
a, b, status,
conflict_resolution=pair_options.get('conflict_resolution', None),
force_delete=status_name in force_delete
)
except StorageEmpty as e: except StorageEmpty as e:
rv = False
cli_logger.error( cli_logger.error(
'{status_name}: Storage "{side}" ({storage}) was completely ' '{status_name}: Storage "{name}" was completely emptied. Use '
'emptied. Use "--force-delete {status_name}" to synchronize that ' '"--force-delete {status_name}" to synchronize that emptyness to '
'emptyness to the other side, or delete the status by yourself to ' 'the other side, or delete the status by yourself to restore the '
'restore the items from the non-empty side.'.format( 'items from the non-empty side.'.format(
side='a' if e.empty_storage is a else 'b', name=e.empty_storage.instance_name,
storage=e.empty_storage,
status_name=status_name status_name=status_name
) )
) )
raise JobFailed()
except SyncConflict as e: except SyncConflict as e:
rv = False
cli_logger.error( cli_logger.error(
'{status_name}: One item changed on both sides. Resolve this ' '{status_name}: One item changed on both sides. Resolve this '
'conflict manually, or by setting the `conflict_resolution` ' 'conflict manually, or by setting the `conflict_resolution` '
@ -463,14 +447,34 @@ def sync_collection(queue, spawn_worker, handled_collections, pair_name,
'Item href on side B: {e.href_b}\n' 'Item href on side B: {e.href_b}\n'
.format(status_name=status_name, e=e, docs=DOCS_HOME) .format(status_name=status_name, e=e, docs=DOCS_HOME)
) )
raise JobFailed()
except (click.Abort, KeyboardInterrupt): except (click.Abort, KeyboardInterrupt):
rv = False raise JobFailed()
except Exception as e: except Exception as e:
rv = False
cli_logger.exception('Unhandled exception occured while syncing {}.' cli_logger.exception('Unhandled exception occured while syncing {}.'
.format(status_name)) .format(status_name))
if rv:
save_status(general['status_path'], status_name, status)
else:
raise JobFailed() raise JobFailed()
def sync_collection(queue, spawn_worker, handled_collections, pair_name,
collection, a, b, pair_options, general, force_delete):
status_name = get_status_name(pair_name, collection)
key = ('sync', pair_name, collection)
if key in handled_collections:
cli_logger.warning('Already syncing {}, skipping'.format(status_name))
return
handled_collections.add(key)
with catch_sync_errors(status_name):
cli_logger.info('Syncing {}'.format(status_name))
status = load_status(general['status_path'], status_name)
cli_logger.debug('Loaded status for {}'.format(status_name))
sync(
a, b, status,
conflict_resolution=pair_options.get('conflict_resolution', None),
force_delete=status_name in force_delete
)
save_status(general['status_path'], status_name, status)