From 30b7b8748017c29e2e5e156aadc31b6edfade786 Mon Sep 17 00:00:00 2001 From: Ho Kim Date: Mon, 29 Jul 2024 06:02:16 +0000 Subject: [PATCH] feat(dash): add storage quota collecting support --- crates/dash/api/src/storage/mod.rs | 2 ++ crates/dash/operator/src/ctx/storage.rs | 9 +++++-- crates/dash/operator/src/validator/storage.rs | 25 +++++++++++++------ crates/dash/provider/src/storage/object.rs | 21 ++++++++++++++-- 4 files changed, 46 insertions(+), 11 deletions(-) diff --git a/crates/dash/api/src/storage/mod.rs b/crates/dash/api/src/storage/mod.rs index 4715b0a7..4a7cb53d 100644 --- a/crates/dash/api/src/storage/mod.rs +++ b/crates/dash/api/src/storage/mod.rs @@ -112,6 +112,8 @@ pub struct ModelStorageStatus { pub state: ModelStorageState, pub kind: Option, pub last_updated: DateTime, + #[serde(default)] + pub total_quota: Option, } #[derive( diff --git a/crates/dash/operator/src/ctx/storage.rs b/crates/dash/operator/src/ctx/storage.rs index 540bf405..54712064 100644 --- a/crates/dash/operator/src/ctx/storage.rs +++ b/crates/dash/operator/src/ctx/storage.rs @@ -56,6 +56,7 @@ impl ::ark_core_k8s::manager::Ctx for Ctx { &name, status.and_then(|status| status.kind.clone()), ModelStorageState::Deleting, + None, ) .await; } else if !data @@ -86,13 +87,14 @@ impl ::ark_core_k8s::manager::Ctx for Ctx { { ModelStorageState::Pending => { match validator.validate_model_storage(&name, &data.spec).await { - Ok(()) => { + Ok(total_quota) => { Self::update_state_or_requeue( &namespace, &manager.kube, &name, Some(data.spec.kind.clone()), ModelStorageState::Ready, + total_quota, ) .await } @@ -136,8 +138,9 @@ impl Ctx { name: &str, kind: Option, state: ModelStorageState, + total_quota: Option, ) -> Result { - match Self::update_state(namespace, kube, name, kind, state).await { + match Self::update_state(namespace, kube, name, kind, state, total_quota).await { Ok(()) => { info!("model storage is ready: {namespace}/{name}"); Ok(Action::requeue( @@ -160,6 +163,7 @@ impl Ctx { name: &str, kind: Option, state: ModelStorageState, + total_quota: Option, ) -> Result<()> { let api = Api::<::Data>::namespaced( kube.clone(), @@ -174,6 +178,7 @@ impl Ctx { state, kind, last_updated: Utc::now(), + total_quota, }, })); let pp = PatchParams::apply(::NAME); diff --git a/crates/dash/operator/src/validator/storage.rs b/crates/dash/operator/src/validator/storage.rs index 7b231fee..0e2b6db2 100644 --- a/crates/dash/operator/src/validator/storage.rs +++ b/crates/dash/operator/src/validator/storage.rs @@ -14,6 +14,7 @@ use dash_provider::storage::{ assert_source_is_none, assert_source_is_same, DatabaseStorageClient, KubernetesStorageClient, ObjectStorageClient, }; +use futures::TryFutureExt; use itertools::Itertools; use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference; use kube::{Resource, ResourceExt}; @@ -25,7 +26,11 @@ pub struct ModelStorageValidator<'namespace, 'kube> { impl<'namespace, 'kube> ModelStorageValidator<'namespace, 'kube> { #[instrument(level = Level::INFO, skip_all, err(Display))] - pub async fn validate_model_storage(&self, name: &str, spec: &ModelStorageSpec) -> Result<()> { + pub async fn validate_model_storage( + &self, + name: &str, + spec: &ModelStorageSpec, + ) -> Result> { if spec.kind.is_unique() { self.validate_model_storage_conflict(name, spec.kind.to_kind()) .await?; @@ -67,16 +72,16 @@ impl<'namespace, 'kube> ModelStorageValidator<'namespace, 'kube> { async fn validate_model_storage_database( &self, storage: &ModelStorageDatabaseSpec, - ) -> Result<()> { - DatabaseStorageClient::try_new(storage).await.map(|_| ()) + ) -> Result> { + DatabaseStorageClient::try_new(storage).await.map(|_| None) } fn validate_model_storage_kubernetes( &self, storage: &ModelStorageKubernetesSpec, - ) -> Result<()> { + ) -> Result> { let ModelStorageKubernetesSpec {} = storage; - Ok(()) + Ok(None) } #[instrument(level = Level::INFO, skip_all, err(Display))] @@ -84,7 +89,7 @@ impl<'namespace, 'kube> ModelStorageValidator<'namespace, 'kube> { &self, name: &str, storage: &ModelStorageObjectSpec, - ) -> Result<()> { + ) -> Result> { let storage = ModelStorageBindingStorageSpec { source: None, source_binding_name: None, @@ -96,8 +101,14 @@ impl<'namespace, 'kube> ModelStorageValidator<'namespace, 'kube> { self.kubernetes_storage.namespace, storage, ) + .and_then(|client| async move { + client + .target() + .get_capacity_global() + .await + .map(|capacity| Some(capacity.capacity.as_u128())) + }) .await - .map(|_| ()) } #[instrument(level = Level::INFO, skip_all, err(Display))] diff --git a/crates/dash/provider/src/storage/object.rs b/crates/dash/provider/src/storage/object.rs index 5b8bf5fb..69a9ebe2 100644 --- a/crates/dash/provider/src/storage/object.rs +++ b/crates/dash/provider/src/storage/object.rs @@ -112,6 +112,10 @@ impl ObjectStorageClient { }) } + pub const fn target(&self) -> &ObjectStorageSession { + &self.target + } + pub fn get_session<'model>( &self, kube: &'model Client, @@ -2332,6 +2336,18 @@ fn get_default_node_affinity() -> NodeAffinity { }, weight: 1, }, + // KISS normal control plane nodes should be preferred + PreferredSchedulingTerm { + preference: NodeSelectorTerm { + match_expressions: Some(vec![NodeSelectorRequirement { + key: "node-role.kubernetes.io/kiss".into(), + operator: "In".into(), + values: Some(vec!["ControlPlane".into()]), + }]), + match_fields: None, + }, + weight: 2, + }, // KISS compute nodes should be preferred PreferredSchedulingTerm { preference: NodeSelectorTerm { @@ -2342,7 +2358,7 @@ fn get_default_node_affinity() -> NodeAffinity { }]), match_fields: None, }, - weight: 2, + weight: 4, }, // KISS gateway nodes should be more preferred PreferredSchedulingTerm { @@ -2354,7 +2370,7 @@ fn get_default_node_affinity() -> NodeAffinity { }]), match_fields: None, }, - weight: 4, + weight: 8, }, ]), required_during_scheduling_ignored_during_execution: Some(NodeSelector { @@ -2365,6 +2381,7 @@ fn get_default_node_affinity() -> NodeAffinity { values: Some(vec![ "Compute".into(), "ControlPlane".into(), + "Desktop".into(), "Gateway".into(), ]), }]),