Skip to content
This repository has been archived by the owner on May 22, 2024. It is now read-only.

Commit

Permalink
Get everything working
Browse files Browse the repository at this point in the history
  • Loading branch information
kflansburg committed Oct 12, 2021
1 parent 6460a04 commit 221ff4b
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 96 deletions.
40 changes: 16 additions & 24 deletions krator/examples/moose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,25 +352,13 @@ impl Operator for MooseTracker {
Arc::clone(&self.shared)
}

// #[cfg(feature = "admission-webhook")]
// async fn admission_hook(
// &self,
// manifest: Self::Manifest,
// ) -> krator::admission::AdmissionResult<Self::Manifest> {
// use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status;
// // All moose names start with "M"
// let name = manifest.meta().name.clone().unwrap();
// info!("Processing admission hook for moose named {}", name);
// match name.chars().next() {
// Some('m') | Some('M') => krator::admission::AdmissionResult::Allow(manifest),
// _ => krator::admission::AdmissionResult::Deny(Status {
// code: Some(400),
// message: Some("Mooses may only have names starting with 'M'.".to_string()),
// status: Some("Failure".to_string()),
// ..Default::default()
// }),
// }
// }
#[cfg(feature = "admission-webhook")]
async fn admission_hook(
&self,
_manifest: Self::Manifest,
) -> crate::admission::AdmissionResult<Self::Manifest> {
unimplemented!()
}

