Skip to content

Commit

Permalink
fix(dash): allow all hostnames for object storage ingresses
Browse files Browse the repository at this point in the history
  • Loading branch information
HoKim98 committed Sep 8, 2024
1 parent f84c112 commit 7dcb2e8
Show file tree
Hide file tree
Showing 8 changed files with 68 additions and 38 deletions.
2 changes: 2 additions & 0 deletions crates/dash/api/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub struct ModelStorageSpec {

impl ModelStorageCrd {
pub const FINALIZER_NAME: &'static str = "dash.ulagbulag.io/finalizer-model-storages";

pub const LABEL_IS_EXTERNAL: &'static str = "ark.ulagbulag.io/is-external";
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)]
Expand Down
14 changes: 3 additions & 11 deletions crates/dash/api/src/storage/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,17 +207,9 @@ impl ModelStorageObjectRefSpec {
}
}

#[inline]
pub const fn get_default_tenant_name() -> &'static str {
"minio"
}

#[inline]
pub fn get_kubernetes_minio_endpoint(namespace: &str) -> Option<Url> {
format!(
"http://{service}.{namespace}.svc",
service = get_default_tenant_name()
)
.parse()
.ok()
format!("http://object-storage.{namespace}.svc")
.parse()
.ok()
}
5 changes: 4 additions & 1 deletion crates/dash/operator/src/ctx/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ impl ::ark_core_k8s::manager::Ctx for Ctx {
.unwrap_or_default()
{
ModelStorageState::Pending => {
match validator.validate_model_storage(&name, &data.spec).await {
match validator
.validate_model_storage(&name, &data.metadata, &data.spec)
.await
{
Ok(total_quota) => {
Self::update_state_or_requeue(
&namespace,
Expand Down
4 changes: 2 additions & 2 deletions crates/dash/operator/src/optimizer/model_claim/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl super::GetCapacity for ModelStorageObjectSpec {
target_name: &storage_name,
};

let client = ObjectStorageClient::try_new(kube, namespace, storage).await?;
let client = ObjectStorageClient::try_new(kube, namespace, None, storage).await?;
let session = client.get_session(kube, namespace, model);
session.get_capacity().map_ok(Some).await
}
Expand All @@ -40,7 +40,7 @@ impl super::GetCapacity for ModelStorageObjectSpec {
storage_name: String,
) -> Result<Option<Capacity>> {
let storage =
ObjectStorageSession::load_storage_provider(kube, namespace, &storage_name, self)
ObjectStorageSession::load_storage_provider(kube, namespace, &storage_name, None, self)
.await?;
storage.get_capacity_global().map_ok(Some).await
}
Expand Down
12 changes: 8 additions & 4 deletions crates/dash/operator/src/validator/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use dash_provider::storage::{
use futures::TryFutureExt;
use itertools::Itertools;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference;
use kube::{Resource, ResourceExt};
use kube::{api::ObjectMeta, Resource, ResourceExt};
use tracing::{instrument, Level};

pub struct ModelStorageValidator<'namespace, 'kube> {
Expand All @@ -29,6 +29,7 @@ impl<'namespace, 'kube> ModelStorageValidator<'namespace, 'kube> {
pub async fn validate_model_storage(
&self,
name: &str,
metadata: &ObjectMeta,
spec: &ModelStorageSpec,
) -> Result<Option<u128>> {
if spec.kind.is_unique() {
Expand All @@ -42,7 +43,8 @@ impl<'namespace, 'kube> ModelStorageValidator<'namespace, 'kube> {
}
ModelStorageKindSpec::Kubernetes(spec) => self.validate_model_storage_kubernetes(spec),
ModelStorageKindSpec::ObjectStorage(spec) => {
self.validate_model_storage_object(name, spec).await
self.validate_model_storage_object(name, metadata, spec)
.await
}
}
}
Expand Down Expand Up @@ -88,6 +90,7 @@ impl<'namespace, 'kube> ModelStorageValidator<'namespace, 'kube> {
async fn validate_model_storage_object(
&self,
name: &str,
metadata: &ObjectMeta,
storage: &ModelStorageObjectSpec,
) -> Result<Option<u128>> {
let storage = ModelStorageBindingStorageSpec {
Expand All @@ -99,6 +102,7 @@ impl<'namespace, 'kube> ModelStorageValidator<'namespace, 'kube> {
ObjectStorageClient::try_new(
self.kubernetes_storage.kube,
self.kubernetes_storage.namespace,
Some(metadata),
storage,
)
.and_then(|client| async move {
Expand Down Expand Up @@ -205,7 +209,7 @@ impl<'namespace, 'kube> ModelStorageValidator<'namespace, 'kube> {
}]
};

ObjectStorageClient::try_new(kube, namespace, storage)
ObjectStorageClient::try_new(kube, namespace, None, storage)
.await?
.get_session(kube, namespace, model)
.create_bucket(owner_references)
Expand Down Expand Up @@ -315,7 +319,7 @@ impl<'namespace, 'kube> ModelStorageValidator<'namespace, 'kube> {
) -> Result<()> {
let KubernetesStorageClient { kube, namespace } = self.kubernetes_storage;

let client = ObjectStorageClient::try_new(kube, namespace, storage).await?;
let client = ObjectStorageClient::try_new(kube, namespace, None, storage).await?;
let session = client.get_session(kube, namespace, model);
match deletion_policy {
ModelStorageBindingDeletionPolicy::Delete => session.delete_bucket().await,
Expand Down
4 changes: 2 additions & 2 deletions crates/dash/provider/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ impl<'namespace, 'kube> StorageClient<'namespace, 'kube> {
model: &ModelCrd,
ref_name: &str,
) -> Result<Option<Value>> {
ObjectStorageClient::try_new(self.kube, self.namespace, storage)
ObjectStorageClient::try_new(self.kube, self.namespace, None, storage)
.await?
.get_session(self.kube, self.namespace, model)
.get(ref_name)
Expand Down Expand Up @@ -344,7 +344,7 @@ impl<'namespace, 'kube> StorageClient<'namespace, 'kube> {
storage: ModelStorageBindingStorageSpec<'_, &ModelStorageObjectSpec>,
model: &ModelCrd,
) -> Result<Vec<Value>> {
ObjectStorageClient::try_new(self.kube, self.namespace, storage)
ObjectStorageClient::try_new(self.kube, self.namespace, None, storage)
.await?
.get_session(self.kube, self.namespace, model)
.get_list()
Expand Down
64 changes: 46 additions & 18 deletions crates/dash/provider/src/storage/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@ use dash_api::{
ModelStorageBindingSyncPolicyPush,
},
model_user::ModelUserAccessTokenSecretRefSpec,
storage::object::{
get_default_tenant_name, get_kubernetes_minio_endpoint, ModelStorageObjectBorrowedSpec,
ModelStorageObjectClonedSpec, ModelStorageObjectOwnedReplicationSpec,
ModelStorageObjectOwnedSpec, ModelStorageObjectRefSpec, ModelStorageObjectSpec,
storage::{
object::{
get_kubernetes_minio_endpoint, ModelStorageObjectBorrowedSpec,
ModelStorageObjectClonedSpec, ModelStorageObjectOwnedReplicationSpec,
ModelStorageObjectOwnedSpec, ModelStorageObjectRefSpec, ModelStorageObjectSpec,
},
ModelStorageCrd,
},
};
use dash_provider_api::data::Capacity;
Expand Down Expand Up @@ -81,6 +84,7 @@ impl ObjectStorageClient {
pub async fn try_new<'source>(
kube: &Client,
namespace: &str,
metadata: Option<&ObjectMeta>,
storage: ModelStorageBindingStorageSpec<'source, &ModelStorageObjectSpec>,
) -> Result<Self> {
Ok(Self {
Expand All @@ -94,6 +98,7 @@ impl ObjectStorageClient {
kube,
namespace,
source_name,
metadata,
source,
)
.await
Expand All @@ -106,6 +111,7 @@ impl ObjectStorageClient {
kube,
namespace,
storage.target_name,
metadata,
storage.target,
)
.await?,
Expand Down Expand Up @@ -158,20 +164,21 @@ impl<'model> ObjectStorageSession {
kube: &Client,
namespace: &str,
name: &str,
metadata: Option<&ObjectMeta>,
storage: &ModelStorageObjectSpec,
) -> Result<Self> {
match storage {
ModelStorageObjectSpec::Borrowed(storage) => {
let _ = Self::create_or_get_storage(kube, namespace).await?;
let _ = Self::create_or_get_storage(kube, namespace, metadata).await?;
Self::load_storage_provider_by_borrowed(kube, namespace, name, storage).await
}
ModelStorageObjectSpec::Cloned(storage) => {
let metadata = Self::create_or_get_storage(kube, namespace).await?;
let metadata = Self::create_or_get_storage(kube, namespace, metadata).await?;
Self::load_storage_provider_by_cloned(kube, namespace, name, &metadata, storage)
.await
}
ModelStorageObjectSpec::Owned(storage) => {
let metadata = Self::create_or_get_storage(kube, namespace).await?;
let metadata = Self::create_or_get_storage(kube, namespace, metadata).await?;
Self::load_storage_provider_by_owned(kube, namespace, name, &metadata, storage)
.await
}
Expand Down Expand Up @@ -286,7 +293,11 @@ impl<'model> ObjectStorageSession {

#[must_use]
#[instrument(level = Level::INFO, skip(kube), err(Display))]
async fn create_or_get_storage(kube: &Client, namespace: &str) -> Result<ObjectMeta> {
async fn create_or_get_storage(
kube: &Client,
namespace: &str,
metadata: Option<&ObjectMeta>,
) -> Result<ObjectMeta> {
async fn get_latest_nginx_controller_image() -> Result<String> {
Ok("registry.k8s.io/ingress-nginx/controller:v1.10.0".into())
}
Expand All @@ -297,20 +308,37 @@ impl<'model> ObjectStorageSession {
let cluster_role_name = format!("dash:{tenant_name}");
let ingress_class_controller = format!("k8s.io/dash/{tenant_name}/{namespace}");
let ingress_class_name = get_ingress_class_name(namespace, tenant_name);
let ingress_host = get_ingress_host(namespace, tenant_name);
let service_metrics_name = format!("{tenant_name}-metrics");

let port_http_name = "http";
let port_https_name = "https";
let port_metrics_name = "metrics";

let labels = btreemap! {
"app".into() => tenant_name.into(),
"dash.ulagbulag.io/modelstorage-type".into() => tenant_name.into(),
let annotations = metadata
.as_ref()
.and_then(|metadata| metadata.annotations.clone());
let mut labels = metadata
.as_ref()
.and_then(|metadata| metadata.labels.clone())
.unwrap_or_default();
labels.insert("app".into(), tenant_name.into());
labels.insert(
"dash.ulagbulag.io/modelstorage-type".into(),
tenant_name.into(),
);

let service_type = match labels
.get(ModelStorageCrd::LABEL_IS_EXTERNAL)
.map(|label| label.as_str())
{
Some("true") => Some("LoadBalancer".into()),
Some(_) | None => None,
};

let metadata = ObjectMeta {
name: Some(tenant_name.into()),
namespace: Some(namespace.into()),
annotations,
labels: Some(labels.clone()),
..Default::default()
};
Expand Down Expand Up @@ -707,6 +735,7 @@ impl<'model> ObjectStorageSession {
metadata: metadata.clone(),
spec: Some(ServiceSpec {
selector: Some(labels.clone()),
type_: service_type,
ports: Some(vec![
ServicePort {
name: Some(port_http_name.into()),
Expand Down Expand Up @@ -784,15 +813,15 @@ impl<'model> ObjectStorageSession {
ingress_class_name: Some(ingress_class_name.clone()),
tls: None,
rules: Some(vec![IngressRule {
host: Some(ingress_host),
host: None,
http: Some(HTTPIngressRuleValue {
paths: vec![HTTPIngressPath {
path: Some("/".into()),
path_type: "Prefix".into(),
backend: IngressBackend {
resource: None,
service: Some(IngressServiceBackend {
name: get_default_tenant_name().into(),
name: "minio".into(),
port: Some(ServiceBackendPort {
name: Some("http-minio".into()),
number: None,
Expand Down Expand Up @@ -1075,7 +1104,6 @@ impl<'client, 'model, 'source> ObjectStorageRef<'client, 'model, 'source> {
{
let api = Api::namespaced(self.kube.clone(), self.namespace);
let ingress_class_name = get_ingress_class_name(self.namespace, tenant_name);
let ingress_host = get_ingress_host(self.namespace, tenant_name);
let data = || Ingress {
metadata: ObjectMeta {
annotations: Some(btreemap! {
Expand All @@ -1091,7 +1119,7 @@ impl<'client, 'model, 'source> ObjectStorageRef<'client, 'model, 'source> {
ingress_class_name: Some(ingress_class_name.clone()),
tls: None,
rules: Some(vec![IngressRule {
host: Some(ingress_host),
host: None,
http: Some(HTTPIngressRuleValue {
paths: vec![HTTPIngressPath {
path: Some(format!("/{bucket_name}/")),
Expand Down Expand Up @@ -2411,8 +2439,8 @@ fn get_default_pod_affinity(tenant_name: &str) -> PodAffinity {
}
}

fn get_ingress_host(namespace: &str, tenant_name: &str) -> String {
format!("{tenant_name}.{namespace}.svc")
const fn get_default_tenant_name() -> &'static str {
"object-storage"
}

fn get_ingress_class_name(namespace: &str, tenant_name: &str) -> String {
Expand Down
1 change: 1 addition & 0 deletions crates/dash/query/provider/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ async fn load_models<'a>(
&kube,
namespace,
&model_name,
None,
&storage,
)
.await
Expand Down

0 comments on commit 7dcb2e8

Please sign in to comment.