Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arc wrap watcher output #1266

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions examples/dynamic_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use kube::{
use serde::de::DeserializeOwned;
use tracing::*;

use std::{env, fmt::Debug};
use std::{env, fmt::Debug, sync::Arc};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -39,9 +39,9 @@ async fn main() -> anyhow::Result<()> {
}

async fn handle_events<
K: Resource<DynamicType = ApiResource> + Clone + Debug + Send + DeserializeOwned + 'static,
K: Resource<DynamicType = ApiResource> + Clone + Debug + Send + Sync + DeserializeOwned + 'static,
>(
stream: impl Stream<Item = watcher::Result<Event<K>>> + Send + 'static,
stream: impl Stream<Item = watcher::Result<Event<Arc<K>>>> + Send + 'static,
ar: &ApiResource,
) -> anyhow::Result<()> {
let mut items = stream.applied_objects().boxed();
Expand Down
11 changes: 11 additions & 0 deletions examples/event_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use futures::{pin_mut, TryStreamExt};
use k8s_openapi::{
api::{core::v1::ObjectReference, events::v1::Event},
Expand Down Expand Up @@ -50,6 +52,15 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}

// This function lets the app handle an added/modified event from k8s
fn handle_event(ev: Arc<Event>) -> anyhow::Result<()> {
info!(
"Event: \"{}\" via {} {}",
ev.message.as_ref().unwrap().trim(),
ev.involved_object.kind.as_ref().unwrap(),
ev.involved_object.name.as_ref().unwrap()
);
Ok(())
fn format_objref(oref: ObjectReference) -> Option<String> {
Some(format!("{}/{}", oref.kind?, oref.name?))
}
Expand Down
8 changes: 5 additions & 3 deletions examples/multi_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use futures::{stream, StreamExt, TryStreamExt};
use k8s_openapi::api::{
apps::v1::Deployment,
Expand Down Expand Up @@ -31,9 +33,9 @@ async fn main() -> anyhow::Result<()> {
// SelectAll Stream elements must have the same Item, so all packed in this:
#[allow(clippy::large_enum_variant)]
enum Watched {
Config(ConfigMap),
Deploy(Deployment),
Secret(Secret),
Config(Arc<ConfigMap>),
Deploy(Arc<Deployment>),
Secret(Arc<Secret>),
}
while let Some(o) = combo_stream.try_next().await? {
match o {
Expand Down
7 changes: 5 additions & 2 deletions examples/node_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use futures::{pin_mut, TryStreamExt};
use k8s_openapi::api::core::v1::{Event, Node};
use kube::{
Expand Down Expand Up @@ -31,12 +33,13 @@ async fn main() -> anyhow::Result<()> {
}

// A simple node problem detector
async fn check_for_node_failures(events: &Api<Event>, o: Node) -> anyhow::Result<()> {
async fn check_for_node_failures(events: &Api<Event>, o: Arc<Node>) -> anyhow::Result<()> {
let name = o.name_any();
// Nodes often modify a lot - only print broken nodes
if let Some(true) = o.spec.unwrap().unschedulable {
if let Some(true) = o.spec.clone().unwrap().unschedulable {
let failed = o
.status
.clone()
.unwrap()
.conditions
.unwrap()
Expand Down
76 changes: 39 additions & 37 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,10 @@
/// Helper for building custom trigger filters, see the implementations of [`trigger_self`] and [`trigger_owners`] for some examples.
pub fn trigger_with<T, K, I, S>(
stream: S,
mapper: impl Fn(T) -> I,
mapper: impl Fn(Arc<T>) -> I,
) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
where
S: TryStream<Ok = T>,
S: TryStream<Ok = Arc<T>>,
I: IntoIterator,
I::Item: Into<ReconcileRequest<K>>,
K: Resource,
Expand All @@ -110,29 +110,29 @@
dyntype: K::DynamicType,
) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
where
S: TryStream<Ok = K>,
S: TryStream<Ok = Arc<K>>,
K: Resource,
K::DynamicType: Clone,
{
trigger_with(stream, move |obj| {
Some(ReconcileRequest {
obj_ref: ObjectRef::from_obj_with(&obj, dyntype.clone()),
obj_ref: ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()),
reason: ReconcileReason::ObjectUpdated,
})
})
}

/// Enqueues any mapper returned `K` types for reconciliation
fn trigger_others<S, K, I>(
fn trigger_others<T, S, K, I>(
stream: S,
mapper: impl Fn(S::Ok) -> I + Sync + Send + 'static,
dyntype: <S::Ok as Resource>::DynamicType,
mapper: impl Fn(Arc<T>) -> I + Sync + Send + 'static,
dyntype: T::DynamicType,
) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
where
// Input stream has items as some Resource (via Controller::watches)
S: TryStream,
S::Ok: Resource,
<S::Ok as Resource>::DynamicType: Clone,
S: TryStream<Ok = Arc<T>>,
T: Resource,
T::DynamicType: Clone,
// Output stream is requests for the root type K
K: Resource,
K::DynamicType: Clone,
Expand All @@ -141,7 +141,7 @@
I::IntoIter: Send,
{
trigger_with(stream, move |obj| {
let watch_ref = ObjectRef::from_obj_with(&obj, dyntype.clone()).erase();
let watch_ref = ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()).erase();
mapper(obj)
.into_iter()
.map(move |mapped_obj_ref| ReconcileRequest {
Expand All @@ -154,19 +154,19 @@
}

/// Enqueues any owners of type `KOwner` for reconciliation
pub fn trigger_owners<KOwner, S>(
pub fn trigger_owners<KOwner, S, T>(
stream: S,
owner_type: KOwner::DynamicType,
child_type: <S::Ok as Resource>::DynamicType,
child_type: T::DynamicType,
) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>>
where
S: TryStream,
S::Ok: Resource,
<S::Ok as Resource>::DynamicType: Clone,
S: TryStream<Ok = Arc<T>>,
T: Resource,
T::DynamicType: Clone,
KOwner: Resource,
KOwner::DynamicType: Clone,
{
let mapper = move |obj: S::Ok| {
let mapper = move |obj: Arc<T>| {
let meta = obj.meta().clone();
let ns = meta.namespace;
let owner_type = owner_type.clone();
Expand Down Expand Up @@ -653,7 +653,7 @@
/// Prefer [`Controller::new`] if you do not need to share the stream, or do not need pre-filtering.
#[cfg(feature = "unstable-runtime-stream-control")]
pub fn for_stream(
trigger: impl Stream<Item = Result<K, watcher::Error>> + Send + 'static,
trigger: impl Stream<Item = Result<Arc<K>, watcher::Error>> + Send + 'static,
reader: Store<K>,
) -> Self
where
Expand All @@ -677,7 +677,7 @@
/// [`dynamic`]: kube_client::core::dynamic
#[cfg(feature = "unstable-runtime-stream-control")]
pub fn for_stream_with(
trigger: impl Stream<Item = Result<K, watcher::Error>> + Send + 'static,
trigger: impl Stream<Item = Result<Arc<K>, watcher::Error>> + Send + 'static,
reader: Store<K>,
dyntype: K::DynamicType,
) -> Self {
Expand Down Expand Up @@ -736,7 +736,9 @@
///
/// [`OwnerReference`]: k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference
#[must_use]
pub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>(
pub fn owns<
Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static + Sync,
>(
self,
api: Api<Child>,
wc: watcher::Config,
Expand All @@ -748,7 +750,7 @@
///
/// Same as [`Controller::owns`], but accepts a `DynamicType` so it can be used with dynamic resources.
#[must_use]
pub fn owns_with<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static>(
pub fn owns_with<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static + Sync>(
mut self,
api: Api<Child>,
dyntype: Child::DynamicType,
Expand Down Expand Up @@ -806,7 +808,7 @@
#[must_use]
pub fn owns_stream<Child: Resource<DynamicType = ()> + Send + 'static>(
self,
trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
trigger: impl Stream<Item = Result<Arc<Child>, watcher::Error>> + Send + 'static,
) -> Self {
self.owns_stream_with(trigger, ())
}
Expand All @@ -824,7 +826,7 @@
#[must_use]
pub fn owns_stream_with<Child: Resource + Send + 'static>(
mut self,
trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
trigger: impl Stream<Item = Result<Arc<Child>, watcher::Error>> + Send + 'static,
dyntype: Child::DynamicType,
) -> Self
where
Expand Down Expand Up @@ -903,11 +905,11 @@
pub fn watches<Other, I>(
self,
api: Api<Other>,
wc: watcher::Config,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
wc: Config,
mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
) -> Self
where
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static + Sync,
Other::DynamicType: Default + Debug + Clone + Eq + Hash,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
I::IntoIter: Send,
Expand All @@ -923,16 +925,16 @@
mut self,
api: Api<Other>,
dyntype: Other::DynamicType,
wc: watcher::Config,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
wc: Config,
mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
) -> Self
where
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static + Sync,
I: 'static + IntoIterator<Item = ObjectRef<K>>,
I::IntoIter: Send,
Other::DynamicType: Debug + Clone + Eq + Hash,
{
let other_watcher = trigger_others(watcher(api, wc).touched_objects(), mapper, dyntype);

Check failure on line 937 in kube-runtime/src/controller/mod.rs

View workflow job for this annotation

GitHub Actions / msrv

mismatched types
self.trigger_selector.push(other_watcher.boxed());
self
}
Expand Down Expand Up @@ -960,7 +962,7 @@
/// # type CustomResource = ConfigMap;
/// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
/// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
/// fn mapper(_: DaemonSet) -> Option<ObjectRef<CustomResource>> { todo!() }
/// fn mapper(_: Arc<DaemonSet>) -> Option<ObjectRef<CustomResource>> { todo!() }
/// # async fn doc(client: kube::Client) {
/// let api: Api<DaemonSet> = Api::all(client.clone());
/// let cr: Api<CustomResource> = Api::all(client.clone());
Expand All @@ -979,8 +981,8 @@
#[must_use]
pub fn watches_stream<Other, I>(
self,
trigger: impl Stream<Item = Result<Other, watcher::Error>> + Send + 'static,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
trigger: impl Stream<Item = Result<Arc<Other>, watcher::Error>> + Send + 'static,
mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
) -> Self
where
Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
Expand All @@ -1004,8 +1006,8 @@
#[must_use]
pub fn watches_stream_with<Other, I>(
mut self,
trigger: impl Stream<Item = Result<Other, watcher::Error>> + Send + 'static,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
trigger: impl Stream<Item = Result<Arc<Other>, watcher::Error>> + Send + 'static,
mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
dyntype: Other::DynamicType,
) -> Self
where
Expand Down Expand Up @@ -1297,7 +1299,7 @@
// and returns a WatchEvent generic over a resource `K`
fn assert_stream<T, K>(x: T) -> T
where
T: Stream<Item = watcher::Result<Event<K>>> + Send,
T: Stream<Item = watcher::Result<Event<Arc<K>>>> + Send,
K: Resource + Clone + DeserializeOwned + std::fmt::Debug + Send + 'static,
{
x
Expand Down Expand Up @@ -1364,14 +1366,14 @@
);
pin_mut!(applier);
for i in 0..items {
let obj = ConfigMap {
let obj = Arc::new(ConfigMap {
metadata: ObjectMeta {
name: Some(format!("cm-{i}")),
namespace: Some("default".to_string()),
..Default::default()
},
..Default::default()
};
});
store_tx.apply_watcher_event(&watcher::Event::Applied(obj.clone()));
queue_tx.unbounded_send(ObjectRef::from_obj(&obj)).unwrap();
}
Expand Down
Loading
Loading