diff --git a/crates/dash/api/src/storage/mod.rs b/crates/dash/api/src/storage/mod.rs index 4a7cb53d..43941ba1 100644 --- a/crates/dash/api/src/storage/mod.rs +++ b/crates/dash/api/src/storage/mod.rs @@ -47,6 +47,8 @@ pub struct ModelStorageSpec { impl ModelStorageCrd { pub const FINALIZER_NAME: &'static str = "dash.ulagbulag.io/finalizer-model-storages"; + + pub const LABEL_IS_EXTERNAL: &'static str = "ark.ulagbulag.io/is-external"; } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)] diff --git a/crates/dash/api/src/storage/object.rs b/crates/dash/api/src/storage/object.rs index 071d178d..453e8f17 100644 --- a/crates/dash/api/src/storage/object.rs +++ b/crates/dash/api/src/storage/object.rs @@ -207,17 +207,9 @@ impl ModelStorageObjectRefSpec { } } -#[inline] -pub const fn get_default_tenant_name() -> &'static str { - "minio" -} - #[inline] pub fn get_kubernetes_minio_endpoint(namespace: &str) -> Option { - format!( - "http://{service}.{namespace}.svc", - service = get_default_tenant_name() - ) - .parse() - .ok() + format!("http://object-storage.{namespace}.svc") + .parse() + .ok() } diff --git a/crates/dash/operator/src/ctx/storage.rs b/crates/dash/operator/src/ctx/storage.rs index 54712064..68458cb5 100644 --- a/crates/dash/operator/src/ctx/storage.rs +++ b/crates/dash/operator/src/ctx/storage.rs @@ -86,7 +86,10 @@ impl ::ark_core_k8s::manager::Ctx for Ctx { .unwrap_or_default() { ModelStorageState::Pending => { - match validator.validate_model_storage(&name, &data.spec).await { + match validator + .validate_model_storage(&name, &data.metadata, &data.spec) + .await + { Ok(total_quota) => { Self::update_state_or_requeue( &namespace, diff --git a/crates/dash/operator/src/optimizer/model_claim/object.rs b/crates/dash/operator/src/optimizer/model_claim/object.rs index da4e68f5..ba13aead 100644 --- a/crates/dash/operator/src/optimizer/model_claim/object.rs +++ b/crates/dash/operator/src/optimizer/model_claim/object.rs @@ -27,7 +27,7 @@ impl super::GetCapacity for ModelStorageObjectSpec { target_name: &storage_name, }; - let client = ObjectStorageClient::try_new(kube, namespace, storage).await?; + let client = ObjectStorageClient::try_new(kube, namespace, None, storage).await?; let session = client.get_session(kube, namespace, model); session.get_capacity().map_ok(Some).await } @@ -40,7 +40,7 @@ impl super::GetCapacity for ModelStorageObjectSpec { storage_name: String, ) -> Result> { let storage = - ObjectStorageSession::load_storage_provider(kube, namespace, &storage_name, self) + ObjectStorageSession::load_storage_provider(kube, namespace, &storage_name, None, self) .await?; storage.get_capacity_global().map_ok(Some).await } diff --git a/crates/dash/operator/src/validator/storage.rs b/crates/dash/operator/src/validator/storage.rs index 0e2b6db2..ccb2352d 100644 --- a/crates/dash/operator/src/validator/storage.rs +++ b/crates/dash/operator/src/validator/storage.rs @@ -17,7 +17,7 @@ use dash_provider::storage::{ use futures::TryFutureExt; use itertools::Itertools; use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference; -use kube::{Resource, ResourceExt}; +use kube::{api::ObjectMeta, Resource, ResourceExt}; use tracing::{instrument, Level}; pub struct ModelStorageValidator<'namespace, 'kube> { @@ -29,6 +29,7 @@ impl<'namespace, 'kube> ModelStorageValidator<'namespace, 'kube> { pub async fn validate_model_storage( &self, name: &str, + metadata: &ObjectMeta, spec: &ModelStorageSpec, ) -> Result> { if spec.kind.is_unique() { @@ -42,7 +43,8 @@ impl<'namespace, 'kube> ModelStorageValidator<'namespace, 'kube> { } ModelStorageKindSpec::Kubernetes(spec) => self.validate_model_storage_kubernetes(spec), ModelStorageKindSpec::ObjectStorage(spec) => { - self.validate_model_storage_object(name, spec).await + self.validate_model_storage_object(name, metadata, spec) + .await } } } @@ -88,6 +90,7 @@ impl<'namespace, 'kube> ModelStorageValidator<'namespace, 'kube> { async fn validate_model_storage_object( &self, name: &str, + metadata: &ObjectMeta, storage: &ModelStorageObjectSpec, ) -> Result> { let storage = ModelStorageBindingStorageSpec { @@ -99,6 +102,7 @@ impl<'namespace, 'kube> ModelStorageValidator<'namespace, 'kube> { ObjectStorageClient::try_new( self.kubernetes_storage.kube, self.kubernetes_storage.namespace, + Some(metadata), storage, ) .and_then(|client| async move { @@ -205,7 +209,7 @@ impl<'namespace, 'kube> ModelStorageValidator<'namespace, 'kube> { }] }; - ObjectStorageClient::try_new(kube, namespace, storage) + ObjectStorageClient::try_new(kube, namespace, None, storage) .await? .get_session(kube, namespace, model) .create_bucket(owner_references) @@ -315,7 +319,7 @@ impl<'namespace, 'kube> ModelStorageValidator<'namespace, 'kube> { ) -> Result<()> { let KubernetesStorageClient { kube, namespace } = self.kubernetes_storage; - let client = ObjectStorageClient::try_new(kube, namespace, storage).await?; + let client = ObjectStorageClient::try_new(kube, namespace, None, storage).await?; let session = client.get_session(kube, namespace, model); match deletion_policy { ModelStorageBindingDeletionPolicy::Delete => session.delete_bucket().await, diff --git a/crates/dash/provider/src/storage/mod.rs b/crates/dash/provider/src/storage/mod.rs index 69885d03..8b39e39f 100644 --- a/crates/dash/provider/src/storage/mod.rs +++ b/crates/dash/provider/src/storage/mod.rs @@ -215,7 +215,7 @@ impl<'namespace, 'kube> StorageClient<'namespace, 'kube> { model: &ModelCrd, ref_name: &str, ) -> Result> { - ObjectStorageClient::try_new(self.kube, self.namespace, storage) + ObjectStorageClient::try_new(self.kube, self.namespace, None, storage) .await? .get_session(self.kube, self.namespace, model) .get(ref_name) @@ -344,7 +344,7 @@ impl<'namespace, 'kube> StorageClient<'namespace, 'kube> { storage: ModelStorageBindingStorageSpec<'_, &ModelStorageObjectSpec>, model: &ModelCrd, ) -> Result> { - ObjectStorageClient::try_new(self.kube, self.namespace, storage) + ObjectStorageClient::try_new(self.kube, self.namespace, None, storage) .await? .get_session(self.kube, self.namespace, model) .get_list() diff --git a/crates/dash/provider/src/storage/object.rs b/crates/dash/provider/src/storage/object.rs index bdfb7922..5b254f2a 100644 --- a/crates/dash/provider/src/storage/object.rs +++ b/crates/dash/provider/src/storage/object.rs @@ -13,10 +13,13 @@ use dash_api::{ ModelStorageBindingSyncPolicyPush, }, model_user::ModelUserAccessTokenSecretRefSpec, - storage::object::{ - get_default_tenant_name, get_kubernetes_minio_endpoint, ModelStorageObjectBorrowedSpec, - ModelStorageObjectClonedSpec, ModelStorageObjectOwnedReplicationSpec, - ModelStorageObjectOwnedSpec, ModelStorageObjectRefSpec, ModelStorageObjectSpec, + storage::{ + object::{ + get_kubernetes_minio_endpoint, ModelStorageObjectBorrowedSpec, + ModelStorageObjectClonedSpec, ModelStorageObjectOwnedReplicationSpec, + ModelStorageObjectOwnedSpec, ModelStorageObjectRefSpec, ModelStorageObjectSpec, + }, + ModelStorageCrd, }, }; use dash_provider_api::data::Capacity; @@ -81,6 +84,7 @@ impl ObjectStorageClient { pub async fn try_new<'source>( kube: &Client, namespace: &str, + metadata: Option<&ObjectMeta>, storage: ModelStorageBindingStorageSpec<'source, &ModelStorageObjectSpec>, ) -> Result { Ok(Self { @@ -94,6 +98,7 @@ impl ObjectStorageClient { kube, namespace, source_name, + metadata, source, ) .await @@ -106,6 +111,7 @@ impl ObjectStorageClient { kube, namespace, storage.target_name, + metadata, storage.target, ) .await?, @@ -158,20 +164,21 @@ impl<'model> ObjectStorageSession { kube: &Client, namespace: &str, name: &str, + metadata: Option<&ObjectMeta>, storage: &ModelStorageObjectSpec, ) -> Result { match storage { ModelStorageObjectSpec::Borrowed(storage) => { - let _ = Self::create_or_get_storage(kube, namespace).await?; + let _ = Self::create_or_get_storage(kube, namespace, metadata).await?; Self::load_storage_provider_by_borrowed(kube, namespace, name, storage).await } ModelStorageObjectSpec::Cloned(storage) => { - let metadata = Self::create_or_get_storage(kube, namespace).await?; + let metadata = Self::create_or_get_storage(kube, namespace, metadata).await?; Self::load_storage_provider_by_cloned(kube, namespace, name, &metadata, storage) .await } ModelStorageObjectSpec::Owned(storage) => { - let metadata = Self::create_or_get_storage(kube, namespace).await?; + let metadata = Self::create_or_get_storage(kube, namespace, metadata).await?; Self::load_storage_provider_by_owned(kube, namespace, name, &metadata, storage) .await } @@ -286,7 +293,11 @@ impl<'model> ObjectStorageSession { #[must_use] #[instrument(level = Level::INFO, skip(kube), err(Display))] - async fn create_or_get_storage(kube: &Client, namespace: &str) -> Result { + async fn create_or_get_storage( + kube: &Client, + namespace: &str, + metadata: Option<&ObjectMeta>, + ) -> Result { async fn get_latest_nginx_controller_image() -> Result { Ok("registry.k8s.io/ingress-nginx/controller:v1.10.0".into()) } @@ -297,20 +308,37 @@ impl<'model> ObjectStorageSession { let cluster_role_name = format!("dash:{tenant_name}"); let ingress_class_controller = format!("k8s.io/dash/{tenant_name}/{namespace}"); let ingress_class_name = get_ingress_class_name(namespace, tenant_name); - let ingress_host = get_ingress_host(namespace, tenant_name); let service_metrics_name = format!("{tenant_name}-metrics"); let port_http_name = "http"; let port_https_name = "https"; let port_metrics_name = "metrics"; - let labels = btreemap! { - "app".into() => tenant_name.into(), - "dash.ulagbulag.io/modelstorage-type".into() => tenant_name.into(), + let annotations = metadata + .as_ref() + .and_then(|metadata| metadata.annotations.clone()); + let mut labels = metadata + .as_ref() + .and_then(|metadata| metadata.labels.clone()) + .unwrap_or_default(); + labels.insert("app".into(), tenant_name.into()); + labels.insert( + "dash.ulagbulag.io/modelstorage-type".into(), + tenant_name.into(), + ); + + let service_type = match labels + .get(ModelStorageCrd::LABEL_IS_EXTERNAL) + .map(|label| label.as_str()) + { + Some("true") => Some("LoadBalancer".into()), + Some(_) | None => None, }; + let metadata = ObjectMeta { name: Some(tenant_name.into()), namespace: Some(namespace.into()), + annotations, labels: Some(labels.clone()), ..Default::default() }; @@ -707,6 +735,7 @@ impl<'model> ObjectStorageSession { metadata: metadata.clone(), spec: Some(ServiceSpec { selector: Some(labels.clone()), + type_: service_type, ports: Some(vec![ ServicePort { name: Some(port_http_name.into()), @@ -784,7 +813,7 @@ impl<'model> ObjectStorageSession { ingress_class_name: Some(ingress_class_name.clone()), tls: None, rules: Some(vec![IngressRule { - host: Some(ingress_host), + host: None, http: Some(HTTPIngressRuleValue { paths: vec![HTTPIngressPath { path: Some("/".into()), @@ -792,7 +821,7 @@ impl<'model> ObjectStorageSession { backend: IngressBackend { resource: None, service: Some(IngressServiceBackend { - name: get_default_tenant_name().into(), + name: "minio".into(), port: Some(ServiceBackendPort { name: Some("http-minio".into()), number: None, @@ -1075,7 +1104,6 @@ impl<'client, 'model, 'source> ObjectStorageRef<'client, 'model, 'source> { { let api = Api::namespaced(self.kube.clone(), self.namespace); let ingress_class_name = get_ingress_class_name(self.namespace, tenant_name); - let ingress_host = get_ingress_host(self.namespace, tenant_name); let data = || Ingress { metadata: ObjectMeta { annotations: Some(btreemap! { @@ -1091,7 +1119,7 @@ impl<'client, 'model, 'source> ObjectStorageRef<'client, 'model, 'source> { ingress_class_name: Some(ingress_class_name.clone()), tls: None, rules: Some(vec![IngressRule { - host: Some(ingress_host), + host: None, http: Some(HTTPIngressRuleValue { paths: vec![HTTPIngressPath { path: Some(format!("/{bucket_name}/")), @@ -2411,8 +2439,8 @@ fn get_default_pod_affinity(tenant_name: &str) -> PodAffinity { } } -fn get_ingress_host(namespace: &str, tenant_name: &str) -> String { - format!("{tenant_name}.{namespace}.svc") +const fn get_default_tenant_name() -> &'static str { + "object-storage" } fn get_ingress_class_name(namespace: &str, tenant_name: &str) -> String { diff --git a/crates/dash/query/provider/src/lib.rs b/crates/dash/query/provider/src/lib.rs index 298749aa..95b97b28 100644 --- a/crates/dash/query/provider/src/lib.rs +++ b/crates/dash/query/provider/src/lib.rs @@ -208,6 +208,7 @@ async fn load_models<'a>( &kube, namespace, &model_name, + None, &storage, ) .await