Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add resilient streaming #107

Merged
merged 17 commits into from
Jan 19, 2024
Merged

Add resilient streaming #107

merged 17 commits into from
Jan 19, 2024

Conversation

urimandujano
Copy link
Contributor

@urimandujano urimandujano commented Jan 17, 2024

This PR introduces a new ResilientWatcher utility to swap in for kubernetes.watch.Watch objects. This new class will "resiliently" watch streams produced by Kubernetes client functions. If an exception we want to retry is raised during the stream, the new utility will sleep for 1 second before restarting the stream from the begining. All other exceptions are raised and halt the stream.

The behavior of restarting the stream from the beginning means its possible to produce duplicate objects while streaming. This PR helps to address that by adding a cache (specifically for streaming API objects) that can track which data has been produced by a stream and filter redundant data out. This is the functionality of the api_objects_stream method - it tracks Kubenetes object IDs in an internal cache. There is another method log_stream which is intended to be used for client functions that return a stream of logs. Since there is no way to reliably deduplicate logs, logs will be repeated on restarted streams.

Closes #96 and #106

Example

# Using `ResilientStreamWatcher` on its own

from kubernetes import client, config

from prefect_kubernetes.utilities import ResilientStreamWatcher

config.load_kube_config(context="docker-desktop")


def list_namespace():
    watcher = ResilientStreamWatcher()
    for ns in watcher.api_object_stream(
        client.CoreV1Api().list_namespace, timeout_seconds=5
    ):
        print(
            f"Got item {ns['object'].metadata.name} with ID {ns['object'].metadata.uid} "
        )


def list_logs():
    watcher = ResilientStreamWatcher()
    for log in watcher.log_stream(
        client.CoreV1Api().read_namespaced_pod_log,
        name="mypod",
        namespace="default",
        follow=True,
        _preload_content=False,
    ):
        print(log)


if __name__ == "__main__":
    list_namespace()
    list_logs()

Screenshots

Checklist

  • References any related issue by including "Closes #" or "Closes ".
    • If no issue exists and your change is not a small fix, please create an issue first.
  • Includes tests or only affects documentation.
  • Passes pre-commit checks.
    • Run pre-commit install && pre-commit run --all locally for formatting and linting.
  • Includes screenshots of documentation updates.
    • Run mkdocs serve view documentation locally.
  • Summarizes PR's changes in CHANGELOG.md

@urimandujano urimandujano changed the title Ft/add resilient streaming Add resilient streaming Jan 17, 2024
@urimandujano urimandujano marked this pull request as ready for review January 17, 2024 18:38
@urimandujano urimandujano requested a review from a team as a code owner January 17, 2024 18:38
Copy link
Member

@desertaxle desertaxle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really cool! I have a couple of stylistic comments/questions. Also, +1 to @abrookins comment on using dequeue.

prefect_kubernetes/utilities.py Outdated Show resolved Hide resolved
cache = FIFOCache(maxsize=self.max_cache_size)
yield from self._stream(cache, func, *args, **kwargs)

def log_stream(self, func: Callable, *args, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function looks like it's a very thin wrapper around _stream and doesn't inherently have any knowledge of logs. Would we be able to remove this method and make _stream 'public' instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it'll be a good experience to have stream publicly available for a more general use case. I removed log_stream and updated the stream docstring to note it's behaviors on log stream client methods.

prefect_kubernetes/utilities.py Outdated Show resolved Hide resolved
tests/test_events_replicator.py Outdated Show resolved Hide resolved
tests/test_events_replicator.py Outdated Show resolved Hide resolved
tests/test_worker.py Outdated Show resolved Hide resolved
tests/test_worker.py Outdated Show resolved Hide resolved
Copy link
Member

@desertaxle desertaxle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very cool implementation!

prefect_kubernetes/utilities.py Outdated Show resolved Hide resolved
prefect_kubernetes/utilities.py Outdated Show resolved Hide resolved
prefect_kubernetes/utilities.py Outdated Show resolved Hide resolved
@urimandujano urimandujano merged commit f2e4d88 into main Jan 19, 2024
6 checks passed
@urimandujano urimandujano deleted the ft/add-resilient-streaming branch January 19, 2024 22:21
urimandujano added a commit that referenced this pull request Jan 30, 2024
urimandujano added a commit that referenced this pull request Feb 1, 2024
urimandujano added a commit that referenced this pull request Feb 1, 2024
* Revert "Add resilient streaming (#107)"

This reverts commit f2e4d88.

* ft: update changelog

* ft: unreverts some testing fixes

* fix: pre-commit formatting
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Worker raises InvalidChunkLength when replicating pod events
3 participants