From 38391196b594929f4fb76dedd21f853eb2ea9580 Mon Sep 17 00:00:00 2001 From: Uriel Mandujano Date: Wed, 17 Jan 2024 11:17:46 -0600 Subject: [PATCH] ft: support custom reconnect exceptions --- prefect_kubernetes/utilities.py | 36 +++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/prefect_kubernetes/utilities.py b/prefect_kubernetes/utilities.py index c6024a9..40f65df 100644 --- a/prefect_kubernetes/utilities.py +++ b/prefect_kubernetes/utilities.py @@ -1,7 +1,8 @@ """ Utilities for working with the Python Kubernetes API. """ import logging +import time from pathlib import Path -from typing import List, Optional, Type, TypeVar, Union +from typing import Callable, List, Optional, Type, TypeVar, Union import urllib3 from cachetools import FIFOCache @@ -148,9 +149,7 @@ def _slugify_label_key(key: str, max_length: int = 63, prefix_max_length=253) -> prefix, max_length=prefix_max_length, regex_pattern=r"[^a-zA-Z0-9-\.]+", - ).strip( - "_-." - ) # Must start or end with alphanumeric characters + ).strip("_-.") # Must start or end with alphanumeric characters or prefix ) @@ -192,7 +191,7 @@ class ResilientStreamWatcher: certain exceptions. """ - RECONNECT_EXCEPTIONS = (urllib3.exceptions.ProtocolError,) + DEFAULT_RECONNECT_EXCEPTIONS = (urllib3.exceptions.ProtocolError,) def __init__( self, @@ -204,10 +203,14 @@ def __init__( self.logger = logger self.watch = watch.Watch() - reconnect_exceptions = reconnect_exceptions or self.RECONNECT_EXCEPTIONS + reconnect_exceptions = ( + reconnect_exceptions + if reconnect_exceptions is not None + else self.DEFAULT_RECONNECT_EXCEPTIONS + ) self.reconnect_exceptions = tuple(reconnect_exceptions) - def _stream(self, cache: Optional[FIFOCache], func, *args, **kwargs): + def _stream(self, cache: Optional[FIFOCache], func: Callable, *args, **kwargs): keep_streaming = True while keep_streaming: try: @@ -224,18 +227,25 @@ def _stream(self, cache: Optional[FIFOCache], func, *args, **kwargs): yield event else: yield event + else: + # Case: we've completed the iteration + keep_streaming = False except self.reconnect_exceptions: + # Case: We've hit an exception we're willing to retry on if self.logger: self.logger.exception("Unable to connect, retrying...") - except Exception: + time.sleep(1) + except: + # Case: We hit an exception we're unwilling to retry on if self.logger: - self.logger.exception("Unexpected error") + self.logger.exception( + f"Unexpected error while streaming {func.__name__}" + ) keep_streaming = False + self.stop() raise - finally: - if self.logger: - self.logger.debug("Completed stream.") - keep_streaming = False + + self.stop() def api_object_stream(self, func, *args, **kwargs): """