From d59270d0308e16644f871c3afe977afa33a2420e Mon Sep 17 00:00:00 2001 From: David Herberth Date: Thu, 30 Mar 2023 10:33:29 +0200 Subject: [PATCH] implements owns_stream and owns_stream_with for Controller Signed-off-by: David Herberth --- kube-runtime/Cargo.toml | 3 +- kube-runtime/src/controller/mod.rs | 57 ++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index bbefb28ab..589897a9c 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -15,9 +15,10 @@ rust-version = "1.63.0" edition = "2021" [features] -unstable-runtime = ["unstable-runtime-subscribe", "unstable-runtime-predicates"] +unstable-runtime = ["unstable-runtime-subscribe", "unstable-runtime-predicates", "unstable-runtime-owns-stream"] unstable-runtime-subscribe = [] unstable-runtime-predicates = [] +unstable-runtime-owns-stream = [] [package.metadata.docs.rs] features = ["k8s-openapi/v1_26", "unstable-runtime"] diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 98affaece..11049db4a 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -584,6 +584,63 @@ where self } + /// Trigger the reconciliation process for a stream of `Child` objects of the owner `K` + /// + /// Same as [`Controller::owns`], but instad of a resource a stream of resources is used. + /// This allows for customized and pre-filtered watch streams to be used as a trigger. + /// + /// # Example: + /// + /// ```no_run + /// # use futures::StreamExt; + /// # use k8s_openapi::api::core::v1::ConfigMap; + /// # use k8s_openapi::api::apps::v1::StatefulSet; + /// # use kube::runtime::controller::Action; + /// # use kube::runtime::{predicates, watcher, Controller, WatchStreamExt}; + /// # use kube::{Api, Client, Error, ResourceExt}; + /// # use std::sync::Arc; + /// # type CustomResource = ConfigMap; + /// # async fn reconcile(_: Arc, _: Arc<()>) -> Result { Ok(Action::await_change()) } + /// # fn error_policy(_: Arc, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() } + /// # async fn doc(client: kube::Client) { + /// let sts_stream = watcher(Api::::all(client.clone()), watcher::Config::default()) + /// .touched_objects() + /// .predicate_filter(predicates::generation); + /// + /// Controller::new(Api::::all(client), watcher::Config::default()) + /// .owns_stream(sts_stream) + /// .run(reconcile, error_policy, Arc::new(())) + /// .for_each(|_| std::future::ready(())) + /// .await; + /// # } + /// ``` + #[cfg(feature = "unstable-runtime-owns-stream")] + #[must_use] + pub fn owns_stream + Send + 'static>( + self, + trigger: impl Stream> + Send + 'static, + ) -> Self { + self.owns_stream_with(trigger, ()) + } + + /// Trigger the reconciliation process for a stream of `Child` objects of the owner `K` + /// + /// Same as [`Controller::owns_stream`], but accepts a `DynamicType` so it can be used with dynamic resources. + #[cfg(feature = "unstable-runtime-owns-stream")] + #[must_use] + pub fn owns_stream_with( + mut self, + trigger: impl Stream> + Send + 'static, + dyntype: Child::DynamicType, + ) -> Self + where + Child::DynamicType: Debug + Eq + Hash + Clone, + { + let child_watcher = trigger_owners(trigger, self.dyntype.clone(), dyntype); + self.trigger_selector.push(child_watcher.boxed()); + self + } + /// Specify `Watched` object which `K` has a custom relation to and should be watched /// /// To define the `Watched` relation with `K`, you **must** define a custom relation mapper, which,