Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Runtime: Add WatchStreamExt::subscribe #1131

Merged
merged 15 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,12 @@ categories = ["web-programming::http-client", "caching", "network-programming"]
rust-version = "1.60.0"
edition = "2021"

[features]
unstable-runtime = ["unstable-runtime-subscribe"]
unstable-runtime-subscribe = []

[package.metadata.docs.rs]
features = ["k8s-openapi/v1_26"]
features = ["k8s-openapi/v1_26", "unstable-runtime"]
# Define the configuration attribute `docsrs`. Used to enable `doc_cfg` feature.
rustdoc-args = ["--cfg", "docsrs"]

Expand Down
4 changes: 4 additions & 0 deletions kube-runtime/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@
mod backoff_reset_timer;
mod event_flatten;
mod stream_backoff;
#[cfg(feature = "unstable_runtime_subscribe")]
mod stream_subscribe;
danrspencer marked this conversation as resolved.
Show resolved Hide resolved
mod watch_ext;

pub use backoff_reset_timer::ResetTimerBackoff;
pub use event_flatten::EventFlatten;
pub use stream_backoff::StreamBackoff;
#[cfg(feature = "unstable_runtime_subscribe")]
pub use stream_subscribe::StreamSubscribe;
pub use watch_ext::WatchStreamExt;

use futures::{
Expand Down
243 changes: 243 additions & 0 deletions kube-runtime/src/utils/stream_subscribe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
use core::{
pin::Pin,
task::{Context, Poll},
};
use futures::{stream, Stream};
use pin_project::pin_project;
use std::{fmt, sync::Arc};
use tokio::sync::{broadcast, broadcast::error::RecvError};

const CHANNEL_CAPACITY: usize = 128;

/// Exposes the [`StreamSubscribe::subscribe()`] method which allows additional
/// consumers of events from a stream without consuming the stream itself.
///
/// If a subscriber begins to lag behind the stream, it will receive an [`Error::Lagged`]
/// error. The subscriber can then decide to abort its task or tolerate the lost events.
///
/// If the [`Stream`] is dropped or ends, any [`StreamSubscribe::subscribe()`] streams
/// will also end.
///
/// ## Warning
///
/// If the primary [`Stream`] is not polled, the [`StreamSubscribe::subscribe()`] streams
/// will never receive any events.
#[pin_project]
#[must_use = "subscribers will not get events unless this stream is polled"]
pub struct StreamSubscribe<S>
where
S: Stream,
{
#[pin]
stream: S,
sender: broadcast::Sender<Option<Arc<S::Item>>>,
}

impl<S: Stream> StreamSubscribe<S> {
pub fn new(stream: S) -> Self {
let (sender, _) = broadcast::channel(CHANNEL_CAPACITY);

Self { stream, sender }
}

/// Subscribe to events from this stream
#[must_use = "streams do nothing unless polled"]
pub fn subscribe(&self) -> impl Stream<Item = Result<Arc<S::Item>, Error>> {
stream::unfold(self.sender.subscribe(), |mut rx| async {
match rx.recv().await {
Ok(Some(obj)) => Some((Ok(obj), rx)),
Err(RecvError::Lagged(amt)) => Some((Err(Error::Lagged(amt)), rx)),
_ => None,
clux marked this conversation as resolved.
Show resolved Hide resolved
}
})
}
}

impl<S: Stream> Stream for StreamSubscribe<S> {
type Item = Arc<S::Item>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let item = this.stream.poll_next(cx);

match item {
Poll::Ready(Some(item)) => {
let item = Arc::new(item);
this.sender.send(Some(item.clone())).ok();
Poll::Ready(Some(item))
}
Poll::Ready(None) => {
this.sender.send(None).ok();
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
}
}

/// An error returned from the inner stream of a [`StreamSubscribe`].
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum Error {
/// The subscriber lagged too far behind. Polling again will return
/// the oldest event still retained.
///
/// Includes the number of skipped events.
Lagged(u64),
}
clux marked this conversation as resolved.
Show resolved Hide resolved

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::Lagged(amt) => write!(f, "subscriber lagged by {amt}"),
}
}
}

impl std::error::Error for Error {}

