From 1efbcf77f7ea8475c5dfa90b8d492b0aa4575b0b Mon Sep 17 00:00:00 2001 From: Sergey Vasilyev Date: Sun, 11 Jun 2023 18:50:17 +0200 Subject: [PATCH] Re-authenticate if the session is closed by a concurrent request Signed-off-by: Sergey Vasilyev --- kopf/_cogs/clients/api.py | 10 ++++++++++ kopf/_cogs/clients/auth.py | 2 +- kopf/_cogs/clients/errors.py | 13 +++++++++++++ 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/kopf/_cogs/clients/api.py b/kopf/_cogs/clients/api.py index 351f1213..1d712f8f 100644 --- a/kopf/_cogs/clients/api.py +++ b/kopf/_cogs/clients/api.py @@ -84,6 +84,16 @@ async def request( ) await errors.check_response(response) # but do not parse it! + # aiohttp raises a generic error if the session/transport is closed, so we try to guess. + except RuntimeError as e: + if context.session.closed: + # NB: this will reset the retry counter and do the full cycle with the new creds. + # TODO: find a way to gracefully replace the active session in the existing context, + # so that all ongoing requests would switch to new session & credentials. + logger.error(f"Request attempt {idx} failed; will try re-authenticating: {what}") + raise errors.APISessionClosed("Session is closed.") from e + raise + except (aiohttp.ClientConnectionError, errors.APIServerError, asyncio.TimeoutError) as e: if backoff is None: # i.e. the last or the only attempt. logger.error(f"Request attempt {idx} failed; escalating: {what} -> {e!r}") diff --git a/kopf/_cogs/clients/auth.py b/kopf/_cogs/clients/auth.py index b92d101b..230799c5 100644 --- a/kopf/_cogs/clients/auth.py +++ b/kopf/_cogs/clients/auth.py @@ -43,7 +43,7 @@ async def wrapper(*args: Any, **kwargs: Any) -> Any: async for key, info, context in vault.extended(APIContext, 'contexts'): try: return await fn(*args, **kwargs, context=context) - except errors.APIUnauthorizedError as e: + except (errors.APIUnauthorizedError, errors.APISessionClosed) as e: await vault.invalidate(key, exc=e) # Normally, either `vault.extended()` or `vault.invalidate()` raise the login errors. diff --git a/kopf/_cogs/clients/errors.py b/kopf/_cogs/clients/errors.py index 45317efd..a5919234 100644 --- a/kopf/_cogs/clients/errors.py +++ b/kopf/_cogs/clients/errors.py @@ -113,6 +113,19 @@ class APIConflictError(APIClientError): pass +class APISessionClosed(Exception): + """ + A helper to escalate from inside the requests to cause re-authentication. + + This happens when credentials expire while multiple concurrent requests + are ongoing (including their retries, mostly their back-off timeouts): + one random request will raise HTTP 401 and cause the re-authentication, + while others will retry their requests with the old session (now closed!) + and get a generic RuntimeError from aiohttp, thus failing their whole task. + """ + pass + + async def check_response( response: aiohttp.ClientResponse, ) -> None: