From dfed9794cb342100e68e4f02b437a1bbb005f441 Mon Sep 17 00:00:00 2001 From: Hugo Osvaldo Barrera Date: Sat, 26 Jun 2021 11:46:46 +0200 Subject: [PATCH] Port google storage to use asyncio --- .builds/archlinux.yaml | 1 + setup.py | 2 +- vdirsyncer/storage/dav.py | 12 +++- vdirsyncer/storage/google.py | 118 ++++++++++++++++++++--------------- 4 files changed, 80 insertions(+), 53 deletions(-) diff --git a/.builds/archlinux.yaml b/.builds/archlinux.yaml index 4c122dc..cd70509 100644 --- a/.builds/archlinux.yaml +++ b/.builds/archlinux.yaml @@ -14,6 +14,7 @@ packages: - python-click-threading - python-requests - python-requests-toolbelt + - python-aiohttp-oauthlib # Test dependencies: - python-hypothesis - python-pytest-cov diff --git a/setup.py b/setup.py index ea71894..6d291a1 100644 --- a/setup.py +++ b/setup.py @@ -56,7 +56,7 @@ setup( install_requires=requirements, # Optional dependencies extras_require={ - "google": ["requests-oauthlib"], + "google": ["aiohttp-oauthlib"], "etesync": ["etesync==0.5.2", "django<2.0"], }, # Build dependencies diff --git a/vdirsyncer/storage/dav.py b/vdirsyncer/storage/dav.py index 8dc4319..1743265 100644 --- a/vdirsyncer/storage/dav.py +++ b/vdirsyncer/storage/dav.py @@ -411,12 +411,18 @@ class DAVSession: # XXX: This is a temporary hack to pin-point bad refactoring. assert self.connector is not None - async with aiohttp.ClientSession( + async with self._session as session: + return await http.request(method, url, session=session, **more) + + @property + def _session(self): + """Return a new session for requests.""" + + return aiohttp.ClientSession( connector=self.connector, connector_owner=False, # TODO use `raise_for_status=true`, though this needs traces first, - ) as session: - return await http.request(method, url, session=session, **more) + ) def get_default_headers(self): return { diff --git a/vdirsyncer/storage/google.py b/vdirsyncer/storage/google.py index 2829379..fa1199e 100644 --- a/vdirsyncer/storage/google.py +++ b/vdirsyncer/storage/google.py @@ -2,6 +2,7 @@ import json import logging import os import urllib.parse as urlparse +from pathlib import Path import aiohttp import click @@ -21,7 +22,7 @@ TOKEN_URL = "https://accounts.google.com/o/oauth2/v2/auth" REFRESH_URL = "https://www.googleapis.com/oauth2/v4/token" try: - from requests_oauthlib import OAuth2Session + from aiohttp_oauthlib import OAuth2Session have_oauth2 = True except ImportError: @@ -37,6 +38,9 @@ class GoogleSession(dav.DAVSession): url=None, connector: aiohttp.BaseConnector = None, ): + if not have_oauth2: + raise exceptions.UserError("aiohttp-oauthlib not installed") + # Required for discovering collections if url is not None: self.url = url @@ -45,68 +49,84 @@ class GoogleSession(dav.DAVSession): self._settings = {} self.connector = connector - if not have_oauth2: - raise exceptions.UserError("requests-oauthlib not installed") + self._token_file = Path(expand_path(token_file)) + self._client_id = client_id + self._client_secret = client_secret + self._token = None - token_file = expand_path(token_file) - return self._init_token(token_file, client_id, client_secret) + async def request(self, method, path, **kwargs): + if not self._token: + await self._init_token() - def _init_token(self, token_file, client_id, client_secret): - token = None - try: - with open(token_file) as f: - token = json.load(f) - except OSError: - pass - except ValueError as e: - raise exceptions.UserError( - "Failed to load token file {}, try deleting it. " - "Original error: {}".format(token_file, e) - ) + return await super().request(method, path, **kwargs) - def _save_token(token): - checkdir(expand_path(os.path.dirname(token_file)), create=True) - with atomic_write(token_file, mode="w", overwrite=True) as f: - json.dump(token, f) + def _save_token(self, token): + """Helper function called by OAuth2Session when a token is updated.""" + checkdir(expand_path(os.path.dirname(self._token_file)), create=True) + with atomic_write(self._token_file, mode="w", overwrite=True) as f: + json.dump(token, f) - self._session = OAuth2Session( - client_id=client_id, - token=token, + @property + def _session(self): + """Return a new OAuth session for requests.""" + + return OAuth2Session( + client_id=self._client_id, + token=self._token, redirect_uri="urn:ietf:wg:oauth:2.0:oob", scope=self.scope, auto_refresh_url=REFRESH_URL, auto_refresh_kwargs={ - "client_id": client_id, - "client_secret": client_secret, + "client_id": self._client_id, + "client_secret": self._client_secret, }, - token_updater=_save_token, + token_updater=lambda token: self._save_token(token), + connector=self.connector, + connector_owner=False, ) - if not token: - authorization_url, state = self._session.authorization_url( - TOKEN_URL, - # access_type and approval_prompt are Google specific - # extra parameters. - access_type="offline", - approval_prompt="force", + async def _init_token(self): + try: + with self._token_file.open() as f: + self._token = json.load(f) + except FileNotFoundError: + pass + except ValueError as e: + raise exceptions.UserError( + "Failed to load token file {}, try deleting it. " + "Original error: {}".format(self._token_file, e) ) - click.echo(f"Opening {authorization_url} ...") - try: - open_graphical_browser(authorization_url) - except Exception as e: - logger.warning(str(e)) - click.echo("Follow the instructions on the page.") - code = click.prompt("Paste obtained code") - token = self._session.fetch_token( - REFRESH_URL, - code=code, - # Google specific extra parameter used for client - # authentication - client_secret=client_secret, - ) + if not self._token: + # Some times a task stops at this `async`, and another continues the flow. + # At this point, the user has already completed the flow, but is prompeted + # for a second one. + async with self._session as session: + authorization_url, state = session.authorization_url( + TOKEN_URL, + # access_type and approval_prompt are Google specific + # extra parameters. + access_type="offline", + approval_prompt="force", + ) + click.echo(f"Opening {authorization_url} ...") + try: + open_graphical_browser(authorization_url) + except Exception as e: + logger.warning(str(e)) + + click.echo("Follow the instructions on the page.") + code = click.prompt("Paste obtained code") + + self._token = await session.fetch_token( + REFRESH_URL, + code=code, + # Google specific extra param used for client authentication: + client_secret=self._client_secret, + ) + # FIXME: Ugly - _save_token(token) + self._save_token(self._token) class GoogleCalendarStorage(dav.CalDAVStorage):