diff --git a/Cargo.lock b/Cargo.lock index d07f23eb..9c6af944 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -174,8 +174,10 @@ checksum = "7f2c685bad3eb3d45a01354cedb7d5faa66194d1d58ba6e267a8de788f79db38" dependencies = [ "android-tzdata", "iana-time-zone", + "js-sys", "num-traits", "serde", + "wasm-bindgen", "windows-targets 0.48.5", ] @@ -864,8 +866,9 @@ name = "linkerd-cni-repair-controller" version = "0.1.0" dependencies = [ "anyhow", + "chrono", "clap", - "futures-util", + "futures", "k8s-openapi", "kube", "kubert", diff --git a/cni-repair-controller/Cargo.toml b/cni-repair-controller/Cargo.toml index 6611e7ef..39cf0895 100644 --- a/cni-repair-controller/Cargo.toml +++ b/cni-repair-controller/Cargo.toml @@ -15,7 +15,7 @@ rustls-tls = ["kube/rustls-tls"] [dependencies] anyhow = "1" -futures-util = "0.3" +futures = "0.3" k8s-openapi = { version = "0.20", features = ["v1_22"] } kube = { version = "0.87", features = ["runtime"] } openssl = { version = "0.10.60", optional = true } @@ -35,3 +35,6 @@ features = ["admin", "clap", "prometheus-client", "runtime"] [dependencies.tokio] version = "1" features = ["macros", "parking_lot", "rt", "rt-multi-thread"] + +[dev-dependencies] +chrono = "0.4" diff --git a/cni-repair-controller/src/lib.rs b/cni-repair-controller/src/lib.rs index 9ee5d4c2..5529fb74 100644 --- a/cni-repair-controller/src/lib.rs +++ b/cni-repair-controller/src/lib.rs @@ -1,4 +1,4 @@ -use futures_util::{Stream, StreamExt}; +use futures::{Stream, StreamExt}; use k8s_openapi::api::core::v1::{ObjectReference, Pod}; use kube::{ runtime::{ @@ -163,3 +163,84 @@ impl Metrics { } } } + +#[cfg(test)] +mod test { + use super::*; + use chrono::Utc; + use k8s_openapi::api::core::v1::{ + ContainerState, ContainerStateTerminated, ContainerStatus, PodStatus, + }; + use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ObjectMeta, Time}; + use tokio::{ + sync::mpsc::error::TryRecvError, + time::{self, Duration}, + }; + + #[tokio::test] + async fn test_process_events() { + let mut prom = prometheus_client::registry::Registry::default(); + let metrics = + Metrics::register(prom.sub_registry_with_prefix("linkerd_cni_repair_controller")); + + // This pod should be ignored + let pod1 = Pod { + metadata: ObjectMeta { + name: Some("pod1".to_string()), + namespace: Some("default".to_string()), + ..Default::default() + }, + ..Default::default() + }; + + // This pod should be processed + let pod2 = Pod { + metadata: ObjectMeta { + name: Some("pod2".to_string()), + namespace: Some("default".to_string()), + ..Default::default() + }, + status: Some(PodStatus { + init_container_statuses: Some(vec![ContainerStatus { + name: "linkerd-network-validator".to_string(), + last_state: Some(ContainerState { + terminated: Some(ContainerStateTerminated { + exit_code: UNSUCCESSFUL_EXIT_CODE, + ..Default::default() + }), + ..Default::default() + }), + ..Default::default() + }]), + ..Default::default() + }), + ..Default::default() + }; + + // This pod should be ignored + let pod3 = Pod { + metadata: ObjectMeta { + name: Some("pod2".to_string()), + namespace: Some("default".to_string()), + deletion_timestamp: Some(Time(Utc::now())), + ..Default::default() + }, + ..pod2.clone() + }; + + let (tx, mut rx) = mpsc::channel(EVENT_CHANNEL_CAPACITY); + let stream = futures::stream::iter(vec![ + watcher::Event::Applied(pod1), + watcher::Event::Applied(pod2), + watcher::Event::Applied(pod3), + ]); + + tokio::spawn(process_events(stream, tx, metrics)); + time::sleep(Duration::from_secs(2)).await; + let msg = rx.try_recv(); + let object_ref = msg.unwrap(); + assert_eq!(object_ref.name, Some("pod2".to_string())); + let msg = rx.try_recv(); + assert_eq!(msg, Err(TryRecvError::Disconnected)); + } +}