Merge pull request #985 from telotortium/gcal-oauth-remove-oob

gcal: replace oob OAuth2 with local server redirect
This commit is contained in:
Hugo 2022-06-26 18:42:37 +00:00 committed by GitHub
commit 1f7497c9d1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 91 additions and 4 deletions

View file

@ -2,7 +2,10 @@ import json
import logging
import os
import urllib.parse as urlparse
import wsgiref.simple_server
import wsgiref.util
from pathlib import Path
from threading import Thread
import aiohttp
import click
@ -14,6 +17,8 @@ from ..utils import expand_path
from ..utils import open_graphical_browser
from . import base
from . import dav
from .google_helpers import _RedirectWSGIApp
from .google_helpers import _WSGIRequestHandler
logger = logging.getLogger(__name__)
@ -54,6 +59,7 @@ class GoogleSession(dav.DAVSession):
self._client_id = client_id
self._client_secret = client_secret
self._token = None
self._redirect_uri = None
async def request(self, method, path, **kwargs):
if not self._token:
@ -69,12 +75,18 @@ class GoogleSession(dav.DAVSession):
@property
def _session(self):
"""Return a new OAuth session for requests."""
"""Return a new OAuth session for requests.
Accesses the self.redirect_uri field (str): the URI to redirect
authentication to. Should be a loopback address for a local server that
follows the process detailed in
https://developers.google.com/identity/protocols/oauth2/native-app.
"""
return OAuth2Session(
client_id=self._client_id,
token=self._token,
redirect_uri="urn:ietf:wg:oauth:2.0:oob",
redirect_uri=self._redirect_uri,
scope=self.scope,
auto_refresh_url=REFRESH_URL,
auto_refresh_kwargs={
@ -102,7 +114,18 @@ class GoogleSession(dav.DAVSession):
# Some times a task stops at this `async`, and another continues the flow.
# At this point, the user has already completed the flow, but is prompeted
# for a second one.
wsgi_app = _RedirectWSGIApp("Successfully obtained token.")
wsgiref.simple_server.WSGIServer.allow_reuse_address = False
host = "127.0.0.1"
local_server = wsgiref.simple_server.make_server(
host, 0, wsgi_app, handler_class=_WSGIRequestHandler
)
thread = Thread(target=local_server.handle_request)
thread.start()
self._redirect_uri = f"http://{host}:{local_server.server_port}"
async with self._session as session:
# Fail fast if the address is occupied
authorization_url, state = session.authorization_url(
TOKEN_URL,
# access_type and approval_prompt are Google specific
@ -117,14 +140,23 @@ class GoogleSession(dav.DAVSession):
logger.warning(str(e))
click.echo("Follow the instructions on the page.")
code = click.prompt("Paste obtained code")
thread.join()
logger.debug("server handled request!")
# Note: using https here because oauthlib is very picky that
# OAuth 2.0 should only occur over https.
authorization_response = wsgi_app.last_request_uri.replace(
"http", "https", 1
)
logger.debug(f"authorization_response: {authorization_response}")
self._token = await session.fetch_token(
REFRESH_URL,
code=code,
authorization_response=authorization_response,
# Google specific extra param used for client authentication:
client_secret=self._client_secret,
)
logger.debug(f"token: {self._token}")
local_server.server_close()
# FIXME: Ugly
await self._save_token(self._token)

View file

@ -0,0 +1,55 @@
# SPDX-License-Identifier: Apache-2.0
#
# Based on:
# https://github.com/googleapis/google-auth-library-python-oauthlib/blob/1fb16be1bad9050ee29293541be44e41e82defd7/google_auth_oauthlib/flow.py#L513
import logging
import wsgiref.simple_server
import wsgiref.util
from typing import Any
from typing import Callable
from typing import Dict
from typing import Iterable
from typing import Optional
logger = logging.getLogger(__name__)
class _WSGIRequestHandler(wsgiref.simple_server.WSGIRequestHandler):
"""Custom WSGIRequestHandler."""
def log_message(self, format, *args):
# (format is the argument name defined in the superclass.)
logger.info(format, *args)
class _RedirectWSGIApp:
"""WSGI app to handle the authorization redirect.
Stores the request URI and displays the given success message.
"""
last_request_uri: Optional[str]
def __init__(self, success_message: str):
"""
:param success_message: The message to display in the web browser the
authorization flow is complete.
"""
self.last_request_uri = None
self._success_message = success_message
def __call__(
self,
environ: Dict[str, Any],
start_response: Callable[[str, list], None],
) -> Iterable[bytes]:
"""WSGI Callable.
:param environ: The WSGI environment.
:param start_response: The WSGI start_response callable.
:returns: The response body.
"""
start_response("200 OK", [("Content-type", "text/plain; charset=utf-8")])
self.last_request_uri = wsgiref.util.request_uri(environ)
return [self._success_message.encode("utf-8")]