From 6460a04f01c21b5c312d4ee7d0cf891d46278024 Mon Sep 17 00:00:00 2001 From: Kevin Flansburg Date: Mon, 11 Oct 2021 10:19:23 -0700 Subject: [PATCH] Initial webhook spike --- krator/examples/moose.rs | 84 ++++++++++++++++++------------ krator/src/admission.rs | 89 +++++++++++++++++++++----------- krator/src/lib.rs | 4 -- krator/src/manager.rs | 50 +++++++++++++++++- krator/src/manager/controller.rs | 38 +++++++------- krator/src/manager/tasks.rs | 12 +++-- krator/src/operator.rs | 12 ++--- krator/src/runtime.rs | 5 +- krator/src/store.rs | 4 -- 9 files changed, 190 insertions(+), 108 deletions(-) diff --git a/krator/examples/moose.rs b/krator/examples/moose.rs index f7e25aa..8e33539 100644 --- a/krator/examples/moose.rs +++ b/krator/examples/moose.rs @@ -352,25 +352,25 @@ impl Operator for MooseTracker { Arc::clone(&self.shared) } - #[cfg(feature = "admission-webhook")] - async fn admission_hook( - &self, - manifest: Self::Manifest, - ) -> krator::admission::AdmissionResult { - use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status; - // All moose names start with "M" - let name = manifest.meta().name.clone().unwrap(); - info!("Processing admission hook for moose named {}", name); - match name.chars().next() { - Some('m') | Some('M') => krator::admission::AdmissionResult::Allow(manifest), - _ => krator::admission::AdmissionResult::Deny(Status { - code: Some(400), - message: Some("Mooses may only have names starting with 'M'.".to_string()), - status: Some("Failure".to_string()), - ..Default::default() - }), - } - } + // #[cfg(feature = "admission-webhook")] + // async fn admission_hook( + // &self, + // manifest: Self::Manifest, + // ) -> krator::admission::AdmissionResult { + // use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status; + // // All moose names start with "M" + // let name = manifest.meta().name.clone().unwrap(); + // info!("Processing admission hook for moose named {}", name); + // match name.chars().next() { + // Some('m') | Some('M') => krator::admission::AdmissionResult::Allow(manifest), + // _ => krator::admission::AdmissionResult::Deny(Status { + // code: Some(400), + // message: Some("Mooses may only have names starting with 'M'.".to_string()), + // status: Some("Failure".to_string()), + // ..Default::default() + // }), + // } + // } #[cfg(feature = "admission-webhook")] async fn admission_hook_tls(&self) -> anyhow::Result { @@ -386,6 +386,26 @@ impl Operator for MooseTracker { } } +#[cfg(feature = "admission-webhook")] +fn admission_hook( + manifest: Moose, + _shared: &SharedMooseState, +) -> krator::admission::AdmissionResult { + use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status; + // All moose names start with "M" + let name = manifest.meta().name.clone().unwrap(); + info!("Processing admission hook for moose named {}", name); + match name.chars().next() { + Some('m') | Some('M') => krator::admission::AdmissionResult::Allow(manifest), + _ => krator::admission::AdmissionResult::Deny(Status { + code: Some(400), + message: Some("Mooses may only have names starting with 'M'.".to_string()), + status: Some("Failure".to_string()), + ..Default::default() + }), + } +} + #[derive(Debug, StructOpt)] #[structopt( name = "moose", @@ -528,20 +548,16 @@ Running moose example. Try to install some of the manifests provided in examples "# ); - // New API does not currently support Webhooks, so use legacy API if enabled. - #[cfg(feature = "admission-webhook")] - { - use krator::OperatorRuntime; - let mut runtime = OperatorRuntime::new(&kubeconfig, tracker, Some(params)); - runtime.start().await; - } - #[cfg(not(feature = "admission-webhook"))] - { - use krator::{ControllerBuilder, Manager}; - let mut manager = Manager::new(&kubeconfig); - let controller = ControllerBuilder::new(tracker).with_params(params); - manager.register_controller(controller); - manager.start().await; - } + use krator::{ControllerBuilder, Manager}; + let mut manager = Manager::new(&kubeconfig); + let controller = if cfg!(admission_webhook) { + ControllerBuilder::new(tracker) + .with_params(params) + .with_webhook(admission_hook) + } else { + ControllerBuilder::new(tracker).with_params(params) + }; + manager.register_controller(controller); + manager.start().await; Ok(()) } diff --git a/krator/src/admission.rs b/krator/src/admission.rs index 2bf8bbc..dcba1e2 100644 --- a/krator/src/admission.rs +++ b/krator/src/admission.rs @@ -20,7 +20,6 @@ use std::{ sync::Arc, }; use tracing::{info, trace, warn}; -use tracing_futures::Instrument; /// WebhookResources encapsulates Kubernetes resources necessary to register the admission webhook. /// and provides some convenience functions @@ -321,9 +320,9 @@ impl AdmissionTls { } /// Type signature for validating or mutating webhooks. -pub type WebhookFn = dyn Fn( +pub type WebhookFn = fn( ::Manifest, - <::ObjectState as ObjectState>::SharedState, + &<::ObjectState as ObjectState>::SharedState, ) -> AdmissionResult<::Manifest>; /// Result of admission hook. @@ -443,7 +442,7 @@ struct AdmissionReviewResponse { #[tracing::instrument( level="debug", - skip(operator, request), + skip(operator, request, hook), fields( name=%request.request.name(), namespace=?request.request.namespace(), @@ -455,7 +454,8 @@ struct AdmissionReviewResponse { async fn review( operator: Arc, request: AdmissionReviewRequest, -) -> warp::reply::Json { + hook: &WebhookFn, +) -> warp::reply::WithStatus { let manifest = match request.request.operation { AdmissionRequestOperation::Create { object, .. } => object, AdmissionRequestOperation::Update { @@ -478,12 +478,14 @@ async fn review( let name = manifest.name(); let namespace = manifest.namespace(); - let span = tracing::debug_span!("Operator::admission_hook",); + // let span = tracing::debug_span!("hook",); - let result = operator - .admission_hook(manifest.clone()) - .instrument(span) - .await; + let shared = operator.shared_state().await; + + let result = { + let shared = shared.read().await; + hook(manifest.clone(), &*shared) + }; let response = match result { AdmissionResult::Allow(new_manifest) => { @@ -530,35 +532,60 @@ async fn review( } } }; - warp::reply::json(&AdmissionReviewResponse { - api_version: request.api_version, - kind: request.kind, - response, - }) + warp::reply::with_status( + warp::reply::json(&AdmissionReviewResponse { + api_version: request.api_version, + kind: request.kind, + response, + }), + warp::http::StatusCode::OK, + ) } -pub(crate) async fn endpoint(operator: Arc) { - let tls = operator - .admission_hook_tls() - .await - .expect("getting webhook tls AdmissionTls failed"); - +use warp::Rejection; +pub(crate) fn create_endpoint( + operator: Arc, + path: String, + f: WebhookFn, +) -> impl warp::Filter,), Error = Rejection> + Clone +{ use warp::Filter; - let routes = warp::any() + warp::path(path) + .and(warp::path::end()) .and(warp::post()) .and(warp::body::json()) .and_then(move |request: AdmissionReviewRequest| { let operator = Arc::clone(&operator); async move { - let response = review(operator, request).await; + let response = review(operator, request, &f).await; Ok::<_, std::convert::Infallible>(response) } - }); - - warp::serve(routes) - .tls() - .cert(tls.cert) - .key(tls.private_key) - .run(([0, 0, 0, 0], 8443)) - .await; + }) +} + +pub(crate) async fn endpoint(_operator: Arc) { + todo!() + // let tls = operator + // .admission_hook_tls() + // .await + // .expect("getting webhook tls AdmissionTls failed"); + + // use warp::Filter; + // let routes = warp::any() + // .and(warp::post()) + // .and(warp::body::json()) + // .and_then(move |request: AdmissionReviewRequest| { + // let operator = Arc::clone(&operator); + // async move { + // let response = review(operator, request).await; + // Ok::<_, std::convert::Infallible>(response) + // } + // }); + + // warp::serve(routes) + // .tls() + // .cert(tls.cert) + // .key(tls.private_key) + // .run(([0, 0, 0, 0], 8443)) + // .await; } diff --git a/krator/src/lib.rs b/krator/src/lib.rs index 92be9f5..bebd662 100644 --- a/krator/src/lib.rs +++ b/krator/src/lib.rs @@ -14,12 +14,8 @@ pub mod admission; pub mod state; -// TODO: Remove once webhooks are supported. -#[cfg(not(feature = "admission-webhook"))] mod manager; -#[cfg(not(feature = "admission-webhook"))] pub use manager::controller::ControllerBuilder; -#[cfg(not(feature = "admission-webhook"))] pub use manager::Manager; pub use manifest::Manifest; diff --git a/krator/src/manager.rs b/krator/src/manager.rs index d56e84f..5089b8b 100644 --- a/krator/src/manager.rs +++ b/krator/src/manager.rs @@ -1,6 +1,8 @@ //! Defines types for registering controllers with runtime. use crate::{operator::Operator, store::Store}; - +use std::sync::Arc; +#[cfg(feature = "admission-webhook")] +use warp::Filter; pub mod tasks; use tasks::{controller_tasks, OperatorTask}; @@ -20,23 +22,57 @@ pub struct Manager { controllers: Vec, controller_tasks: Vec, store: Store, + #[cfg(feature = "admission-webhook")] + filter: warp::filters::BoxedFilter<(warp::reply::WithStatus,)>, +} + +fn not_found() -> warp::reply::WithStatus { + warp::reply::with_status(warp::reply::json(&()), warp::http::StatusCode::NOT_FOUND) } impl Manager { /// Create a new controller manager. pub fn new(kubeconfig: &kube::Config) -> Self { + #[cfg(feature = "admission-webhook")] + let filter = { warp::any().map(not_found).boxed() }; + Manager { controllers: vec![], controller_tasks: vec![], kubeconfig: kubeconfig.clone(), store: Store::new(), + #[cfg(feature = "admission-webhook")] + filter, } } /// Register a controller with the manager. pub fn register_controller(&mut self, builder: ControllerBuilder) { - let (controller, tasks) = + let webhooks = builder.webhooks.clone(); + + let (operator, controller, tasks) = controller_tasks(self.kubeconfig.clone(), builder, self.store.clone()); + + #[cfg(feature = "admission-webhook")] + for (path, f) in webhooks { + let endpoint = + crate::admission::create_endpoint::(Arc::clone(&operator), path.to_string(), f); + + // Create temporary variable w/ throwaway filter of correct type. + let mut temp = warp::any().map(not_found).boxed(); + + // Swap self.filter into temporary. + std::mem::swap(&mut temp, &mut self.filter); + + // Compose new filter from new endpoint and temporary (now holding original self.filter). + let mut new_filter = endpoint.or(temp).unify().boxed(); + + // Swap new filter back into self.filter. + std::mem::swap(&mut new_filter, &mut self.filter); + + // Throwaway filter stored in new_filter implicitly dropped. + } + self.controllers.push(controller); self.controller_tasks.extend(tasks); } @@ -62,6 +98,16 @@ impl Manager { } } + #[cfg(feature = "admission-webhook")] + { + let task = warp::serve(self.filter) + // .tls() + // .cert(tls.cert) + // .key(tls.private_key) + .run(([0, 0, 0, 0], 8443)); + tasks.push(task.boxed()); + } + futures::future::join_all(tasks).await; } } diff --git a/krator/src/manager/controller.rs b/krator/src/manager/controller.rs index 06265b8..7bd540f 100644 --- a/krator/src/manager/controller.rs +++ b/krator/src/manager/controller.rs @@ -4,6 +4,8 @@ use crate::admission::WebhookFn; use crate::operator::Watchable; use crate::Operator; use kube::api::ListParams; +use kube::Resource; +use std::collections::BTreeMap; /// Builder pattern for registering a controller or operator. pub struct ControllerBuilder { @@ -23,6 +25,8 @@ pub struct ControllerBuilder { /// The buffer length for Tokio channels used to communicate between /// watcher tasks and runtime tasks. buffer: usize, + /// Registered webhooks. + pub(crate) webhooks: BTreeMap>, } impl ControllerBuilder { @@ -35,6 +39,7 @@ impl ControllerBuilder { namespace: None, list_params: Default::default(), buffer: 32, + webhooks: BTreeMap::new(), } } @@ -165,30 +170,25 @@ impl ControllerBuilder { self } - /// Registers a validating webhook at the path "/$GROUP/$VERSION/$KIND". + /// Registers a webhook at the path "/$GROUP/$VERSION/$KIND". /// Multiple webhooks can be registered, but must be at different paths. #[cfg(feature = "admission-webhook")] - pub(crate) fn validates(self, _f: &WebhookFn) -> Self { - todo!() - } - - /// Registers a validating webhook at the supplied path. - #[cfg(feature = "admission-webhook")] - pub(crate) fn validates_at_path(self, _path: &str, _f: &WebhookFn) -> Self { - todo!() - } - - /// Registers a mutating webhook at the path "/$GROUP/$VERSION/$KIND". - /// Multiple webhooks can be registered, but must be at different paths. - #[cfg(feature = "admission-webhook")] - pub(crate) fn mutates(self, _f: &WebhookFn) -> Self { - todo!() + pub fn with_webhook(mut self, f: WebhookFn) -> Self { + let path = format!( + "/{}/{}/{}", + O::Manifest::group(&()), + O::Manifest::version(&()), + O::Manifest::kind(&()) + ); + self.webhooks.insert(path, f); + self } - /// Registers a mutating webhook at the supplied path. + /// Registers a webhook at the supplied path. #[cfg(feature = "admission-webhook")] - pub(crate) fn mutates_at_path(self, _path: &str, _f: &WebhookFn) -> Self { - todo!() + pub fn with_webhook_at_path(mut self, path: &str, f: WebhookFn) -> Self { + self.webhooks.insert(path.to_string(), f); + self } } diff --git a/krator/src/manager/tasks.rs b/krator/src/manager/tasks.rs index c67f9bd..7b6c578 100644 --- a/krator/src/manager/tasks.rs +++ b/krator/src/manager/tasks.rs @@ -4,9 +4,9 @@ use std::future::Future; use futures::FutureExt; - use kube::{api::ApiResource, api::GroupVersionKind, Resource}; use kube_runtime::watcher::Event; +use std::sync::Arc; use tracing::{debug, info, warn}; use crate::{ @@ -64,7 +64,7 @@ pub(crate) async fn launch_watcher(client: kube::Client, handle: WatchHandle) { /// concrete `Event`. async fn launch_runtime( kubeconfig: kube::Config, - controller: O, + controller: Arc, mut rx: tokio::sync::mpsc::Receiver, store: Store, ) { @@ -197,15 +197,16 @@ pub(crate) fn controller_tasks( kubeconfig: kube::Config, controller: ControllerBuilder, store: Store, -) -> (Controller, Vec) { +) -> (Arc, Controller, Vec) { let mut watches = Vec::new(); let mut owns = Vec::new(); let mut tasks = Vec::new(); let buffer = controller.buffer(); + let (manages, rx) = controller.manages().handle(buffer); + let operator = Arc::new(controller.controller); // Create main Operator task. - let (manages, rx) = controller.manages().handle(buffer); - let task = launch_runtime(kubeconfig, controller.controller, rx, store.clone()).boxed(); + let task = launch_runtime(kubeconfig, Arc::clone(&operator), rx, store.clone()).boxed(); tasks.push(task); for watch in controller.watches { @@ -223,6 +224,7 @@ pub(crate) fn controller_tasks( } ( + operator, Controller { manages, owns, diff --git a/krator/src/operator.rs b/krator/src/operator.rs index 13cde9a..6299fa2 100644 --- a/krator/src/operator.rs +++ b/krator/src/operator.rs @@ -55,12 +55,12 @@ pub trait Operator: 'static + Sync + Send { Ok(()) } - #[cfg(feature = "admission-webhook")] - /// Invoked when object is created or modified. Can mutate the and / or deny the request. - async fn admission_hook( - &self, - manifest: Self::Manifest, - ) -> crate::admission::AdmissionResult; + // #[cfg(feature = "admission-webhook")] + // /// Invoked when object is created or modified. Can mutate the and / or deny the request. + // async fn admission_hook( + // &self, + // manifest: Self::Manifest, + // ) -> crate::admission::AdmissionResult; #[cfg(feature = "admission-webhook")] /// Gets called by the operator if the admission-webhook feature is enabled. The function should diff --git a/krator/src/runtime.rs b/krator/src/runtime.rs index df79d17..7f4c9ec 100644 --- a/krator/src/runtime.rs +++ b/krator/src/runtime.rs @@ -76,10 +76,9 @@ impl OperatorRuntime { } } - #[cfg(not(feature = "admission-webhook"))] pub(crate) fn new_with_store( kubeconfig: &kube::Config, - operator: O, + operator: Arc, params: Option, store: Store, ) -> Self { @@ -89,7 +88,7 @@ impl OperatorRuntime { OperatorRuntime { client, handlers: HashMap::new(), - operator: Arc::new(operator), + operator, list_params, signal: None, store, diff --git a/krator/src/store.rs b/krator/src/store.rs index 2449795..51f59c6 100644 --- a/krator/src/store.rs +++ b/krator/src/store.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; -#[cfg(not(feature = "admission-webhook"))] use kube::api::DynamicObject; use kube::api::GroupVersionKind; @@ -51,7 +50,6 @@ impl Store { } /// Clear cache for specified object kind. - #[cfg(not(feature = "admission-webhook"))] pub(crate) async fn reset(&self, gvk: &GroupVersionKind) { let mut objects = self.objects.write().await; let key = gvk.clone(); @@ -60,7 +58,6 @@ impl Store { } /// Delete a cached object. - #[cfg(not(feature = "admission-webhook"))] pub(crate) async fn delete_gvk( &self, namespace: Option, @@ -75,7 +72,6 @@ impl Store { } /// Insert an object that has already been type erased. - #[cfg(not(feature = "admission-webhook"))] pub(crate) async fn insert_gvk( &self, namespace: Option,