#[cfg(feature = "admission-webhook")]
async fn admission_hook_tls(&self) -> anyhow::Result<krator::admission::AdmissionTls> {
Expand All @@ -387,9 +375,9 @@ impl Operator for MooseTracker {
}

#[cfg(feature = "admission-webhook")]
fn admission_hook(
async fn admission_hook(
manifest: Moose,
_shared: &SharedMooseState,
_shared: Arc<RwLock<SharedMooseState>>,
) -> krator::admission::AdmissionResult<Moose> {
use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status;
// All moose names start with "M"
Expand Down Expand Up @@ -550,13 +538,17 @@ Running moose example. Try to install some of the manifests provided in examples

use krator::{ControllerBuilder, Manager};
let mut manager = Manager::new(&kubeconfig);
let controller = if cfg!(admission_webhook) {

#[cfg(feature = "admission-webhook")]
let controller = {
ControllerBuilder::new(tracker)
.with_params(params)
.with_webhook(admission_hook)
} else {
ControllerBuilder::new(tracker).with_params(params)
};

#[cfg(not(feature = "admission-webhook"))]
let controller = { ControllerBuilder::new(tracker).with_params(params) };

manager.register_controller(controller);
manager.start().await;
Ok(())
Expand Down
100 changes: 57 additions & 43 deletions krator/src/admission.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Basic implementation of Kubernetes Admission API
use crate::ObjectState;
use crate::manager::controller::{GenericAsyncFn, GenericFuture};
use crate::Operator;
use anyhow::{bail, ensure, Context};
use k8s_openapi::{
Expand Down Expand Up @@ -320,11 +320,12 @@ impl AdmissionTls {
}

/// Type signature for validating or mutating webhooks.
pub type WebhookFn<C> = fn(
<C as Operator>::Manifest,
&<<C as Operator>::ObjectState as ObjectState>::SharedState,
) -> AdmissionResult<<C as Operator>::Manifest>;
// pub type WebhookFn<O: Operator> = fn(
// O::Manifest,
// &<O::ObjectState as ObjectState>::SharedState,
// ) -> std::pin::Pin<Box<dyn std::future::Future<Output=AdmissionResult<O::Manifest>> + Send + 'static>>;

//std::pin::Pin<Box<dyn std::future::Future<Output=AdmissionResult<<C as Operator>::Manifest>> + Send>>
/// Result of admission hook.
#[allow(clippy::large_enum_variant)]
pub enum AdmissionResult<T> {
Expand Down Expand Up @@ -451,11 +452,15 @@ struct AdmissionReviewResponse {
user_info=?request.request.user_info
)
)]
async fn review<O: Operator>(
async fn review<O: Operator, R, F>(
operator: Arc<O>,
request: AdmissionReviewRequest<O::Manifest>,
hook: &WebhookFn<O>,
) -> warp::reply::WithStatus<warp::reply::Json> {
hook: F,
) -> warp::reply::WithStatus<warp::reply::Json>
where
R: GenericFuture<O>,
F: GenericAsyncFn<O, R>,
{
let manifest = match request.request.operation {
AdmissionRequestOperation::Create { object, .. } => object,
AdmissionRequestOperation::Update {
Expand All @@ -482,10 +487,7 @@ async fn review<O: Operator>(

let shared = operator.shared_state().await;

let result = {
let shared = shared.read().await;
hook(manifest.clone(), &*shared)
};
let result = hook(manifest.clone(), shared).await;

let response = match result {
AdmissionResult::Allow(new_manifest) => {
Expand Down Expand Up @@ -542,50 +544,62 @@ async fn review<O: Operator>(
)
}

use warp::Rejection;
pub(crate) fn create_endpoint<O: Operator>(
pub(crate) fn create_boxed_endpoint<O: Operator, R, F>(
operator: Arc<O>,
path: String,
f: WebhookFn<O>,
) -> impl warp::Filter<Extract = (warp::reply::WithStatus<warp::reply::Json>,), Error = Rejection> + Clone
f: F,
) -> warp::filters::BoxedFilter<(warp::reply::WithStatus<warp::reply::Json>,)>
where
R: GenericFuture<O>,
F: GenericAsyncFn<O, R>,
{
use warp::Filter;

warp::path(path)
.and(warp::path::end())
.and(warp::post())
.map(move || f.clone())
.and(warp::body::json())
.and_then(move |request: AdmissionReviewRequest<O::Manifest>| {
.and_then(move |f: F, request: AdmissionReviewRequest<O::Manifest>| {
let operator = Arc::clone(&operator);
async move {
let response = review(operator, request, &f).await;
let response = review(operator, request, f).await;
Ok::<_, std::convert::Infallible>(response)
}
})
.boxed()
}

pub(crate) async fn endpoint<O: Operator>(_operator: Arc<O>) {
todo!()
// let tls = operator
// .admission_hook_tls()
// .await
// .expect("getting webhook tls AdmissionTls failed");

// use warp::Filter;
// let routes = warp::any()
// .and(warp::post())
// .and(warp::body::json())
// .and_then(move |request: AdmissionReviewRequest<O::Manifest>| {
// let operator = Arc::clone(&operator);
// async move {
// let response = review(operator, request).await;
// Ok::<_, std::convert::Infallible>(response)
// }
// });

// warp::serve(routes)
// .tls()
// .cert(tls.cert)
// .key(tls.private_key)
// .run(([0, 0, 0, 0], 8443))
// .await;
pub(crate) async fn endpoint<O: Operator>(operator: Arc<O>) {
let tls = operator
.admission_hook_tls()
.await
.expect("getting webhook tls AdmissionTls failed");

use warp::Filter;
let routes = warp::any()
.map(move || Arc::clone(&operator))
.map(|operator: Arc<O>| {
let hook_operator = Arc::clone(&operator);
let func = move |manifest: O::Manifest, _| {
let inner_operator = Arc::clone(&hook_operator);
async move { inner_operator.admission_hook(manifest).await }
};
(operator, func)
})
.and(warp::post())
.and(warp::body::json())
.and_then(
move |(operator, f), request: AdmissionReviewRequest<O::Manifest>| async move {
let response = review(operator, request, f).await;
Ok::<_, std::convert::Infallible>(response)
},
);

warp::serve(routes)
.tls()
.cert(tls.cert)
.key(tls.private_key)
.run(([0, 0, 0, 0], 8443))
.await;
}
18 changes: 7 additions & 11 deletions krator/src/manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Defines types for registering controllers with runtime.
use crate::{operator::Operator, store::Store};
use std::sync::Arc;
// use std::sync::Arc;
#[cfg(feature = "admission-webhook")]
use warp::Filter;
pub mod tasks;
Expand All @@ -26,6 +26,7 @@ pub struct Manager {
filter: warp::filters::BoxedFilter<(warp::reply::WithStatus<warp::reply::Json>,)>,
}

#[cfg(feature = "admission-webhook")]
fn not_found() -> warp::reply::WithStatus<warp::reply::Json> {
warp::reply::with_status(warp::reply::json(&()), warp::http::StatusCode::NOT_FOUND)
}
Expand All @@ -48,31 +49,26 @@ impl Manager {

/// Register a controller with the manager.
pub fn register_controller<C: Operator>(&mut self, builder: ControllerBuilder<C>) {
let webhooks = builder.webhooks.clone();

let (operator, controller, tasks) =
controller_tasks(self.kubeconfig.clone(), builder, self.store.clone());

#[cfg(feature = "admission-webhook")]
for (path, f) in webhooks {
let endpoint =
crate::admission::create_endpoint::<C>(Arc::clone(&operator), path.to_string(), f);

for endpoint in builder.webhooks.values() {
// Create temporary variable w/ throwaway filter of correct type.
let mut temp = warp::any().map(not_found).boxed();

// Swap self.filter into temporary.
std::mem::swap(&mut temp, &mut self.filter);

// Compose new filter from new endpoint and temporary (now holding original self.filter).
let mut new_filter = endpoint.or(temp).unify().boxed();
let mut new_filter = endpoint.clone().or(temp).unify().boxed();

// Swap new filter back into self.filter.
std::mem::swap(&mut new_filter, &mut self.filter);

// Throwaway filter stored in new_filter implicitly dropped.
}

let (controller, tasks) =
controller_tasks(self.kubeconfig.clone(), builder, self.store.clone());

self.controllers.push(controller);
self.controller_tasks.extend(tasks);
}
Expand Down
Loading

0 comments on commit 221ff4b

Please sign in to comment.