From 6bb2f3ac1725b3ca191e0b2781127881feaa30c4 Mon Sep 17 00:00:00 2001 From: Kevin Flansburg Date: Mon, 11 Oct 2021 10:19:23 -0700 Subject: [PATCH 1/4] Initial webhook spike Signed-off-by: Kevin Flansburg --- krator/examples/moose.rs | 84 ++++++++++++++++++------------ krator/src/admission.rs | 89 +++++++++++++++++++++----------- krator/src/lib.rs | 4 -- krator/src/manager.rs | 50 +++++++++++++++++- krator/src/manager/controller.rs | 38 +++++++------- krator/src/manager/tasks.rs | 12 +++-- krator/src/operator.rs | 12 ++--- krator/src/runtime.rs | 5 +- krator/src/store.rs | 4 -- 9 files changed, 190 insertions(+), 108 deletions(-) diff --git a/krator/examples/moose.rs b/krator/examples/moose.rs index f7e25aa..8e33539 100644 --- a/krator/examples/moose.rs +++ b/krator/examples/moose.rs @@ -352,25 +352,25 @@ impl Operator for MooseTracker { Arc::clone(&self.shared) } - #[cfg(feature = "admission-webhook")] - async fn admission_hook( - &self, - manifest: Self::Manifest, - ) -> krator::admission::AdmissionResult { - use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status; - // All moose names start with "M" - let name = manifest.meta().name.clone().unwrap(); - info!("Processing admission hook for moose named {}", name); - match name.chars().next() { - Some('m') | Some('M') => krator::admission::AdmissionResult::Allow(manifest), - _ => krator::admission::AdmissionResult::Deny(Status { - code: Some(400), - message: Some("Mooses may only have names starting with 'M'.".to_string()), - status: Some("Failure".to_string()), - ..Default::default() - }), - } - } + // #[cfg(feature = "admission-webhook")] + // async fn admission_hook( + // &self, + // manifest: Self::Manifest, + // ) -> krator::admission::AdmissionResult { + // use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status; + // // All moose names start with "M" + // let name = manifest.meta().name.clone().unwrap(); + // info!("Processing admission hook for moose named {}", name); + // match name.chars().next() { + // Some('m') | Some('M') => krator::admission::AdmissionResult::Allow(manifest), + // _ => krator::admission::AdmissionResult::Deny(Status { + // code: Some(400), + // message: Some("Mooses may only have names starting with 'M'.".to_string()), + // status: Some("Failure".to_string()), + // ..Default::default() + // }), + // } + // } #[cfg(feature = "admission-webhook")] async fn admission_hook_tls(&self) -> anyhow::Result { @@ -386,6 +386,26 @@ impl Operator for MooseTracker { } } +#[cfg(feature = "admission-webhook")] +fn admission_hook( + manifest: Moose, + _shared: &SharedMooseState, +) -> krator::admission::AdmissionResult { + use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status; + // All moose names start with "M" + let name = manifest.meta().name.clone().unwrap(); + info!("Processing admission hook for moose named {}", name); + match name.chars().next() { + Some('m') | Some('M') => krator::admission::AdmissionResult::Allow(manifest), + _ => krator::admission::AdmissionResult::Deny(Status { + code: Some(400), + message: Some("Mooses may only have names starting with 'M'.".to_string()), + status: Some("Failure".to_string()), + ..Default::default() + }), + } +} + #[derive(Debug, StructOpt)] #[structopt( name = "moose", @@ -528,20 +548,16 @@ Running moose example. Try to install some of the manifests provided in examples "# ); - // New API does not currently support Webhooks, so use legacy API if enabled. - #[cfg(feature = "admission-webhook")] - { - use krator::OperatorRuntime; - let mut runtime = OperatorRuntime::new(&kubeconfig, tracker, Some(params)); - runtime.start().await; - } - #[cfg(not(feature = "admission-webhook"))] - { - use krator::{ControllerBuilder, Manager}; - let mut manager = Manager::new(&kubeconfig); - let controller = ControllerBuilder::new(tracker).with_params(params); - manager.register_controller(controller); - manager.start().await; - } + use krator::{ControllerBuilder, Manager}; + let mut manager = Manager::new(&kubeconfig); + let controller = if cfg!(admission_webhook) { + ControllerBuilder::new(tracker) + .with_params(params) + .with_webhook(admission_hook) + } else { + ControllerBuilder::new(tracker).with_params(params) + }; + manager.register_controller(controller); + manager.start().await; Ok(()) } diff --git a/krator/src/admission.rs b/krator/src/admission.rs index 2bf8bbc..dcba1e2 100644 --- a/krator/src/admission.rs +++ b/krator/src/admission.rs @@ -20,7 +20,6 @@ use std::{ sync::Arc, }; use tracing::{info, trace, warn}; -use tracing_futures::Instrument; /// WebhookResources encapsulates Kubernetes resources necessary to register the admission webhook. /// and provides some convenience functions @@ -321,9 +320,9 @@ impl AdmissionTls { } /// Type signature for validating or mutating webhooks. -pub type WebhookFn = dyn Fn( +pub type WebhookFn = fn( ::Manifest, - <::ObjectState as ObjectState>::SharedState, + &<::ObjectState as ObjectState>::SharedState, ) -> AdmissionResult<::Manifest>; /// Result of admission hook. @@ -443,7 +442,7 @@ struct AdmissionReviewResponse { #[tracing::instrument( level="debug", - skip(operator, request), + skip(operator, request, hook), fields( name=%request.request.name(), namespace=?request.request.namespace(), @@ -455,7 +454,8 @@ struct AdmissionReviewResponse { async fn review( operator: Arc, request: AdmissionReviewRequest, -) -> warp::reply::Json { + hook: &WebhookFn, +) -> warp::reply::WithStatus { let manifest = match request.request.operation { AdmissionRequestOperation::Create { object, .. } => object, AdmissionRequestOperation::Update { @@ -478,12 +478,14 @@ async fn review( let name = manifest.name(); let namespace = manifest.namespace(); - let span = tracing::debug_span!("Operator::admission_hook",); + // let span = tracing::debug_span!("hook",); - let result = operator - .admission_hook(manifest.clone()) - .instrument(span) - .await; + let shared = operator.shared_state().await; + + let result = { + let shared = shared.read().await; + hook(manifest.clone(), &*shared) + }; let response = match result { AdmissionResult::Allow(new_manifest) => { @@ -530,35 +532,60 @@ async fn review( } } }; - warp::reply::json(&AdmissionReviewResponse { - api_version: request.api_version, - kind: request.kind, - response, - }) + warp::reply::with_status( + warp::reply::json(&AdmissionReviewResponse { + api_version: request.api_version, + kind: request.kind, + response, + }), + warp::http::StatusCode::OK, + ) } -pub(crate) async fn endpoint(operator: Arc) { - let tls = operator - .admission_hook_tls() - .await - .expect("getting webhook tls AdmissionTls failed"); - +use warp::Rejection; +pub(crate) fn create_endpoint( + operator: Arc, + path: String, + f: WebhookFn, +) -> impl warp::Filter,), Error = Rejection> + Clone +{ use warp::Filter; - let routes = warp::any() + warp::path(path) + .and(warp::path::end()) .and(warp::post()) .and(warp::body::json()) .and_then(move |request: AdmissionReviewRequest| { let operator = Arc::clone(&operator); async move { - let response = review(operator, request).await; + let response = review(operator, request, &f).await; Ok::<_, std::convert::Infallible>(response) } - }); - - warp::serve(routes) - .tls() - .cert(tls.cert) - .key(tls.private_key) - .run(([0, 0, 0, 0], 8443)) - .await; + }) +} + +pub(crate) async fn endpoint(_operator: Arc) { + todo!() + // let tls = operator + // .admission_hook_tls() + // .await + // .expect("getting webhook tls AdmissionTls failed"); + + // use warp::Filter; + // let routes = warp::any() + // .and(warp::post()) + // .and(warp::body::json()) + // .and_then(move |request: AdmissionReviewRequest| { + // let operator = Arc::clone(&operator); + // async move { + // let response = review(operator, request).await; + // Ok::<_, std::convert::Infallible>(response) + // } + // }); + + // warp::serve(routes) + // .tls() + // .cert(tls.cert) + // .key(tls.private_key) + // .run(([0, 0, 0, 0], 8443)) + // .await; } diff --git a/krator/src/lib.rs b/krator/src/lib.rs index 92be9f5..bebd662 100644 --- a/krator/src/lib.rs +++ b/krator/src/lib.rs @@ -14,12 +14,8 @@ pub mod admission; pub mod state; -// TODO: Remove once webhooks are supported. -#[cfg(not(feature = "admission-webhook"))] mod manager; -#[cfg(not(feature = "admission-webhook"))] pub use manager::controller::ControllerBuilder; -#[cfg(not(feature = "admission-webhook"))] pub use manager::Manager; pub use manifest::Manifest; diff --git a/krator/src/manager.rs b/krator/src/manager.rs index d56e84f..5089b8b 100644 --- a/krator/src/manager.rs +++ b/krator/src/manager.rs @@ -1,6 +1,8 @@ //! Defines types for registering controllers with runtime. use crate::{operator::Operator, store::Store}; - +use std::sync::Arc; +#[cfg(feature = "admission-webhook")] +use warp::Filter; pub mod tasks; use tasks::{controller_tasks, OperatorTask}; @@ -20,23 +22,57 @@ pub struct Manager { controllers: Vec, controller_tasks: Vec, store: Store, + #[cfg(feature = "admission-webhook")] + filter: warp::filters::BoxedFilter<(warp::reply::WithStatus,)>, +} + +fn not_found() -> warp::reply::WithStatus { + warp::reply::with_status(warp::reply::json(&()), warp::http::StatusCode::NOT_FOUND) } impl Manager { /// Create a new controller manager. pub fn new(kubeconfig: &kube::Config) -> Self { + #[cfg(feature = "admission-webhook")] + let filter = { warp::any().map(not_found).boxed() }; + Manager { controllers: vec![], controller_tasks: vec![], kubeconfig: kubeconfig.clone(), store: Store::new(), + #[cfg(feature = "admission-webhook")] + filter, } } /// Register a controller with the manager. pub fn register_controller(&mut self, builder: ControllerBuilder) { - let (controller, tasks) = + let webhooks = builder.webhooks.clone(); + + let (operator, controller, tasks) = controller_tasks(self.kubeconfig.clone(), builder, self.store.clone()); + + #[cfg(feature = "admission-webhook")] + for (path, f) in webhooks { + let endpoint = + crate::admission::create_endpoint::(Arc::clone(&operator), path.to_string(), f); + + // Create temporary variable w/ throwaway filter of correct type. + let mut temp = warp::any().map(not_found).boxed(); + + // Swap self.filter into temporary. + std::mem::swap(&mut temp, &mut self.filter); + + // Compose new filter from new endpoint and temporary (now holding original self.filter). + let mut new_filter = endpoint.or(temp).unify().boxed(); + + // Swap new filter back into self.filter. + std::mem::swap(&mut new_filter, &mut self.filter); + + // Throwaway filter stored in new_filter implicitly dropped. + } + self.controllers.push(controller); self.controller_tasks.extend(tasks); } @@ -62,6 +98,16 @@ impl Manager { } } + #[cfg(feature = "admission-webhook")] + { + let task = warp::serve(self.filter) + // .tls() + // .cert(tls.cert) + // .key(tls.private_key) + .run(([0, 0, 0, 0], 8443)); + tasks.push(task.boxed()); + } + futures::future::join_all(tasks).await; } } diff --git a/krator/src/manager/controller.rs b/krator/src/manager/controller.rs index 06265b8..7bd540f 100644 --- a/krator/src/manager/controller.rs +++ b/krator/src/manager/controller.rs @@ -4,6 +4,8 @@ use crate::admission::WebhookFn; use crate::operator::Watchable; use crate::Operator; use kube::api::ListParams; +use kube::Resource; +use std::collections::BTreeMap; /// Builder pattern for registering a controller or operator. pub struct ControllerBuilder { @@ -23,6 +25,8 @@ pub struct ControllerBuilder { /// The buffer length for Tokio channels used to communicate between /// watcher tasks and runtime tasks. buffer: usize, + /// Registered webhooks. + pub(crate) webhooks: BTreeMap>, } impl ControllerBuilder { @@ -35,6 +39,7 @@ impl ControllerBuilder { namespace: None, list_params: Default::default(), buffer: 32, + webhooks: BTreeMap::new(), } } @@ -165,30 +170,25 @@ impl ControllerBuilder { self } - /// Registers a validating webhook at the path "/$GROUP/$VERSION/$KIND". + /// Registers a webhook at the path "/$GROUP/$VERSION/$KIND". /// Multiple webhooks can be registered, but must be at different paths. #[cfg(feature = "admission-webhook")] - pub(crate) fn validates(self, _f: &WebhookFn) -> Self { - todo!() - } - - /// Registers a validating webhook at the supplied path. - #[cfg(feature = "admission-webhook")] - pub(crate) fn validates_at_path(self, _path: &str, _f: &WebhookFn) -> Self { - todo!() - } - - /// Registers a mutating webhook at the path "/$GROUP/$VERSION/$KIND". - /// Multiple webhooks can be registered, but must be at different paths. - #[cfg(feature = "admission-webhook")] - pub(crate) fn mutates(self, _f: &WebhookFn) -> Self { - todo!() + pub fn with_webhook(mut self, f: WebhookFn) -> Self { + let path = format!( + "/{}/{}/{}", + O::Manifest::group(&()), + O::Manifest::version(&()), + O::Manifest::kind(&()) + ); + self.webhooks.insert(path, f); + self } - /// Registers a mutating webhook at the supplied path. + /// Registers a webhook at the supplied path. #[cfg(feature = "admission-webhook")] - pub(crate) fn mutates_at_path(self, _path: &str, _f: &WebhookFn) -> Self { - todo!() + pub fn with_webhook_at_path(mut self, path: &str, f: WebhookFn) -> Self { + self.webhooks.insert(path.to_string(), f); + self } } diff --git a/krator/src/manager/tasks.rs b/krator/src/manager/tasks.rs index c67f9bd..7b6c578 100644 --- a/krator/src/manager/tasks.rs +++ b/krator/src/manager/tasks.rs @@ -4,9 +4,9 @@ use std::future::Future; use futures::FutureExt; - use kube::{api::ApiResource, api::GroupVersionKind, Resource}; use kube_runtime::watcher::Event; +use std::sync::Arc; use tracing::{debug, info, warn}; use crate::{ @@ -64,7 +64,7 @@ pub(crate) async fn launch_watcher(client: kube::Client, handle: WatchHandle) { /// concrete `Event`. async fn launch_runtime( kubeconfig: kube::Config, - controller: O, + controller: Arc, mut rx: tokio::sync::mpsc::Receiver, store: Store, ) { @@ -197,15 +197,16 @@ pub(crate) fn controller_tasks( kubeconfig: kube::Config, controller: ControllerBuilder, store: Store, -) -> (Controller, Vec) { +) -> (Arc, Controller, Vec) { let mut watches = Vec::new(); let mut owns = Vec::new(); let mut tasks = Vec::new(); let buffer = controller.buffer(); + let (manages, rx) = controller.manages().handle(buffer); + let operator = Arc::new(controller.controller); // Create main Operator task. - let (manages, rx) = controller.manages().handle(buffer); - let task = launch_runtime(kubeconfig, controller.controller, rx, store.clone()).boxed(); + let task = launch_runtime(kubeconfig, Arc::clone(&operator), rx, store.clone()).boxed(); tasks.push(task); for watch in controller.watches { @@ -223,6 +224,7 @@ pub(crate) fn controller_tasks( } ( + operator, Controller { manages, owns, diff --git a/krator/src/operator.rs b/krator/src/operator.rs index 13cde9a..6299fa2 100644 --- a/krator/src/operator.rs +++ b/krator/src/operator.rs @@ -55,12 +55,12 @@ pub trait Operator: 'static + Sync + Send { Ok(()) } - #[cfg(feature = "admission-webhook")] - /// Invoked when object is created or modified. Can mutate the and / or deny the request. - async fn admission_hook( - &self, - manifest: Self::Manifest, - ) -> crate::admission::AdmissionResult; + // #[cfg(feature = "admission-webhook")] + // /// Invoked when object is created or modified. Can mutate the and / or deny the request. + // async fn admission_hook( + // &self, + // manifest: Self::Manifest, + // ) -> crate::admission::AdmissionResult; #[cfg(feature = "admission-webhook")] /// Gets called by the operator if the admission-webhook feature is enabled. The function should diff --git a/krator/src/runtime.rs b/krator/src/runtime.rs index df79d17..7f4c9ec 100644 --- a/krator/src/runtime.rs +++ b/krator/src/runtime.rs @@ -76,10 +76,9 @@ impl OperatorRuntime { } } - #[cfg(not(feature = "admission-webhook"))] pub(crate) fn new_with_store( kubeconfig: &kube::Config, - operator: O, + operator: Arc, params: Option, store: Store, ) -> Self { @@ -89,7 +88,7 @@ impl OperatorRuntime { OperatorRuntime { client, handlers: HashMap::new(), - operator: Arc::new(operator), + operator, list_params, signal: None, store, diff --git a/krator/src/store.rs b/krator/src/store.rs index 2449795..51f59c6 100644 --- a/krator/src/store.rs +++ b/krator/src/store.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; -#[cfg(not(feature = "admission-webhook"))] use kube::api::DynamicObject; use kube::api::GroupVersionKind; @@ -51,7 +50,6 @@ impl Store { } /// Clear cache for specified object kind. - #[cfg(not(feature = "admission-webhook"))] pub(crate) async fn reset(&self, gvk: &GroupVersionKind) { let mut objects = self.objects.write().await; let key = gvk.clone(); @@ -60,7 +58,6 @@ impl Store { } /// Delete a cached object. - #[cfg(not(feature = "admission-webhook"))] pub(crate) async fn delete_gvk( &self, namespace: Option, @@ -75,7 +72,6 @@ impl Store { } /// Insert an object that has already been type erased. - #[cfg(not(feature = "admission-webhook"))] pub(crate) async fn insert_gvk( &self, namespace: Option, From bb2564fafcfd7ae3989d1b86d8a66f7178404ea4 Mon Sep 17 00:00:00 2001 From: Kevin Flansburg Date: Tue, 12 Oct 2021 14:11:27 -0700 Subject: [PATCH 2/4] Get everything working Signed-off-by: Kevin Flansburg --- krator/examples/moose.rs | 40 +++++-------- krator/src/admission.rs | 100 ++++++++++++++++++------------- krator/src/manager.rs | 18 +++--- krator/src/manager/controller.rs | 88 ++++++++++++++++++++++++--- krator/src/manager/tasks.rs | 12 ++-- krator/src/operator.rs | 12 ++-- 6 files changed, 174 insertions(+), 96 deletions(-) diff --git a/krator/examples/moose.rs b/krator/examples/moose.rs index 8e33539..4810520 100644 --- a/krator/examples/moose.rs +++ b/krator/examples/moose.rs @@ -352,25 +352,13 @@ impl Operator for MooseTracker { Arc::clone(&self.shared) } - // #[cfg(feature = "admission-webhook")] - // async fn admission_hook( - // &self, - // manifest: Self::Manifest, - // ) -> krator::admission::AdmissionResult { - // use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status; - // // All moose names start with "M" - // let name = manifest.meta().name.clone().unwrap(); - // info!("Processing admission hook for moose named {}", name); - // match name.chars().next() { - // Some('m') | Some('M') => krator::admission::AdmissionResult::Allow(manifest), - // _ => krator::admission::AdmissionResult::Deny(Status { - // code: Some(400), - // message: Some("Mooses may only have names starting with 'M'.".to_string()), - // status: Some("Failure".to_string()), - // ..Default::default() - // }), - // } - // } + #[cfg(feature = "admission-webhook")] + async fn admission_hook( + &self, + _manifest: Self::Manifest, + ) -> crate::admission::AdmissionResult { + unimplemented!() + } #[cfg(feature = "admission-webhook")] async fn admission_hook_tls(&self) -> anyhow::Result { @@ -387,9 +375,9 @@ impl Operator for MooseTracker { } #[cfg(feature = "admission-webhook")] -fn admission_hook( +async fn admission_hook( manifest: Moose, - _shared: &SharedMooseState, + _shared: Arc>, ) -> krator::admission::AdmissionResult { use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status; // All moose names start with "M" @@ -550,13 +538,17 @@ Running moose example. Try to install some of the manifests provided in examples use krator::{ControllerBuilder, Manager}; let mut manager = Manager::new(&kubeconfig); - let controller = if cfg!(admission_webhook) { + + #[cfg(feature = "admission-webhook")] + let controller = { ControllerBuilder::new(tracker) .with_params(params) .with_webhook(admission_hook) - } else { - ControllerBuilder::new(tracker).with_params(params) }; + + #[cfg(not(feature = "admission-webhook"))] + let controller = { ControllerBuilder::new(tracker).with_params(params) }; + manager.register_controller(controller); manager.start().await; Ok(()) diff --git a/krator/src/admission.rs b/krator/src/admission.rs index dcba1e2..ed93c0b 100644 --- a/krator/src/admission.rs +++ b/krator/src/admission.rs @@ -1,5 +1,5 @@ //! Basic implementation of Kubernetes Admission API -use crate::ObjectState; +use crate::manager::controller::{GenericAsyncFn, GenericFuture}; use crate::Operator; use anyhow::{bail, ensure, Context}; use k8s_openapi::{ @@ -320,11 +320,12 @@ impl AdmissionTls { } /// Type signature for validating or mutating webhooks. -pub type WebhookFn = fn( - ::Manifest, - &<::ObjectState as ObjectState>::SharedState, -) -> AdmissionResult<::Manifest>; +// pub type WebhookFn = fn( +// O::Manifest, +// &::SharedState, +// ) -> std::pin::Pin> + Send + 'static>>; +//std::pin::Pin::Manifest>> + Send>> /// Result of admission hook. #[allow(clippy::large_enum_variant)] pub enum AdmissionResult { @@ -451,11 +452,15 @@ struct AdmissionReviewResponse { user_info=?request.request.user_info ) )] -async fn review( +async fn review( operator: Arc, request: AdmissionReviewRequest, - hook: &WebhookFn, -) -> warp::reply::WithStatus { + hook: F, +) -> warp::reply::WithStatus +where + R: GenericFuture, + F: GenericAsyncFn, +{ let manifest = match request.request.operation { AdmissionRequestOperation::Create { object, .. } => object, AdmissionRequestOperation::Update { @@ -482,10 +487,7 @@ async fn review( let shared = operator.shared_state().await; - let result = { - let shared = shared.read().await; - hook(manifest.clone(), &*shared) - }; + let result = hook(manifest.clone(), shared).await; let response = match result { AdmissionResult::Allow(new_manifest) => { @@ -542,50 +544,62 @@ async fn review( ) } -use warp::Rejection; -pub(crate) fn create_endpoint( +pub(crate) fn create_boxed_endpoint( operator: Arc, path: String, - f: WebhookFn, -) -> impl warp::Filter,), Error = Rejection> + Clone + f: F, +) -> warp::filters::BoxedFilter<(warp::reply::WithStatus,)> +where + R: GenericFuture, + F: GenericAsyncFn, { use warp::Filter; + warp::path(path) .and(warp::path::end()) .and(warp::post()) + .map(move || f.clone()) .and(warp::body::json()) - .and_then(move |request: AdmissionReviewRequest| { + .and_then(move |f: F, request: AdmissionReviewRequest| { let operator = Arc::clone(&operator); async move { - let response = review(operator, request, &f).await; + let response = review(operator, request, f).await; Ok::<_, std::convert::Infallible>(response) } }) + .boxed() } -pub(crate) async fn endpoint(_operator: Arc) { - todo!() - // let tls = operator - // .admission_hook_tls() - // .await - // .expect("getting webhook tls AdmissionTls failed"); - - // use warp::Filter; - // let routes = warp::any() - // .and(warp::post()) - // .and(warp::body::json()) - // .and_then(move |request: AdmissionReviewRequest| { - // let operator = Arc::clone(&operator); - // async move { - // let response = review(operator, request).await; - // Ok::<_, std::convert::Infallible>(response) - // } - // }); - - // warp::serve(routes) - // .tls() - // .cert(tls.cert) - // .key(tls.private_key) - // .run(([0, 0, 0, 0], 8443)) - // .await; +pub(crate) async fn endpoint(operator: Arc) { + let tls = operator + .admission_hook_tls() + .await + .expect("getting webhook tls AdmissionTls failed"); + + use warp::Filter; + let routes = warp::any() + .map(move || Arc::clone(&operator)) + .map(|operator: Arc| { + let hook_operator = Arc::clone(&operator); + let func = move |manifest: O::Manifest, _| { + let inner_operator = Arc::clone(&hook_operator); + async move { inner_operator.admission_hook(manifest).await } + }; + (operator, func) + }) + .and(warp::post()) + .and(warp::body::json()) + .and_then( + move |(operator, f), request: AdmissionReviewRequest| async move { + let response = review(operator, request, f).await; + Ok::<_, std::convert::Infallible>(response) + }, + ); + + warp::serve(routes) + .tls() + .cert(tls.cert) + .key(tls.private_key) + .run(([0, 0, 0, 0], 8443)) + .await; } diff --git a/krator/src/manager.rs b/krator/src/manager.rs index 5089b8b..83dcdae 100644 --- a/krator/src/manager.rs +++ b/krator/src/manager.rs @@ -1,6 +1,6 @@ //! Defines types for registering controllers with runtime. use crate::{operator::Operator, store::Store}; -use std::sync::Arc; +// use std::sync::Arc; #[cfg(feature = "admission-webhook")] use warp::Filter; pub mod tasks; @@ -26,6 +26,7 @@ pub struct Manager { filter: warp::filters::BoxedFilter<(warp::reply::WithStatus,)>, } +#[cfg(feature = "admission-webhook")] fn not_found() -> warp::reply::WithStatus { warp::reply::with_status(warp::reply::json(&()), warp::http::StatusCode::NOT_FOUND) } @@ -48,16 +49,8 @@ impl Manager { /// Register a controller with the manager. pub fn register_controller(&mut self, builder: ControllerBuilder) { - let webhooks = builder.webhooks.clone(); - - let (operator, controller, tasks) = - controller_tasks(self.kubeconfig.clone(), builder, self.store.clone()); - #[cfg(feature = "admission-webhook")] - for (path, f) in webhooks { - let endpoint = - crate::admission::create_endpoint::(Arc::clone(&operator), path.to_string(), f); - + for endpoint in builder.webhooks.values() { // Create temporary variable w/ throwaway filter of correct type. let mut temp = warp::any().map(not_found).boxed(); @@ -65,7 +58,7 @@ impl Manager { std::mem::swap(&mut temp, &mut self.filter); // Compose new filter from new endpoint and temporary (now holding original self.filter). - let mut new_filter = endpoint.or(temp).unify().boxed(); + let mut new_filter = endpoint.clone().or(temp).unify().boxed(); // Swap new filter back into self.filter. std::mem::swap(&mut new_filter, &mut self.filter); @@ -73,6 +66,9 @@ impl Manager { // Throwaway filter stored in new_filter implicitly dropped. } + let (controller, tasks) = + controller_tasks(self.kubeconfig.clone(), builder, self.store.clone()); + self.controllers.push(controller); self.controller_tasks.extend(tasks); } diff --git a/krator/src/manager/controller.rs b/krator/src/manager/controller.rs index 7bd540f..440a43e 100644 --- a/krator/src/manager/controller.rs +++ b/krator/src/manager/controller.rs @@ -1,16 +1,25 @@ use super::watch::{Watch, WatchHandle}; #[cfg(feature = "admission-webhook")] -use crate::admission::WebhookFn; +use crate::admission::create_boxed_endpoint; +#[cfg(feature = "admission-webhook")] +use crate::admission::AdmissionResult; use crate::operator::Watchable; +#[cfg(feature = "admission-webhook")] +use crate::ObjectState; use crate::Operator; use kube::api::ListParams; +#[cfg(feature = "admission-webhook")] use kube::Resource; +#[cfg(feature = "admission-webhook")] use std::collections::BTreeMap; +use std::sync::Arc; +#[cfg(feature = "admission-webhook")] +use tokio::sync::RwLock; /// Builder pattern for registering a controller or operator. pub struct ControllerBuilder { /// The controller or operator singleton. - pub(crate) controller: C, + pub(crate) controller: Arc, /// List of watch configurations for objects that will simply be cached /// locally. pub(crate) watches: Vec, @@ -26,19 +35,35 @@ pub struct ControllerBuilder { /// watcher tasks and runtime tasks. buffer: usize, /// Registered webhooks. - pub(crate) webhooks: BTreeMap>, + #[cfg(feature = "admission-webhook")] + pub(crate) webhooks: + BTreeMap,)>>, } +// pub trait AsyncPtrFuture: std::future::Future> + Send + 'static {} + +// pub type AsyncFnPtrReturn = std::pin::Pin +// > +// >; + +// pub trait AsyncFn: Fn(O::Manifest, &::SharedState) -> AsyncFnPtrReturn {} + +// pub type AsyncFnPtr = Box< +// dyn AsyncFn +// >; + impl ControllerBuilder { /// Create builder from operator singleton. pub fn new(operator: O) -> Self { ControllerBuilder { - controller: operator, + controller: Arc::new(operator), watches: vec![], owns: vec![], namespace: None, list_params: Default::default(), buffer: 32, + #[cfg(feature = "admission-webhook")] webhooks: BTreeMap::new(), } } @@ -173,25 +198,72 @@ impl ControllerBuilder { /// Registers a webhook at the path "/$GROUP/$VERSION/$KIND". /// Multiple webhooks can be registered, but must be at different paths. #[cfg(feature = "admission-webhook")] - pub fn with_webhook(mut self, f: WebhookFn) -> Self { + pub fn with_webhook(mut self, f: F) -> Self + where + R: GenericFuture, + F: GenericAsyncFn, + { let path = format!( "/{}/{}/{}", O::Manifest::group(&()), O::Manifest::version(&()), O::Manifest::kind(&()) ); - self.webhooks.insert(path, f); + let filter = create_boxed_endpoint(Arc::clone(&self.controller), path.to_string(), f); + self.webhooks.insert(path.to_string(), filter); self } /// Registers a webhook at the supplied path. #[cfg(feature = "admission-webhook")] - pub fn with_webhook_at_path(mut self, path: &str, f: WebhookFn) -> Self { - self.webhooks.insert(path.to_string(), f); + pub fn with_webhook_at_path(mut self, path: &str, f: F) -> Self + where + R: GenericFuture, + F: GenericAsyncFn, + { + let filter = create_boxed_endpoint(Arc::clone(&self.controller), path.to_string(), f); + self.webhooks.insert(path.to_string(), filter); self } } +#[cfg(feature = "admission-webhook")] +pub trait GenericFuture: + 'static + std::future::Future> + Send +{ +} + +#[cfg(feature = "admission-webhook")] +impl< + O: Operator, + T: 'static + std::future::Future> + Send, + > GenericFuture for T +{ +} + +#[cfg(feature = "admission-webhook")] +pub trait GenericAsyncFn: + 'static + + Clone + + Send + + Sync + + Fn(O::Manifest, Arc::SharedState>>) -> R +{ +} + +#[cfg(feature = "admission-webhook")] +impl< + O: Operator, + R, + T: 'static + + Clone + + Send + + Sync + + Fn(O::Manifest, Arc::SharedState>>) -> R, + > GenericAsyncFn for T +{ +} + #[derive(Clone)] pub struct Controller { pub manages: WatchHandle, diff --git a/krator/src/manager/tasks.rs b/krator/src/manager/tasks.rs index 7b6c578..eeba4d4 100644 --- a/krator/src/manager/tasks.rs +++ b/krator/src/manager/tasks.rs @@ -197,16 +197,21 @@ pub(crate) fn controller_tasks( kubeconfig: kube::Config, controller: ControllerBuilder, store: Store, -) -> (Arc, Controller, Vec) { +) -> (Controller, Vec) { let mut watches = Vec::new(); let mut owns = Vec::new(); let mut tasks = Vec::new(); let buffer = controller.buffer(); let (manages, rx) = controller.manages().handle(buffer); - let operator = Arc::new(controller.controller); // Create main Operator task. - let task = launch_runtime(kubeconfig, Arc::clone(&operator), rx, store.clone()).boxed(); + let task = launch_runtime( + kubeconfig, + Arc::clone(&controller.controller), + rx, + store.clone(), + ) + .boxed(); tasks.push(task); for watch in controller.watches { @@ -224,7 +229,6 @@ pub(crate) fn controller_tasks( } ( - operator, Controller { manages, owns, diff --git a/krator/src/operator.rs b/krator/src/operator.rs index 6299fa2..13cde9a 100644 --- a/krator/src/operator.rs +++ b/krator/src/operator.rs @@ -55,12 +55,12 @@ pub trait Operator: 'static + Sync + Send { Ok(()) } - // #[cfg(feature = "admission-webhook")] - // /// Invoked when object is created or modified. Can mutate the and / or deny the request. - // async fn admission_hook( - // &self, - // manifest: Self::Manifest, - // ) -> crate::admission::AdmissionResult; + #[cfg(feature = "admission-webhook")] + /// Invoked when object is created or modified. Can mutate the and / or deny the request. + async fn admission_hook( + &self, + manifest: Self::Manifest, + ) -> crate::admission::AdmissionResult; #[cfg(feature = "admission-webhook")] /// Gets called by the operator if the admission-webhook feature is enabled. The function should From 19fa57e8b5bb2f937f9a9d2fe0f772093e7a0ac9 Mon Sep 17 00:00:00 2001 From: Kevin Flansburg Date: Tue, 12 Oct 2021 14:44:09 -0700 Subject: [PATCH 3/4] Some cleanup Signed-off-by: Kevin Flansburg --- krator/examples/moose.rs | 16 +++++++- krator/src/admission.rs | 47 +++++++++++++++++++----- krator/src/manager/controller.rs | 63 ++------------------------------ 3 files changed, 54 insertions(+), 72 deletions(-) diff --git a/krator/examples/moose.rs b/krator/examples/moose.rs index 4810520..6e95ad0 100644 --- a/krator/examples/moose.rs +++ b/krator/examples/moose.rs @@ -355,9 +355,21 @@ impl Operator for MooseTracker { #[cfg(feature = "admission-webhook")] async fn admission_hook( &self, - _manifest: Self::Manifest, + manifest: Self::Manifest, ) -> crate::admission::AdmissionResult { - unimplemented!() + use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status; + // All moose names start with "M" + let name = manifest.meta().name.clone().unwrap(); + info!("Processing admission hook for moose named {}", name); + match name.chars().next() { + Some('m') | Some('M') => krator::admission::AdmissionResult::Allow(manifest), + _ => krator::admission::AdmissionResult::Deny(Status { + code: Some(400), + message: Some("Mooses may only have names starting with 'M'.".to_string()), + status: Some("Failure".to_string()), + ..Default::default() + }), + } } #[cfg(feature = "admission-webhook")] diff --git a/krator/src/admission.rs b/krator/src/admission.rs index ed93c0b..a7df23f 100644 --- a/krator/src/admission.rs +++ b/krator/src/admission.rs @@ -1,5 +1,5 @@ //! Basic implementation of Kubernetes Admission API -use crate::manager::controller::{GenericAsyncFn, GenericFuture}; +use crate::ObjectState; use crate::Operator; use anyhow::{bail, ensure, Context}; use k8s_openapi::{ @@ -19,6 +19,7 @@ use std::{ fmt::{Display, Formatter}, sync::Arc, }; +use tokio::sync::RwLock; use tracing::{info, trace, warn}; /// WebhookResources encapsulates Kubernetes resources necessary to register the admission webhook. @@ -319,13 +320,6 @@ impl AdmissionTls { } } -/// Type signature for validating or mutating webhooks. -// pub type WebhookFn = fn( -// O::Manifest, -// &::SharedState, -// ) -> std::pin::Pin> + Send + 'static>>; - -//std::pin::Pin::Manifest>> + Send>> /// Result of admission hook. #[allow(clippy::large_enum_variant)] pub enum AdmissionResult { @@ -441,6 +435,41 @@ struct AdmissionReviewResponse { response: AdmissionResponse, } +/// Trait for Future which returns AdmissionResult. +pub trait GenericFuture: + 'static + std::future::Future> + Send +{ +} + +impl< + O: Operator, + T: 'static + std::future::Future> + Send, + > GenericFuture for T +{ +} + +/// Trait to represent an admission review function. +pub trait GenericAsyncFn: + 'static + + Clone + + Send + + Sync + + Fn(O::Manifest, Arc::SharedState>>) -> R +{ +} + +impl< + O: Operator, + R, + T: 'static + + Clone + + Send + + Sync + + Fn(O::Manifest, Arc::SharedState>>) -> R, + > GenericAsyncFn for T +{ +} + #[tracing::instrument( level="debug", skip(operator, request, hook), @@ -483,8 +512,6 @@ where let name = manifest.name(); let namespace = manifest.namespace(); - // let span = tracing::debug_span!("hook",); - let shared = operator.shared_state().await; let result = hook(manifest.clone(), shared).await; diff --git a/krator/src/manager/controller.rs b/krator/src/manager/controller.rs index 440a43e..4501522 100644 --- a/krator/src/manager/controller.rs +++ b/krator/src/manager/controller.rs @@ -1,20 +1,12 @@ use super::watch::{Watch, WatchHandle}; #[cfg(feature = "admission-webhook")] -use crate::admission::create_boxed_endpoint; -#[cfg(feature = "admission-webhook")] -use crate::admission::AdmissionResult; +use crate::admission::{create_boxed_endpoint, GenericAsyncFn, GenericFuture}; use crate::operator::Watchable; -#[cfg(feature = "admission-webhook")] -use crate::ObjectState; use crate::Operator; use kube::api::ListParams; #[cfg(feature = "admission-webhook")] -use kube::Resource; -#[cfg(feature = "admission-webhook")] use std::collections::BTreeMap; use std::sync::Arc; -#[cfg(feature = "admission-webhook")] -use tokio::sync::RwLock; /// Builder pattern for registering a controller or operator. pub struct ControllerBuilder { @@ -40,19 +32,6 @@ pub struct ControllerBuilder { BTreeMap,)>>, } -// pub trait AsyncPtrFuture: std::future::Future> + Send + 'static {} - -// pub type AsyncFnPtrReturn = std::pin::Pin -// > -// >; - -// pub trait AsyncFn: Fn(O::Manifest, &::SharedState) -> AsyncFnPtrReturn {} - -// pub type AsyncFnPtr = Box< -// dyn AsyncFn -// >; - impl ControllerBuilder { /// Create builder from operator singleton. pub fn new(operator: O) -> Self { @@ -203,6 +182,7 @@ impl ControllerBuilder { R: GenericFuture, F: GenericAsyncFn, { + use kube::Resource; let path = format!( "/{}/{}/{}", O::Manifest::group(&()), @@ -210,7 +190,7 @@ impl ControllerBuilder { O::Manifest::kind(&()) ); let filter = create_boxed_endpoint(Arc::clone(&self.controller), path.to_string(), f); - self.webhooks.insert(path.to_string(), filter); + self.webhooks.insert(path, filter); self } @@ -227,43 +207,6 @@ impl ControllerBuilder { } } -#[cfg(feature = "admission-webhook")] -pub trait GenericFuture: - 'static + std::future::Future> + Send -{ -} - -#[cfg(feature = "admission-webhook")] -impl< - O: Operator, - T: 'static + std::future::Future> + Send, - > GenericFuture for T -{ -} - -#[cfg(feature = "admission-webhook")] -pub trait GenericAsyncFn: - 'static - + Clone - + Send - + Sync - + Fn(O::Manifest, Arc::SharedState>>) -> R -{ -} - -#[cfg(feature = "admission-webhook")] -impl< - O: Operator, - R, - T: 'static - + Clone - + Send - + Sync - + Fn(O::Manifest, Arc::SharedState>>) -> R, - > GenericAsyncFn for T -{ -} - #[derive(Clone)] pub struct Controller { pub manages: WatchHandle, From 1656a904872cd78e84bb67b5370dc6e5097ac82e Mon Sep 17 00:00:00 2001 From: Kevin Flansburg Date: Tue, 12 Oct 2021 14:45:02 -0700 Subject: [PATCH 4/4] Typo Signed-off-by: Kevin Flansburg --- krator/examples/moose.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/krator/examples/moose.rs b/krator/examples/moose.rs index 6e95ad0..38aacfa 100644 --- a/krator/examples/moose.rs +++ b/krator/examples/moose.rs @@ -356,7 +356,7 @@ impl Operator for MooseTracker { async fn admission_hook( &self, manifest: Self::Manifest, - ) -> crate::admission::AdmissionResult { + ) -> krator::admission::AdmissionResult { use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status; // All moose names start with "M" let name = manifest.meta().name.clone().unwrap();