Skip to content

Commit

Permalink
feat(dash): add storage quota collecting support
Browse files Browse the repository at this point in the history
  • Loading branch information
HoKim98 committed Jul 29, 2024
1 parent 289124d commit 30b7b87
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 11 deletions.
2 changes: 2 additions & 0 deletions crates/dash/api/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ pub struct ModelStorageStatus {
pub state: ModelStorageState,
pub kind: Option<ModelStorageKindSpec>,
pub last_updated: DateTime<Utc>,
#[serde(default)]
pub total_quota: Option<u128>,
}

#[derive(
Expand Down
9 changes: 7 additions & 2 deletions crates/dash/operator/src/ctx/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl ::ark_core_k8s::manager::Ctx for Ctx {
&name,
status.and_then(|status| status.kind.clone()),
ModelStorageState::Deleting,
None,
)
.await;
} else if !data
Expand Down Expand Up @@ -86,13 +87,14 @@ impl ::ark_core_k8s::manager::Ctx for Ctx {
{
ModelStorageState::Pending => {
match validator.validate_model_storage(&name, &data.spec).await {
Ok(()) => {
Ok(total_quota) => {
Self::update_state_or_requeue(
&namespace,
&manager.kube,
&name,
Some(data.spec.kind.clone()),
ModelStorageState::Ready,
total_quota,
)
.await
}
Expand Down Expand Up @@ -136,8 +138,9 @@ impl Ctx {
name: &str,
kind: Option<ModelStorageKindSpec>,
state: ModelStorageState,
total_quota: Option<u128>,
) -> Result<Action, Error> {
match Self::update_state(namespace, kube, name, kind, state).await {
match Self::update_state(namespace, kube, name, kind, state, total_quota).await {
Ok(()) => {
info!("model storage is ready: {namespace}/{name}");
Ok(Action::requeue(
Expand All @@ -160,6 +163,7 @@ impl Ctx {
name: &str,
kind: Option<ModelStorageKindSpec>,
state: ModelStorageState,
total_quota: Option<u128>,
) -> Result<()> {
let api = Api::<<Self as ::ark_core_k8s::manager::Ctx>::Data>::namespaced(
kube.clone(),
Expand All @@ -174,6 +178,7 @@ impl Ctx {
state,
kind,
last_updated: Utc::now(),
total_quota,
},
}));
let pp = PatchParams::apply(<Self as ::ark_core_k8s::manager::Ctx>::NAME);
Expand Down
25 changes: 18 additions & 7 deletions crates/dash/operator/src/validator/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use dash_provider::storage::{
assert_source_is_none, assert_source_is_same, DatabaseStorageClient, KubernetesStorageClient,
ObjectStorageClient,
};
use futures::TryFutureExt;
use itertools::Itertools;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
use kube::{Resource, ResourceExt};
Expand All @@ -25,7 +26,11 @@ pub struct ModelStorageValidator<'namespace, 'kube> {

impl<'namespace, 'kube> ModelStorageValidator<'namespace, 'kube> {
#[instrument(level = Level::INFO, skip_all, err(Display))]
pub async fn validate_model_storage(&self, name: &str, spec: &ModelStorageSpec) -> Result<()> {
pub async fn validate_model_storage(
&self,
name: &str,
spec: &ModelStorageSpec,
) -> Result<Option<u128>> {
if spec.kind.is_unique() {
self.validate_model_storage_conflict(name, spec.kind.to_kind())
.await?;
Expand Down Expand Up @@ -67,24 +72,24 @@ impl<'namespace, 'kube> ModelStorageValidator<'namespace, 'kube> {
async fn validate_model_storage_database(
&self,
storage: &ModelStorageDatabaseSpec,
) -> Result<()> {
DatabaseStorageClient::try_new(storage).await.map(|_| ())
) -> Result<Option<u128>> {
DatabaseStorageClient::try_new(storage).await.map(|_| None)
}

fn validate_model_storage_kubernetes(
&self,
storage: &ModelStorageKubernetesSpec,
) -> Result<()> {
) -> Result<Option<u128>> {
let ModelStorageKubernetesSpec {} = storage;
Ok(())
Ok(None)
}

#[instrument(level = Level::INFO, skip_all, err(Display))]
async fn validate_model_storage_object(
&self,
name: &str,
storage: &ModelStorageObjectSpec,
) -> Result<()> {
) -> Result<Option<u128>> {
let storage = ModelStorageBindingStorageSpec {
source: None,
source_binding_name: None,
Expand All @@ -96,8 +101,14 @@ impl<'namespace, 'kube> ModelStorageValidator<'namespace, 'kube> {
self.kubernetes_storage.namespace,
storage,
)
.and_then(|client| async move {
client
.target()
.get_capacity_global()
.await
.map(|capacity| Some(capacity.capacity.as_u128()))
})
.await
.map(|_| ())
}

#[instrument(level = Level::INFO, skip_all, err(Display))]
Expand Down
21 changes: 19 additions & 2 deletions crates/dash/provider/src/storage/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ impl ObjectStorageClient {
})
}

pub const fn target(&self) -> &ObjectStorageSession {
&self.target
}

pub fn get_session<'model>(
&self,
kube: &'model Client,
Expand Down Expand Up @@ -2332,6 +2336,18 @@ fn get_default_node_affinity() -> NodeAffinity {
},
weight: 1,
},
// KISS normal control plane nodes should be preferred
PreferredSchedulingTerm {
preference: NodeSelectorTerm {
match_expressions: Some(vec![NodeSelectorRequirement {
key: "node-role.kubernetes.io/kiss".into(),
operator: "In".into(),
values: Some(vec!["ControlPlane".into()]),
}]),
match_fields: None,
},
weight: 2,
},
// KISS compute nodes should be preferred
PreferredSchedulingTerm {
preference: NodeSelectorTerm {
Expand All @@ -2342,7 +2358,7 @@ fn get_default_node_affinity() -> NodeAffinity {
}]),
match_fields: None,
},
weight: 2,
weight: 4,
},
// KISS gateway nodes should be more preferred
PreferredSchedulingTerm {
Expand All @@ -2354,7 +2370,7 @@ fn get_default_node_affinity() -> NodeAffinity {
}]),
match_fields: None,
},
weight: 4,
weight: 8,
},
]),
required_during_scheduling_ignored_during_execution: Some(NodeSelector {
Expand All @@ -2365,6 +2381,7 @@ fn get_default_node_affinity() -> NodeAffinity {
values: Some(vec![
"Compute".into(),
"ControlPlane".into(),
"Desktop".into(),
"Gateway".into(),
]),
}]),
Expand Down

0 comments on commit 30b7b87

Please sign in to comment.