Skip to content
This repository has been archived by the owner on May 22, 2024. It is now read-only.

Webhooks for Controller Manager API #55

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 33 additions & 13 deletions krator/examples/moose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,26 @@ impl Operator for MooseTracker {
}
}

#[cfg(feature = "admission-webhook")]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Figure out how to remove the original trait method, but still support the old API.

async fn admission_hook(
manifest: Moose,
_shared: Arc<RwLock<SharedMooseState>>,
) -> krator::admission::AdmissionResult<Moose> {
use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status;
// All moose names start with "M"
let name = manifest.meta().name.clone().unwrap();
info!("Processing admission hook for moose named {}", name);
match name.chars().next() {
Some('m') | Some('M') => krator::admission::AdmissionResult::Allow(manifest),
_ => krator::admission::AdmissionResult::Deny(Status {
code: Some(400),
message: Some("Mooses may only have names starting with 'M'.".to_string()),
status: Some("Failure".to_string()),
..Default::default()
}),
}
}

#[derive(Debug, StructOpt)]
#[structopt(
name = "moose",
Expand Down Expand Up @@ -528,20 +548,20 @@ Running moose example. Try to install some of the manifests provided in examples
"#
);

// New API does not currently support Webhooks, so use legacy API if enabled.
use krator::{ControllerBuilder, Manager};
let mut manager = Manager::new(&kubeconfig);

#[cfg(feature = "admission-webhook")]
{
use krator::OperatorRuntime;
let mut runtime = OperatorRuntime::new(&kubeconfig, tracker, Some(params));
runtime.start().await;
}
let controller = {
ControllerBuilder::new(tracker)
.with_params(params)
.with_webhook(admission_hook)
};

#[cfg(not(feature = "admission-webhook"))]
{
use krator::{ControllerBuilder, Manager};
let mut manager = Manager::new(&kubeconfig);
let controller = ControllerBuilder::new(tracker).with_params(params);
manager.register_controller(controller);
manager.start().await;
}
let controller = { ControllerBuilder::new(tracker).with_params(params) };

manager.register_controller(controller);
manager.start().await;
Ok(())
}
120 changes: 94 additions & 26 deletions krator/src/admission.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ use std::{
fmt::{Display, Formatter},
sync::Arc,
};
use tokio::sync::RwLock;
use tracing::{info, trace, warn};
use tracing_futures::Instrument;

/// WebhookResources encapsulates Kubernetes resources necessary to register the admission webhook.
/// and provides some convenience functions
Expand Down Expand Up @@ -320,12 +320,6 @@ impl AdmissionTls {
}
}

/// Type signature for validating or mutating webhooks.
pub type WebhookFn<C> = dyn Fn(
<C as Operator>::Manifest,
<<C as Operator>::ObjectState as ObjectState>::SharedState,
) -> AdmissionResult<<C as Operator>::Manifest>;

