Skip to content
This repository has been archived by the owner on May 22, 2024. It is now read-only.

Commit

Permalink
Initial webhook spike
Browse files Browse the repository at this point in the history
  • Loading branch information
kflansburg committed Oct 12, 2021
1 parent 047eb1e commit 6460a04
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 108 deletions.
84 changes: 50 additions & 34 deletions krator/examples/moose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,25 +352,25 @@ impl Operator for MooseTracker {
Arc::clone(&self.shared)
}

#[cfg(feature = "admission-webhook")]
async fn admission_hook(
&self,
manifest: Self::Manifest,
) -> krator::admission::AdmissionResult<Self::Manifest> {
use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status;
// All moose names start with "M"
let name = manifest.meta().name.clone().unwrap();
info!("Processing admission hook for moose named {}", name);
match name.chars().next() {
Some('m') | Some('M') => krator::admission::AdmissionResult::Allow(manifest),
_ => krator::admission::AdmissionResult::Deny(Status {
code: Some(400),
message: Some("Mooses may only have names starting with 'M'.".to_string()),
status: Some("Failure".to_string()),
..Default::default()
}),
}
}
// #[cfg(feature = "admission-webhook")]
// async fn admission_hook(
// &self,
// manifest: Self::Manifest,
// ) -> krator::admission::AdmissionResult<Self::Manifest> {
// use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status;
// // All moose names start with "M"
// let name = manifest.meta().name.clone().unwrap();
// info!("Processing admission hook for moose named {}", name);
// match name.chars().next() {
// Some('m') | Some('M') => krator::admission::AdmissionResult::Allow(manifest),
// _ => krator::admission::AdmissionResult::Deny(Status {
// code: Some(400),
// message: Some("Mooses may only have names starting with 'M'.".to_string()),
// status: Some("Failure".to_string()),
// ..Default::default()
// }),
// }
// }

#[cfg(feature = "admission-webhook")]
async fn admission_hook_tls(&self) -> anyhow::Result<krator::admission::AdmissionTls> {
Expand All @@ -386,6 +386,26 @@ impl Operator for MooseTracker {
}
}

#[cfg(feature = "admission-webhook")]
fn admission_hook(
manifest: Moose,
_shared: &SharedMooseState,
) -> krator::admission::AdmissionResult<Moose> {
use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status;
// All moose names start with "M"
let name = manifest.meta().name.clone().unwrap();
info!("Processing admission hook for moose named {}", name);
match name.chars().next() {
Some('m') | Some('M') => krator::admission::AdmissionResult::Allow(manifest),
_ => krator::admission::AdmissionResult::Deny(Status {
code: Some(400),
message: Some("Mooses may only have names starting with 'M'.".to_string()),
status: Some("Failure".to_string()),
..Default::default()
}),
}
}

#[derive(Debug, StructOpt)]
#[structopt(
name = "moose",
Expand Down Expand Up @@ -528,20 +548,16 @@ Running moose example. Try to install some of the manifests provided in examples
"#
);

// New API does not currently support Webhooks, so use legacy API if enabled.
#[cfg(feature = "admission-webhook")]
{
use krator::OperatorRuntime;
let mut runtime = OperatorRuntime::new(&kubeconfig, tracker, Some(params));
runtime.start().await;
}
#[cfg(not(feature = "admission-webhook"))]
{
use krator::{ControllerBuilder, Manager};
let mut manager = Manager::new(&kubeconfig);
let controller = ControllerBuilder::new(tracker).with_params(params);
manager.register_controller(controller);
manager.start().await;
}
use krator::{ControllerBuilder, Manager};
let mut manager = Manager::new(&kubeconfig);
let controller = if cfg!(admission_webhook) {
ControllerBuilder::new(tracker)
.with_params(params)
.with_webhook(admission_hook)
} else {
ControllerBuilder::new(tracker).with_params(params)
};
manager.register_controller(controller);
manager.start().await;
Ok(())
}
89 changes: 58 additions & 31 deletions krator/src/admission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::{
sync::Arc,
};
use tracing::{info, trace, warn};
use tracing_futures::Instrument;

/// WebhookResources encapsulates Kubernetes resources necessary to register the admission webhook.
/// and provides some convenience functions
Expand Down Expand Up @@ -321,9 +320,9 @@ impl AdmissionTls {
}

