Skip to content

Commit

Permalink
New project: Straw
Browse files Browse the repository at this point in the history
  • Loading branch information
HoKim98 committed Oct 30, 2023
1 parent f7cd62b commit b4e434f
Show file tree
Hide file tree
Showing 31 changed files with 379 additions and 254 deletions.
8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ members = [
"proxyx/provider/api", # exclude(alpine)
"proxyx/provider/chatgpt", # exclude(alpine)
"proxyx/storage/influxdb", # exclude(alpine)
"straw/api",
"straw/provider",
"vine/api",
"vine/bastion",
"vine/cli",
Expand Down Expand Up @@ -87,7 +89,11 @@ chrono = { version = "0.4", features = ["serde"] }
clap = { version = "4.4", features = ["env", "derive"] }
csv = { version = "1.3" }
ctrlc = { version = "3.4" }
deltalake = { git = "https://github.com/delta-io/delta-rs.git", version = "0.17", default-features = false }
deltalake = { git = "https://github.com/delta-io/delta-rs.git", version = "0.17", default-features = false, features = [
"arrow",
"datafusion",
"parquet",
] }
email_address = { version = "0.2" }
futures = { version = "0.3" }
gethostname = { version = "0.4" }
Expand Down
4 changes: 3 additions & 1 deletion ark/core/k8s/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ edition = "2021"

[features]
default = []
data = ["schemars", "serde", "strum"]
data = ["anyhow", "regex", "schemars", "serde", "strum", "url"]
domain = ["anyhow", "resolv-conf", "tokio/fs"]
manager = [
"anyhow",
Expand Down Expand Up @@ -35,10 +35,12 @@ kube = { workspace = true, optional = true, features = [
"rustls-tls",
"ws",
] }
regex = { workspace = true, optional = true }
resolv-conf = { workspace = true, optional = true }
schemars = { workspace = true, optional = true }
serde = { workspace = true, optional = true }
serde_json = { workspace = true, optional = true }
strum = { workspace = true, optional = true }
tokio = { workspace = true, optional = true }
tracing = { workspace = true, optional = true }
url = { workspace = true, optional = true }
180 changes: 179 additions & 1 deletion ark/core/k8s/src/data.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use std::{borrow::Borrow, cmp::Ordering, fmt, ops, str::FromStr};

use anyhow::{bail, Error};
use regex::Regex;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize};
use strum::{Display, EnumString};

#[derive(
Expand All @@ -10,3 +14,177 @@ pub enum ImagePullPolicy {
IfNotPresent,
Never,
}

#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, JsonSchema)]
#[serde(transparent)]
pub struct Name(String);

impl FromStr for Name {
type Err = Error;

fn from_str(name: &str) -> Result<Self, <Self as FromStr>::Err> {
let re = Regex::new(r"^[a-z][a-z0-9_-]*[a-z0-9]?$")?;
if re.is_match(name) {
Ok(Self(name.into()))
} else {
bail!("invalid name: {name:?}")
}
}
}

impl From<Name> for String {
fn from(value: Name) -> Self {
value.0
}
}

// #[cfg(feature = "nats")]
// impl From<Name> for ::async_nats::Subject {
// fn from(value: Name) -> Self {
// value.0.into()
// }
// }

impl Borrow<str> for Name {
fn borrow(&self) -> &str {
&self.0
}
}

impl Borrow<String> for Name {
fn borrow(&self) -> &String {
&self.0
}
}

impl ops::Deref for Name {
type Target = String;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl fmt::Debug for Name {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
<String as fmt::Debug>::fmt(&self.0, f)
}
}

impl fmt::Display for Name {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
<String as fmt::Display>::fmt(&self.0, f)
}
}

impl PartialEq<String> for Name {
fn eq(&self, other: &String) -> bool {
self.0.eq(other)
}
}

impl PartialEq<Name> for String {
fn eq(&self, other: &Name) -> bool {
self.eq(&other.0)
}
}

impl PartialOrd<String> for Name {
fn partial_cmp(&self, other: &String) -> Option<Ordering> {
self.0.partial_cmp(other)
}

fn lt(&self, other: &String) -> bool {
self.0.lt(other)
}

fn le(&self, other: &String) -> bool {
self.0.le(other)
}

fn gt(&self, other: &String) -> bool {
self.0.gt(other)
}

fn ge(&self, other: &String) -> bool {
self.0.ge(other)
}
}

impl PartialOrd<Name> for String {
fn partial_cmp(&self, other: &Name) -> Option<Ordering> {
self.partial_cmp(&other.0)
}

fn lt(&self, other: &Name) -> bool {
self.lt(&other.0)
}

fn le(&self, other: &Name) -> bool {
self.le(&other.0)
}

fn gt(&self, other: &Name) -> bool {
self.gt(&other.0)
}

fn ge(&self, other: &Name) -> bool {
self.ge(&other.0)
}
}

impl<'de> Deserialize<'de> for Name {
fn deserialize<D>(deserializer: D) -> Result<Self, <D as Deserializer<'de>>::Error>
where
D: Deserializer<'de>,
{
<String as Deserialize<'de>>::deserialize(deserializer)
.and_then(|name| Self::from_str(&name).map_err(::serde::de::Error::custom))
}
}

#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(transparent)]
pub struct Url(pub ::url::Url);

impl FromStr for Url {
type Err = <::url::Url as FromStr>::Err;

fn from_str(s: &str) -> Result<Self, Self::Err> {
<::url::Url as FromStr>::from_str(s).map(Self)
}
}

