Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
ft: support custom reconnect exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
urimandujano committed Jan 17, 2024
1 parent d3e9f2e commit 3839119
Showing 1 changed file with 23 additions and 13 deletions.
36 changes: 23 additions & 13 deletions prefect_kubernetes/utilities.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
""" Utilities for working with the Python Kubernetes API. """
import logging
import time
from pathlib import Path
from typing import List, Optional, Type, TypeVar, Union
from typing import Callable, List, Optional, Type, TypeVar, Union

import urllib3
from cachetools import FIFOCache
Expand Down Expand Up @@ -148,9 +149,7 @@ def _slugify_label_key(key: str, max_length: int = 63, prefix_max_length=253) ->
prefix,
max_length=prefix_max_length,
regex_pattern=r"[^a-zA-Z0-9-\.]+",
).strip(
"_-."
) # Must start or end with alphanumeric characters
).strip("_-.") # Must start or end with alphanumeric characters
or prefix
)

Expand Down Expand Up @@ -192,7 +191,7 @@ class ResilientStreamWatcher:
certain exceptions.
"""

RECONNECT_EXCEPTIONS = (urllib3.exceptions.ProtocolError,)
DEFAULT_RECONNECT_EXCEPTIONS = (urllib3.exceptions.ProtocolError,)

def __init__(
self,
Expand All @@ -204,10 +203,14 @@ def __init__(
self.logger = logger
self.watch = watch.Watch()

reconnect_exceptions = reconnect_exceptions or self.RECONNECT_EXCEPTIONS
reconnect_exceptions = (
reconnect_exceptions
if reconnect_exceptions is not None
else self.DEFAULT_RECONNECT_EXCEPTIONS
)
self.reconnect_exceptions = tuple(reconnect_exceptions)

def _stream(self, cache: Optional[FIFOCache], func, *args, **kwargs):
def _stream(self, cache: Optional[FIFOCache], func: Callable, *args, **kwargs):
keep_streaming = True
while keep_streaming:
try:
Expand All @@ -224,18 +227,25 @@ def _stream(self, cache: Optional[FIFOCache], func, *args, **kwargs):
yield event
else:
yield event
else:
# Case: we've completed the iteration
keep_streaming = False
except self.reconnect_exceptions:
# Case: We've hit an exception we're willing to retry on
if self.logger:
self.logger.exception("Unable to connect, retrying...")
except Exception:
time.sleep(1)
except:
# Case: We hit an exception we're unwilling to retry on
if self.logger:
self.logger.exception("Unexpected error")
self.logger.exception(
f"Unexpected error while streaming {func.__name__}"
)
keep_streaming = False
self.stop()
raise
finally:
if self.logger:
self.logger.debug("Completed stream.")
keep_streaming = False

self.stop()

def api_object_stream(self, func, *args, **kwargs):
"""
Expand Down

0 comments on commit 3839119

Please sign in to comment.