#[cfg(test)]
mod tests {
use super::*;
use futures::{pin_mut, poll, stream, StreamExt};

#[tokio::test]
async fn stream_subscribe_continues_to_propagate_values() {
clux marked this conversation as resolved.
Show resolved Hide resolved
let rx = stream::iter([Ok(0), Ok(1), Err(2), Ok(3), Ok(4)]);
let rx = StreamSubscribe::new(rx);

pin_mut!(rx);
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Ok(0)))));
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Ok(1)))));
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Err(2)))));
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Ok(3)))));
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Ok(4)))));
assert_eq!(poll!(rx.next()), Poll::Ready(None));
}

#[tokio::test]
async fn all_subscribers_get_events() {
let events = [Ok(0), Ok(1), Err(2), Ok(3), Ok(4)];
let rx = stream::iter(events);
let rx = StreamSubscribe::new(rx);

let rx_s1 = rx.subscribe();
let rx_s2 = rx.subscribe();

pin_mut!(rx);
pin_mut!(rx_s1);
pin_mut!(rx_s2);

// Subscribers are pending until we start consuming the stream
assert_eq!(poll!(rx_s1.next()), Poll::Pending, "rx_s1");
assert_eq!(poll!(rx_s2.next()), Poll::Pending, "rx_s2");

for item in events {
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(item))), "rx");
let expected = Poll::Ready(Some(Ok(Arc::new(item))));
clux marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(poll!(rx_s1.next()), expected, "rx_s1");
assert_eq!(poll!(rx_s2.next()), expected, "rx_s2");
}

// Ensure that if the stream is closed, all subscribers are closed
clux marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(poll!(rx.next()), Poll::Ready(None), "rx");
assert_eq!(poll!(rx_s1.next()), Poll::Ready(None), "rx_s1");
assert_eq!(poll!(rx_s2.next()), Poll::Ready(None), "rx_s2");
}

#[tokio::test]
async fn subscribers_can_catch_up_to_the_main_stream() {
let events = (0..CHANNEL_CAPACITY).map(Ok::<_, ()>).collect::<Vec<_>>();
let rx = stream::iter(events.clone());
let rx = StreamSubscribe::new(rx);

let rx_s1 = rx.subscribe();

pin_mut!(rx);
pin_mut!(rx_s1);

for item in events.clone() {
assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(item))), "rx",);
}

for item in events {
assert_eq!(
poll!(rx_s1.next()),
Poll::Ready(Some(Ok(Arc::new(item)))),
"rx_s1"
);
}
}

#[tokio::test]
async fn if_the_subscribers_lag_they_get_a_lagged_error_as_the_next_event() {
// The broadcast channel rounds the capacity up to the next power of two.
let max_capacity = CHANNEL_CAPACITY.next_power_of_two();
clux marked this conversation as resolved.
Show resolved Hide resolved
let overflow = 5;
let events = (0..max_capacity + overflow).collect::<Vec<_>>();
let rx = stream::iter(events.clone());
let rx = StreamSubscribe::new(rx);

let rx_s1 = rx.subscribe();

pin_mut!(rx);
pin_mut!(rx_s1);

// Consume the entire stream, overflowing the inner channel
for _ in events {
rx.next().await;
}

assert_eq!(
poll!(rx_s1.next()),
Poll::Ready(Some(Err(Error::Lagged(overflow as u64)))),
);

let expected_next_event = overflow;
clux marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(
poll!(rx_s1.next()),
Poll::Ready(Some(Ok(Arc::new(expected_next_event)))),
);
}