impl ops::Deref for Url {
type Target = ::url::Url;

fn deref(&self) -> &Self::Target {
&self.0
}
}

impl PartialOrd for Url {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(<Self as Ord>::cmp(self, other))
}
}

impl Ord for Url {
fn cmp(&self, other: &Self) -> Ordering {
<str as Ord>::cmp(self.0.as_str(), other.0.as_str())
}
}

impl JsonSchema for Url {
fn is_referenceable() -> bool {
false
}

fn schema_name() -> String {
"Url".into()
}

fn json_schema(gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema {
String::json_schema(gen)
}
}
3 changes: 2 additions & 1 deletion dash/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ version = { workspace = true }
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
ark-core-k8s = { path = "../../ark/core/k8s" }
dash-provider-api = { path = "../provider/api" }
vine-api = { path = "../../vine/api" }
straw-api = { path = "../../straw/api" }

anyhow = { workspace = true }
chrono = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion dash/api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ pub mod storage;
pub mod task;

pub mod consts {
pub use ::vine_api::consts::NAMESPACE;
pub const NAMESPACE: &str = "dash";
}
19 changes: 18 additions & 1 deletion dash/api/src/pipe.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use ark_core_k8s::data::Name;
use chrono::{DateTime, Utc};
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use straw_api::pipe::StrawPipe;
use strum::{Display, EnumString};

use crate::model::ModelFieldsNativeSpec;
Expand Down Expand Up @@ -35,9 +37,24 @@ use crate::model::ModelFieldsNativeSpec;
}"#
)]
#[serde(rename_all = "camelCase")]
pub struct PipeSpec<Spec = String> {
pub struct PipeSpec<Spec = Name, Exec = PipeExec> {
pub input: Spec,
pub output: Spec,
#[serde(default, flatten)]
pub exec: Exec,
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub enum PipeExec {
Placeholder {},
Straw(StrawPipe),
}

impl Default for PipeExec {
fn default() -> Self {
Self::Placeholder {}
}
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)]
Expand Down
2 changes: 1 addition & 1 deletion dash/api/src/storage/db.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use ark_core_k8s::data::Url;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use vine_api::user_auth::Url;

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
Expand Down
2 changes: 1 addition & 1 deletion dash/api/src/storage/object.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::{collections::BTreeMap, net::Ipv4Addr};

use ark_core_k8s::data::Url;
use k8s_openapi::{
api::core::v1::ResourceRequirements, apimachinery::pkg::api::resource::Quantity,
};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use vine_api::user_auth::Url;

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
Expand Down
1 change: 1 addition & 0 deletions dash/controller/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ ark-core-k8s = { path = "../../ark/core/k8s", features = ["manager"] }
dash-api = { path = "../api" }
dash-provider = { path = "../provider" }
dash-provider-api = { path = "../provider/api" }
straw-api = { path = "../../straw/api" }

anyhow = { workspace = true }
async-trait = { workspace = true }
Expand Down
26 changes: 22 additions & 4 deletions dash/controller/src/validator/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ use dash_api::{
model::{
ModelFieldKindExtendedSpec, ModelFieldKindSpec, ModelFieldSpec, ModelFieldsNativeSpec,
},
pipe::PipeSpec,
pipe::{PipeExec, PipeSpec},
};
use dash_provider::storage::KubernetesStorageClient;
use kube::Client;
use straw_api::pipe::StrawPipe;

use super::model::ModelValidator;

Expand All @@ -33,11 +34,28 @@ impl<'namespace, 'kube> PipeValidator<'namespace, 'kube> {
.await
};

let PipeSpec { input, output } = spec;
let PipeSpec {
input,
output,
exec,
} = spec;

Ok(PipeSpec {
input: validate_model(input).await?,
output: validate_model(output).await?,
input: validate_model(input.into()).await?,
output: validate_model(output.into()).await?,
exec: self.validate_exec(exec).await?,
})
}

async fn validate_exec(&self, exec: PipeExec) -> Result<PipeExec> {
match exec {
PipeExec::Placeholder {} => Ok(exec),
PipeExec::Straw(exec) => self.validate_exec_straw(exec).await.map(PipeExec::Straw),
}
}

async fn validate_exec_straw(&self, exec: StrawPipe) -> Result<StrawPipe> {
// TODO: to be implemented!
Ok(exec)
}
}
10 changes: 2 additions & 8 deletions dash/pipe/provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ s3 = ["chrono", "minio"]

[dependencies]
ark-core = { path = "../../../ark/core" }
ark-core-k8s = { path = "../../../ark/core/k8s", features = ["data"] }
dash-api = { path = "../../api", optional = true }

anyhow = { workspace = true }
Expand All @@ -47,13 +48,7 @@ bytes = { workspace = true }
chrono = { workspace = true, optional = true }
clap = { workspace = true }
ctrlc = { workspace = true }
deltalake = { workspace = true, optional = true, features = [
"arrow",
"datafusion",
"json",
"parquet",
"s3",
] }
deltalake = { workspace = true, optional = true, features = ["json", "s3"] }
futures = { workspace = true }
gethostname = { workspace = true }
inflector = { workspace = true, optional = true }
Expand All @@ -62,7 +57,6 @@ map-macro = { workspace = true, optional = true }
minio = { workspace = true, optional = true }
pyo3 = { workspace = true, optional = true }
rdkafka = { workspace = true, optional = true }
regex = { workspace = true }
rmp-serde = { workspace = true }
schemars = { workspace = true, features = ["bytes"] }
serde = { workspace = true, features = ["derive"] }
Expand Down
Loading

0 comments on commit b4e434f

Please sign in to comment.