Switch sync tests to function-based

This commit is contained in:
Markus Unterwaditzer 2014-03-16 10:47:28 +01:00
parent 27888317d1
commit b3a720be75

View file

@ -8,6 +8,7 @@
''' '''
from unittest import TestCase from unittest import TestCase
import pytest
from vdirsyncer.storage.base import Item from vdirsyncer.storage.base import Item
from vdirsyncer.storage.memory import MemoryStorage from vdirsyncer.storage.memory import MemoryStorage
from vdirsyncer.sync import sync from vdirsyncer.sync import sync
@ -19,128 +20,127 @@ def empty_storage(x):
return list(x.list()) == [] return list(x.list()) == []
class SyncTests(TestCase): def test_irrelevant_status():
a = MemoryStorage()
b = MemoryStorage()
status = {'1': ('1.txt', 1234, '1.ics', 2345)}
sync(a, b, status)
assert not status
assert empty_storage(a)
assert empty_storage(b)
def test_irrelevant_status(self): def test_missing_status():
a = MemoryStorage() a = MemoryStorage()
b = MemoryStorage() b = MemoryStorage()
status = {'1': ('1.txt', 1234, '1.ics', 2345)} status = {}
sync(a, b, status) item = Item(u'UID:1')
assert not status a.upload(item)
assert empty_storage(a) b.upload(item)
assert empty_storage(b) sync(a, b, status)
assert len(status) == 1
assert a.has('1.txt')
assert b.has('1.txt')
def test_missing_status(self): def test_missing_status_and_different_items():
a = MemoryStorage() return # TODO
b = MemoryStorage() a = MemoryStorage()
status = {} b = MemoryStorage()
item = Item(u'UID:1') status = {}
a.upload(item) item1 = Item(u'UID:1\nhaha')
b.upload(item) item2 = Item(u'UID:1\nhoho')
sync(a, b, status) a.upload(item1)
assert len(status) == 1 b.upload(item2)
assert a.has('1.txt') sync(a, b, status)
assert b.has('1.txt') assert status
assert_item_equals(a.get('1.txt')[0], b.get('1.txt')[0])
def test_missing_status_and_different_items(self): def test_upload_and_update():
return # TODO a = MemoryStorage()
a = MemoryStorage() b = MemoryStorage()
b = MemoryStorage() status = {}
status = {}
item1 = Item(u'UID:1\nhaha')
item2 = Item(u'UID:1\nhoho')
a.upload(item1)
b.upload(item2)
sync(a, b, status)
assert status
assert_item_equals(a.get('1.txt')[0], b.get('1.txt')[0])
def test_upload_and_update(self): item = Item(u'UID:1') # new item 1 in a
a = MemoryStorage() a.upload(item)
b = MemoryStorage() sync(a, b, status)
status = {} assert_item_equals(b.get('1.txt')[0], item)
item = Item(u'UID:1') # new item 1 in a item = Item(u'UID:1\nASDF:YES') # update of item 1 in b
a.upload(item) b.update('1.txt', item, b.get('1.txt')[1])
sync(a, b, status) sync(a, b, status)
assert_item_equals(b.get('1.txt')[0], item) assert_item_equals(a.get('1.txt')[0], item)
item = Item(u'UID:1\nASDF:YES') # update of item 1 in b item2 = Item(u'UID:2') # new item 2 in b
b.update('1.txt', item, b.get('1.txt')[1]) b.upload(item2)
sync(a, b, status) sync(a, b, status)
assert_item_equals(a.get('1.txt')[0], item) assert_item_equals(a.get('2.txt')[0], item2)
item2 = Item(u'UID:2') # new item 2 in b item2 = Item(u'UID:2\nASDF:YES') # update of item 2 in a
b.upload(item2) a.update('2.txt', item2, a.get('2.txt')[1])
sync(a, b, status) sync(a, b, status)
assert_item_equals(a.get('2.txt')[0], item2) assert_item_equals(b.get('2.txt')[0], item2)
item2 = Item(u'UID:2\nASDF:YES') # update of item 2 in a def test_deletion():
a.update('2.txt', item2, a.get('2.txt')[1]) a = MemoryStorage()
sync(a, b, status) b = MemoryStorage()
assert_item_equals(b.get('2.txt')[0], item2) status = {}
def test_deletion(self): item = Item(u'UID:1')
a = MemoryStorage() a.upload(item)
b = MemoryStorage() sync(a, b, status)
status = {} b.delete('1.txt', b.get('1.txt')[1])
sync(a, b, status)
assert not a.has('1.txt') and not b.has('1.txt')
item = Item(u'UID:1') a.upload(item)
a.upload(item) sync(a, b, status)
sync(a, b, status) assert a.has('1.txt') and b.has('1.txt')
b.delete('1.txt', b.get('1.txt')[1]) a.delete('1.txt', a.get('1.txt')[1])
sync(a, b, status) sync(a, b, status)
assert not a.has('1.txt') and not b.has('1.txt') assert not a.has('1.txt') and not b.has('1.txt')
a.upload(item) def test_already_synced():
sync(a, b, status) a = MemoryStorage()
assert a.has('1.txt') and b.has('1.txt') b = MemoryStorage()
a.delete('1.txt', a.get('1.txt')[1]) item = Item(u'UID:1')
sync(a, b, status) a.upload(item)
assert not a.has('1.txt') and not b.has('1.txt') b.upload(item)
status = {
'1': ('1.txt', a.get('1.txt')[1], '1.txt', b.get('1.txt')[1])
}
old_status = dict(status)
a.update = b.update = a.upload = b.upload = \
lambda *a, **kw: pytest.fail('Method shouldn\'t have been called.')
sync(a, b, status)
assert status == old_status
assert a.has('1.txt') and b.has('1.txt')
def test_already_synced(self): def test_conflict_resolution_both_etags_new():
a = MemoryStorage() a = MemoryStorage()
b = MemoryStorage() b = MemoryStorage()
item = Item(u'UID:1') item = Item(u'UID:1')
a.upload(item) href_a, etag_a = a.upload(item)
b.upload(item) href_b, etag_b = b.upload(item)
status = { status = {}
'1': ('1.txt', a.get('1.txt')[1], '1.txt', b.get('1.txt')[1]) sync(a, b, status)
} assert status
old_status = dict(status) a.update(href_a, Item(u'UID:1\nASDASD'), etag_a)
a.update = b.update = a.upload = b.upload = \ b.update(href_b, Item(u'UID:1\nHUEHUE'), etag_b)
lambda *a, **kw: self.fail('Method shouldn\'t have been called.') with pytest.raises(exceptions.SyncConflict):
sync(a, b, status) sync(a, b, status)
assert status == old_status sync(a, b, status, conflict_resolution='a wins')
assert a.has('1.txt') and b.has('1.txt') obj_a, _ = a.get(href_a)
obj_b, _ = b.get(href_b)
assert_item_equals(obj_a, obj_b)
n = normalize_item(obj_a)
assert u'UID:1' in n
assert u'ASDASD' in n
def test_conflict_resolution_both_etags_new(self): def test_conflict_resolution_new_etags_without_changes():
a = MemoryStorage() a = MemoryStorage()
b = MemoryStorage() b = MemoryStorage()
item = Item(u'UID:1') item = Item(u'UID:1')
href_a, etag_a = a.upload(item) href_a, etag_a = a.upload(item)
href_b, etag_b = b.upload(item) href_b, etag_b = b.upload(item)
status = {} status = {'1': (href_a, 'BOGUS_a', href_b, 'BOGUS_b')}
sync(a, b, status) sync(a, b, status)
assert status assert status == {'1': (href_a, etag_a, href_b, etag_b)}
a.update(href_a, Item(u'UID:1\nASDASD'), etag_a)
b.update(href_b, Item(u'UID:1\nHUEHUE'), etag_b)
self.assertRaises(exceptions.SyncConflict, sync, a, b, status)
sync(a, b, status, conflict_resolution='a wins')
obj_a, _ = a.get(href_a)
obj_b, _ = b.get(href_b)
assert_item_equals(obj_a, obj_b)
n = normalize_item(obj_a)
assert u'UID:1' in n
assert u'ASDASD' in n
def tset_conflict_resolution_new_etags_without_changes(self):
a = MemoryStorage()
b = MemoryStorage()
item = Item(u'UID:1')
href_a, etag_a = a.upload(item)
href_b, etag_b = b.upload(item)
status = {'1': (href_a, 'BOGUS_a', href_b, 'BOGUS_b')}
sync(a, b, status)
assert status == {'1': (href_a, etag_a, href_b, etag_b)}