/// Result of admission hook.
#[allow(clippy::large_enum_variant)]
pub enum AdmissionResult<T> {
Expand Down Expand Up @@ -441,9 +435,44 @@ struct AdmissionReviewResponse {
response: AdmissionResponse,
}

/// Trait for Future which returns AdmissionResult.
pub trait GenericFuture<O: Operator>:
'static + std::future::Future<Output = AdmissionResult<O::Manifest>> + Send
{
}

impl<
O: Operator,
T: 'static + std::future::Future<Output = AdmissionResult<O::Manifest>> + Send,
> GenericFuture<O> for T
{
}

/// Trait to represent an admission review function.
pub trait GenericAsyncFn<O: Operator, R>:
'static
+ Clone
+ Send
+ Sync
+ Fn(O::Manifest, Arc<RwLock<<O::ObjectState as ObjectState>::SharedState>>) -> R
{
}

impl<
O: Operator,
R,
T: 'static
+ Clone
+ Send
+ Sync
+ Fn(O::Manifest, Arc<RwLock<<O::ObjectState as ObjectState>::SharedState>>) -> R,
> GenericAsyncFn<O, R> for T
{
}

#[tracing::instrument(
level="debug",
skip(operator, request),
skip(operator, request, hook),
fields(
name=%request.request.name(),
namespace=?request.request.namespace(),
Expand All @@ -452,10 +481,15 @@ struct AdmissionReviewResponse {
user_info=?request.request.user_info
)
)]
async fn review<O: Operator>(
async fn review<O: Operator, R, F>(
operator: Arc<O>,
request: AdmissionReviewRequest<O::Manifest>,
) -> warp::reply::Json {
hook: F,
) -> warp::reply::WithStatus<warp::reply::Json>
where
R: GenericFuture<O>,
F: GenericAsyncFn<O, R>,
{
let manifest = match request.request.operation {
AdmissionRequestOperation::Create { object, .. } => object,
AdmissionRequestOperation::Update {
Expand All @@ -478,12 +512,9 @@ async fn review<O: Operator>(
let name = manifest.name();
let namespace = manifest.namespace();

let span = tracing::debug_span!("Operator::admission_hook",);
let shared = operator.shared_state().await;

let result = operator
.admission_hook(manifest.clone())
.instrument(span)
.await;
let result = hook(manifest.clone(), shared).await;

let response = match result {
AdmissionResult::Allow(new_manifest) => {
Expand Down Expand Up @@ -530,11 +561,40 @@ async fn review<O: Operator>(
}
}
};
warp::reply::json(&AdmissionReviewResponse {
api_version: request.api_version,
kind: request.kind,
response,
})
warp::reply::with_status(
warp::reply::json(&AdmissionReviewResponse {
api_version: request.api_version,
kind: request.kind,
response,
}),
warp::http::StatusCode::OK,
)
}

pub(crate) fn create_boxed_endpoint<O: Operator, R, F>(
operator: Arc<O>,
path: String,
f: F,
) -> warp::filters::BoxedFilter<(warp::reply::WithStatus<warp::reply::Json>,)>
where
R: GenericFuture<O>,
F: GenericAsyncFn<O, R>,
{
use warp::Filter;

warp::path(path)
.and(warp::path::end())
.and(warp::post())
.map(move || f.clone())
.and(warp::body::json())
.and_then(move |f: F, request: AdmissionReviewRequest<O::Manifest>| {
let operator = Arc::clone(&operator);
async move {
let response = review(operator, request, f).await;
Ok::<_, std::convert::Infallible>(response)
}
})
.boxed()
}

pub(crate) async fn endpoint<O: Operator>(operator: Arc<O>) {
Expand All @@ -545,15 +605,23 @@ pub(crate) async fn endpoint<O: Operator>(operator: Arc<O>) {

use warp::Filter;
let routes = warp::any()
.map(move || Arc::clone(&operator))
.map(|operator: Arc<O>| {
let hook_operator = Arc::clone(&operator);
let func = move |manifest: O::Manifest, _| {
let inner_operator = Arc::clone(&hook_operator);
async move { inner_operator.admission_hook(manifest).await }
};
(operator, func)
})
.and(warp::post())
.and(warp::body::json())
.and_then(move |request: AdmissionReviewRequest<O::Manifest>| {
let operator = Arc::clone(&operator);
async move {
let response = review(operator, request).await;
.and_then(
move |(operator, f), request: AdmissionReviewRequest<O::Manifest>| async move {
let response = review(operator, request, f).await;
Ok::<_, std::convert::Infallible>(response)
}
});
},
);

warp::serve(routes)
.tls()
Expand Down
4 changes: 0 additions & 4 deletions krator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,8 @@ pub mod admission;

pub mod state;

// TODO: Remove once webhooks are supported.
#[cfg(not(feature = "admission-webhook"))]
mod manager;
#[cfg(not(feature = "admission-webhook"))]
pub use manager::controller::ControllerBuilder;
#[cfg(not(feature = "admission-webhook"))]
pub use manager::Manager;

pub use manifest::Manifest;
Expand Down
44 changes: 43 additions & 1 deletion krator/src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! Defines types for registering controllers with runtime.
use crate::{operator::Operator, store::Store};

// use std::sync::Arc;
#[cfg(feature = "admission-webhook")]
use warp::Filter;
pub mod tasks;
use tasks::{controller_tasks, OperatorTask};

Expand All @@ -20,23 +22,53 @@ pub struct Manager {
controllers: Vec<Controller>,
controller_tasks: Vec<OperatorTask>,
store: Store,
#[cfg(feature = "admission-webhook")]
filter: warp::filters::BoxedFilter<(warp::reply::WithStatus<warp::reply::Json>,)>,
}

#[cfg(feature = "admission-webhook")]
fn not_found() -> warp::reply::WithStatus<warp::reply::Json> {
warp::reply::with_status(warp::reply::json(&()), warp::http::StatusCode::NOT_FOUND)
}

impl Manager {
/// Create a new controller manager.
pub fn new(kubeconfig: &kube::Config) -> Self {
#[cfg(feature = "admission-webhook")]
let filter = { warp::any().map(not_found).boxed() };

Manager {
controllers: vec![],
controller_tasks: vec![],
kubeconfig: kubeconfig.clone(),
store: Store::new(),
#[cfg(feature = "admission-webhook")]
filter,
}
}

/// Register a controller with the manager.
pub fn register_controller<C: Operator>(&mut self, builder: ControllerBuilder<C>) {
#[cfg(feature = "admission-webhook")]
for endpoint in builder.webhooks.values() {
// Create temporary variable w/ throwaway filter of correct type.
let mut temp = warp::any().map(not_found).boxed();

// Swap self.filter into temporary.
std::mem::swap(&mut temp, &mut self.filter);

// Compose new filter from new endpoint and temporary (now holding original self.filter).
let mut new_filter = endpoint.clone().or(temp).unify().boxed();

// Swap new filter back into self.filter.
std::mem::swap(&mut new_filter, &mut self.filter);

// Throwaway filter stored in new_filter implicitly dropped.
}

let (controller, tasks) =
controller_tasks(self.kubeconfig.clone(), builder, self.store.clone());

self.controllers.push(controller);
self.controller_tasks.extend(tasks);
}
Expand All @@ -62,6 +94,16 @@ impl Manager {
}
}

#[cfg(feature = "admission-webhook")]
{
let task = warp::serve(self.filter)
// .tls()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Figure out where to load certs from.

// .cert(tls.cert)
// .key(tls.private_key)
.run(([0, 0, 0, 0], 8443));
tasks.push(task.boxed());
}

futures::future::join_all(tasks).await;
}
}
Loading