vdirsyncer/vdirsyncer/storage/memory.py
Markus Unterwaditzer 0d9ec646d7 Initial etesync support (#614)
* Initial etesync support

* Optimized get()

* Bugfixes

* bugfixes

* Add writing operations

* collection creation WIP

* Fix collection creation

* tests wip

* wip

* Final touch for test setup

* actually skip tests

* Disable metadata tests

* Avoid polluting working tree

* Avoid importing wsgi-intercept if not installed

* Fix collection tests

* Proper teardown

* Skip bad test

* Remove vtodo

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* wip

* Add docs

* Remove useless pkg

* Don't crash if etesync isn't imported

* Stylefix

* Fix more import errors

* More import errors

* Only run etesync on latest python

* Fix etesync check

* journal-manager migration
2017-04-13 16:27:11 +02:00

75 lines
2 KiB
Python

# -*- coding: utf-8 -*-
import random
from .base import Storage, normalize_meta_value
from .. import exceptions
def _random_string():
return '{:.9f}'.format(random.random())
class MemoryStorage(Storage):
storage_name = 'memory'
'''
Saves data in RAM, only useful for testing.
'''
def __init__(self, fileext='', **kwargs):
if kwargs.get('collection') is not None:
raise exceptions.UserError('MemoryStorage does not support '
'collections.')
self.items = {} # href => (etag, item)
self.metadata = {}
self.fileext = fileext
super(MemoryStorage, self).__init__(**kwargs)
def _get_href(self, item):
return item.ident + self.fileext
def list(self):
for href, (etag, _item) in self.items.items():
yield href, etag
def get(self, href):
etag, item = self.items[href]
return item, etag
def has(self, href):
return href in self.items
def upload(self, item):
href = self._get_href(item)
if href in self.items:
raise exceptions.AlreadyExistingError(existing_href=href)
etag = _random_string()
self.items[href] = (etag, item)
return href, etag
def update(self, href, item, etag):
if href not in self.items:
raise exceptions.NotFoundError(href)
actual_etag, _ = self.items[href]
if etag != actual_etag:
raise exceptions.WrongEtagError(etag, actual_etag)
new_etag = _random_string()
self.items[href] = (new_etag, item)
return new_etag
def delete(self, href, etag):
if not self.has(href):
raise exceptions.NotFoundError(href)
if etag != self.items[href][0]:
raise exceptions.WrongEtagError(etag)
del self.items[href]
def get_meta(self, key):
return normalize_meta_value(self.metadata.get(key))
def set_meta(self, key, value):
self.metadata[key] = normalize_meta_value(value)