Use icalendar for parsing

This commit is contained in:
Markus Unterwaditzer 2014-05-15 15:14:06 +02:00
parent 069641dfab
commit 61ef8b3ee3
4 changed files with 150 additions and 159 deletions

View file

@ -10,6 +10,7 @@
'''
import vdirsyncer.log
from vdirsyncer.utils import text_type
vdirsyncer.log.set_level(vdirsyncer.log.logging.DEBUG)
@ -18,15 +19,71 @@ def normalize_item(item):
# in their filesystem backend
# - PRODID is changed by radicale for some reason after upload, but nobody
# cares about that anyway
rv = set()
for line in item.raw.splitlines():
rv = []
if not isinstance(item, text_type):
item = item.raw
for line in item.splitlines():
line = line.strip()
line = line.strip().split(u':', 1)
if line[0] in ('X-RADICALE-NAME', 'PRODID', 'REV'):
continue
rv.add(u':'.join(line))
return rv
rv.append(u':'.join(line))
return tuple(sorted(rv))
def assert_item_equals(a, b):
assert normalize_item(a) == normalize_item(b)
VCARD_TEMPLATE = u'''BEGIN:VCARD
VERSION:3.0
FN:Cyrus Daboo
N:Daboo;Cyrus
ADR;TYPE=POSTAL:;2822 Email HQ;Suite 2821;RFCVille;PA;15213;USA
EMAIL;TYPE=INTERNET;TYPE=PREF:cyrus@example.com
NICKNAME:me
NOTE:Example VCard.
ORG:Self Employed
TEL;TYPE=WORK;TYPE=VOICE:412 605 0499
TEL;TYPE=FAX:412 605 0705
URL:http://www.example.com
X-SOMETHING:{r}
UID:{r}
END:VCARD'''
TASK_TEMPLATE = u'''BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//dmfs.org//mimedir.icalendar//EN
BEGIN:VTODO
CREATED:20130721T142233Z
DTSTAMP:20130730T074543Z
LAST-MODIFIED;VALUE=DATE-TIME:20140122T151338Z
SEQUENCE:2
SUMMARY:Book: Kowlani - Tödlicher Staub
X-SOMETHING:{r}
UID:{r}
END:VTODO
END:VCALENDAR'''
BARE_EVENT_TEMPLATE = u'''BEGIN:VEVENT
DTSTART:19970714T170000Z
DTEND:19970715T035959Z
SUMMARY:Bastille Day Party
X-SOMETHING:{r}
UID:{r}
END:VEVENT'''
EVENT_TEMPLATE = u'''BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//hacksw/handcal//NONSGML v1.0//EN
''' + BARE_EVENT_TEMPLATE + u'''
END:VCALENDAR'''
SIMPLE_TEMPLATE = u'''BEGIN:FOO
UID:{r}
END:FOO
'''

View file

@ -17,6 +17,7 @@ import requests
import requests.exceptions
from .. import StorageTests
from tests import VCARD_TEMPLATE, TASK_TEMPLATE, EVENT_TEMPLATE
import vdirsyncer.exceptions as exceptions
from vdirsyncer.storage.base import Item
from vdirsyncer.storage.dav import CaldavStorage, CarddavStorage
@ -32,49 +33,6 @@ def _get_server_mixin(server_name):
ServerMixin = _get_server_mixin(dav_server)
VCARD_TEMPLATE = u'''BEGIN:VCARD
VERSION:3.0
FN:Cyrus Daboo
N:Daboo;Cyrus
ADR;TYPE=POSTAL:;2822 Email HQ;Suite 2821;RFCVille;PA;15213;USA
EMAIL;TYPE=INTERNET;TYPE=PREF:cyrus@example.com
NICKNAME:me
NOTE:Example VCard.
ORG:Self Employed
TEL;TYPE=WORK;TYPE=VOICE:412 605 0499
TEL;TYPE=FAX:412 605 0705
URL:http://www.example.com
X-SOMETHING:{r}
UID:{r}
END:VCARD'''
TASK_TEMPLATE = u'''BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//dmfs.org//mimedir.icalendar//EN
BEGIN:VTODO
CREATED:20130721T142233Z
DTSTAMP:20130730T074543Z
LAST-MODIFIED;VALUE=DATE-TIME:20140122T151338Z
SEQUENCE:2
SUMMARY:Book: Kowlani - Tödlicher Staub
X-SOMETHING:{r}
UID:{r}
END:VTODO
END:VCALENDAR'''
EVENT_TEMPLATE = u'''BEGIN:VCALENDAR
VERSION:2.0
PRODID:-//hacksw/handcal//NONSGML v1.0//EN
BEGIN:VEVENT
DTSTART:19970714T170000Z
DTEND:19970715T035959Z
SUMMARY:Bastille Day Party
X-SOMETHING:{r}
UID:{r}
END:VEVENT
END:VCALENDAR'''
templates = {
'VCARD': VCARD_TEMPLATE,

View file

@ -9,55 +9,64 @@
from requests import Response
from tests import normalize_item, SIMPLE_TEMPLATE, BARE_EVENT_TEMPLATE
from vdirsyncer.storage.http import HttpStorage, split_collection
def test_split_collection_simple():
input = u'\r\n'.join((
u'BEGIN:VADDRESSBOOK',
SIMPLE_TEMPLATE.format(r=123),
SIMPLE_TEMPLATE.format(r=345),
SIMPLE_TEMPLATE.format(r=678),
u'END:VADDRESSBOOK'
))
given = split_collection(input)
expected = [
SIMPLE_TEMPLATE.format(r=123),
SIMPLE_TEMPLATE.format(r=345),
SIMPLE_TEMPLATE.format(r=678)
]
assert set(normalize_item(item) for item in split_collection(input)) == \
set(normalize_item(item) for item in expected)
def test_split_collection_timezones():
items = [
(
u'BEGIN:VEVENT',
u'SUMMARY:Eine Kurzinfo',
u'DESCRIPTION:Beschreibung des Termines',
u'END:VEVENT'
),
(
u'BEGIN:VEVENT',
u'SUMMARY:Eine zweite Kurzinfo',
u'DESCRIPTION:Beschreibung des anderen Termines',
u' With an extra line for description',
u'BEGIN:VALARM',
u'ACTION:AUDIO',
u'TRIGGER:19980403T120000',
u'ATTACH;FMTTYPE=audio/basic:http://host.com/pub/ssbanner.aud',
u'REPEAT:4',
u'DURATION:PT1H',
u'END:VALARM',
u'END:VEVENT'
)
BARE_EVENT_TEMPLATE.format(r=123),
BARE_EVENT_TEMPLATE.format(r=345)
]
timezone = (
u'BEGIN:VTIMEZONE',
u'TZID:/mozilla.org/20070129_1/Asia/Tokyo',
u'X-LIC-LOCATION:Asia/Tokyo',
u'BEGIN:STANDARD',
u'TZOFFSETFROM:+0900',
u'TZOFFSETTO:+0900',
u'TZNAME:JST',
u'DTSTART:19700101T000000',
u'END:STANDARD',
u'BEGIN:VTIMEZONE\r\n'
u'TZID:/mozilla.org/20070129_1/Asia/Tokyo\r\n'
u'X-LIC-LOCATION:Asia/Tokyo\r\n'
u'BEGIN:STANDARD\r\n'
u'TZOFFSETFROM:+0900\r\n'
u'TZOFFSETTO:+0900\r\n'
u'TZNAME:JST\r\n'
u'DTSTART:19700101T000000\r\n'
u'END:STANDARD\r\n'
u'END:VTIMEZONE'
)
full = list(
(u'BEGIN:VCALENDAR',) +
timezone + tuple(line for item in items for line in item) +
(u'END:VCALENDAR',)
full = u'\r\n'.join(
[u'BEGIN:VCALENDAR'] +
items +
[timezone, u'END:VCALENDAR']
)
given = set(normalize_item(item) for item in split_collection(full))
expected = set(
normalize_item(u'\r\n'.join((
u'BEGIN:VCALENDAR', item, timezone, u'END:VCALENDAR'
)))
for item in items
)
given = [tuple(x) for x in split_collection(full)]
expected = [(u'BEGIN:VCALENDAR',) + timezone + item + (u'END:VCALENDAR',)
for item in items]
assert given == expected
@ -72,7 +81,6 @@ def test_list(monkeypatch):
(u'BEGIN:VEVENT\n'
u'SUMMARY:Eine zweite Küèrzinfo\n'
u'DESCRIPTION:Beschreibung des anderen Termines\n'
u' With an extra line for description\n'
u'BEGIN:VALARM\n'
u'ACTION:AUDIO\n'
u'TRIGGER:19980403T120000\n'
@ -108,13 +116,15 @@ def test_list(monkeypatch):
item, etag2 = s.get(href)
assert item.uid is None
assert etag2 == etag
found_items[item.raw.strip()] = href
found_items[normalize_item(item)] = href
assert set(found_items) == set(u'BEGIN:VCALENDAR\n' + x + '\nEND:VCALENDAR'
for x in items)
expected = set(normalize_item(u'BEGIN:VCALENDAR\n' + x + '\nEND:VCALENDAR')
for x in items)
assert set(found_items) == expected
for href, etag in s.list():
item, etag2 = s.get(href)
assert item.uid is None
assert etag2 == etag
assert found_items[item.raw.strip()] == href
assert found_items[normalize_item(item)] == href

View file

@ -7,89 +7,55 @@
:license: MIT, see LICENSE for more details.
'''
import itertools
import icalendar.cal
import icalendar.parser
from .base import Item, Storage
from ..utils import expand_path, get_password, request, text_type, urlparse
from ..utils import expand_path, get_password, itervalues, request, \
text_type, urlparse
USERAGENT = 'vdirsyncer'
def split_simple_collection(lines):
item = []
collection_type = None
item_type = None
for line in lines:
if u':' not in line:
key = line
value = None
else:
key, value = (x.strip() for x in line.split(u':', 1))
def split_collection(text, inline=(u'VTIMEZONE',),
wrap_items_with=(u'VCALENDAR',)):
assert isinstance(text, text_type)
collection = icalendar.cal.Component.from_ical(text)
items = collection.subcomponents
if key == u'BEGIN':
if collection_type is None:
collection_type = value
elif item_type is None:
item_type = value
item.append(line)
else:
item.append(line)
elif key == u'END':
if value == collection_type:
break
elif value == item_type:
item.append(line)
yield item
item = []
item_type = None
else:
item.append(line)
else:
if item_type is not None:
item.append(line)
if collection.name in wrap_items_with:
start = u'BEGIN:{}'.format(collection.name)
end = u'END:{}'.format(collection.name)
else:
start = end = u''
def wrap_items(items, collection_type, exclude=(u'VTIMEZONE',)):
inlined_items = {}
for item in items:
key, value = (x.strip() for x in item[0].split(u':'))
if value in exclude:
yield item
else:
yield ([u'BEGIN:' + collection_type] + item +
[u'END:' + collection_type])
if item.name in inline:
inlined_items[item.name] = item
def inline_timezones(items):
timezone = None
for item in items:
if u':' not in item[0]:
import pdb
pdb.set_trace()
if item.name not in inline:
lines = []
lines.append(start)
for inlined_item in itervalues(inlined_items):
lines.extend(to_unicode_lines(inlined_item))
key, value = (x.strip() for x in item[0].split(u':'))
if value == u'VTIMEZONE':
if timezone is not None:
raise ValueError('Multiple timezones.')
timezone = item
else:
if timezone is not None:
item = [item[0]] + timezone + item[1:]
yield item
lines.extend(to_unicode_lines(item))
lines.append(end)
lines.append(u'')
yield u''.join(line + u'\r\n' for line in lines if line)
def split_collection(lines):
collection_type = None
for line in lines:
key, value = (x.strip() for x in line.split(u':'))
if key == u'BEGIN':
collection_type = value
break
def to_unicode_lines(item):
'''icalendar doesn't provide an efficient way of getting the ical data as
unicode. So let's do it ourselves.'''
is_calendar = collection_type == u'VCALENDAR'
rv = split_simple_collection(lines)
if is_calendar:
rv = inline_timezones(wrap_items(rv, collection_type))
return rv
for content_line in item.content_lines():
if content_line:
yield icalendar.parser.foldline(content_line)
def prepare_auth(auth, username, password):
@ -152,8 +118,8 @@ class HttpStorage(Storage):
r = request('GET', self.url, **self._settings)
r.raise_for_status()
self._items.clear()
for i, item in enumerate(split_collection(r.text.splitlines())):
item = Item(u'\n'.join(item))
for i, item in enumerate(split_collection(r.text)):
item = Item(item)
self._items[self._get_href(item)] = item, item.hash
for href, (item, etag) in self._items.items():