Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
ft: updates logs and docstrings
Browse files Browse the repository at this point in the history
  • Loading branch information
urimandujano committed Jan 17, 2024
1 parent c2a4ca4 commit d85d661
Showing 1 changed file with 53 additions and 7 deletions.
60 changes: 53 additions & 7 deletions prefect_kubernetes/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ def _slugify_label_key(key: str, max_length: int = 63, prefix_max_length=253) ->
prefix,
max_length=prefix_max_length,
regex_pattern=r"[^a-zA-Z0-9-\.]+",
).strip("_-.") # Must start or end with alphanumeric characters
).strip(
"_-."
) # Must start or end with alphanumeric characters
or prefix
)

Expand Down Expand Up @@ -199,6 +201,17 @@ def __init__(
max_cache_size: int = 50000,
reconnect_exceptions: Optional[List[Type[Exception]]] = None,
) -> None:
"""
A utility class for managing streams of Kuberenetes API objects and logs
Attributes:
logger: A logger which will be used interally to log errors
max_cache_size: The maximum number of API objects to track in an
internal cache to help deduplicate results on stream reconnects
reconnect_exceptions: A list of exceptions that will cause the stream
to reconnect.
"""

self.max_cache_size = max_cache_size
self.logger = logger
self.watch = watch.Watch()
Expand All @@ -211,6 +224,12 @@ def __init__(
self.reconnect_exceptions = tuple(reconnect_exceptions)

def _stream(self, cache: Optional[FIFOCache], func: Callable, *args, **kwargs):
"""
A private method for streaming API objects or logs from a Kubernetes
client function. This method will reconnect the stream on certain
configurable exceptions and deduplicate results on reconnects if
streaming API objects.
"""
keep_streaming = True
while keep_streaming:
try:
Expand All @@ -228,14 +247,14 @@ def _stream(self, cache: Optional[FIFOCache], func: Callable, *args, **kwargs):
else:
yield event
else:
# Case: we've completed the iteration
# Case: we've finished iterating
keep_streaming = False
except self.reconnect_exceptions:
# Case: We've hit an exception we're willing to retry on
if self.logger:
self.logger.exception("Unable to connect, retrying...")
self.logger.error("Unable to connect, retrying...", exc_info=True)
time.sleep(1)
except:
except Exception:
# Case: We hit an exception we're unwilling to retry on
if self.logger:
self.logger.exception(
Expand All @@ -247,18 +266,45 @@ def _stream(self, cache: Optional[FIFOCache], func: Callable, *args, **kwargs):

self.stop()

def api_object_stream(self, func, *args, **kwargs):
def api_object_stream(self, func: Callable, *args, **kwargs):
"""
Create a FIFOCache to maintain a record of API objects that have been
seen. This is useful because `_stream` will reconnect a stream on
`RECONNECT_EXCEPTIONS` and on reconnect it will restart streaming all
`self.reconnect_exceptions` and on reconnect it will restart streaming all
objects. This cache prevents the same object from being yielded twice.
Args:
func: A Kubernetes client function to call which produces a stream of API o
bjects
*args: Positional arguments to pass to `func`
**kwargs: Keyword arguments to pass to `func`
Returns:
An iterator of API objects
"""
cache = FIFOCache(maxsize=self.max_cache_size)
yield from self._stream(cache, func, *args, **kwargs)

def log_stream(self, func, *args, **kwargs):
def log_stream(self, func: Callable, *args, **kwargs):
"""
Stream logs genererated by a function in a way that will reconnect the
stream on `self.reconnect_exceptions`. On reconnect, the stream will
restart streaming starting from the beginning of the log's history. It
is possible for duplicate logs to be yielded.
Args:
func: A Kubernetes client function to call which produces a stream
of logs
*args: Positional arguments to pass to `func`
**kwargs: Keyword arguments to pass to `func`
Returns:
An iterator of log
"""
yield from self._stream(None, func, *args, **kwargs)

def stop(self):
"""
Shut down the internal Watch object.
"""
self.watch.stop()

0 comments on commit d85d661

Please sign in to comment.