/// Type signature for validating or mutating webhooks.
pub type WebhookFn<C> = dyn Fn(
pub type WebhookFn<C> = fn(
<C as Operator>::Manifest,
<<C as Operator>::ObjectState as ObjectState>::SharedState,
&<<C as Operator>::ObjectState as ObjectState>::SharedState,
) -> AdmissionResult<<C as Operator>::Manifest>;

/// Result of admission hook.
Expand Down Expand Up @@ -443,7 +442,7 @@ struct AdmissionReviewResponse {

#[tracing::instrument(
level="debug",
skip(operator, request),
skip(operator, request, hook),
fields(
name=%request.request.name(),
namespace=?request.request.namespace(),
Expand All @@ -455,7 +454,8 @@ struct AdmissionReviewResponse {
async fn review<O: Operator>(
operator: Arc<O>,
request: AdmissionReviewRequest<O::Manifest>,
) -> warp::reply::Json {
hook: &WebhookFn<O>,
) -> warp::reply::WithStatus<warp::reply::Json> {
let manifest = match request.request.operation {
AdmissionRequestOperation::Create { object, .. } => object,
AdmissionRequestOperation::Update {
Expand All @@ -478,12 +478,14 @@ async fn review<O: Operator>(
let name = manifest.name();
let namespace = manifest.namespace();

let span = tracing::debug_span!("Operator::admission_hook",);
// let span = tracing::debug_span!("hook",);

let result = operator
.admission_hook(manifest.clone())
.instrument(span)
.await;
let shared = operator.shared_state().await;

let result = {
let shared = shared.read().await;
hook(manifest.clone(), &*shared)
};

let response = match result {
AdmissionResult::Allow(new_manifest) => {
Expand Down Expand Up @@ -530,35 +532,60 @@ async fn review<O: Operator>(
}
}
};
warp::reply::json(&AdmissionReviewResponse {
api_version: request.api_version,
kind: request.kind,
response,
})
warp::reply::with_status(
warp::reply::json(&AdmissionReviewResponse {
api_version: request.api_version,
kind: request.kind,
response,
}),
warp::http::StatusCode::OK,
)
}

pub(crate) async fn endpoint<O: Operator>(operator: Arc<O>) {
let tls = operator
.admission_hook_tls()
.await
.expect("getting webhook tls AdmissionTls failed");

use warp::Rejection;
pub(crate) fn create_endpoint<O: Operator>(
operator: Arc<O>,
path: String,
f: WebhookFn<O>,
) -> impl warp::Filter<Extract = (warp::reply::WithStatus<warp::reply::Json>,), Error = Rejection> + Clone
{
use warp::Filter;
let routes = warp::any()
warp::path(path)
.and(warp::path::end())
.and(warp::post())
.and(warp::body::json())
.and_then(move |request: AdmissionReviewRequest<O::Manifest>| {
let operator = Arc::clone(&operator);
async move {
let response = review(operator, request).await;
let response = review(operator, request, &f).await;
Ok::<_, std::convert::Infallible>(response)
}
});

warp::serve(routes)
.tls()
.cert(tls.cert)
.key(tls.private_key)
.run(([0, 0, 0, 0], 8443))
.await;
})
}

pub(crate) async fn endpoint<O: Operator>(_operator: Arc<O>) {
todo!()
// let tls = operator
// .admission_hook_tls()
// .await
// .expect("getting webhook tls AdmissionTls failed");

// use warp::Filter;
// let routes = warp::any()
// .and(warp::post())
// .and(warp::body::json())
// .and_then(move |request: AdmissionReviewRequest<O::Manifest>| {
// let operator = Arc::clone(&operator);
// async move {
// let response = review(operator, request).await;
// Ok::<_, std::convert::Infallible>(response)
// }
// });

// warp::serve(routes)
// .tls()
// .cert(tls.cert)
// .key(tls.private_key)
// .run(([0, 0, 0, 0], 8443))
// .await;
}
4 changes: 0 additions & 4 deletions krator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,8 @@ pub mod admission;

pub mod state;

// TODO: Remove once webhooks are supported.
#[cfg(not(feature = "admission-webhook"))]
mod manager;
#[cfg(not(feature = "admission-webhook"))]
pub use manager::controller::ControllerBuilder;
#[cfg(not(feature = "admission-webhook"))]
pub use manager::Manager;

pub use manifest::Manifest;
Expand Down
50 changes: 48 additions & 2 deletions krator/src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Defines types for registering controllers with runtime.
use crate::{operator::Operator, store::Store};

use std::sync::Arc;
#[cfg(feature = "admission-webhook")]
use warp::Filter;
pub mod tasks;
use tasks::{controller_tasks, OperatorTask};

Expand All @@ -20,23 +22,57 @@ pub struct Manager {
controllers: Vec<Controller>,
controller_tasks: Vec<OperatorTask>,
store: Store,
#[cfg(feature = "admission-webhook")]
filter: warp::filters::BoxedFilter<(warp::reply::WithStatus<warp::reply::Json>,)>,
}

fn not_found() -> warp::reply::WithStatus<warp::reply::Json> {
warp::reply::with_status(warp::reply::json(&()), warp::http::StatusCode::NOT_FOUND)
}

impl Manager {
/// Create a new controller manager.
pub fn new(kubeconfig: &kube::Config) -> Self {
#[cfg(feature = "admission-webhook")]
let filter = { warp::any().map(not_found).boxed() };

Manager {
controllers: vec![],
controller_tasks: vec![],
kubeconfig: kubeconfig.clone(),
store: Store::new(),
#[cfg(feature = "admission-webhook")]
filter,
}
}

/// Register a controller with the manager.
pub fn register_controller<C: Operator>(&mut self, builder: ControllerBuilder<C>) {
let (controller, tasks) =
let webhooks = builder.webhooks.clone();

let (operator, controller, tasks) =
controller_tasks(self.kubeconfig.clone(), builder, self.store.clone());

#[cfg(feature = "admission-webhook")]
for (path, f) in webhooks {
let endpoint =
crate::admission::create_endpoint::<C>(Arc::clone(&operator), path.to_string(), f);

// Create temporary variable w/ throwaway filter of correct type.
let mut temp = warp::any().map(not_found).boxed();

// Swap self.filter into temporary.
std::mem::swap(&mut temp, &mut self.filter);

// Compose new filter from new endpoint and temporary (now holding original self.filter).
let mut new_filter = endpoint.or(temp).unify().boxed();

// Swap new filter back into self.filter.
std::mem::swap(&mut new_filter, &mut self.filter);

// Throwaway filter stored in new_filter implicitly dropped.
}

self.controllers.push(controller);
self.controller_tasks.extend(tasks);
}
Expand All @@ -62,6 +98,16 @@ impl Manager {
}
}

#[cfg(feature = "admission-webhook")]
{
let task = warp::serve(self.filter)
// .tls()
// .cert(tls.cert)
// .key(tls.private_key)
.run(([0, 0, 0, 0], 8443));
tasks.push(task.boxed());
}

futures::future::join_all(tasks).await;
}
}
Loading

0 comments on commit 6460a04

Please sign in to comment.