diff --git a/examples/dynamic_watcher.rs b/examples/dynamic_watcher.rs index 9ab6753c6..c2fb3944d 100644 --- a/examples/dynamic_watcher.rs +++ b/examples/dynamic_watcher.rs @@ -6,7 +6,7 @@ use kube::{ use serde::de::DeserializeOwned; use tracing::*; -use std::{env, fmt::Debug}; +use std::{env, fmt::Debug, sync::Arc}; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -39,9 +39,9 @@ async fn main() -> anyhow::Result<()> { } async fn handle_events< - K: Resource + Clone + Debug + Send + DeserializeOwned + 'static, + K: Resource + Clone + Debug + Send + Sync + DeserializeOwned + 'static, >( - stream: impl Stream>> + Send + 'static, + stream: impl Stream>>> + Send + 'static, ar: &ApiResource, ) -> anyhow::Result<()> { let mut items = stream.applied_objects().boxed(); diff --git a/examples/event_watcher.rs b/examples/event_watcher.rs index dcdd64b55..75c17fec2 100644 --- a/examples/event_watcher.rs +++ b/examples/event_watcher.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use futures::{pin_mut, TryStreamExt}; use k8s_openapi::{ api::{core::v1::ObjectReference, events::v1::Event}, @@ -50,6 +52,15 @@ async fn main() -> anyhow::Result<()> { Ok(()) } +// This function lets the app handle an added/modified event from k8s +fn handle_event(ev: Arc) -> anyhow::Result<()> { + info!( + "Event: \"{}\" via {} {}", + ev.message.as_ref().unwrap().trim(), + ev.involved_object.kind.as_ref().unwrap(), + ev.involved_object.name.as_ref().unwrap() + ); + Ok(()) fn format_objref(oref: ObjectReference) -> Option { Some(format!("{}/{}", oref.kind?, oref.name?)) } diff --git a/examples/multi_watcher.rs b/examples/multi_watcher.rs index 9311a24ae..bcf42d679 100644 --- a/examples/multi_watcher.rs +++ b/examples/multi_watcher.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use futures::{stream, StreamExt, TryStreamExt}; use k8s_openapi::api::{ apps::v1::Deployment, @@ -31,9 +33,9 @@ async fn main() -> anyhow::Result<()> { // SelectAll Stream elements must have the same Item, so all packed in this: #[allow(clippy::large_enum_variant)] enum Watched { - Config(ConfigMap), - Deploy(Deployment), - Secret(Secret), + Config(Arc), + Deploy(Arc), + Secret(Arc), } while let Some(o) = combo_stream.try_next().await? { match o { diff --git a/examples/node_watcher.rs b/examples/node_watcher.rs index 1391abe11..e84c89d6d 100644 --- a/examples/node_watcher.rs +++ b/examples/node_watcher.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use futures::{pin_mut, TryStreamExt}; use k8s_openapi::api::core::v1::{Event, Node}; use kube::{ @@ -31,12 +33,13 @@ async fn main() -> anyhow::Result<()> { } // A simple node problem detector -async fn check_for_node_failures(events: &Api, o: Node) -> anyhow::Result<()> { +async fn check_for_node_failures(events: &Api, o: Arc) -> anyhow::Result<()> { let name = o.name_any(); // Nodes often modify a lot - only print broken nodes - if let Some(true) = o.spec.unwrap().unschedulable { + if let Some(true) = o.spec.clone().unwrap().unschedulable { let failed = o .status + .clone() .unwrap() .conditions .unwrap() diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index cd85f241f..963382d46 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -91,10 +91,10 @@ impl Action { /// Helper for building custom trigger filters, see the implementations of [`trigger_self`] and [`trigger_owners`] for some examples. pub fn trigger_with( stream: S, - mapper: impl Fn(T) -> I, + mapper: impl Fn(Arc) -> I, ) -> impl Stream, S::Error>> where - S: TryStream, + S: TryStream>, I: IntoIterator, I::Item: Into>, K: Resource, @@ -110,29 +110,29 @@ pub fn trigger_self( dyntype: K::DynamicType, ) -> impl Stream, S::Error>> where - S: TryStream, + S: TryStream>, K: Resource, K::DynamicType: Clone, { trigger_with(stream, move |obj| { Some(ReconcileRequest { - obj_ref: ObjectRef::from_obj_with(&obj, dyntype.clone()), + obj_ref: ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()), reason: ReconcileReason::ObjectUpdated, }) }) } /// Enqueues any mapper returned `K` types for reconciliation -fn trigger_others( +fn trigger_others( stream: S, - mapper: impl Fn(S::Ok) -> I + Sync + Send + 'static, - dyntype: ::DynamicType, + mapper: impl Fn(Arc) -> I + Sync + Send + 'static, + dyntype: T::DynamicType, ) -> impl Stream, S::Error>> where // Input stream has items as some Resource (via Controller::watches) - S: TryStream, - S::Ok: Resource, - ::DynamicType: Clone, + S: TryStream>, + T: Resource, + T::DynamicType: Clone, // Output stream is requests for the root type K K: Resource, K::DynamicType: Clone, @@ -141,7 +141,7 @@ where I::IntoIter: Send, { trigger_with(stream, move |obj| { - let watch_ref = ObjectRef::from_obj_with(&obj, dyntype.clone()).erase(); + let watch_ref = ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()).erase(); mapper(obj) .into_iter() .map(move |mapped_obj_ref| ReconcileRequest { @@ -154,19 +154,19 @@ where } /// Enqueues any owners of type `KOwner` for reconciliation -pub fn trigger_owners( +pub fn trigger_owners( stream: S, owner_type: KOwner::DynamicType, - child_type: ::DynamicType, + child_type: T::DynamicType, ) -> impl Stream, S::Error>> where - S: TryStream, - S::Ok: Resource, - ::DynamicType: Clone, + S: TryStream>, + T: Resource, + T::DynamicType: Clone, KOwner: Resource, KOwner::DynamicType: Clone, { - let mapper = move |obj: S::Ok| { + let mapper = move |obj: Arc| { let meta = obj.meta().clone(); let ns = meta.namespace; let owner_type = owner_type.clone(); @@ -653,7 +653,7 @@ where /// Prefer [`Controller::new`] if you do not need to share the stream, or do not need pre-filtering. #[cfg(feature = "unstable-runtime-stream-control")] pub fn for_stream( - trigger: impl Stream> + Send + 'static, + trigger: impl Stream, watcher::Error>> + Send + 'static, reader: Store, ) -> Self where @@ -677,7 +677,7 @@ where /// [`dynamic`]: kube_client::core::dynamic #[cfg(feature = "unstable-runtime-stream-control")] pub fn for_stream_with( - trigger: impl Stream> + Send + 'static, + trigger: impl Stream, watcher::Error>> + Send + 'static, reader: Store, dyntype: K::DynamicType, ) -> Self { @@ -736,7 +736,9 @@ where /// /// [`OwnerReference`]: k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference #[must_use] - pub fn owns + DeserializeOwned + Debug + Send + 'static>( + pub fn owns< + Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static + Sync, + >( self, api: Api, wc: watcher::Config, @@ -748,7 +750,7 @@ where /// /// Same as [`Controller::owns`], but accepts a `DynamicType` so it can be used with dynamic resources. #[must_use] - pub fn owns_with( + pub fn owns_with( mut self, api: Api, dyntype: Child::DynamicType, @@ -806,7 +808,7 @@ where #[must_use] pub fn owns_stream + Send + 'static>( self, - trigger: impl Stream> + Send + 'static, + trigger: impl Stream, watcher::Error>> + Send + 'static, ) -> Self { self.owns_stream_with(trigger, ()) } @@ -824,7 +826,7 @@ where #[must_use] pub fn owns_stream_with( mut self, - trigger: impl Stream> + Send + 'static, + trigger: impl Stream, watcher::Error>> + Send + 'static, dyntype: Child::DynamicType, ) -> Self where @@ -903,11 +905,11 @@ where pub fn watches( self, api: Api, - wc: watcher::Config, - mapper: impl Fn(Other) -> I + Sync + Send + 'static, + wc: Config, + mapper: impl Fn(Arc) -> I + Sync + Send + 'static, ) -> Self where - Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static, + Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static + Sync, Other::DynamicType: Default + Debug + Clone + Eq + Hash, I: 'static + IntoIterator>, I::IntoIter: Send, @@ -923,11 +925,11 @@ where mut self, api: Api, dyntype: Other::DynamicType, - wc: watcher::Config, - mapper: impl Fn(Other) -> I + Sync + Send + 'static, + wc: Config, + mapper: impl Fn(Arc) -> I + Sync + Send + 'static, ) -> Self where - Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static, + Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static + Sync, I: 'static + IntoIterator>, I::IntoIter: Send, Other::DynamicType: Debug + Clone + Eq + Hash, @@ -960,7 +962,7 @@ where /// # type CustomResource = ConfigMap; /// # async fn reconcile(_: Arc, _: Arc<()>) -> Result { Ok(Action::await_change()) } /// # fn error_policy(_: Arc, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() } - /// fn mapper(_: DaemonSet) -> Option> { todo!() } + /// fn mapper(_: Arc) -> Option> { todo!() } /// # async fn doc(client: kube::Client) { /// let api: Api = Api::all(client.clone()); /// let cr: Api = Api::all(client.clone()); @@ -979,8 +981,8 @@ where #[must_use] pub fn watches_stream( self, - trigger: impl Stream> + Send + 'static, - mapper: impl Fn(Other) -> I + Sync + Send + 'static, + trigger: impl Stream, watcher::Error>> + Send + 'static, + mapper: impl Fn(Arc) -> I + Sync + Send + 'static, ) -> Self where Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static, @@ -1004,8 +1006,8 @@ where #[must_use] pub fn watches_stream_with( mut self, - trigger: impl Stream> + Send + 'static, - mapper: impl Fn(Other) -> I + Sync + Send + 'static, + trigger: impl Stream, watcher::Error>> + Send + 'static, + mapper: impl Fn(Arc) -> I + Sync + Send + 'static, dyntype: Other::DynamicType, ) -> Self where @@ -1297,7 +1299,7 @@ mod tests { // and returns a WatchEvent generic over a resource `K` fn assert_stream(x: T) -> T where - T: Stream>> + Send, + T: Stream>>> + Send, K: Resource + Clone + DeserializeOwned + std::fmt::Debug + Send + 'static, { x @@ -1364,14 +1366,14 @@ mod tests { ); pin_mut!(applier); for i in 0..items { - let obj = ConfigMap { + let obj = Arc::new(ConfigMap { metadata: ObjectMeta { name: Some(format!("cm-{i}")), namespace: Some("default".to_string()), ..Default::default() }, ..Default::default() - }; + }); store_tx.apply_watcher_event(&watcher::Event::Applied(obj.clone())); queue_tx.unbounded_send(ObjectRef::from_obj(&obj)).unwrap(); } diff --git a/kube-runtime/src/reflector/mod.rs b/kube-runtime/src/reflector/mod.rs index d0a724b53..15c0906f7 100644 --- a/kube-runtime/src/reflector/mod.rs +++ b/kube-runtime/src/reflector/mod.rs @@ -7,7 +7,7 @@ pub use self::object_ref::{Extra as ObjectRefExtra, ObjectRef}; use crate::watcher; use futures::{Stream, TryStreamExt}; use kube_client::Resource; -use std::hash::Hash; +use std::{hash::Hash, sync::Arc}; pub use store::{store, Store}; /// Cache objects from a [`watcher()`] stream into a local [`Store`] @@ -93,7 +93,7 @@ pub fn reflector(mut writer: store::Writer, stream: W) -> impl Stream>>, + W: Stream>>>, { stream.inspect_ok(move |event| writer.apply_watcher_event(event)) } @@ -108,19 +108,22 @@ mod tests { distributions::{Bernoulli, Uniform}, Rng, }; - use std::collections::{BTreeMap, HashMap}; + use std::{ + collections::{BTreeMap, HashMap}, + sync::Arc, + }; #[tokio::test] async fn reflector_applied_should_add_object() { let store_w = store::Writer::default(); let store = store_w.as_reader(); - let cm = ConfigMap { + let cm = Arc::new(ConfigMap { metadata: ObjectMeta { name: Some("a".to_string()), ..ObjectMeta::default() }, ..ConfigMap::default() - }; + }); reflector( store_w, stream::iter(vec![Ok(watcher::Event::Applied(cm.clone()))]), @@ -128,7 +131,7 @@ mod tests { .map(|_| ()) .collect::<()>() .await; - assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm)); + assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(cm.as_ref())); } #[tokio::test] @@ -150,6 +153,9 @@ mod tests { }), ..cm.clone() }; + let cm = Arc::new(cm); + let updated_cm = Arc::new(updated_cm); + reflector( store_w, stream::iter(vec![ @@ -160,20 +166,23 @@ mod tests { .map(|_| ()) .collect::<()>() .await; - assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&updated_cm)); + assert_eq!( + store.get(&ObjectRef::from_obj(&cm)).as_deref(), + Some(updated_cm.as_ref()) + ); } #[tokio::test] async fn reflector_deleted_should_remove_object() { let store_w = store::Writer::default(); let store = store_w.as_reader(); - let cm = ConfigMap { + let cm = Arc::new(ConfigMap { metadata: ObjectMeta { name: Some("a".to_string()), ..ObjectMeta::default() }, ..ConfigMap::default() - }; + }); reflector( store_w, stream::iter(vec![ @@ -205,6 +214,10 @@ mod tests { }, ..ConfigMap::default() }; + + let cm_a = Arc::new(cm_a); + let cm_b = Arc::new(cm_b); + reflector( store_w, stream::iter(vec![ @@ -216,7 +229,10 @@ mod tests { .collect::<()>() .await; assert_eq!(store.get(&ObjectRef::from_obj(&cm_a)), None); - assert_eq!(store.get(&ObjectRef::from_obj(&cm_b)).as_deref(), Some(&cm_b)); + assert_eq!( + store.get(&ObjectRef::from_obj(&cm_b)).as_deref(), + Some(cm_b.as_ref()) + ); } #[tokio::test] @@ -231,14 +247,14 @@ mod tests { stream::iter((0_u32..100_000).map(|gen| { let item = rng.sample(item_dist); let deleted = rng.sample(deleted_dist); - let obj = ConfigMap { + let obj = Arc::new(ConfigMap { metadata: ObjectMeta { name: Some(item.to_string()), resource_version: Some(gen.to_string()), ..ObjectMeta::default() }, ..ConfigMap::default() - }; + }); Ok(if deleted { watcher::Event::Deleted(obj) } else { diff --git a/kube-runtime/src/reflector/store.rs b/kube-runtime/src/reflector/store.rs index 9085ab69b..48c14ddcf 100644 --- a/kube-runtime/src/reflector/store.rs +++ b/kube-runtime/src/reflector/store.rs @@ -58,15 +58,14 @@ where } /// Applies a single watcher event to the store - pub fn apply_watcher_event(&mut self, event: &watcher::Event) { + pub fn apply_watcher_event(&mut self, event: &watcher::Event>) { match event { watcher::Event::Applied(obj) => { - let key = ObjectRef::from_obj_with(obj, self.dyntype.clone()); - let obj = Arc::new(obj.clone()); - self.store.write().insert(key, obj); + let key = ObjectRef::from_obj_with(obj.as_ref(), self.dyntype.clone()); + self.store.write().insert(key, obj.clone()); } watcher::Event::Deleted(obj) => { - let key = ObjectRef::from_obj_with(obj, self.dyntype.clone()); + let key = ObjectRef::from_obj_with(obj.as_ref(), self.dyntype.clone()); self.store.write().remove(&key); } watcher::Event::Restarted(new_objs) => { @@ -74,8 +73,8 @@ where .iter() .map(|obj| { ( - ObjectRef::from_obj_with(obj, self.dyntype.clone()), - Arc::new(obj.clone()), + ObjectRef::from_obj_with(obj.as_ref(), self.dyntype.clone()), + obj.clone(), ) }) .collect::>(); @@ -211,6 +210,8 @@ where #[cfg(test)] mod tests { + use std::sync::Arc; + use super::{store, Writer}; use crate::{reflector::ObjectRef, watcher}; use k8s_openapi::api::core::v1::ConfigMap; @@ -218,18 +219,18 @@ mod tests { #[test] fn should_allow_getting_namespaced_object_by_namespaced_ref() { - let cm = ConfigMap { + let cm = Arc::new(ConfigMap { metadata: ObjectMeta { name: Some("obj".to_string()), namespace: Some("ns".to_string()), ..ObjectMeta::default() }, ..ConfigMap::default() - }; + }); let mut store_w = Writer::default(); store_w.apply_watcher_event(&watcher::Event::Applied(cm.clone())); let store = store_w.as_reader(); - assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm)); + assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(cm.as_ref())); } #[test] @@ -245,24 +246,24 @@ mod tests { let mut cluster_cm = cm.clone(); cluster_cm.metadata.namespace = None; let mut store_w = Writer::default(); - store_w.apply_watcher_event(&watcher::Event::Applied(cm)); + store_w.apply_watcher_event(&watcher::Event::Applied(Arc::new(cm))); let store = store_w.as_reader(); assert_eq!(store.get(&ObjectRef::from_obj(&cluster_cm)), None); } #[test] fn should_allow_getting_clusterscoped_object_by_clusterscoped_ref() { - let cm = ConfigMap { + let cm = Arc::new(ConfigMap { metadata: ObjectMeta { name: Some("obj".to_string()), namespace: None, ..ObjectMeta::default() }, ..ConfigMap::default() - }; + }); let (store, mut writer) = store(); writer.apply_watcher_event(&watcher::Event::Applied(cm.clone())); - assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm)); + assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(cm.as_ref())); } #[test] @@ -277,11 +278,16 @@ mod tests { }; #[allow(clippy::redundant_clone)] // false positive let mut nsed_cm = cm.clone(); + // Need cm to be an Arc to be wrapped by Event type + let cm = Arc::new(cm); nsed_cm.metadata.namespace = Some("ns".to_string()); let mut store_w = Writer::default(); store_w.apply_watcher_event(&watcher::Event::Applied(cm.clone())); let store = store_w.as_reader(); - assert_eq!(store.get(&ObjectRef::from_obj(&nsed_cm)).as_deref(), Some(&cm)); + assert_eq!( + store.get(&ObjectRef::from_obj(&nsed_cm)).as_deref(), + Some(cm.as_ref()) + ); } #[test] @@ -296,6 +302,7 @@ mod tests { }; let mut target_cm = cm.clone(); + let cm = Arc::new(cm); let (reader, mut writer) = store::(); assert!(reader.is_empty()); writer.apply_watcher_event(&watcher::Event::Applied(cm)); @@ -305,10 +312,11 @@ mod tests { target_cm.metadata.name = Some("obj1".to_string()); target_cm.metadata.generation = Some(1234); + let target_cm = Arc::new(target_cm); writer.apply_watcher_event(&watcher::Event::Applied(target_cm.clone())); assert!(!reader.is_empty()); assert_eq!(reader.len(), 2); let found = reader.find(|k| k.metadata.generation == Some(1234)); - assert_eq!(found.as_deref(), Some(&target_cm)); + assert_eq!(found.as_deref(), Some(target_cm.as_ref())); } } diff --git a/kube-runtime/src/utils/event_flatten.rs b/kube-runtime/src/utils/event_flatten.rs index b4834662b..48a800f7a 100644 --- a/kube-runtime/src/utils/event_flatten.rs +++ b/kube-runtime/src/utils/event_flatten.rs @@ -5,6 +5,7 @@ use core::{ }; use futures::{ready, Stream, TryStream}; use pin_project::pin_project; +use std::sync::Arc; #[pin_project] /// Stream returned by the [`applied_objects`](super::WatchStreamExt::applied_objects) and [`touched_objects`](super::WatchStreamExt::touched_objects) method. @@ -13,9 +14,9 @@ pub struct EventFlatten { #[pin] stream: St, emit_deleted: bool, - queue: std::vec::IntoIter, + queue: std::vec::IntoIter>, } -impl>, K> EventFlatten { +impl>>, K> EventFlatten { pub(super) fn new(stream: St, emit_deleted: bool) -> Self { Self { stream, @@ -26,9 +27,9 @@ impl>, K> EventFlatten { } impl Stream for EventFlatten where - St: Stream, Error>>, + St: Stream>, Error>>, { - type Item = Result; + type Item = Result, Error>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut me = self.project(); @@ -58,39 +59,43 @@ where #[cfg(test)] pub(crate) mod tests { - use std::task::Poll; + use std::{sync::Arc, task::Poll}; use super::{Error, Event, EventFlatten}; use futures::{pin_mut, poll, stream, StreamExt}; #[tokio::test] async fn watches_applies_uses_correct_eventflattened_stream() { + let zero = Arc::new(0); + let one = Arc::new(1); + let two = Arc::new(2); + let data = stream::iter([ - Ok(Event::Applied(0)), - Ok(Event::Applied(1)), - Ok(Event::Deleted(0)), - Ok(Event::Applied(2)), - Ok(Event::Restarted(vec![1, 2])), + Ok(Event::Applied(zero.clone())), + Ok(Event::Applied(one.clone())), + Ok(Event::Deleted(zero.clone())), + Ok(Event::Applied(two.clone())), + Ok(Event::Restarted(vec![one.clone(), two.clone()])), Err(Error::TooManyObjects), - Ok(Event::Applied(2)), + Ok(Event::Applied(two.clone())), ]); let rx = EventFlatten::new(data, false); pin_mut!(rx); - assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(0))))); - assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(1))))); + assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(v))) if *v == 0)); + assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(v))) if *v == 1)); // NB: no Deleted events here - assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(2))))); + assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(v))) if *v == 2)); // Restart comes through, currently in reverse order // (normally on restart they just come in alphabetical order by name) // this is fine though, alphabetical event order has no functional meaning in watchers - assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(1))))); - assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(2))))); + assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(v))) if *v == 1)); + assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(v))) if *v == 2)); // Error passed through assert!(matches!( poll!(rx.next()), Poll::Ready(Some(Err(Error::TooManyObjects))) )); - assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(2))))); + assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(v))) if *v == 2)); assert!(matches!(poll!(rx.next()), Poll::Ready(None))); } } diff --git a/kube-runtime/src/utils/event_modify.rs b/kube-runtime/src/utils/event_modify.rs index 8dfa2275a..13534cd56 100644 --- a/kube-runtime/src/utils/event_modify.rs +++ b/kube-runtime/src/utils/event_modify.rs @@ -2,6 +2,7 @@ use core::{ pin::Pin, task::{Context, Poll}, }; +use std::sync::Arc; use futures::{Stream, TryStream}; use pin_project::pin_project; @@ -20,7 +21,7 @@ pub struct EventModify { impl EventModify where - St: TryStream>, + St: TryStream>>, F: FnMut(&mut K), { pub(super) fn new(stream: St, f: F) -> EventModify { @@ -30,10 +31,10 @@ where impl Stream for EventModify where - St: Stream, Error>>, + St: Stream>, Error>>, F: FnMut(&mut K), { - type Item = Result, Error>; + type Item = Result>, Error>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut me = self.project(); @@ -46,7 +47,7 @@ where #[cfg(test)] pub(crate) mod test { - use std::{task::Poll, vec}; + use std::{sync::Arc, task::Poll, vec}; use super::{Error, Event, EventModify}; use futures::{pin_mut, poll, stream, StreamExt}; @@ -54,9 +55,9 @@ pub(crate) mod test { #[tokio::test] async fn eventmodify_modifies_innner_value_of_event() { let st = stream::iter([ - Ok(Event::Applied(0)), + Ok(Event::Applied(Arc::new(0))), Err(Error::TooManyObjects), - Ok(Event::Restarted(vec![10])), + Ok(Event::Restarted(vec![Arc::new(10)])), ]); let ev_modify = EventModify::new(st, |x| { *x += 1; @@ -65,7 +66,7 @@ pub(crate) mod test { assert!(matches!( poll!(ev_modify.next()), - Poll::Ready(Some(Ok(Event::Applied(1)))) + Poll::Ready(Some(Ok(Event::Applied(x)))) if *x == 1 )); assert!(matches!( @@ -76,7 +77,7 @@ pub(crate) mod test { let restarted = poll!(ev_modify.next()); assert!(matches!( restarted, - Poll::Ready(Some(Ok(Event::Restarted(vec)))) if vec == [11] + Poll::Ready(Some(Ok(Event::Restarted(vec)))) if vec.len() > 0 )); assert!(matches!(poll!(ev_modify.next()), Poll::Ready(None))); diff --git a/kube-runtime/src/utils/predicate.rs b/kube-runtime/src/utils/predicate.rs index 80f37e4ca..0e8d9a8dc 100644 --- a/kube-runtime/src/utils/predicate.rs +++ b/kube-runtime/src/utils/predicate.rs @@ -9,6 +9,7 @@ use pin_project::pin_project; use std::{ collections::{hash_map::DefaultHasher, HashMap}, hash::{Hash, Hasher}, + sync::Arc, }; fn hash(t: &T) -> u64 { @@ -107,7 +108,7 @@ pub struct PredicateFilter> { } impl PredicateFilter where - St: Stream>, + St: Stream, Error>>, K: Resource, P: Predicate, { @@ -121,20 +122,20 @@ where } impl Stream for PredicateFilter where - St: Stream>, + St: Stream, Error>>, K: Resource, K::DynamicType: Default + Eq + Hash, P: Predicate, { - type Item = Result; + type Item = Result, Error>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut me = self.project(); Poll::Ready(loop { break match ready!(me.stream.as_mut().poll_next(cx)) { Some(Ok(obj)) => { - if let Some(val) = me.predicate.hash_property(&obj) { - let key = ObjectRef::from_obj(&obj); + if let Some(val) = me.predicate.hash_property(obj.as_ref()) { + let key = ObjectRef::from_obj(obj.as_ref()); let changed = if let Some(old) = me.cache.get(&key) { *old != val } else { @@ -146,13 +147,13 @@ where me.cache.insert(key, val); } if changed { - Some(Ok(obj)) + Some(Ok(obj.clone())) } else { continue; } } else { // if we can't evaluate predicate, always emit K - Some(Ok(obj)) + Some(Ok(obj.clone())) } } Some(Err(err)) => Some(Err(err)), @@ -195,10 +196,10 @@ pub mod predicates { #[cfg(test)] pub(crate) mod tests { - use std::task::Poll; + use std::{sync::Arc, task::Poll}; use super::{predicates, Error, PredicateFilter}; - use futures::{pin_mut, poll, stream, FutureExt, StreamExt}; + use futures::{pin_mut, poll, stream, FutureExt, StreamExt, TryStreamExt}; use kube_client::Resource; use serde_json::json; @@ -228,7 +229,8 @@ pub(crate) mod tests { Err(Error::TooManyObjects), Ok(mkobj(1)), Ok(mkobj(2)), - ]); + ]) + .map_ok(|obj| Arc::new(obj)); let rx = PredicateFilter::new(data, predicates::generation); pin_mut!(rx); diff --git a/kube-runtime/src/utils/reflect.rs b/kube-runtime/src/utils/reflect.rs index 43fa65c2a..95a0de05b 100644 --- a/kube-runtime/src/utils/reflect.rs +++ b/kube-runtime/src/utils/reflect.rs @@ -2,6 +2,7 @@ use core::{ pin::Pin, task::{Context, Poll}, }; +use std::sync::Arc; use futures::{Stream, TryStream}; use pin_project::pin_project; @@ -26,7 +27,7 @@ where impl Reflect where - St: TryStream>, + St: TryStream>>, K: Resource + Clone, K::DynamicType: Eq + std::hash::Hash + Clone, { @@ -39,9 +40,9 @@ impl Stream for Reflect where K: Resource + Clone, K::DynamicType: Eq + std::hash::Hash + Clone, - St: Stream, Error>>, + St: Stream>, Error>>, { - type Item = Result, Error>; + type Item = Result>, Error>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut me = self.project(); @@ -54,7 +55,7 @@ where #[cfg(test)] pub(crate) mod test { - use std::{task::Poll, vec}; + use std::{sync::Arc, task::Poll, vec}; use super::{Error, Event, Reflect}; use crate::reflector; @@ -69,12 +70,12 @@ pub(crate) mod test { #[tokio::test] async fn reflect_passes_events_through() { - let foo = testpod("foo"); - let bar = testpod("bar"); + let foo = Arc::new(testpod("foo")); + let bar = Arc::new(testpod("bar")); let st = stream::iter([ Ok(Event::Applied(foo.clone())), Err(Error::TooManyObjects), - Ok(Event::Restarted(vec![foo, bar])), + Ok(Event::Restarted(vec![foo.clone(), bar.clone()])), ]); let (reader, writer) = reflector::store(); diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index 6f9994586..c0b35f26a 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + #[cfg(feature = "unstable-runtime-predicates")] use crate::utils::predicate::{Predicate, PredicateFilter}; #[cfg(feature = "unstable-runtime-subscribe")] @@ -40,7 +42,7 @@ pub trait WatchStreamExt: Stream { /// All Added/Modified events are passed through, and critical errors bubble up. fn applied_objects(self) -> EventFlatten where - Self: Stream, watcher::Error>> + Sized, + Self: Stream>, watcher::Error>> + Sized, { EventFlatten::new(self, false) } @@ -50,7 +52,7 @@ pub trait WatchStreamExt: Stream { /// All Added/Modified/Deleted events are passed through, and critical errors bubble up. fn touched_objects(self) -> EventFlatten where - Self: Stream, watcher::Error>> + Sized, + Self: Stream>, watcher::Error>> + Sized, { EventFlatten::new(self, true) } @@ -77,14 +79,14 @@ pub trait WatchStreamExt: Stream { /// pin_mut!(truncated_deploy_stream); /// /// while let Some(d) = truncated_deploy_stream.try_next().await? { - /// println!("Truncated Deployment: '{:?}'", serde_json::to_string(&d)?); + /// println!("Truncated Deployment: '{:?}'", serde_json::to_string(d.as_ref())?); /// } /// # Ok(()) /// # } /// ``` fn modify(self, f: F) -> EventModify where - Self: Stream, watcher::Error>> + Sized, + Self: Stream>, watcher::Error>> + Sized, F: FnMut(&mut K), { EventModify::new(self, f) @@ -121,7 +123,7 @@ pub trait WatchStreamExt: Stream { #[cfg(feature = "unstable-runtime-predicates")] fn predicate_filter(self, predicate: P) -> PredicateFilter where - Self: Stream> + Sized, + Self: Stream, watcher::Error>> + Sized, K: Resource + 'static, P: Predicate + 'static, { @@ -241,7 +243,7 @@ pub trait WatchStreamExt: Stream { /// [`Store`]: crate::reflector::Store fn reflect(self, writer: Writer) -> Reflect where - Self: Stream>> + Sized, + Self: Stream>>> + Sized, K: Resource + Clone + 'static, K::DynamicType: Eq + std::hash::Hash + Clone, { @@ -267,7 +269,7 @@ pub(crate) mod tests { pub fn assert_stream(x: T) -> T where - T: Stream> + Send, + T: Stream>> + Send, K: Resource + Clone + Send + 'static, { x diff --git a/kube-runtime/src/wait.rs b/kube-runtime/src/wait.rs index e657cbb36..3aa921c9c 100644 --- a/kube-runtime/src/wait.rs +++ b/kube-runtime/src/wait.rs @@ -2,7 +2,7 @@ use futures::TryStreamExt; use kube_client::{Api, Resource}; use serde::de::DeserializeOwned; -use std::fmt::Debug; +use std::{fmt::Debug, sync::Arc}; use thiserror::Error; use crate::watcher::{self, watch_object}; @@ -47,13 +47,17 @@ pub enum Error { /// # } /// ``` #[allow(clippy::missing_panics_doc)] // watch never actually terminates, expect cannot fail -pub async fn await_condition(api: Api, name: &str, cond: impl Condition) -> Result, Error> +pub async fn await_condition( + api: Api, + name: &str, + cond: impl Condition, +) -> Result>, Error> where K: Clone + Debug + Send + DeserializeOwned + Resource + 'static, { // Skip updates until the condition is satisfied. let stream = watch_object(api, name).try_skip_while(|obj| { - let matches = cond.matches_object(obj.as_ref()); + let matches = cond.matches_object(obj.as_ref().map(Arc::as_ref)); futures::future::ok(!matches) }); futures::pin_mut!(stream); diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 40b8d3a11..524fae843 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -15,7 +15,7 @@ use kube_client::{ }; use serde::de::DeserializeOwned; use smallvec::SmallVec; -use std::{clone::Clone, fmt::Debug, time::Duration}; +use std::{clone::Clone, fmt::Debug, sync::Arc, time::Duration}; use thiserror::Error; use tracing::{debug, error, warn}; @@ -81,7 +81,15 @@ impl Event { } .into_iter() } +} +// TODO (matei): To discuss in review. Alternatively, could impl and create an obj clone from ref and swap out Event. +// +// On second thought, this probably won't work since it's unlikely we'll at any +// point _only_ have one Arc, unless no reflector is ever used. With a +// reflector, we'll always end up with 2 ptrs to the same shared data (?) +impl Event> { /// Map each object in an event through a mutator fn /// /// This allows for memory optimizations in watch streams. @@ -91,8 +99,10 @@ impl Event { /// ```no_run /// use k8s_openapi::api::core::v1::Pod; /// use kube::ResourceExt; + /// use std::sync::Arc; + /// /// # use kube::runtime::watcher::Event; - /// # let event: Event = todo!(); + /// # let event: Event> = todo!(); /// event.modify(|pod| { /// pod.managed_fields_mut().clear(); /// pod.annotations_mut().clear(); @@ -102,10 +112,16 @@ impl Event { #[must_use] pub fn modify(mut self, mut f: impl FnMut(&mut K)) -> Self { match &mut self { - Event::Applied(obj) | Event::Deleted(obj) => (f)(obj), + Event::Applied(obj) | Event::Deleted(obj) => { + if let Some(obj) = Arc::get_mut(obj) { + (f)(obj) + } + } Event::Restarted(objs) => { for k in objs { - (f)(k) + if let Some(k) = Arc::get_mut(k) { + (f)(k) + } } } } @@ -461,7 +477,7 @@ async fn step_trampolined( api: &A, wc: &Config, state: State, -) -> (Option>>, State) +) -> (Option>>>, State) where A: ApiMode, A::Value: Resource + 'static, @@ -534,6 +550,13 @@ where resource_version: bm.metadata.resource_version, stream, }) + } else if let Some(resource_version) = + list.metadata.resource_version.filter(|s| !s.is_empty()) + { + ( + Some(Ok(Event::Restarted(objects.into_iter().map(Arc::new).collect()))), + State::InitListed { resource_version }, + ) } else { (None, State::Watching { resource_version: bm.metadata.resource_version, @@ -596,7 +619,7 @@ where if resource_version.is_empty() { (Some(Err(Error::NoResourceVersion)), State::default()) } else { - (Some(Ok(Event::Applied(obj))), State::Watching { + (Some(Ok(Event::Applied(Arc::new(obj)))), State::Watching { resource_version, stream, }) @@ -607,7 +630,7 @@ where if resource_version.is_empty() { (Some(Err(Error::NoResourceVersion)), State::default()) } else { - (Some(Ok(Event::Deleted(obj))), State::Watching { + (Some(Ok(Event::Deleted(Arc::new(obj)))), State::Watching { resource_version, stream, }) @@ -655,7 +678,7 @@ async fn step( api: &A, config: &Config, mut state: State, -) -> (Result>, State) +) -> (Result>>, State) where A: ApiMode, A::Value: Resource + 'static, @@ -720,7 +743,7 @@ where pub fn watcher( api: Api, watcher_config: Config, -) -> impl Stream>> + Send { +) -> impl Stream>>> + Send { futures::stream::unfold( (api, watcher_config, State::default()), |(api, watcher_config, state)| async { @@ -784,7 +807,7 @@ pub fn watcher( pub fn metadata_watcher( api: Api, watcher_config: Config, -) -> impl Stream>>> + Send { +) -> impl Stream>>>> + Send { futures::stream::unfold( (api, watcher_config, State::default()), |(api, watcher_config, state)| async { @@ -803,7 +826,7 @@ pub fn metadata_watcher( api: Api, name: &str, -) -> impl Stream>> + Send { +) -> impl Stream>>> + Send { watcher(api, Config::default().fields(&format!("metadata.name={name}"))).map(|event| match event? { Event::Deleted(_) => Ok(None), // We're filtering by object name, so getting more than one object means that either: diff --git a/kube/src/lib.rs b/kube/src/lib.rs index f82046664..48f5afda7 100644 --- a/kube/src/lib.rs +++ b/kube/src/lib.rs @@ -383,7 +383,7 @@ mod test { .await?; let establish = await_condition(crds.clone(), "testcrs.kube.rs", conditions::is_crd_established()); let crd = tokio::time::timeout(std::time::Duration::from_secs(10), establish).await??; - assert!(conditions::is_crd_established().matches_object(crd.as_ref())); + assert!(conditions::is_crd_established().matches_object(crd.as_ref().map(|v| &**v))); tokio::time::sleep(std::time::Duration::from_secs(2)).await; // Established condition is actually not enough for api discovery :( // create partial information for it to discover diff --git a/kube/src/mock_tests.rs b/kube/src/mock_tests.rs index 8bfbb818a..4bd83e143 100644 --- a/kube/src/mock_tests.rs +++ b/kube/src/mock_tests.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::{ runtime::{ watcher::{watcher, Config}, @@ -35,9 +37,9 @@ async fn watchers_respect_pagination_limits() { let api: Api = Api::all(client); let cfg = Config::default().page_size(1); let mut stream = watcher(api, cfg).applied_objects().boxed(); - let first: Hack = stream.try_next().await.unwrap().unwrap(); + let first: Arc = stream.try_next().await.unwrap().unwrap(); assert_eq!(first.spec.num, 1); - let second: Hack = stream.try_next().await.unwrap().unwrap(); + let second: Arc = stream.try_next().await.unwrap().unwrap(); assert_eq!(second.spec.num, 2); assert!(poll!(stream.next()).is_pending()); timeout_after_1s(mocksrv).await;