diff --git a/Cargo.toml b/Cargo.toml index a36b592b..7cffdcce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,6 +40,8 @@ members = [ "proxyx/provider/api", # exclude(alpine) "proxyx/provider/chatgpt", # exclude(alpine) "proxyx/storage/influxdb", # exclude(alpine) + "straw/api", + "straw/provider", "vine/api", "vine/bastion", "vine/cli", @@ -87,7 +89,11 @@ chrono = { version = "0.4", features = ["serde"] } clap = { version = "4.4", features = ["env", "derive"] } csv = { version = "1.3" } ctrlc = { version = "3.4" } -deltalake = { git = "https://github.com/delta-io/delta-rs.git", version = "0.17", default-features = false } +deltalake = { git = "https://github.com/delta-io/delta-rs.git", version = "0.17", default-features = false, features = [ + "arrow", + "datafusion", + "parquet", +] } email_address = { version = "0.2" } futures = { version = "0.3" } gethostname = { version = "0.4" } diff --git a/ark/core/k8s/Cargo.toml b/ark/core/k8s/Cargo.toml index b393720e..994cc710 100644 --- a/ark/core/k8s/Cargo.toml +++ b/ark/core/k8s/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" [features] default = [] -data = ["schemars", "serde", "strum"] +data = ["anyhow", "regex", "schemars", "serde", "strum", "url"] domain = ["anyhow", "resolv-conf", "tokio/fs"] manager = [ "anyhow", @@ -35,6 +35,7 @@ kube = { workspace = true, optional = true, features = [ "rustls-tls", "ws", ] } +regex = { workspace = true, optional = true } resolv-conf = { workspace = true, optional = true } schemars = { workspace = true, optional = true } serde = { workspace = true, optional = true } @@ -42,3 +43,4 @@ serde_json = { workspace = true, optional = true } strum = { workspace = true, optional = true } tokio = { workspace = true, optional = true } tracing = { workspace = true, optional = true } +url = { workspace = true, optional = true } diff --git a/ark/core/k8s/src/data.rs b/ark/core/k8s/src/data.rs index 12a732df..de7c9fbc 100644 --- a/ark/core/k8s/src/data.rs +++ b/ark/core/k8s/src/data.rs @@ -1,5 +1,9 @@ +use std::{borrow::Borrow, cmp::Ordering, fmt, ops, str::FromStr}; + +use anyhow::{bail, Error}; +use regex::Regex; use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize}; use strum::{Display, EnumString}; #[derive( @@ -10,3 +14,177 @@ pub enum ImagePullPolicy { IfNotPresent, Never, } + +#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, JsonSchema)] +#[serde(transparent)] +pub struct Name(String); + +impl FromStr for Name { + type Err = Error; + + fn from_str(name: &str) -> Result::Err> { + let re = Regex::new(r"^[a-z][a-z0-9_-]*[a-z0-9]?$")?; + if re.is_match(name) { + Ok(Self(name.into())) + } else { + bail!("invalid name: {name:?}") + } + } +} + +impl From for String { + fn from(value: Name) -> Self { + value.0 + } +} + +// #[cfg(feature = "nats")] +// impl From for ::async_nats::Subject { +// fn from(value: Name) -> Self { +// value.0.into() +// } +// } + +impl Borrow for Name { + fn borrow(&self) -> &str { + &self.0 + } +} + +impl Borrow for Name { + fn borrow(&self) -> &String { + &self.0 + } +} + +impl ops::Deref for Name { + type Target = String; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl fmt::Debug for Name { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + ::fmt(&self.0, f) + } +} + +impl fmt::Display for Name { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + ::fmt(&self.0, f) + } +} + +impl PartialEq for Name { + fn eq(&self, other: &String) -> bool { + self.0.eq(other) + } +} + +impl PartialEq for String { + fn eq(&self, other: &Name) -> bool { + self.eq(&other.0) + } +} + +impl PartialOrd for Name { + fn partial_cmp(&self, other: &String) -> Option { + self.0.partial_cmp(other) + } + + fn lt(&self, other: &String) -> bool { + self.0.lt(other) + } + + fn le(&self, other: &String) -> bool { + self.0.le(other) + } + + fn gt(&self, other: &String) -> bool { + self.0.gt(other) + } + + fn ge(&self, other: &String) -> bool { + self.0.ge(other) + } +} + +impl PartialOrd for String { + fn partial_cmp(&self, other: &Name) -> Option { + self.partial_cmp(&other.0) + } + + fn lt(&self, other: &Name) -> bool { + self.lt(&other.0) + } + + fn le(&self, other: &Name) -> bool { + self.le(&other.0) + } + + fn gt(&self, other: &Name) -> bool { + self.gt(&other.0) + } + + fn ge(&self, other: &Name) -> bool { + self.ge(&other.0) + } +} + +impl<'de> Deserialize<'de> for Name { + fn deserialize(deserializer: D) -> Result>::Error> + where + D: Deserializer<'de>, + { + >::deserialize(deserializer) + .and_then(|name| Self::from_str(&name).map_err(::serde::de::Error::custom)) + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(transparent)] +pub struct Url(pub ::url::Url); + +impl FromStr for Url { + type Err = <::url::Url as FromStr>::Err; + + fn from_str(s: &str) -> Result { + <::url::Url as FromStr>::from_str(s).map(Self) + } +} + +impl ops::Deref for Url { + type Target = ::url::Url; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl PartialOrd for Url { + fn partial_cmp(&self, other: &Self) -> Option { + Some(::cmp(self, other)) + } +} + +impl Ord for Url { + fn cmp(&self, other: &Self) -> Ordering { + ::cmp(self.0.as_str(), other.0.as_str()) + } +} + +impl JsonSchema for Url { + fn is_referenceable() -> bool { + false + } + + fn schema_name() -> String { + "Url".into() + } + + fn json_schema(gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema { + String::json_schema(gen) + } +} diff --git a/dash/api/Cargo.toml b/dash/api/Cargo.toml index 8e82cf0f..fab058fc 100644 --- a/dash/api/Cargo.toml +++ b/dash/api/Cargo.toml @@ -17,8 +17,9 @@ version = { workspace = true } # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +ark-core-k8s = { path = "../../ark/core/k8s" } dash-provider-api = { path = "../provider/api" } -vine-api = { path = "../../vine/api" } +straw-api = { path = "../../straw/api" } anyhow = { workspace = true } chrono = { workspace = true } diff --git a/dash/api/src/lib.rs b/dash/api/src/lib.rs index 3fde04bb..51116772 100644 --- a/dash/api/src/lib.rs +++ b/dash/api/src/lib.rs @@ -6,5 +6,5 @@ pub mod storage; pub mod task; pub mod consts { - pub use ::vine_api::consts::NAMESPACE; + pub const NAMESPACE: &str = "dash"; } diff --git a/dash/api/src/pipe.rs b/dash/api/src/pipe.rs index d6f2772d..6b8ada34 100644 --- a/dash/api/src/pipe.rs +++ b/dash/api/src/pipe.rs @@ -1,7 +1,9 @@ +use ark_core_k8s::data::Name; use chrono::{DateTime, Utc}; use kube::CustomResource; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use straw_api::pipe::StrawPipe; use strum::{Display, EnumString}; use crate::model::ModelFieldsNativeSpec; @@ -35,9 +37,24 @@ use crate::model::ModelFieldsNativeSpec; }"# )] #[serde(rename_all = "camelCase")] -pub struct PipeSpec { +pub struct PipeSpec { pub input: Spec, pub output: Spec, + #[serde(default, flatten)] + pub exec: Exec, +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub enum PipeExec { + Placeholder {}, + Straw(StrawPipe), +} + +impl Default for PipeExec { + fn default() -> Self { + Self::Placeholder {} + } } #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)] diff --git a/dash/api/src/storage/db.rs b/dash/api/src/storage/db.rs index 74baac54..c0285b7e 100644 --- a/dash/api/src/storage/db.rs +++ b/dash/api/src/storage/db.rs @@ -1,6 +1,6 @@ +use ark_core_k8s::data::Url; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use vine_api::user_auth::Url; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] diff --git a/dash/api/src/storage/object.rs b/dash/api/src/storage/object.rs index ed52ca78..a6070c96 100644 --- a/dash/api/src/storage/object.rs +++ b/dash/api/src/storage/object.rs @@ -1,11 +1,11 @@ use std::{collections::BTreeMap, net::Ipv4Addr}; +use ark_core_k8s::data::Url; use k8s_openapi::{ api::core::v1::ResourceRequirements, apimachinery::pkg::api::resource::Quantity, }; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use vine_api::user_auth::Url; #[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)] #[serde(rename_all = "camelCase")] diff --git a/dash/controller/Cargo.toml b/dash/controller/Cargo.toml index 1b0f7674..96f7dded 100644 --- a/dash/controller/Cargo.toml +++ b/dash/controller/Cargo.toml @@ -21,6 +21,7 @@ ark-core-k8s = { path = "../../ark/core/k8s", features = ["manager"] } dash-api = { path = "../api" } dash-provider = { path = "../provider" } dash-provider-api = { path = "../provider/api" } +straw-api = { path = "../../straw/api" } anyhow = { workspace = true } async-trait = { workspace = true } diff --git a/dash/controller/src/validator/pipe.rs b/dash/controller/src/validator/pipe.rs index 77c965b3..f49d9207 100644 --- a/dash/controller/src/validator/pipe.rs +++ b/dash/controller/src/validator/pipe.rs @@ -3,10 +3,11 @@ use dash_api::{ model::{ ModelFieldKindExtendedSpec, ModelFieldKindSpec, ModelFieldSpec, ModelFieldsNativeSpec, }, - pipe::PipeSpec, + pipe::{PipeExec, PipeSpec}, }; use dash_provider::storage::KubernetesStorageClient; use kube::Client; +use straw_api::pipe::StrawPipe; use super::model::ModelValidator; @@ -33,11 +34,28 @@ impl<'namespace, 'kube> PipeValidator<'namespace, 'kube> { .await }; - let PipeSpec { input, output } = spec; + let PipeSpec { + input, + output, + exec, + } = spec; Ok(PipeSpec { - input: validate_model(input).await?, - output: validate_model(output).await?, + input: validate_model(input.into()).await?, + output: validate_model(output.into()).await?, + exec: self.validate_exec(exec).await?, }) } + + async fn validate_exec(&self, exec: PipeExec) -> Result { + match exec { + PipeExec::Placeholder {} => Ok(exec), + PipeExec::Straw(exec) => self.validate_exec_straw(exec).await.map(PipeExec::Straw), + } + } + + async fn validate_exec_straw(&self, exec: StrawPipe) -> Result { + // TODO: to be implemented! + Ok(exec) + } } diff --git a/dash/pipe/provider/Cargo.toml b/dash/pipe/provider/Cargo.toml index aaf94d50..261ad244 100644 --- a/dash/pipe/provider/Cargo.toml +++ b/dash/pipe/provider/Cargo.toml @@ -36,6 +36,7 @@ s3 = ["chrono", "minio"] [dependencies] ark-core = { path = "../../../ark/core" } +ark-core-k8s = { path = "../../../ark/core/k8s", features = ["data"] } dash-api = { path = "../../api", optional = true } anyhow = { workspace = true } @@ -47,13 +48,7 @@ bytes = { workspace = true } chrono = { workspace = true, optional = true } clap = { workspace = true } ctrlc = { workspace = true } -deltalake = { workspace = true, optional = true, features = [ - "arrow", - "datafusion", - "json", - "parquet", - "s3", -] } +deltalake = { workspace = true, optional = true, features = ["json", "s3"] } futures = { workspace = true } gethostname = { workspace = true } inflector = { workspace = true, optional = true } @@ -62,7 +57,6 @@ map-macro = { workspace = true, optional = true } minio = { workspace = true, optional = true } pyo3 = { workspace = true, optional = true } rdkafka = { workspace = true, optional = true } -regex = { workspace = true } rmp-serde = { workspace = true } schemars = { workspace = true, features = ["bytes"] } serde = { workspace = true, features = ["derive"] } diff --git a/dash/pipe/provider/src/function.rs b/dash/pipe/provider/src/function.rs index b5cbcc8d..fcd4f5b3 100644 --- a/dash/pipe/provider/src/function.rs +++ b/dash/pipe/provider/src/function.rs @@ -8,6 +8,7 @@ use std::{ }; use anyhow::{anyhow, Error, Result}; +use ark_core_k8s::data::Name; use async_trait::async_trait; use clap::Args; use futures::future::try_join_all; @@ -16,7 +17,7 @@ use serde::{de::DeserializeOwned, Serialize}; use tracing::info; use crate::{ - message::{Name, PipeMessages}, + message::PipeMessages, messengers::{Messenger, MessengerType, Publisher}, storage::StorageIO, }; diff --git a/dash/pipe/provider/src/lib.rs b/dash/pipe/provider/src/lib.rs index 39d94c9c..d7f63da3 100644 --- a/dash/pipe/provider/src/lib.rs +++ b/dash/pipe/provider/src/lib.rs @@ -8,6 +8,8 @@ pub mod messengers; mod pipe; pub mod storage; +pub use ark_core_k8s::data::Name; + #[cfg(feature = "deltalake")] pub use self::function::deltalake::DeltaFunction; pub use self::function::{ @@ -16,6 +18,6 @@ pub use self::function::{ }; #[cfg(feature = "pyo3")] pub use self::message::PyPipeMessage; -pub use self::message::{Name, PipeMessage, PipeMessages, PipePayload}; +pub use self::message::{PipeMessage, PipeMessages, PipePayload}; pub use self::messengers::MessengerType; pub use self::pipe::{DefaultModelIn, PipeArgs}; diff --git a/dash/pipe/provider/src/message.rs b/dash/pipe/provider/src/message.rs index 92b66280..e4e25509 100644 --- a/dash/pipe/provider/src/message.rs +++ b/dash/pipe/provider/src/message.rs @@ -1,12 +1,12 @@ -use std::{borrow::Borrow, cmp::Ordering, collections::HashMap, fmt, ops, str::FromStr}; +use std::collections::HashMap; use anyhow::{bail, Error, Result}; +use ark_core_k8s::data::Name; use bytes::Bytes; use chrono::{DateTime, Utc}; use futures::future::try_join_all; -use regex::Regex; use schemars::JsonSchema; -use serde::{de::DeserializeOwned, Deserialize, Deserializer, Serialize}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use serde_json::Value as DynValue; use crate::storage::{StorageSet, StorageType}; @@ -590,140 +590,6 @@ impl PipePayload { } } -#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, JsonSchema)] -#[serde(transparent)] -pub struct Name(String); - -impl FromStr for Name { - type Err = Error; - - fn from_str(name: &str) -> Result::Err> { - let re = Regex::new(r"^[a-z][a-z0-9_-]*[a-z0-9]?$")?; - if re.is_match(name) { - Ok(Self(name.into())) - } else { - bail!("invalid name: {name:?}") - } - } -} - -impl From for String { - fn from(value: Name) -> Self { - value.0 - } -} - -// #[cfg(feature = "nats")] -// impl From for ::async_nats::Subject { -// fn from(value: Name) -> Self { -// value.0.into() -// } -// } - -impl Borrow for Name { - fn borrow(&self) -> &str { - &self.0 - } -} - -impl Borrow for Name { - fn borrow(&self) -> &String { - &self.0 - } -} - -impl ops::Deref for Name { - type Target = String; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl fmt::Debug for Name { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - ::fmt(&self.0, f) - } -} - -impl fmt::Display for Name { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - ::fmt(&self.0, f) - } -} - -impl PartialEq for Name { - fn eq(&self, other: &String) -> bool { - self.0.eq(other) - } -} - -impl PartialEq for String { - fn eq(&self, other: &Name) -> bool { - self.eq(&other.0) - } -} - -impl PartialOrd for Name { - fn partial_cmp(&self, other: &String) -> Option { - self.0.partial_cmp(other) - } - - fn lt(&self, other: &String) -> bool { - self.0.lt(other) - } - - fn le(&self, other: &String) -> bool { - self.0.le(other) - } - - fn gt(&self, other: &String) -> bool { - self.0.gt(other) - } - - fn ge(&self, other: &String) -> bool { - self.0.ge(other) - } -} - -impl PartialOrd for String { - fn partial_cmp(&self, other: &Name) -> Option { - self.partial_cmp(&other.0) - } - - fn lt(&self, other: &Name) -> bool { - self.lt(&other.0) - } - - fn le(&self, other: &Name) -> bool { - self.le(&other.0) - } - - fn gt(&self, other: &Name) -> bool { - self.gt(&other.0) - } - - fn ge(&self, other: &Name) -> bool { - self.ge(&other.0) - } -} - -impl<'de> Deserialize<'de> for Name { - fn deserialize(deserializer: D) -> Result>::Error> - where - D: Deserializer<'de>, - { - >::deserialize(deserializer) - .and_then(|name| Self::from_str(&name).map_err(::serde::de::Error::custom)) - } -} - -impl Name { - pub(crate) fn new(name: String) -> Self { - Self(name) - } -} - #[derive(Copy, Clone, Debug, PartialEq, Eq)] enum OpCode { // Special opcodes diff --git a/dash/pipe/provider/src/messengers/kafka.rs b/dash/pipe/provider/src/messengers/kafka.rs index 5818fdd5..28f878d0 100644 --- a/dash/pipe/provider/src/messengers/kafka.rs +++ b/dash/pipe/provider/src/messengers/kafka.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use anyhow::{anyhow, bail, Result}; +use ark_core_k8s::data::Name; use async_trait::async_trait; use bytes::Bytes; use clap::Parser; @@ -13,7 +14,7 @@ use rdkafka::{ use serde::{de::DeserializeOwned, Deserialize, Serialize}; use tracing::debug; -use crate::message::{Name, PipeMessage}; +use crate::message::PipeMessage; pub struct Messenger { config: ClientConfig, diff --git a/dash/pipe/provider/src/messengers/mod.rs b/dash/pipe/provider/src/messengers/mod.rs index 995ca844..55ea1987 100644 --- a/dash/pipe/provider/src/messengers/mod.rs +++ b/dash/pipe/provider/src/messengers/mod.rs @@ -6,6 +6,7 @@ mod nats; use std::sync::Arc; use anyhow::Result; +use ark_core_k8s::data::Name; use async_trait::async_trait; use bytes::Bytes; use clap::Parser; @@ -14,7 +15,7 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize}; use strum::{Display, EnumString}; use tracing::debug; -use crate::message::{Name, PipeMessage}; +use crate::message::PipeMessage; pub async fn init_messenger( args: &MessengerArgs, diff --git a/dash/pipe/provider/src/messengers/nats.rs b/dash/pipe/provider/src/messengers/nats.rs index b051be2e..611e1aab 100644 --- a/dash/pipe/provider/src/messengers/nats.rs +++ b/dash/pipe/provider/src/messengers/nats.rs @@ -1,6 +1,7 @@ use std::{path::PathBuf, sync::Arc}; use anyhow::{anyhow, bail, Result}; +use ark_core_k8s::data::Name; use async_nats::{Client, ServerAddr, ToServerAddrs}; use async_trait::async_trait; use bytes::Bytes; @@ -9,7 +10,7 @@ use serde::{de::DeserializeOwned, Deserialize, Serialize}; use tokio_stream::StreamExt; use tracing::debug; -use crate::message::{Name, PipeMessage}; +use crate::message::PipeMessage; pub struct Messenger { client: Arc, diff --git a/dash/pipe/provider/src/pipe.rs b/dash/pipe/provider/src/pipe.rs index 9839b1d2..ac1d9596 100644 --- a/dash/pipe/provider/src/pipe.rs +++ b/dash/pipe/provider/src/pipe.rs @@ -9,6 +9,7 @@ use std::{ }; use anyhow::{anyhow, Result}; +use ark_core_k8s::data::Name; use clap::{ArgAction, Parser}; use futures::Future; use schemars::JsonSchema; @@ -24,7 +25,7 @@ use tracing::{debug, error, info, warn}; use crate::{ function::{Function, FunctionBuilder, FunctionContext}, - message::{Name, PipeMessages}, + message::PipeMessages, messengers::{init_messenger, MessengerArgs, Publisher, Subscriber}, storage::{MetadataStorageArgs, MetadataStorageType, StorageIO, StorageSet}, PipeMessage, PipePayload, diff --git a/dash/pipe/provider/src/storage/lakehouse/mod.rs b/dash/pipe/provider/src/storage/lakehouse/mod.rs index ca7c220e..b1be0e21 100644 --- a/dash/pipe/provider/src/storage/lakehouse/mod.rs +++ b/dash/pipe/provider/src/storage/lakehouse/mod.rs @@ -4,6 +4,7 @@ pub mod schema; use std::{collections::HashMap, io::Cursor, ops, sync::Arc}; use anyhow::{anyhow, bail, Result}; +use ark_core_k8s::data::Name; use async_recursion::async_recursion; use async_trait::async_trait; use deltalake::{ @@ -20,7 +21,7 @@ use serde_json::json; use tokio::sync::Mutex; use tracing::{debug, info}; -use crate::message::{Name, PipeMessage}; +use crate::message::PipeMessage; use self::{decoder::TryIntoTableDecoder, schema::FieldColumns}; @@ -123,7 +124,7 @@ impl super::MetadataStorageMut for MaybeStorageTable { pub struct StorageTable { ctx: StorageContext, - model: Name, + model: String, table: Arc, writer: Option, } @@ -283,9 +284,9 @@ impl StorageContext { region, secret_key, }: super::StorageS3Args, - model: &Name, + model: &str, fields: Option, - ) -> Result<(Name, Arc, bool)> { + ) -> Result<(String, Arc, bool)> { let mut table = { let allow_http = s3_endpoint.scheme() == "http"; let table_uri = format!( @@ -344,12 +345,12 @@ impl StorageContext { } }; - let model = Name::new(model.to_snake_case()); + let model = model.to_snake_case(); let table = Arc::new(table); if has_inited { self.session - .register_table(model.as_str(), table.clone()) + .register_table(&model, table.clone()) .map_err(|error| anyhow!("failed to load DeltaLake metadata session: {error}"))?; } diff --git a/dash/pipe/provider/src/storage/mod.rs b/dash/pipe/provider/src/storage/mod.rs index 65c2d7c5..cb63d8f4 100644 --- a/dash/pipe/provider/src/storage/mod.rs +++ b/dash/pipe/provider/src/storage/mod.rs @@ -6,6 +6,7 @@ pub mod s3; use std::{marker::PhantomData, pin::Pin, sync::Arc}; use anyhow::{anyhow, Result}; +use ark_core_k8s::data::Name; use async_stream::try_stream; use async_trait::async_trait; use bytes::Bytes; @@ -17,10 +18,7 @@ use strum::{Display, EnumString}; use tracing::debug; use url::Url; -use crate::{ - function::FunctionContext, - message::{Name, PipeMessage}, -}; +use crate::{function::FunctionContext, message::PipeMessage}; pub struct StorageIO { pub input: Arc, diff --git a/dash/pipe/provider/src/storage/s3.rs b/dash/pipe/provider/src/storage/s3.rs index fc729b0a..d1367598 100644 --- a/dash/pipe/provider/src/storage/s3.rs +++ b/dash/pipe/provider/src/storage/s3.rs @@ -1,4 +1,5 @@ use anyhow::{anyhow, bail, Error, Result}; +use ark_core_k8s::data::Name; use async_trait::async_trait; use bytes::Bytes; use chrono::{SecondsFormat, Utc}; @@ -11,8 +12,6 @@ use minio::s3::{ }; use tracing::debug; -use crate::message::Name; - #[derive(Clone)] pub struct Storage { base_url: BaseUrl, diff --git a/dash/query/cli/src/main.rs b/dash/query/cli/src/main.rs index 21cf5a03..c425666b 100644 --- a/dash/query/cli/src/main.rs +++ b/dash/query/cli/src/main.rs @@ -2,7 +2,7 @@ use std::{cell::RefCell, process::exit}; use anyhow::{anyhow, bail, Result}; use clap::{ArgAction, Parser}; -use dash_query_provider::{Name, QueryClient, QueryClientArgs}; +use dash_query_provider::{QueryClient, QueryClientArgs}; use futures::TryStreamExt; use inquire::{autocompletion::Replacement, Autocomplete, CustomUserError, Text}; use serde_json::Value; @@ -49,7 +49,7 @@ async fn try_main() -> Result<()> { } } -async fn try_main_interactive(client: QueryClient, table_sample: Name) { +async fn try_main_interactive(client: QueryClient, table_sample: String) { #[derive(Clone, Default)] struct History(RefCell>); diff --git a/dash/query/provider/src/function.rs b/dash/query/provider/src/function.rs index 81ae647a..4815f587 100644 --- a/dash/query/provider/src/function.rs +++ b/dash/query/provider/src/function.rs @@ -1,6 +1,7 @@ use std::{fmt, sync::Arc}; use anyhow::Result; +use dash_api::pipe::PipeSpec; use dash_pipe_provider::{ deltalake::{ arrow::datatypes::{DataType, Schema}, @@ -21,22 +22,15 @@ use tracing::debug; pub(crate) struct DashFunctionTemplate { name: Name, model_in: Name, - input: Arc, - output: Arc, + spec: RemotePipeSpec, } impl DashFunctionTemplate { - pub(crate) fn new( - name: Name, - model_in: Name, - input: Arc, - output: Arc, - ) -> Result { + pub(crate) fn new(name: Name, model_in: Name, spec: RemotePipeSpec) -> Result { Ok(Self { name, model_in, - input, - output, + spec, }) } @@ -46,8 +40,12 @@ impl DashFunctionTemplate { let Self { name, model_in, - input: input_schema, - output: output_schema, + spec: + PipeSpec { + input: input_schema, + output: output_schema, + exec: (), + }, } = self; let input = DataType::Struct(input_schema.fields().clone()); @@ -102,8 +100,12 @@ impl fmt::Display for DashFunctionTemplate { let Self { name, model_in: _, - input, - output: _, + spec: + PipeSpec { + input, + output: _, + exec: _, + }, } = self; write!(f, "{name}(")?; @@ -116,3 +118,5 @@ impl fmt::Display for DashFunctionTemplate { write!(f, ")") } } + +type RemotePipeSpec = PipeSpec, ()>; diff --git a/dash/query/provider/src/lib.rs b/dash/query/provider/src/lib.rs index 22b37ab0..69cd4e8f 100644 --- a/dash/query/provider/src/lib.rs +++ b/dash/query/provider/src/lib.rs @@ -47,7 +47,7 @@ pub struct QueryClientArgs { #[derive(Clone)] pub struct QueryClient { ctx: StorageContext, - tables: BTreeMap>, + tables: BTreeMap>, } impl QueryClient { @@ -93,7 +93,7 @@ impl QueryClient { Ok(Self { ctx, tables }) } - pub fn list_table_names(&self) -> Keys<'_, Name, Arc> { + pub fn list_table_names(&self) -> Keys<'_, String, Arc> { self.tables.keys() } @@ -137,8 +137,15 @@ impl QueryClient { async fn load_models<'a>( kube: &'a Client, namespace: &'a str, -) -> Result> + 'a)> + 'a> -{ +) -> Result< + impl Iterator< + Item = ( + String, + String, + impl Future> + 'a, + ), + > + 'a, +> { let api = Api::::namespaced(kube.clone(), namespace); let lp = ListParams::default(); let bindings = api.list(&lp).await?.items; @@ -151,8 +158,8 @@ async fn load_models<'a>( (model_name, storage_name) }) .filter_map(move |binding| { - let model_name: Name = binding.spec.model.parse().ok()?; - let storage_name: Name = binding.spec.storage.target().parse().ok()?; + let model_name = binding.spec.model; + let storage_name = binding.spec.storage.target().clone(); let status = binding.status?; if !matches!(status.state, ModelStorageBindingState::Ready) { @@ -196,11 +203,11 @@ async fn load_models<'a>( async fn load_functions( kube: &Client, - tables: &BTreeMap>, + tables: &BTreeMap>, namespace: &str, ) -> Result> { async fn get_model_schema( - tables: &BTreeMap>, + tables: &BTreeMap>, name: &str, ) -> Result> { match tables.get(&name.to_snake_case()) { @@ -228,16 +235,16 @@ async fn load_functions( let PipeSpec { input: model_in, output: model_out, + exec: _, } = function.spec; - let model_in: Name = model_in.parse().ok()?; - let model_out: Name = model_out.parse().ok()?; - Some(async move { - let input = get_model_schema(tables, &model_in).await?; - let output = get_model_schema(tables, &model_out).await?; - - self::function::DashFunctionTemplate::new(name, model_in, input, output) + let spec = PipeSpec { + input: get_model_schema(tables, &model_in).await?, + output: get_model_schema(tables, &model_out).await?, + exec: (), + }; + self::function::DashFunctionTemplate::new(name, model_in, spec) }) })) .await diff --git a/straw/api/Cargo.toml b/straw/api/Cargo.toml new file mode 100644 index 00000000..e8718898 --- /dev/null +++ b/straw/api/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "straw-api" + +authors = { workspace = true } +description = { workspace = true } +documentation = { workspace = true } +edition = { workspace = true } +include = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +readme = { workspace = true } +rust-version = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +version = { workspace = true } + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +ark-core-k8s = { path = "../../ark/core/k8s", features = ["data"] } + +schemars = { workspace = true } +serde = { workspace = true } diff --git a/straw/api/src/lib.rs b/straw/api/src/lib.rs new file mode 100644 index 00000000..97a59c2c --- /dev/null +++ b/straw/api/src/lib.rs @@ -0,0 +1 @@ +pub mod pipe; diff --git a/straw/api/src/pipe.rs b/straw/api/src/pipe.rs new file mode 100644 index 00000000..82839045 --- /dev/null +++ b/straw/api/src/pipe.rs @@ -0,0 +1,23 @@ +use std::collections::BTreeMap; + +use ark_core_k8s::data::Url; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)] +#[serde(transparent)] +pub struct StrawPipe { + pub straw: Vec, +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct StrawNode { + pub name: String, + #[serde(default)] + pub params: StrawParams, + #[serde(default)] + pub repo: Option, +} + +pub type StrawParams = BTreeMap; diff --git a/straw/provider/Cargo.toml b/straw/provider/Cargo.toml new file mode 100644 index 00000000..ad5c1d84 --- /dev/null +++ b/straw/provider/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "straw-provider" + +authors = { workspace = true } +description = { workspace = true } +documentation = { workspace = true } +edition = { workspace = true } +include = { workspace = true } +keywords = { workspace = true } +license = { workspace = true } +readme = { workspace = true } +rust-version = { workspace = true } +homepage = { workspace = true } +repository = { workspace = true } +version = { workspace = true } + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +straw-api = { path = "../api" } + +anyhow = { workspace = true } diff --git a/straw/provider/src/lib.rs b/straw/provider/src/lib.rs new file mode 100644 index 00000000..8f735f4e --- /dev/null +++ b/straw/provider/src/lib.rs @@ -0,0 +1,3 @@ +pub struct StrawSession {} + +// TODO: Straw Offloading 엔진 구현 diff --git a/templates/dash/examples/test-pipe-identity.yaml b/templates/dash/examples/test-pipe-identity.yaml index 02ae5a44..b5c131db 100644 --- a/templates/dash/examples/test-pipe-identity.yaml +++ b/templates/dash/examples/test-pipe-identity.yaml @@ -23,3 +23,4 @@ metadata: spec: input: identity-input output: identity-output + placeholder: {} diff --git a/vine/api/src/user_auth.rs b/vine/api/src/user_auth.rs index bf522e59..508c37f0 100644 --- a/vine/api/src/user_auth.rs +++ b/vine/api/src/user_auth.rs @@ -1,6 +1,5 @@ -use std::{cmp::Ordering, ops::Deref, str::FromStr}; - use anyhow::{bail, Result}; +use ark_core_k8s::data::Url; use k8s_openapi::api::core::v1::NodeSpec; use kube::CustomResource; use schemars::JsonSchema; @@ -149,49 +148,3 @@ pub enum UserAuthError { PrimaryKeyMalformed, UserNotRegistered, } - -#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] -#[serde(transparent)] -pub struct Url(pub ::url::Url); - -impl FromStr for Url { - type Err = <::url::Url as FromStr>::Err; - - fn from_str(s: &str) -> Result { - <::url::Url as FromStr>::from_str(s).map(Self) - } -} - -impl Deref for Url { - type Target = ::url::Url; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl PartialOrd for Url { - fn partial_cmp(&self, other: &Self) -> Option { - Some(::cmp(self, other)) - } -} - -impl Ord for Url { - fn cmp(&self, other: &Self) -> Ordering { - ::cmp(self.0.as_str(), other.0.as_str()) - } -} - -impl JsonSchema for Url { - fn is_referenceable() -> bool { - false - } - - fn schema_name() -> String { - "Url".into() - } - - fn json_schema(gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema { - String::json_schema(gen) - } -}