diff --git a/krator/examples/moose.rs b/krator/examples/moose.rs index 8e33539..4810520 100644 --- a/krator/examples/moose.rs +++ b/krator/examples/moose.rs @@ -352,25 +352,13 @@ impl Operator for MooseTracker { Arc::clone(&self.shared) } - // #[cfg(feature = "admission-webhook")] - // async fn admission_hook( - // &self, - // manifest: Self::Manifest, - // ) -> krator::admission::AdmissionResult { - // use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status; - // // All moose names start with "M" - // let name = manifest.meta().name.clone().unwrap(); - // info!("Processing admission hook for moose named {}", name); - // match name.chars().next() { - // Some('m') | Some('M') => krator::admission::AdmissionResult::Allow(manifest), - // _ => krator::admission::AdmissionResult::Deny(Status { - // code: Some(400), - // message: Some("Mooses may only have names starting with 'M'.".to_string()), - // status: Some("Failure".to_string()), - // ..Default::default() - // }), - // } - // } + #[cfg(feature = "admission-webhook")] + async fn admission_hook( + &self, + _manifest: Self::Manifest, + ) -> crate::admission::AdmissionResult { + unimplemented!() + } #[cfg(feature = "admission-webhook")] async fn admission_hook_tls(&self) -> anyhow::Result { @@ -387,9 +375,9 @@ impl Operator for MooseTracker { } #[cfg(feature = "admission-webhook")] -fn admission_hook( +async fn admission_hook( manifest: Moose, - _shared: &SharedMooseState, + _shared: Arc>, ) -> krator::admission::AdmissionResult { use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status; // All moose names start with "M" @@ -550,13 +538,17 @@ Running moose example. Try to install some of the manifests provided in examples use krator::{ControllerBuilder, Manager}; let mut manager = Manager::new(&kubeconfig); - let controller = if cfg!(admission_webhook) { + + #[cfg(feature = "admission-webhook")] + let controller = { ControllerBuilder::new(tracker) .with_params(params) .with_webhook(admission_hook) - } else { - ControllerBuilder::new(tracker).with_params(params) }; + + #[cfg(not(feature = "admission-webhook"))] + let controller = { ControllerBuilder::new(tracker).with_params(params) }; + manager.register_controller(controller); manager.start().await; Ok(()) diff --git a/krator/src/admission.rs b/krator/src/admission.rs index dcba1e2..ed93c0b 100644 --- a/krator/src/admission.rs +++ b/krator/src/admission.rs @@ -1,5 +1,5 @@ //! Basic implementation of Kubernetes Admission API -use crate::ObjectState; +use crate::manager::controller::{GenericAsyncFn, GenericFuture}; use crate::Operator; use anyhow::{bail, ensure, Context}; use k8s_openapi::{ @@ -320,11 +320,12 @@ impl AdmissionTls { } /// Type signature for validating or mutating webhooks. -pub type WebhookFn = fn( - ::Manifest, - &<::ObjectState as ObjectState>::SharedState, -) -> AdmissionResult<::Manifest>; +// pub type WebhookFn = fn( +// O::Manifest, +// &::SharedState, +// ) -> std::pin::Pin> + Send + 'static>>; +//std::pin::Pin::Manifest>> + Send>> /// Result of admission hook. #[allow(clippy::large_enum_variant)] pub enum AdmissionResult { @@ -451,11 +452,15 @@ struct AdmissionReviewResponse { user_info=?request.request.user_info ) )] -async fn review( +async fn review( operator: Arc, request: AdmissionReviewRequest, - hook: &WebhookFn, -) -> warp::reply::WithStatus { + hook: F, +) -> warp::reply::WithStatus +where + R: GenericFuture, + F: GenericAsyncFn, +{ let manifest = match request.request.operation { AdmissionRequestOperation::Create { object, .. } => object, AdmissionRequestOperation::Update { @@ -482,10 +487,7 @@ async fn review( let shared = operator.shared_state().await; - let result = { - let shared = shared.read().await; - hook(manifest.clone(), &*shared) - }; + let result = hook(manifest.clone(), shared).await; let response = match result { AdmissionResult::Allow(new_manifest) => { @@ -542,50 +544,62 @@ async fn review( ) } -use warp::Rejection; -pub(crate) fn create_endpoint( +pub(crate) fn create_boxed_endpoint( operator: Arc, path: String, - f: WebhookFn, -) -> impl warp::Filter,), Error = Rejection> + Clone + f: F, +) -> warp::filters::BoxedFilter<(warp::reply::WithStatus,)> +where + R: GenericFuture, + F: GenericAsyncFn, { use warp::Filter; + warp::path(path) .and(warp::path::end()) .and(warp::post()) + .map(move || f.clone()) .and(warp::body::json()) - .and_then(move |request: AdmissionReviewRequest| { + .and_then(move |f: F, request: AdmissionReviewRequest| { let operator = Arc::clone(&operator); async move { - let response = review(operator, request, &f).await; + let response = review(operator, request, f).await; Ok::<_, std::convert::Infallible>(response) } }) + .boxed() } -pub(crate) async fn endpoint(_operator: Arc) { - todo!() - // let tls = operator - // .admission_hook_tls() - // .await - // .expect("getting webhook tls AdmissionTls failed"); - - // use warp::Filter; - // let routes = warp::any() - // .and(warp::post()) - // .and(warp::body::json()) - // .and_then(move |request: AdmissionReviewRequest| { - // let operator = Arc::clone(&operator); - // async move { - // let response = review(operator, request).await; - // Ok::<_, std::convert::Infallible>(response) - // } - // }); - - // warp::serve(routes) - // .tls() - // .cert(tls.cert) - // .key(tls.private_key) - // .run(([0, 0, 0, 0], 8443)) - // .await; +pub(crate) async fn endpoint(operator: Arc) { + let tls = operator + .admission_hook_tls() + .await + .expect("getting webhook tls AdmissionTls failed"); + + use warp::Filter; + let routes = warp::any() + .map(move || Arc::clone(&operator)) + .map(|operator: Arc| { + let hook_operator = Arc::clone(&operator); + let func = move |manifest: O::Manifest, _| { + let inner_operator = Arc::clone(&hook_operator); + async move { inner_operator.admission_hook(manifest).await } + }; + (operator, func) + }) + .and(warp::post()) + .and(warp::body::json()) + .and_then( + move |(operator, f), request: AdmissionReviewRequest| async move { + let response = review(operator, request, f).await; + Ok::<_, std::convert::Infallible>(response) + }, + ); + + warp::serve(routes) + .tls() + .cert(tls.cert) + .key(tls.private_key) + .run(([0, 0, 0, 0], 8443)) + .await; } diff --git a/krator/src/manager.rs b/krator/src/manager.rs index 5089b8b..83dcdae 100644 --- a/krator/src/manager.rs +++ b/krator/src/manager.rs @@ -1,6 +1,6 @@ //! Defines types for registering controllers with runtime. use crate::{operator::Operator, store::Store}; -use std::sync::Arc; +// use std::sync::Arc; #[cfg(feature = "admission-webhook")] use warp::Filter; pub mod tasks; @@ -26,6 +26,7 @@ pub struct Manager { filter: warp::filters::BoxedFilter<(warp::reply::WithStatus,)>, } +#[cfg(feature = "admission-webhook")] fn not_found() -> warp::reply::WithStatus { warp::reply::with_status(warp::reply::json(&()), warp::http::StatusCode::NOT_FOUND) } @@ -48,16 +49,8 @@ impl Manager { /// Register a controller with the manager. pub fn register_controller(&mut self, builder: ControllerBuilder) { - let webhooks = builder.webhooks.clone(); - - let (operator, controller, tasks) = - controller_tasks(self.kubeconfig.clone(), builder, self.store.clone()); - #[cfg(feature = "admission-webhook")] - for (path, f) in webhooks { - let endpoint = - crate::admission::create_endpoint::(Arc::clone(&operator), path.to_string(), f); - + for endpoint in builder.webhooks.values() { // Create temporary variable w/ throwaway filter of correct type. let mut temp = warp::any().map(not_found).boxed(); @@ -65,7 +58,7 @@ impl Manager { std::mem::swap(&mut temp, &mut self.filter); // Compose new filter from new endpoint and temporary (now holding original self.filter). - let mut new_filter = endpoint.or(temp).unify().boxed(); + let mut new_filter = endpoint.clone().or(temp).unify().boxed(); // Swap new filter back into self.filter. std::mem::swap(&mut new_filter, &mut self.filter); @@ -73,6 +66,9 @@ impl Manager { // Throwaway filter stored in new_filter implicitly dropped. } + let (controller, tasks) = + controller_tasks(self.kubeconfig.clone(), builder, self.store.clone()); + self.controllers.push(controller); self.controller_tasks.extend(tasks); } diff --git a/krator/src/manager/controller.rs b/krator/src/manager/controller.rs index 7bd540f..440a43e 100644 --- a/krator/src/manager/controller.rs +++ b/krator/src/manager/controller.rs @@ -1,16 +1,25 @@ use super::watch::{Watch, WatchHandle}; #[cfg(feature = "admission-webhook")] -use crate::admission::WebhookFn; +use crate::admission::create_boxed_endpoint; +#[cfg(feature = "admission-webhook")] +use crate::admission::AdmissionResult; use crate::operator::Watchable; +#[cfg(feature = "admission-webhook")] +use crate::ObjectState; use crate::Operator; use kube::api::ListParams; +#[cfg(feature = "admission-webhook")] use kube::Resource; +#[cfg(feature = "admission-webhook")] use std::collections::BTreeMap; +use std::sync::Arc; +#[cfg(feature = "admission-webhook")] +use tokio::sync::RwLock; /// Builder pattern for registering a controller or operator. pub struct ControllerBuilder { /// The controller or operator singleton. - pub(crate) controller: C, + pub(crate) controller: Arc, /// List of watch configurations for objects that will simply be cached /// locally. pub(crate) watches: Vec, @@ -26,19 +35,35 @@ pub struct ControllerBuilder { /// watcher tasks and runtime tasks. buffer: usize, /// Registered webhooks. - pub(crate) webhooks: BTreeMap>, + #[cfg(feature = "admission-webhook")] + pub(crate) webhooks: + BTreeMap,)>>, } +// pub trait AsyncPtrFuture: std::future::Future> + Send + 'static {} + +// pub type AsyncFnPtrReturn = std::pin::Pin +// > +// >; + +// pub trait AsyncFn: Fn(O::Manifest, &::SharedState) -> AsyncFnPtrReturn {} + +// pub type AsyncFnPtr = Box< +// dyn AsyncFn +// >; + impl ControllerBuilder { /// Create builder from operator singleton. pub fn new(operator: O) -> Self { ControllerBuilder { - controller: operator, + controller: Arc::new(operator), watches: vec![], owns: vec![], namespace: None, list_params: Default::default(), buffer: 32, + #[cfg(feature = "admission-webhook")] webhooks: BTreeMap::new(), } } @@ -173,25 +198,72 @@ impl ControllerBuilder { /// Registers a webhook at the path "/$GROUP/$VERSION/$KIND". /// Multiple webhooks can be registered, but must be at different paths. #[cfg(feature = "admission-webhook")] - pub fn with_webhook(mut self, f: WebhookFn) -> Self { + pub fn with_webhook(mut self, f: F) -> Self + where + R: GenericFuture, + F: GenericAsyncFn, + { let path = format!( "/{}/{}/{}", O::Manifest::group(&()), O::Manifest::version(&()), O::Manifest::kind(&()) ); - self.webhooks.insert(path, f); + let filter = create_boxed_endpoint(Arc::clone(&self.controller), path.to_string(), f); + self.webhooks.insert(path.to_string(), filter); self } /// Registers a webhook at the supplied path. #[cfg(feature = "admission-webhook")] - pub fn with_webhook_at_path(mut self, path: &str, f: WebhookFn) -> Self { - self.webhooks.insert(path.to_string(), f); + pub fn with_webhook_at_path(mut self, path: &str, f: F) -> Self + where + R: GenericFuture, + F: GenericAsyncFn, + { + let filter = create_boxed_endpoint(Arc::clone(&self.controller), path.to_string(), f); + self.webhooks.insert(path.to_string(), filter); self } } +#[cfg(feature = "admission-webhook")] +pub trait GenericFuture: + 'static + std::future::Future> + Send +{ +} + +#[cfg(feature = "admission-webhook")] +impl< + O: Operator, + T: 'static + std::future::Future> + Send, + > GenericFuture for T +{ +} + +#[cfg(feature = "admission-webhook")] +pub trait GenericAsyncFn: + 'static + + Clone + + Send + + Sync + + Fn(O::Manifest, Arc::SharedState>>) -> R +{ +} + +#[cfg(feature = "admission-webhook")] +impl< + O: Operator, + R, + T: 'static + + Clone + + Send + + Sync + + Fn(O::Manifest, Arc::SharedState>>) -> R, + > GenericAsyncFn for T +{ +} + #[derive(Clone)] pub struct Controller { pub manages: WatchHandle, diff --git a/krator/src/manager/tasks.rs b/krator/src/manager/tasks.rs index 7b6c578..eeba4d4 100644 --- a/krator/src/manager/tasks.rs +++ b/krator/src/manager/tasks.rs @@ -197,16 +197,21 @@ pub(crate) fn controller_tasks( kubeconfig: kube::Config, controller: ControllerBuilder, store: Store, -) -> (Arc, Controller, Vec) { +) -> (Controller, Vec) { let mut watches = Vec::new(); let mut owns = Vec::new(); let mut tasks = Vec::new(); let buffer = controller.buffer(); let (manages, rx) = controller.manages().handle(buffer); - let operator = Arc::new(controller.controller); // Create main Operator task. - let task = launch_runtime(kubeconfig, Arc::clone(&operator), rx, store.clone()).boxed(); + let task = launch_runtime( + kubeconfig, + Arc::clone(&controller.controller), + rx, + store.clone(), + ) + .boxed(); tasks.push(task); for watch in controller.watches { @@ -224,7 +229,6 @@ pub(crate) fn controller_tasks( } ( - operator, Controller { manages, owns, diff --git a/krator/src/operator.rs b/krator/src/operator.rs index 6299fa2..13cde9a 100644 --- a/krator/src/operator.rs +++ b/krator/src/operator.rs @@ -55,12 +55,12 @@ pub trait Operator: 'static + Sync + Send { Ok(()) } - // #[cfg(feature = "admission-webhook")] - // /// Invoked when object is created or modified. Can mutate the and / or deny the request. - // async fn admission_hook( - // &self, - // manifest: Self::Manifest, - // ) -> crate::admission::AdmissionResult; + #[cfg(feature = "admission-webhook")] + /// Invoked when object is created or modified. Can mutate the and / or deny the request. + async fn admission_hook( + &self, + manifest: Self::Manifest, + ) -> crate::admission::AdmissionResult; #[cfg(feature = "admission-webhook")] /// Gets called by the operator if the admission-webhook feature is enabled. The function should