#[tokio::test]
async fn a_lagging_subscriber_does_not_impact_a_well_behaved_subscriber() {
// The broadcast channel rounds the capacity up to the next power of two.
let max_capacity = CHANNEL_CAPACITY.next_power_of_two();
let overflow = 5;
let events = (0..max_capacity + overflow).collect::<Vec<_>>();
let rx = stream::iter(events.clone());
let rx = StreamSubscribe::new(rx);

let rx_s1 = rx.subscribe();
let rx_s2 = rx.subscribe();

pin_mut!(rx);
pin_mut!(rx_s1);
pin_mut!(rx_s2);

for event in events {
assert_eq!(poll!(rx_s1.next()), Poll::Pending, "rx_s1");

rx.next().await;

assert_eq!(
poll!(rx_s1.next()),
Poll::Ready(Some(Ok(Arc::new(event)))),
"rx_s1"
);
}

assert_eq!(
poll!(rx_s2.next()),
Poll::Ready(Some(Err(Error::Lagged(overflow as u64)))),
"rx_s2"
);

let expected_next_event = overflow;
assert_eq!(
poll!(rx_s2.next()),
Poll::Ready(Some(Ok(Arc::new(expected_next_event)))),
"rx_s2"
);
}
}
67 changes: 66 additions & 1 deletion kube-runtime/src/utils/watch_ext.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
#[cfg(feature = "unstable_runtime_subscribe")]
use crate::utils::stream_subscribe::StreamSubscribe;
use crate::{
utils::{event_flatten::EventFlatten, stream_backoff::StreamBackoff},
watcher,
};
use backoff::backoff::Backoff;

use futures::{Stream, TryStream};

/// Extension trait for streams returned by [`watcher`](watcher()) or [`reflector`](crate::reflector::reflector)
Expand Down Expand Up @@ -36,5 +37,69 @@ pub trait WatchStreamExt: Stream {
{
EventFlatten::new(self, true)
}

/// Create a [`StreamSubscribe`] from a [`watcher()`] stream.
///
/// The [`StreamSubscribe::subscribe()`] method which allows additional consumers
/// of events from a stream without consuming the stream itself.
///
/// If a subscriber begins to lag behind the stream, it will receive an [`Error::Lagged`]
/// error. The subscriber can then decide to abort its task or tolerate the lost events.
///
/// If the [`Stream`] is dropped or ends, any [`StreamSubscribe::subscribe()`] streams
/// will also end.
///
/// ## Warning
///
/// If the primary [`Stream`] is not polled, the [`StreamSubscribe::subscribe()`] streams
/// will never receive any events.
///
/// # Usage
///
/// ```
/// use futures::{Stream, StreamExt};
/// use std::{fmt::Debug, sync::Arc};
/// use kube_runtime::{watcher, WatchStreamExt};
///
/// fn explain_events<K, S>(
/// stream: S,
/// ) -> (
/// impl Stream<Item = Arc<Result<watcher::Event<K>, watcher::Error>>> + Send + Sized + 'static,
/// impl Stream<Item = String> + Send + Sized + 'static,
/// )
/// where
/// K: Debug + Send + Sync + 'static,
/// S: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Send + Sized + 'static,
/// {
/// // Create a stream that can be subscribed to
/// let stream_subscribe = stream.stream_subscribe();
/// // Create a subscription to that stream
/// let subscription = stream_subscribe.subscribe();
///
/// // Create a stream of descriptions of the events
/// let explain_stream = subscription.filter_map(|event| async move {
/// // We don't care about lagged events so we can throw that error away
/// match event.ok()?.as_deref() {
/// Ok(watcher::Event::Applied(event)) => {
/// Some(format!("An object was added or modified: {event:?}"))
/// }
/// Ok(_) => todo!("explain other events"),
/// // We don't care about watcher errors either
/// Err(_) => None,
/// }
/// });
///
/// // We now still have the original stream, and a secondary stream of explanations
/// (stream_subscribe, explain_stream)
/// }
/// ```
#[cfg(feature = "unstable_runtime_subscribe")]
fn stream_subscribe<K>(self) -> StreamSubscribe<Self>
where
Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Send + Sized + 'static,
{
StreamSubscribe::new(self)
}
}

impl<St: ?Sized> WatchStreamExt for St where St: Stream {}
1 change: 1 addition & 0 deletions kube/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ admission = ["kube-core/admission"]
derive = ["kube-derive", "kube-core/schema"]
config = ["kube-client/config"]
runtime = ["kube-runtime"]
unstable-runtime = ["kube-runtime/unstable-runtime"]

[package.metadata.docs.rs]
features = ["client", "rustls-tls", "openssl-tls", "derive", "ws", "oauth", "jsonpatch", "admission", "runtime", "k8s-openapi/v1_26"]
Expand Down