mirror of
https://github.com/samsonjs/vdirsyncer.git
synced 2026-04-27 14:57:41 +00:00
http: retry safe DAV methods on transient aiohttp disconnects; cli: gather with return_exceptions to allow in-flight backoffs to finish
- Retry ServerDisconnectedError/ServerTimeoutError/ClientConnectionError/asyncio.TimeoutError for GET/HEAD/OPTIONS/PROPFIND/REPORT - Keep original rate-limit handling (429, Google 403 usageLimits) - In CLI, avoid cancelling sibling tasks so per-request backoff can complete; re-raise first failure after all tasks finish
This commit is contained in:
parent
4c2c60402e
commit
4990cdf229
2 changed files with 45 additions and 3 deletions
|
|
@ -147,7 +147,14 @@ def sync(ctx, collections, force_delete):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
await asyncio.gather(*tasks)
|
# `return_exceptions=True` ensures that the event loop lives long enough for
|
||||||
|
# backoffs to be able to finish
|
||||||
|
gathered = await asyncio.gather(*tasks, return_exceptions=True)
|
||||||
|
# but now we need to manually check for and propogate a single failure after
|
||||||
|
# allowing all tasks to finish in order to keep exit status non-zero
|
||||||
|
failures = [e for e in gathered if isinstance(e, BaseException)]
|
||||||
|
if failures:
|
||||||
|
raise failures[0]
|
||||||
|
|
||||||
asyncio.run(main(collections))
|
asyncio.run(main(collections))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import platform
|
import platform
|
||||||
|
|
@ -11,6 +12,9 @@ from ssl import create_default_context
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
import requests.auth
|
import requests.auth
|
||||||
|
from aiohttp import ClientConnectionError
|
||||||
|
from aiohttp import ServerDisconnectedError
|
||||||
|
from aiohttp import ServerTimeoutError
|
||||||
from requests.utils import parse_dict_header
|
from requests.utils import parse_dict_header
|
||||||
from tenacity import retry
|
from tenacity import retry
|
||||||
from tenacity import retry_if_exception_type
|
from tenacity import retry_if_exception_type
|
||||||
|
|
@ -152,6 +156,20 @@ def prepare_client_cert(cert):
|
||||||
return cert
|
return cert
|
||||||
|
|
||||||
|
|
||||||
|
class TransientNetworkError(exceptions.Error):
|
||||||
|
"""Transient network condition that should be retried."""
|
||||||
|
|
||||||
|
|
||||||
|
def _is_safe_to_retry_method(method: str) -> bool:
|
||||||
|
"""Returns True if the HTTP method is safe/idempotent to retry.
|
||||||
|
|
||||||
|
We consider these safe for our WebDAV usage:
|
||||||
|
- GET, HEAD, OPTIONS: standard safe methods
|
||||||
|
- PROPFIND, REPORT: read-only DAV queries used for listing/fetching
|
||||||
|
"""
|
||||||
|
return method.upper() in {"GET", "HEAD", "OPTIONS", "PROPFIND", "REPORT"}
|
||||||
|
|
||||||
|
|
||||||
class UsageLimitReached(exceptions.Error):
|
class UsageLimitReached(exceptions.Error):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
@ -190,7 +208,10 @@ async def _is_quota_exceeded_google(response: aiohttp.ClientResponse) -> bool:
|
||||||
@retry(
|
@retry(
|
||||||
stop=stop_after_attempt(5),
|
stop=stop_after_attempt(5),
|
||||||
wait=wait_exponential(multiplier=1, min=4, max=10),
|
wait=wait_exponential(multiplier=1, min=4, max=10),
|
||||||
retry=retry_if_exception_type(UsageLimitReached),
|
retry=(
|
||||||
|
retry_if_exception_type(UsageLimitReached)
|
||||||
|
| retry_if_exception_type(TransientNetworkError)
|
||||||
|
),
|
||||||
reraise=True,
|
reraise=True,
|
||||||
)
|
)
|
||||||
async def request(
|
async def request(
|
||||||
|
|
@ -242,7 +263,21 @@ async def request(
|
||||||
while num_401 < 2:
|
while num_401 < 2:
|
||||||
if auth:
|
if auth:
|
||||||
headers["Authorization"] = auth.get_auth_header(method, url)
|
headers["Authorization"] = auth.get_auth_header(method, url)
|
||||||
response = await session.request(method, url, headers=headers, **kwargs)
|
try:
|
||||||
|
response = await session.request(method, url, headers=headers, **kwargs)
|
||||||
|
except (
|
||||||
|
ServerDisconnectedError,
|
||||||
|
ServerTimeoutError,
|
||||||
|
ClientConnectionError,
|
||||||
|
asyncio.TimeoutError,
|
||||||
|
) as e:
|
||||||
|
# Retry only if the method is safe/idempotent for our DAV use
|
||||||
|
if _is_safe_to_retry_method(method):
|
||||||
|
logger.debug(
|
||||||
|
f"Transient network error on {method} {url}: {e}. Will retry."
|
||||||
|
)
|
||||||
|
raise TransientNetworkError(str(e)) from e
|
||||||
|
raise e from None
|
||||||
|
|
||||||
if response.ok or not auth:
|
if response.ok or not auth:
|
||||||
# we don't need to do the 401-loop if we don't do auth in the first place
|
# we don't need to do the 401-loop if we don't do auth in the first place
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue