diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index ec1e3f529..1718610ac 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -15,8 +15,9 @@ rust-version = "1.63.0" edition = "2021" [features] -unstable-runtime = ["unstable-runtime-subscribe"] +unstable-runtime = ["unstable-runtime-subscribe", "unstable-runtime-owns-stream"] unstable-runtime-subscribe = [] +unstable-runtime-owns-stream = [] [package.metadata.docs.rs] features = ["k8s-openapi/v1_26", "unstable-runtime"] diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 98affaece..7d1562469 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -584,6 +584,63 @@ where self } + /// Trigger the reconciliation process for a stream of `Child` objects of the owner `K` + /// + /// Same as [`Controller::owns`], but instad of a resource a stream of resources is used. + /// This allows for customized and pre-filtered watch streams to be used as a trigger. + /// + /// # Example: + /// + /// ```no_run + /// # use futures::StreamExt; + /// # use k8s_openapi::api::core::v1::ConfigMap; + /// # use k8s_openapi::api::apps::v1::StatefulSet; + /// # use kube::runtime::controller::Action; + /// # use kube::runtime::{watcher, Controller, WatchStreamExt}; + /// # use kube::{Api, Client, Error, ResourceExt}; + /// # use std::sync::Arc; + /// # type CustomResource = ConfigMap; + /// # async fn reconcile(_: Arc, _: Arc<()>) -> Result { Ok(Action::await_change()) } + /// # fn error_policy(_: Arc, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() } + /// # async fn doc(client: kube::Client) { + /// let sts_stream = watcher(Api::::all(client.clone()), watcher::Config::default()) + /// .touched_objects() + /// .predicate_filter(predicates::generation); + /// + /// Controller::new(Api::::all(client), watcher::Config::default()) + /// .owns_stream(sts_stream) + /// .run(reconcile, error_policy, Arc::new(())) + /// .for_each(|_| std::future::ready(())) + /// .await; + /// # } + /// ``` + #[cfg(feature = "unstable-runtime-owns-stream")] + #[must_use] + pub fn owns_stream + Send + 'static>( + self, + trigger: impl Stream> + Send + 'static, + ) -> Self { + self.owns_stream_with(trigger, ()) + } + + /// Trigger the reconciliation process for a stream of `Child` objects of the owner `K` + /// + /// Same as [`Controller::owns_stream`], but accepts a `DynamicType` so it can be used with dynamic resources. + #[cfg(feature = "unstable-runtime-owns-stream")] + #[must_use] + pub fn owns_stream_with( + mut self, + trigger: impl Stream> + Send + 'static, + dyntype: Child::DynamicType, + ) -> Self + where + Child::DynamicType: Debug + Eq + Hash + Clone, + { + let child_watcher = trigger_owners(trigger, self.dyntype.clone(), dyntype); + self.trigger_selector.push(child_watcher.boxed()); + self + } + /// Specify `Watched` object which `K` has a custom relation to and should be watched /// /// To define the `Watched` relation with `K`, you **must** define a custom relation mapper, which,