diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index 9439d1177..6adac2fd9 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -14,8 +14,12 @@ categories = ["web-programming::http-client", "caching", "network-programming"] rust-version = "1.63.0" edition = "2021" +[features] +unstable-runtime = ["unstable-runtime-subscribe"] +unstable-runtime-subscribe = [] + [package.metadata.docs.rs] -features = ["k8s-openapi/v1_26"] +features = ["k8s-openapi/v1_26", "unstable-runtime"] # Define the configuration attribute `docsrs`. Used to enable `doc_cfg` feature. rustdoc-args = ["--cfg", "docsrs"] diff --git a/kube-runtime/src/utils/mod.rs b/kube-runtime/src/utils/mod.rs index 3ebc2f74d..d40dd295e 100644 --- a/kube-runtime/src/utils/mod.rs +++ b/kube-runtime/src/utils/mod.rs @@ -3,11 +3,14 @@ mod backoff_reset_timer; mod event_flatten; mod stream_backoff; +#[cfg(feature = "unstable-runtime-subscribe")] pub mod stream_subscribe; mod watch_ext; pub use backoff_reset_timer::ResetTimerBackoff; pub use event_flatten::EventFlatten; pub use stream_backoff::StreamBackoff; +#[cfg(feature = "unstable-runtime-subscribe")] +pub use stream_subscribe::StreamSubscribe; pub use watch_ext::WatchStreamExt; use futures::{ diff --git a/kube-runtime/src/utils/stream_subscribe.rs b/kube-runtime/src/utils/stream_subscribe.rs new file mode 100644 index 000000000..3391ee0cf --- /dev/null +++ b/kube-runtime/src/utils/stream_subscribe.rs @@ -0,0 +1,243 @@ +use core::{ + pin::Pin, + task::{Context, Poll}, +}; +use futures::{stream, Stream}; +use pin_project::pin_project; +use std::{fmt, sync::Arc}; +use tokio::sync::{broadcast, broadcast::error::RecvError}; + +const CHANNEL_CAPACITY: usize = 128; + +/// Exposes the [`StreamSubscribe::subscribe()`] method which allows additional +/// consumers of events from a stream without consuming the stream itself. +/// +/// If a subscriber begins to lag behind the stream, it will receive an [`Error::Lagged`] +/// error. The subscriber can then decide to abort its task or tolerate the lost events. +/// +/// If the [`Stream`] is dropped or ends, any [`StreamSubscribe::subscribe()`] streams +/// will also end. +/// +/// ## Warning +/// +/// If the primary [`Stream`] is not polled, the [`StreamSubscribe::subscribe()`] streams +/// will never receive any events. +#[pin_project] +#[must_use = "subscribers will not get events unless this stream is polled"] +pub struct StreamSubscribe +where + S: Stream, +{ + #[pin] + stream: S, + sender: broadcast::Sender>>, +} + +impl StreamSubscribe { + pub fn new(stream: S) -> Self { + let (sender, _) = broadcast::channel(CHANNEL_CAPACITY); + + Self { stream, sender } + } + + /// Subscribe to events from this stream + #[must_use = "streams do nothing unless polled"] + pub fn subscribe(&self) -> impl Stream, Error>> { + stream::unfold(self.sender.subscribe(), |mut rx| async { + match rx.recv().await { + Ok(Some(obj)) => Some((Ok(obj), rx)), + Err(RecvError::Lagged(amt)) => Some((Err(Error::Lagged(amt)), rx)), + _ => None, + } + }) + } +} + +impl Stream for StreamSubscribe { + type Item = Arc; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + let item = this.stream.poll_next(cx); + + match item { + Poll::Ready(Some(item)) => { + let item = Arc::new(item); + this.sender.send(Some(item.clone())).ok(); + Poll::Ready(Some(item)) + } + Poll::Ready(None) => { + this.sender.send(None).ok(); + Poll::Ready(None) + } + Poll::Pending => Poll::Pending, + } + } +} + +/// An error returned from the inner stream of a [`StreamSubscribe`]. +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum Error { + /// The subscriber lagged too far behind. Polling again will return + /// the oldest event still retained. + /// + /// Includes the number of skipped events. + Lagged(u64), +} + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Error::Lagged(amt) => write!(f, "subscriber lagged by {amt}"), + } + } +} + +impl std::error::Error for Error {} + +#[cfg(test)] +mod tests { + use super::*; + use futures::{pin_mut, poll, stream, StreamExt}; + + #[tokio::test] + async fn stream_subscribe_continues_to_propagate_values() { + let rx = stream::iter([Ok(0), Ok(1), Err(2), Ok(3), Ok(4)]); + let rx = StreamSubscribe::new(rx); + + pin_mut!(rx); + assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Ok(0))))); + assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Ok(1))))); + assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Err(2))))); + assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Ok(3))))); + assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Ok(4))))); + assert_eq!(poll!(rx.next()), Poll::Ready(None)); + } + + #[tokio::test] + async fn all_subscribers_get_events() { + let events = [Ok(0), Ok(1), Err(2), Ok(3), Ok(4)]; + let rx = stream::iter(events); + let rx = StreamSubscribe::new(rx); + + let rx_s1 = rx.subscribe(); + let rx_s2 = rx.subscribe(); + + pin_mut!(rx); + pin_mut!(rx_s1); + pin_mut!(rx_s2); + + // Subscribers are pending until we start consuming the stream + assert_eq!(poll!(rx_s1.next()), Poll::Pending, "rx_s1"); + assert_eq!(poll!(rx_s2.next()), Poll::Pending, "rx_s2"); + + for item in events { + assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(item))), "rx"); + let expected = Poll::Ready(Some(Ok(Arc::new(item)))); + assert_eq!(poll!(rx_s1.next()), expected, "rx_s1"); + assert_eq!(poll!(rx_s2.next()), expected, "rx_s2"); + } + + // Ensure that if the stream is closed, all subscribers are closed + assert_eq!(poll!(rx.next()), Poll::Ready(None), "rx"); + assert_eq!(poll!(rx_s1.next()), Poll::Ready(None), "rx_s1"); + assert_eq!(poll!(rx_s2.next()), Poll::Ready(None), "rx_s2"); + } + + #[tokio::test] + async fn subscribers_can_catch_up_to_the_main_stream() { + let events = (0..CHANNEL_CAPACITY).map(Ok::<_, ()>).collect::>(); + let rx = stream::iter(events.clone()); + let rx = StreamSubscribe::new(rx); + + let rx_s1 = rx.subscribe(); + + pin_mut!(rx); + pin_mut!(rx_s1); + + for item in events.clone() { + assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(item))), "rx",); + } + + for item in events { + assert_eq!( + poll!(rx_s1.next()), + Poll::Ready(Some(Ok(Arc::new(item)))), + "rx_s1" + ); + } + } + + #[tokio::test] + async fn if_the_subscribers_lag_they_get_a_lagged_error_as_the_next_event() { + // The broadcast channel rounds the capacity up to the next power of two. + let max_capacity = CHANNEL_CAPACITY.next_power_of_two(); + let overflow = 5; + let events = (0..max_capacity + overflow).collect::>(); + let rx = stream::iter(events.clone()); + let rx = StreamSubscribe::new(rx); + + let rx_s1 = rx.subscribe(); + + pin_mut!(rx); + pin_mut!(rx_s1); + + // Consume the entire stream, overflowing the inner channel + for _ in events { + rx.next().await; + } + + assert_eq!( + poll!(rx_s1.next()), + Poll::Ready(Some(Err(Error::Lagged(overflow as u64)))), + ); + + let expected_next_event = overflow; + assert_eq!( + poll!(rx_s1.next()), + Poll::Ready(Some(Ok(Arc::new(expected_next_event)))), + ); + } + + #[tokio::test] + async fn a_lagging_subscriber_does_not_impact_a_well_behaved_subscriber() { + // The broadcast channel rounds the capacity up to the next power of two. + let max_capacity = CHANNEL_CAPACITY.next_power_of_two(); + let overflow = 5; + let events = (0..max_capacity + overflow).collect::>(); + let rx = stream::iter(events.clone()); + let rx = StreamSubscribe::new(rx); + + let rx_s1 = rx.subscribe(); + let rx_s2 = rx.subscribe(); + + pin_mut!(rx); + pin_mut!(rx_s1); + pin_mut!(rx_s2); + + for event in events { + assert_eq!(poll!(rx_s1.next()), Poll::Pending, "rx_s1"); + + rx.next().await; + + assert_eq!( + poll!(rx_s1.next()), + Poll::Ready(Some(Ok(Arc::new(event)))), + "rx_s1" + ); + } + + assert_eq!( + poll!(rx_s2.next()), + Poll::Ready(Some(Err(Error::Lagged(overflow as u64)))), + "rx_s2" + ); + + let expected_next_event = overflow; + assert_eq!( + poll!(rx_s2.next()), + Poll::Ready(Some(Ok(Arc::new(expected_next_event)))), + "rx_s2" + ); + } +} diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index 54a6d45bc..7cda50245 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -1,9 +1,10 @@ +#[cfg(feature = "unstable-runtime-subscribe")] +use crate::utils::stream_subscribe::StreamSubscribe; use crate::{ utils::{event_flatten::EventFlatten, stream_backoff::StreamBackoff}, watcher, }; use backoff::backoff::Backoff; - use futures::{Stream, TryStream}; /// Extension trait for streams returned by [`watcher`](watcher()) or [`reflector`](crate::reflector::reflector) @@ -36,5 +37,69 @@ pub trait WatchStreamExt: Stream { { EventFlatten::new(self, true) } + + /// Create a [`StreamSubscribe`] from a [`watcher()`] stream. + /// + /// The [`StreamSubscribe::subscribe()`] method which allows additional consumers + /// of events from a stream without consuming the stream itself. + /// + /// If a subscriber begins to lag behind the stream, it will receive an [`Error::Lagged`] + /// error. The subscriber can then decide to abort its task or tolerate the lost events. + /// + /// If the [`Stream`] is dropped or ends, any [`StreamSubscribe::subscribe()`] streams + /// will also end. + /// + /// ## Warning + /// + /// If the primary [`Stream`] is not polled, the [`StreamSubscribe::subscribe()`] streams + /// will never receive any events. + /// + /// # Usage + /// + /// ``` + /// use futures::{Stream, StreamExt}; + /// use std::{fmt::Debug, sync::Arc}; + /// use kube_runtime::{watcher, WatchStreamExt}; + /// + /// fn explain_events( + /// stream: S, + /// ) -> ( + /// impl Stream, watcher::Error>>> + Send + Sized + 'static, + /// impl Stream + Send + Sized + 'static, + /// ) + /// where + /// K: Debug + Send + Sync + 'static, + /// S: Stream, watcher::Error>> + Send + Sized + 'static, + /// { + /// // Create a stream that can be subscribed to + /// let stream_subscribe = stream.stream_subscribe(); + /// // Create a subscription to that stream + /// let subscription = stream_subscribe.subscribe(); + /// + /// // Create a stream of descriptions of the events + /// let explain_stream = subscription.filter_map(|event| async move { + /// // We don't care about lagged events so we can throw that error away + /// match event.ok()?.as_ref() { + /// Ok(watcher::Event::Applied(event)) => { + /// Some(format!("An object was added or modified: {event:?}")) + /// } + /// Ok(_) => todo!("explain other events"), + /// // We don't care about watcher errors either + /// Err(_) => None, + /// } + /// }); + /// + /// // We now still have the original stream, and a secondary stream of explanations + /// (stream_subscribe, explain_stream) + /// } + /// ``` + #[cfg(feature = "unstable-runtime-subscribe")] + fn stream_subscribe(self) -> StreamSubscribe + where + Self: Stream, watcher::Error>> + Send + Sized + 'static, + { + StreamSubscribe::new(self) + } } + impl WatchStreamExt for St where St: Stream {} diff --git a/kube/Cargo.toml b/kube/Cargo.toml index ae3ee3395..148152e8a 100644 --- a/kube/Cargo.toml +++ b/kube/Cargo.toml @@ -28,6 +28,7 @@ admission = ["kube-core/admission"] derive = ["kube-derive", "kube-core/schema"] config = ["kube-client/config"] runtime = ["kube-runtime"] +unstable-runtime = ["kube-runtime/unstable-runtime"] [package.metadata.docs.rs] features = ["client", "rustls-tls", "openssl-tls", "derive", "ws", "oauth", "jsonpatch", "admission", "runtime", "k8s-openapi/v1_26"]