Port google storage to use asyncio

This commit is contained in:
Hugo Osvaldo Barrera 2021-06-26 11:46:46 +02:00
parent 8d69b73c9e
commit dfed9794cb
4 changed files with 80 additions and 53 deletions

View file

@ -14,6 +14,7 @@ packages:
- python-click-threading
- python-requests
- python-requests-toolbelt
- python-aiohttp-oauthlib
# Test dependencies:
- python-hypothesis
- python-pytest-cov

View file

@ -56,7 +56,7 @@ setup(
install_requires=requirements,
# Optional dependencies
extras_require={
"google": ["requests-oauthlib"],
"google": ["aiohttp-oauthlib"],
"etesync": ["etesync==0.5.2", "django<2.0"],
},
# Build dependencies

View file

@ -411,12 +411,18 @@ class DAVSession:
# XXX: This is a temporary hack to pin-point bad refactoring.
assert self.connector is not None
async with aiohttp.ClientSession(
async with self._session as session:
return await http.request(method, url, session=session, **more)
@property
def _session(self):
"""Return a new session for requests."""
return aiohttp.ClientSession(
connector=self.connector,
connector_owner=False,
# TODO use `raise_for_status=true`, though this needs traces first,
) as session:
return await http.request(method, url, session=session, **more)
)
def get_default_headers(self):
return {

View file

@ -2,6 +2,7 @@ import json
import logging
import os
import urllib.parse as urlparse
from pathlib import Path
import aiohttp
import click
@ -21,7 +22,7 @@ TOKEN_URL = "https://accounts.google.com/o/oauth2/v2/auth"
REFRESH_URL = "https://www.googleapis.com/oauth2/v4/token"
try:
from requests_oauthlib import OAuth2Session
from aiohttp_oauthlib import OAuth2Session
have_oauth2 = True
except ImportError:
@ -37,6 +38,9 @@ class GoogleSession(dav.DAVSession):
url=None,
connector: aiohttp.BaseConnector = None,
):
if not have_oauth2:
raise exceptions.UserError("aiohttp-oauthlib not installed")
# Required for discovering collections
if url is not None:
self.url = url
@ -45,68 +49,84 @@ class GoogleSession(dav.DAVSession):
self._settings = {}
self.connector = connector
if not have_oauth2:
raise exceptions.UserError("requests-oauthlib not installed")
self._token_file = Path(expand_path(token_file))
self._client_id = client_id
self._client_secret = client_secret
self._token = None
token_file = expand_path(token_file)
return self._init_token(token_file, client_id, client_secret)
async def request(self, method, path, **kwargs):
if not self._token:
await self._init_token()
def _init_token(self, token_file, client_id, client_secret):
token = None
try:
with open(token_file) as f:
token = json.load(f)
except OSError:
pass
except ValueError as e:
raise exceptions.UserError(
"Failed to load token file {}, try deleting it. "
"Original error: {}".format(token_file, e)
)
return await super().request(method, path, **kwargs)
def _save_token(token):
checkdir(expand_path(os.path.dirname(token_file)), create=True)
with atomic_write(token_file, mode="w", overwrite=True) as f:
json.dump(token, f)
def _save_token(self, token):
"""Helper function called by OAuth2Session when a token is updated."""
checkdir(expand_path(os.path.dirname(self._token_file)), create=True)
with atomic_write(self._token_file, mode="w", overwrite=True) as f:
json.dump(token, f)
self._session = OAuth2Session(
client_id=client_id,
token=token,
@property
def _session(self):
"""Return a new OAuth session for requests."""
return OAuth2Session(
client_id=self._client_id,
token=self._token,
redirect_uri="urn:ietf:wg:oauth:2.0:oob",
scope=self.scope,
auto_refresh_url=REFRESH_URL,
auto_refresh_kwargs={
"client_id": client_id,
"client_secret": client_secret,
"client_id": self._client_id,
"client_secret": self._client_secret,
},
token_updater=_save_token,
token_updater=lambda token: self._save_token(token),
connector=self.connector,
connector_owner=False,
)
if not token:
authorization_url, state = self._session.authorization_url(
TOKEN_URL,
# access_type and approval_prompt are Google specific
# extra parameters.
access_type="offline",
approval_prompt="force",
async def _init_token(self):
try:
with self._token_file.open() as f:
self._token = json.load(f)
except FileNotFoundError:
pass
except ValueError as e:
raise exceptions.UserError(
"Failed to load token file {}, try deleting it. "
"Original error: {}".format(self._token_file, e)
)
click.echo(f"Opening {authorization_url} ...")
try:
open_graphical_browser(authorization_url)
except Exception as e:
logger.warning(str(e))
click.echo("Follow the instructions on the page.")
code = click.prompt("Paste obtained code")
token = self._session.fetch_token(
REFRESH_URL,
code=code,
# Google specific extra parameter used for client
# authentication
client_secret=client_secret,
)
if not self._token:
# Some times a task stops at this `async`, and another continues the flow.
# At this point, the user has already completed the flow, but is prompeted
# for a second one.
async with self._session as session:
authorization_url, state = session.authorization_url(
TOKEN_URL,
# access_type and approval_prompt are Google specific
# extra parameters.
access_type="offline",
approval_prompt="force",
)
click.echo(f"Opening {authorization_url} ...")
try:
open_graphical_browser(authorization_url)
except Exception as e:
logger.warning(str(e))
click.echo("Follow the instructions on the page.")
code = click.prompt("Paste obtained code")
self._token = await session.fetch_token(
REFRESH_URL,
code=code,
# Google specific extra param used for client authentication:
client_secret=self._client_secret,
)
# FIXME: Ugly
_save_token(token)
self._save_token(self._token)
class GoogleCalendarStorage(dav.CalDAVStorage):