From 0e89753757efec5aec2826722849f211e9f0b835 Mon Sep 17 00:00:00 2001 From: Markus Unterwaditzer Date: Mon, 27 Mar 2017 00:08:23 +0200 Subject: [PATCH] Use sqlite for sync internally (#609) See #546 --- tests/unit/test_sync.py | 7 +- vdirsyncer/sync.py | 465 ++++++++++++++++++++++++++++++---------- 2 files changed, 352 insertions(+), 120 deletions(-) diff --git a/tests/unit/test_sync.py b/tests/unit/test_sync.py index 4949003..b7f99cd 100644 --- a/tests/unit/test_sync.py +++ b/tests/unit/test_sync.py @@ -184,7 +184,7 @@ def test_deletion(): assert items(a) == items(b) == {item2.raw} -def test_insert_hash(): +def test_broken_status(): a = MemoryStorage() b = MemoryStorage() status = {} @@ -197,8 +197,8 @@ def test_insert_hash(): del d['hash'] a.update(href, Item('UID:1\nHAHA:YES'), etag) - sync(a, b, status) - assert 'hash' in status['1'][0] and 'hash' in status['1'][1] + with pytest.raises(SyncConflict): + sync(a, b, status) def test_already_synced(): @@ -390,6 +390,7 @@ def test_partial_sync_revert(): a.items[next(iter(a.items))] = ('foo', Item('UID:2\nupdated')) assert items(a) == {'UID:2\nupdated'} sync(a, b, status, partial_sync='revert') + assert len(status) == 1 assert items(a) == {'UID:2\nupdated'} sync(a, b, status, partial_sync='revert') assert items(a) == {'UID:2'} diff --git a/vdirsyncer/sync.py b/vdirsyncer/sync.py index 7c0e249..ce59d6e 100644 --- a/vdirsyncer/sync.py +++ b/vdirsyncer/sync.py @@ -12,9 +12,11 @@ Yang: http://blog.ezyang.com/2012/08/how-offlineimap-works/ Some modifications to it are explained in https://unterwaditzer.net/2016/sync-algorithm.html ''' +import abc import contextlib import itertools import logging +import sqlite3 from . import exceptions from .utils import uniq @@ -99,107 +101,345 @@ class _IdentAlreadyExists(SyncError): hrefs=[self.old_href, self.new_href]) -class _Status(object): - def __init__(self, ident_to_props): - self._ident_to_props = ident_to_props - self._new_ident_to_props = {} - - self._href_to_status_a = dict((meta['href'], (ident, meta)) - for ident, (meta, _) - in self._ident_to_props.items()) - - self._href_to_status_b = dict((meta['href'], (ident, meta)) - for ident, (_, meta) - in self._ident_to_props.items()) +class _StatusBase(metaclass=abc.ABCMeta): + @abc.abstractmethod + def transaction(self): + raise NotImplementedError() + @abc.abstractmethod def insert_ident_a(self, ident, props): - props_a, props_b = self._new_ident_to_props.get(ident, (None, None)) - if props_a is not None: - raise _IdentAlreadyExists(old_href=props.href, - new_href=props_a['href']) - self._new_ident_to_props[ident] = props.to_status(), props_b + raise NotImplementedError() + @abc.abstractmethod def insert_ident_b(self, ident, props): - props_a, props_b = self._new_ident_to_props.get(ident, (None, None)) - if props_b is not None: - raise _IdentAlreadyExists(old_href=props.href, - new_href=props_b['href']) - self._new_ident_to_props[ident] = props_a, props.to_status() + raise NotImplementedError() + + @abc.abstractmethod + def update_ident_a(self, ident, props): + raise NotImplementedError() + + @abc.abstractmethod + def update_ident_b(self, ident, props): + raise NotImplementedError() + + @abc.abstractmethod + def remove_ident(self, ident): + raise NotImplementedError() + + @abc.abstractmethod + def get_a(self, ident): + raise NotImplementedError() + + @abc.abstractmethod + def get_b(self, ident): + raise NotImplementedError() + + @abc.abstractmethod + def get_new_a(self, ident): + raise NotImplementedError() + + @abc.abstractmethod + def get_new_b(self, ident): + raise NotImplementedError() + + @abc.abstractmethod + def iter_old(self): + raise NotImplementedError() + + @abc.abstractmethod + def iter_new(self): + raise NotImplementedError() + + @abc.abstractmethod + def get_by_href_a(self, href, default=(None, None)): + raise NotImplementedError() + + @abc.abstractmethod + def get_by_href_b(self, href, default=(None, None)): + raise NotImplementedError() + + @abc.abstractmethod + def rollback(self, ident): + raise NotImplementedError() + + +class _SqliteStatus(_StatusBase): + SCHEMA_VERSION = 1 + + def __init__(self, path): + self._path = path + self._c = sqlite3.connect(path) + self._c.row_factory = sqlite3.Row + self._update_schema() + + def _update_schema(self): + if self._is_latest_version(): + return + + # If we ever bump the schema version, we will need a way to migrate + # data. + + self._c.execute('''CREATE TABLE meta ( + "version" INTEGER PRIMARY KEY + ); ''') + self._c.execute('INSERT INTO meta (version) VALUES (?)', + (self.SCHEMA_VERSION,)) + + # I know that this is a bad schema, but right there is just too little + # gain in deduplicating the .._a and .._b columns. + self._c.execute('''CREATE TABLE status ( + "ident" TEXT PRIMARY KEY NOT NULL, + "href_a" TEXT, + "href_b" TEXT, + "hash_a" TEXT NOT NULL, + "hash_b" TEXT NOT NULL, + "etag_a" TEXT, + "etag_b" TEXT + ); ''') + self._c.execute('CREATE UNIQUE INDEX by_href_a ON status(href_a)') + self._c.execute('CREATE UNIQUE INDEX by_href_b ON status(href_b)') + + # We cannot add NOT NULL here because data is first fetched for the + # storage a, then storage b. Inbetween the `_b`-columns are filled with + # NULL. + # + # In an ideal world we would be able to start a transaction with one + # cursor, write our new data into status and simultaneously query the + # old status data using a different cursor. Unfortunately sqlite + # enforces NOT NULL constraints immediately, not just at commit. Since + # there is also no way to alter constraints on a table (disable + # constraints on start of transaction and reenable on end), it's a + # separate table now that just gets copied over before we commit. + # That's a lot of copying, sadly. + self._c.execute('''CREATE TABLE new_status ( + "ident" TEXT PRIMARY KEY NOT NULL, + "href_a" TEXT, + "href_b" TEXT, + "hash_a" TEXT, + "hash_b" TEXT, + "etag_a" TEXT, + "etag_b" TEXT + ); ''') + + def _is_latest_version(self): + try: + return bool(self._c.execute( + 'SELECT version FROM meta WHERE version = ?', + (self.SCHEMA_VERSION,) + ).fetchone()) + except sqlite3.OperationalError: + return False + + @contextlib.contextmanager + def transaction(self): + with self._c: + try: + yield + self._c.execute('DELETE FROM status') + self._c.execute('INSERT INTO status ' + 'SELECT * FROM new_status') + finally: + self._c.execute('DELETE FROM new_status') + + def insert_ident_a(self, ident, a_props): + # FIXME: Super inefficient + old_props = self.get_new_a(ident) + if old_props is not None: + raise _IdentAlreadyExists(old_href=old_props.href, + new_href=a_props.href) + b_props = self.get_new_b(ident) or _ItemMetadata() + self._c.execute( + 'INSERT OR REPLACE INTO new_status ' + 'VALUES(?, ?, ?, ?, ?, ?, ?)', + (ident, a_props.href, b_props.href, a_props.hash, b_props.hash, + a_props.etag, b_props.etag) + ) + + def insert_ident_b(self, ident, b_props): + # FIXME: Super inefficient + old_props = self.get_new_b(ident) + if old_props is not None: + raise _IdentAlreadyExists(old_href=old_props.href, + new_href=b_props.href) + a_props = self.get_new_a(ident) or _ItemMetadata() + self._c.execute( + 'INSERT OR REPLACE INTO new_status ' + 'VALUES(?, ?, ?, ?, ?, ?, ?)', + (ident, a_props.href, b_props.href, a_props.hash, b_props.hash, + a_props.etag, b_props.etag) + ) def update_ident_a(self, ident, props): - self._new_ident_to_props[ident] = ( - props.to_status(), - self._new_ident_to_props[ident][1], + c = self._c.cursor() + c.execute( + 'UPDATE new_status' + ' SET href_a=?, hash_a=?, etag_a=?' + ' WHERE ident=?', + (props.href, props.hash, props.etag, ident) ) + self._c.commit() + assert c.rowcount > 0 def update_ident_b(self, ident, props): - self._new_ident_to_props[ident] = ( - self._new_ident_to_props[ident][0], - props.to_status(), + c = self._c.cursor() + c.execute( + 'UPDATE new_status' + ' SET href_b=?, hash_b=?, etag_b=?' + ' WHERE ident=?', + (props.href, props.hash, props.etag, ident) ) + self._c.commit() + assert c.rowcount > 0 def remove_ident(self, ident): - del self._new_ident_to_props[ident] + self._c.execute('DELETE FROM new_status WHERE ident=?', (ident,)) + + def _get_impl(self, ident, side, table): + res = self._c.execute('SELECT href_{side} AS href,' + ' hash_{side} AS hash,' + ' etag_{side} AS etag ' + 'FROM {table} WHERE ident=?' + .format(side=side, table=table), + (ident,)).fetchone() + if res is None: + return None + + if res['hash'] is None: # FIXME: Implement as constraint in db + assert res['href'] is None + assert res['etag'] is None + return None + + res = dict(res) + return _ItemMetadata(**res) def get_a(self, ident): - rv = self._ident_to_props[ident][0] - if rv is None: - raise KeyError() - return _ItemMetadata(**rv) + return self._get_impl(ident, side='a', table='status') def get_b(self, ident): - rv = self._ident_to_props[ident][1] - if rv is None: - raise KeyError() - return _ItemMetadata(**rv) + return self._get_impl(ident, side='b', table='status') def get_new_a(self, ident): - rv = self._new_ident_to_props[ident][0] - if rv is None: - raise KeyError() - return _ItemMetadata(**rv) + return self._get_impl(ident, side='a', table='new_status') def get_new_b(self, ident): - rv = self._new_ident_to_props[ident][1] - if rv is None: - raise KeyError() - return _ItemMetadata(**rv) + return self._get_impl(ident, side='b', table='new_status') def iter_old(self): - return iter(self._ident_to_props) + return iter(res['ident'] for res in + self._c.execute('SELECT ident FROM status').fetchall()) def iter_new(self): - return iter(self._new_ident_to_props) + return iter(res['ident'] for res in + self._c.execute('SELECT ident FROM new_status').fetchall()) def rollback(self, ident): - if ident in self._ident_to_props: - self._new_ident_to_props[ident] = self._ident_to_props[ident] - else: - self._new_ident_to_props.pop(ident, None) + a = self.get_a(ident) + b = self.get_b(ident) + assert (a is None) == (b is None) + + if a is None and b is None: + self.remove_ident(ident) + return + + self._c.execute( + 'INSERT OR REPLACE INTO new_status' + ' VALUES (?, ?, ?, ?, ?, ?, ?)', + (ident, a.href, b.href, a.hash, b.hash, a.etag, b.etag) + ) + + def _get_by_href_impl(self, href, default=(None, None), side=None): + res = self._c.execute( + 'SELECT ident, hash_{side} AS hash, etag_{side} AS etag ' + 'FROM status WHERE href_{side}=?'.format(side=side), + (href,)).fetchone() + if not res: + return default + return res['ident'], _ItemMetadata( + href=href, + hash=res['hash'], + etag=res['etag'], + ) + + def get_by_href_a(self, *a, **kw): + kw['side'] = 'a' + return self._get_by_href_impl(*a, **kw) + + def get_by_href_b(self, *a, **kw): + kw['side'] = 'b' + return self._get_by_href_impl(*a, **kw) + + +class _Status(_StatusBase): + + def __init__(self, ident_to_props): + self._db = _SqliteStatus(':memory:') + self._ident_to_props = ident_to_props + + def insert_ident_a(self, ident, props): + return self._db.insert_ident_a(ident, props) + + def insert_ident_b(self, ident, props): + return self._db.insert_ident_b(ident, props) + + def update_ident_a(self, ident, props): + return self._db.update_ident_a(ident, props) + + def update_ident_b(self, ident, props): + return self._db.update_ident_b(ident, props) + + def remove_ident(self, ident): + return self._db.remove_ident(ident) + + def get_a(self, ident): + return self._db.get_a(ident) + + def get_b(self, ident): + return self._db.get_b(ident) + + def get_new_a(self, ident): + return self._db.get_new_a(ident) + + def get_new_b(self, ident): + return self._db.get_new_b(ident) + + def iter_old(self): + return self._db.iter_old() + + def iter_new(self): + return self._db.iter_new() def get_by_href_a(self, href, default=(None, None)): - try: - ident, meta = self._href_to_status_a[href] - except KeyError: - return default - else: - return ident, _ItemMetadata(**meta) + return self._db.get_by_href_a(href, default) def get_by_href_b(self, href, default=(None, None)): - try: - ident, meta = self._href_to_status_b[href] - except KeyError: - return default - else: - return ident, _ItemMetadata(**meta) + return self._db.get_by_href_b(href, default) - def new_to_old_status(self): - for meta_a, meta_b in self._new_ident_to_props.values(): - assert meta_a is not None - assert meta_b is not None + def rollback(self, ident): + return self._db.rollback(ident) + + @contextlib.contextmanager + def transaction(self): + with self._db.transaction(): + for ident, (a, b) in self._ident_to_props.items(): + if a.get('hash') is None or b.get('hash') is None: + continue + params = (ident, a.get('href'), a.get('hash'), a.get('etag'), + b.get('href'), b.get('hash'), b.get('etag')) + + self._db._c.execute( + 'INSERT INTO status' + ' (ident, href_a, hash_a, etag_a,' + ' href_b, hash_b, etag_b)' + ' VALUES (?, ?, ?, ?, ?, ?, ?)', + params + ) + yield self._ident_to_props.clear() - self._ident_to_props.update(self._new_ident_to_props) + for ident in self._db.iter_old(): + a = self._db.get_a(ident) + b = self._db.get_b(ident) + self._ident_to_props[ident] = a.to_status(), b.to_status() class _SubStatus(object): @@ -264,16 +504,12 @@ class _StorageInfo(object): for href, etag in self.storage.list(): ident, meta = self.status.get_by_href(href) - if meta is None: - meta = _ItemMetadata() - - if meta.href != href or meta.etag != etag: + if meta is None or meta.href != href or meta.etag != etag: # Either the item is completely new, or updated # In both cases we should prefetch prefetch.append(href) else: - meta.href = href - meta.etag = etag + # Metadata is completely identical _store_props(ident, meta) # Prefetch items @@ -289,24 +525,21 @@ class _StorageInfo(object): return storage_nonempty def is_changed(self, ident): - try: - status = self.status.get(ident) - except KeyError: # new item + old_meta = self.status.get(ident) + if old_meta is None: # new item return True - meta = self.status.get_new(ident) + new_meta = self.status.get_new(ident) - if meta.etag != status.etag: # etag changed - old_hash = status.hash - if old_hash is None or meta.hash != old_hash: - # item actually changed - return True - else: - # only etag changed - return False + return ( + new_meta.etag != old_meta.etag and # etag changed + # item actually changed + (old_meta.hash is None or new_meta.hash != old_meta.hash) + ) def set_item_cache(self, ident, item): - assert self.status.get_new(ident).hash == item.hash + actual_hash = self.status.get_new(ident).hash + assert actual_hash == item.hash self._item_cache[ident] = item def get_item_cache(self, ident): @@ -370,31 +603,35 @@ def sync(storage_a, storage_b, status, conflict_resolution=None, status_nonempty = bool(status) status = _Status(status) - a_info = _StorageInfo(storage_a, _SubStatus(status, 'a')) - b_info = _StorageInfo(storage_b, _SubStatus(status, 'b')) + with status.transaction(): + a_info = _StorageInfo(storage_a, _SubStatus(status, 'a')) + b_info = _StorageInfo(storage_b, _SubStatus(status, 'b')) - a_nonempty = a_info.prepare_new_status() - b_nonempty = b_info.prepare_new_status() + a_nonempty = a_info.prepare_new_status() + b_nonempty = b_info.prepare_new_status() - if status_nonempty and not force_delete: - if a_nonempty and not b_nonempty: - raise StorageEmpty(empty_storage=storage_b) - elif not a_nonempty and b_nonempty: - raise StorageEmpty(empty_storage=storage_a) + if status_nonempty and not force_delete: + if a_nonempty and not b_nonempty: + raise StorageEmpty(empty_storage=storage_b) + elif not a_nonempty and b_nonempty: + raise StorageEmpty(empty_storage=storage_a) - actions = list(_get_actions(a_info, b_info)) + actions = list(_get_actions(a_info, b_info)) - with storage_a.at_once(), storage_b.at_once(): - for action in actions: - try: - action.run(a_info, b_info, conflict_resolution, partial_sync) - except Exception as e: - if error_callback: - error_callback(e) - else: - raise - - status.new_to_old_status() + with storage_a.at_once(), storage_b.at_once(): + for action in actions: + try: + action.run( + a_info, + b_info, + conflict_resolution, + partial_sync + ) + except Exception as e: + if error_callback: + error_callback(e) + else: + raise class Action: @@ -440,6 +677,7 @@ class Upload(Action): sync_logger.info(u'Copying (uploading) item {} to {}' .format(self.ident, self.dest.storage)) href, etag = self.dest.storage.upload(self.item) + assert href is not None self.dest.status.insert_ident(self.ident, _ItemMetadata( href=href, @@ -518,15 +756,8 @@ class ResolveConflict(Action): def _get_actions(a_info, b_info): for ident in uniq(itertools.chain(a_info.status.parent.iter_new(), a_info.status.parent.iter_old())): - try: - a = a_info.status.get_new(ident) - except KeyError: - a = None - - try: - b = b_info.status.get_new(ident) - except KeyError: - b = None + a = a_info.status.get_new(ident) + b = b_info.status.get_new(ident) if a and b: a_changed = a_info.is_changed(ident)