From 891c5d35944e0051b08b1bd31fe8210f37622283 Mon Sep 17 00:00:00 2001 From: Ho Kim Date: Mon, 2 Oct 2023 03:42:36 +0900 Subject: [PATCH] Begin implementation of dash pipe --- Cargo.toml | 11 +- dash/pipe/functions/identity/Cargo.toml | 20 ++ dash/pipe/functions/identity/src/main.rs | 11 + .../functions/loader-webcam}/Cargo.toml | 9 +- dash/pipe/functions/loader-webcam/src/main.rs | 12 + dash/pipe/provider/Cargo.toml | 30 +++ dash/pipe/provider/src/engine.rs | 215 ++++++++++++++++++ dash/pipe/provider/src/lib.rs | 6 + dash/pipe/provider/src/message.rs | 162 +++++++++++++ dash/pipe/provider/src/storage/lakehouse.rs | 113 +++++++++ dash/pipe/provider/src/storage/mod.rs | 82 +++++++ dash/pipe/provider/src/storage/nats.rs | 76 +++++++ dash/plugins/storage-sync/src/lib.rs | 14 -- dash/router/Cargo.toml | 1 - templates/dash/nats/values.yaml | 23 ++ 15 files changed, 767 insertions(+), 18 deletions(-) create mode 100644 dash/pipe/functions/identity/Cargo.toml create mode 100644 dash/pipe/functions/identity/src/main.rs rename dash/{plugins/storage-sync => pipe/functions/loader-webcam}/Cargo.toml (73%) create mode 100644 dash/pipe/functions/loader-webcam/src/main.rs create mode 100644 dash/pipe/provider/Cargo.toml create mode 100644 dash/pipe/provider/src/engine.rs create mode 100644 dash/pipe/provider/src/lib.rs create mode 100644 dash/pipe/provider/src/message.rs create mode 100644 dash/pipe/provider/src/storage/lakehouse.rs create mode 100644 dash/pipe/provider/src/storage/mod.rs create mode 100644 dash/pipe/provider/src/storage/nats.rs delete mode 100644 dash/plugins/storage-sync/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 1175e7ce..19a0840c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,9 @@ members = [ "dash/controller", "dash/gateway", "dash/openapi", - "dash/plugins/storage-sync", + "dash/pipe/functions/identity", + "dash/pipe/functions/loader-webcam", + "dash/pipe/provider", "dash/provider", "dash/provider/api", "dash/provider/client", @@ -63,6 +65,12 @@ chrono = { version = "=0.4", features = ["serde"] } clap = { version = "=4.4", features = ["env", "derive"] } csv = { version = "=1.2" } ctrlc = { version = "=3.4" } +deltalake = { version = "0.16", features = [ + "arrow", + "datafusion", + "parquet", + "s3", +] } email_address = { version = "=0.2" } futures = { version = "=0.3" } gethostname = { version = "=0.4" } @@ -87,6 +95,7 @@ mime = { version = "=0.3" } minio = { git = "https://github.com/ulagbulag/minio-rs.git", default-features = false, rev = "06d98675b4457e9139f6c01cea4a0659cb82c82c", features = [ "rustls-tls", ] } # not deployed to crates.io +nats = { package = "async-nats", version = "0.32" } ndarray = { version = "=0.15", features = ["serde"] } num-traits = { version = "=0.2" } octocrab = { git = "https://github.com/ulagbulag/octocrab.git", default-features = false, features = [ diff --git a/dash/pipe/functions/identity/Cargo.toml b/dash/pipe/functions/identity/Cargo.toml new file mode 100644 index 00000000..4bf7ed18 --- /dev/null +++ b/dash/pipe/functions/identity/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "dash-pipe-function-identity" +version = "0.1.0" +edition = "2021" + +authors = ["Ho Kim "] +description = "Kubernetes Is Simple, Stupid which a part of OpenARK" +documentation = "https://docs.rs/kiss-api" +license = "GPL-3.0-or-later WITH Classpath-exception-2.0" +readme = "../../README.md" +homepage = "https://github.com/ulagbulag/OpenARK" +repository = "https://github.com/ulagbulag/OpenARK" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dash-pipe-provider = { path = "../../provider" } + +anyhow = { workspace = true } +serde_json = { workspace = true } diff --git a/dash/pipe/functions/identity/src/main.rs b/dash/pipe/functions/identity/src/main.rs new file mode 100644 index 00000000..e7e1e3ff --- /dev/null +++ b/dash/pipe/functions/identity/src/main.rs @@ -0,0 +1,11 @@ +use anyhow::Result; +use dash_pipe_provider::{PipeEngine, PipeMessages}; +use serde_json::Value; + +fn main() { + PipeEngine::from_env().loop_forever(tick) +} + +async fn tick(input: PipeMessages) -> Result> { + Ok(input) +} diff --git a/dash/plugins/storage-sync/Cargo.toml b/dash/pipe/functions/loader-webcam/Cargo.toml similarity index 73% rename from dash/plugins/storage-sync/Cargo.toml rename to dash/pipe/functions/loader-webcam/Cargo.toml index a1895c2b..b03ffef6 100644 --- a/dash/plugins/storage-sync/Cargo.toml +++ b/dash/pipe/functions/loader-webcam/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "dash-plugin-storage-sync" +name = "dash-pipe-function-loader-webcam" version = "0.1.0" edition = "2021" @@ -7,8 +7,13 @@ authors = ["Ho Kim "] description = "Kubernetes Is Simple, Stupid which a part of OpenARK" documentation = "https://docs.rs/kiss-api" license = "GPL-3.0-or-later WITH Classpath-exception-2.0" -readme = "../../../README.md" +readme = "../../README.md" homepage = "https://github.com/ulagbulag/OpenARK" repository = "https://github.com/ulagbulag/OpenARK" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +dash-pipe-provider = { path = "../../provider" } + +anyhow = { workspace = true } diff --git a/dash/pipe/functions/loader-webcam/src/main.rs b/dash/pipe/functions/loader-webcam/src/main.rs new file mode 100644 index 00000000..759994f5 --- /dev/null +++ b/dash/pipe/functions/loader-webcam/src/main.rs @@ -0,0 +1,12 @@ +use anyhow::Result; +use dash_pipe_provider::{PipeEngine, PipeMessages}; + +fn main() { + PipeEngine::from_env().loop_forever(tick) +} + +async fn tick(input: PipeMessages) -> Result> { + // TODO: to be implemented + dbg!(&input); + Ok(input) +} diff --git a/dash/pipe/provider/Cargo.toml b/dash/pipe/provider/Cargo.toml new file mode 100644 index 00000000..a3895b79 --- /dev/null +++ b/dash/pipe/provider/Cargo.toml @@ -0,0 +1,30 @@ +[package] +name = "dash-pipe-provider" +version = "0.1.0" +edition = "2021" + +authors = ["Ho Kim "] +description = "Kubernetes Is Simple, Stupid which a part of OpenARK" +documentation = "https://docs.rs/kiss-api" +license = "GPL-3.0-or-later WITH Classpath-exception-2.0" +readme = "../../README.md" +homepage = "https://github.com/ulagbulag/OpenARK" +repository = "https://github.com/ulagbulag/OpenARK" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +ark-core = { path = "../../../ark/core" } + +anyhow = { workspace = true } +async-trait = { workspace = true } +bytes = { workspace = true } +clap = { workspace = true } +deltalake = { workspace = true } +futures = { workspace = true } +log = { workspace = true } +nats = { workspace = true } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } +tokio = { workspace = true, features = ["full"] } +url = { workspace = true } diff --git a/dash/pipe/provider/src/engine.rs b/dash/pipe/provider/src/engine.rs new file mode 100644 index 00000000..90769d99 --- /dev/null +++ b/dash/pipe/provider/src/engine.rs @@ -0,0 +1,215 @@ +use std::future::Future; + +use anyhow::{anyhow, bail, Result}; +use clap::Parser; +use futures::{StreamExt, TryFutureExt}; +use log::warn; +use nats::ToServerAddrs; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use tokio::task::yield_now; + +use crate::{ + message::PipeMessages, + storage::{StorageSet, StorageType}, +}; + +#[derive(Clone, Debug, Parser, Serialize, Deserialize)] +pub struct PipeEngine { + #[arg(long, env = "NATS_ADDRS", value_name = "ADDR")] + addrs: Vec, + + #[arg(long, env = "PIPE_BATCH_SIZE", value_name = "BATCH_SIZE")] + #[serde(default)] + batch_size: Option, + + #[arg(long, env = "PIPE_PERSISTENCE")] + #[serde(default)] + persistence: Option, + + #[arg(long, env = "PIPE_REPLY")] + #[serde(default)] + reply: Option, + + #[command(flatten)] + storage: crate::storage::StorageArgs, + + #[arg(long, env = "PIPE_STREAM_IN", value_name = "NAME")] + #[serde(default)] + stream_in: Option, + + #[arg(long, env = "PIPE_STREAM_OUT", value_name = "NAME")] + #[serde(default)] + stream_out: Option, +} + +impl PipeEngine { + pub fn from_env() -> Self { + Self::parse() + } + + pub fn with_batch_size(mut self, batch_size: usize) -> Self { + self.batch_size = Some(batch_size); + self + } + + pub fn with_persistence(mut self, persistence: bool) -> Self { + self.persistence = Some(persistence); + self + } + + pub fn with_reply(mut self, reply: bool) -> Self { + self.reply = Some(reply); + self + } + + pub fn with_stream_in(mut self, stream_in: String) -> Self { + self.stream_in = Some(stream_in); + self + } + + pub fn with_stream_out(mut self, stream_out: String) -> Self { + self.stream_out = Some(stream_out); + self + } +} + +impl PipeEngine { + pub fn loop_forever(&self, tick: F) + where + F: Fn(PipeMessages) -> Fut, + Fut: Future>>, + Input: DeserializeOwned, + Output: Serialize, + { + ::ark_core::logger::init_once(); + + if let Err(error) = ::tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .expect("failed to init tokio runtime") + .block_on(self.loop_forever_async(tick)) + { + panic!("{error}") + } + } + + async fn loop_forever_async(&self, tick: F) -> Result<()> + where + F: Fn(PipeMessages) -> Fut, + Fut: Future>>, + Input: DeserializeOwned, + Output: Serialize, + { + // init client + let client = { + let addrs = self + .addrs + .iter() + .flat_map(|addr| { + addr.to_server_addrs() + .map_err(|error| anyhow!("failed to parse NATS address: {error}")) + }) + .flatten() + .collect::>(); + match ::nats::connect(addrs).await { + Ok(client) => client, + Err(error) => bail!("failed to init NATS client: {error}"), + } + }; + + // init streams + let mut input_stream = match &self.stream_in { + Some(stream) => match client.subscribe(stream.clone()).await { + Ok(stream) => Some(stream), + Err(error) => bail!("failed to init NATS input stream: {error}"), + }, + None => None, + }; + + // init storages + let storage = { + let default_output = match self.persistence { + Some(true) => StorageType::LakeHouse, + Some(false) | None => StorageType::Nats, + }; + StorageSet::try_new(&self.storage, &client, "myobjbucket", default_output).await? + }; + + 'main: loop { + // yield per every loop + yield_now().await; + + let inputs = match &mut input_stream { + // TODO: to be implemented + Some(stream) => match self.batch_size { + Some(batch_size) => { + let mut inputs = vec![]; + for _ in 0..batch_size { + match stream.next().await.map(TryInto::try_into).transpose() { + Ok(Some(input)) => { + inputs.push(input); + } + Ok(None) => break, + Err(error) => { + warn!("failed to parse NATS batch input: {error}"); + continue 'main; + } + } + } + + if inputs.is_empty() { + continue 'main; + } else { + PipeMessages::Batch(inputs) + } + } + None => match stream.next().await.map(TryInto::try_into).transpose() { + Ok(Some(input)) => PipeMessages::Single(input), + Ok(None) => continue 'main, + Err(error) => { + warn!("failed to parse NATS input: {error}"); + continue 'main; + } + }, + }, + None => PipeMessages::None, + }; + + let inputs = match inputs.load_payloads(&storage).await { + Ok(inputs) => inputs, + Err(error) => { + warn!("failed to get NATS payloads: {error}"); + continue 'main; + } + }; + + let outputs = match tick(inputs) + .and_then(|inputs| inputs.dump_payloads(&storage)) + .await + { + Ok(PipeMessages::None) => continue 'main, + Ok(outputs) => outputs, + Err(error) => { + warn!("{error}"); + continue 'main; + } + }; + + if let Some(output_stream) = &self.stream_out { + for output in outputs.into_vec() { + match output.try_into() { + Ok(output) => { + if let Err(error) = client.publish(output_stream.clone(), output).await + { + warn!("failed to send NATS output: {error}"); + } + } + Err(error) => { + warn!("failed to parse NATS output: {error}"); + } + } + } + } + } + } +} diff --git a/dash/pipe/provider/src/lib.rs b/dash/pipe/provider/src/lib.rs new file mode 100644 index 00000000..721f1e68 --- /dev/null +++ b/dash/pipe/provider/src/lib.rs @@ -0,0 +1,6 @@ +mod engine; +mod message; +mod storage; + +pub use self::engine::PipeEngine; +pub use self::message::{PipeMessage, PipeMessages, PipePayload}; diff --git a/dash/pipe/provider/src/message.rs b/dash/pipe/provider/src/message.rs new file mode 100644 index 00000000..333b7388 --- /dev/null +++ b/dash/pipe/provider/src/message.rs @@ -0,0 +1,162 @@ +use anyhow::{bail, Error, Result}; +use bytes::Bytes; +use futures::future::try_join_all; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; + +use crate::storage::{StorageSet, StorageType}; + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +#[serde(untagged)] +pub enum PipeMessages +where + Payload: Default, +{ + None, + Single(PipeMessage), + Batch(Vec>), +} + +impl PipeMessages { + pub async fn dump_payloads(self, storage: &StorageSet) -> Result> { + match self { + Self::None => Ok(PipeMessages::None), + Self::Single(value) => value.dump_payloads(storage).await.map(PipeMessages::Single), + Self::Batch(values) => { + try_join_all(values.into_iter().map(|value| value.dump_payloads(storage))) + .await + .map(PipeMessages::Batch) + } + } + } +} + +impl PipeMessages +where + Payload: Default, +{ + pub async fn load_payloads(self, storage: &StorageSet) -> Result> { + match self { + Self::None => Ok(PipeMessages::None), + Self::Single(value) => value.load_payloads(storage).await.map(PipeMessages::Single), + Self::Batch(values) => { + try_join_all(values.into_iter().map(|value| value.load_payloads(storage))) + .await + .map(PipeMessages::Batch) + } + } + } + + pub fn into_vec(self) -> Vec> { + match self { + Self::None => Default::default(), + Self::Single(value) => vec![value], + Self::Batch(values) => values, + } + } +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct PipeMessage { + #[serde(default)] + pub payloads: Vec>, + pub value: Value, +} + +impl TryFrom for PipeMessage +where + Value: DeserializeOwned, +{ + type Error = Error; + + fn try_from(value: Bytes) -> Result { + ::serde_json::from_reader(&*value).map_err(Into::into) + } +} + +impl TryFrom<::nats::Message> for PipeMessage +where + Value: DeserializeOwned, +{ + type Error = Error; + + fn try_from(message: ::nats::Message) -> Result { + message.payload.try_into() + } +} + +impl TryFrom> for Bytes +where + Payload: Serialize, + Value: Serialize, +{ + type Error = Error; + + fn try_from(value: PipeMessage) -> Result { + ::serde_json::to_vec(&value) + .map(Into::into) + .map_err(Into::into) + } +} + +impl PipeMessage { + pub async fn dump_payloads(self, storage: &StorageSet) -> Result> { + Ok(PipeMessage { + payloads: try_join_all( + self.payloads + .into_iter() + .map(|payload| payload.dump(storage)), + ) + .await?, + value: self.value, + }) + } +} + +impl PipeMessage { + pub async fn load_payloads(self, storage: &StorageSet) -> Result> { + Ok(PipeMessage { + payloads: try_join_all( + self.payloads + .into_iter() + .map(|payload| payload.load(storage)), + ) + .await?, + value: self.value, + }) + } +} + +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] +pub struct PipePayload { + pub key: String, + #[serde(default)] + pub storage: Option, + #[serde(skip)] + pub value: Value, +} + +impl PipePayload { + pub async fn dump(self, storage: &StorageSet) -> Result> { + Ok(PipePayload { + value: storage + .get_default_output() + .put_with_str(&self.key, self.value) + .await?, + key: self.key, + storage: Some(storage.get_default_output().storage_type()), + }) + } +} + +impl PipePayload { + pub async fn load(self, storage: &StorageSet) -> Result { + Ok(PipePayload { + value: match self.storage { + Some(type_) => storage.get(type_).get_with_str(&self.key).await?, + None => bail!("storage type not defined"), + }, + key: self.key, + storage: self.storage, + }) + } +} diff --git a/dash/pipe/provider/src/storage/lakehouse.rs b/dash/pipe/provider/src/storage/lakehouse.rs new file mode 100644 index 00000000..f8c7efe6 --- /dev/null +++ b/dash/pipe/provider/src/storage/lakehouse.rs @@ -0,0 +1,113 @@ +use std::collections::HashMap; + +use anyhow::{anyhow, bail, Error, Result}; +use async_trait::async_trait; +use bytes::Bytes; +use clap::Parser; +use deltalake::{DeltaTable, DeltaTableBuilder, ObjectStore, Path}; +use futures::TryFutureExt; +use serde::{Deserialize, Serialize}; +use url::Url; + +pub struct Storage { + table: DeltaTable, +} + +impl Storage { + pub async fn try_new( + StorageLakeHouseArgs { + access_key, + deltalake_endpoint, + region, + secret_key, + }: &StorageLakeHouseArgs, + bucket_name: &str, + ) -> Result { + Ok(Self { + table: { + let allow_http = deltalake_endpoint.scheme() == "http"; + let table_uri = format!("s3a://{bucket_name}/"); + + let mut backend_config: HashMap = HashMap::new(); + backend_config.insert("AWS_ACCESS_KEY_ID".to_string(), access_key.clone()); + backend_config.insert( + "AWS_ENDPOINT_URL".to_string(), + deltalake_endpoint.to_string(), + ); + backend_config.insert("AWS_REGION".to_string(), region.clone()); + backend_config.insert("AWS_SECRET_ACCESS_KEY".to_string(), secret_key.clone()); + backend_config.insert("AWS_S3_ALLOW_UNSAFE_RENAME".to_string(), "true".into()); + + DeltaTableBuilder::from_uri(table_uri) + .with_allow_http(allow_http) + .with_storage_options(backend_config) + .build() + .unwrap() + }, + }) + } +} + +#[async_trait] +impl super::Storage for Storage { + fn storage_type(&self) -> super::StorageType { + super::StorageType::LakeHouse + } + + async fn get(&self, path: &Path) -> Result { + self.table + .object_store() + .get(path) + .map_err(Error::from) + .and_then(|object| async move { + match object.bytes().await { + Ok(bytes) => Ok(bytes), + Err(error) => { + bail!("failed to get object data from DeltaLake object store: {error}") + } + } + }) + .await + .map_err(|error| anyhow!("failed to get object from DeltaLake object store: {error}")) + } + + async fn put(&self, path: &Path, bytes: Bytes) -> Result<()> { + self.table + .object_store() + .put(path, bytes) + .await + .map(|_| ()) + .map_err(|error| anyhow!("failed to put object into DeltaLake object store: {error}")) + } + + async fn delete(&self, path: &Path) -> Result<()> { + self.table + .object_store() + .delete(path) + .await + .map(|_| ()) + .map_err(|error| { + anyhow!("failed to delete object from DeltaLake object store: {error}") + }) + } +} + +#[derive(Clone, Debug, Parser, Serialize, Deserialize)] +pub struct StorageLakeHouseArgs { + #[arg(long, env = "AWS_ACCESS_KEY_ID", value_name = "VALUE")] + access_key: String, + + #[arg(long, env = "AWS_ENDPOINT_URL", value_name = "URL")] + deltalake_endpoint: Url, + + #[arg( + long, + env = "AWS_REGION", + value_name = "REGION", + default_value = "us-east-1" + )] + region: String, + + #[arg(long, env = "AWS_SECRET_ACCESS_KEY", value_name = "VALUE")] + secret_key: String, +} diff --git a/dash/pipe/provider/src/storage/mod.rs b/dash/pipe/provider/src/storage/mod.rs new file mode 100644 index 00000000..6b8110fb --- /dev/null +++ b/dash/pipe/provider/src/storage/mod.rs @@ -0,0 +1,82 @@ +mod lakehouse; +mod nats; + +use anyhow::{anyhow, Result}; +use async_trait::async_trait; +use bytes::Bytes; +use clap::Parser; +use deltalake::Path; +use serde::{Deserialize, Serialize}; + +pub struct StorageSet { + default_output: StorageType, + lakehouse: self::lakehouse::Storage, + nats: self::nats::Storage, +} + +impl StorageSet { + pub async fn try_new( + args: &StorageArgs, + client: &::nats::Client, + bucket_name: &str, + default_output: StorageType, + ) -> Result { + Ok(Self { + default_output, + lakehouse: self::lakehouse::Storage::try_new(&args.lakehouse, bucket_name).await?, + nats: self::nats::Storage::try_new(&args.nats, client, bucket_name).await?, + }) + } + + pub const fn get(&self, type_: StorageType) -> &(dyn Send + Sync + Storage) { + match type_ { + StorageType::LakeHouse => &self.lakehouse, + StorageType::Nats => &self.nats, + } + } + + pub const fn get_default_output(&self) -> &(dyn Send + Sync + Storage) { + self.get(self.default_output) + } +} + +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] +pub enum StorageType { + LakeHouse, + Nats, +} + +#[async_trait] +pub trait Storage { + fn storage_type(&self) -> StorageType; + + async fn get(&self, path: &Path) -> Result; + + async fn get_with_str(&self, path: &str) -> Result { + self.get(&parse_path(path)?).await + } + + async fn put(&self, path: &Path, bytes: Bytes) -> Result<()>; + + async fn put_with_str(&self, path: &str, bytes: Bytes) -> Result<()> { + self.put(&parse_path(path)?, bytes).await + } + + async fn delete(&self, path: &Path) -> Result<()>; +} + +#[derive(Clone, Debug, Parser, Serialize, Deserialize)] +pub struct StorageArgs { + #[arg(long, env = "BUCKET", value_name = "NAME")] + bucket_name: String, + + #[command(flatten)] + lakehouse: self::lakehouse::StorageLakeHouseArgs, + + #[command(flatten)] + nats: self::nats::StorageNatsArgs, +} + +fn parse_path(path: impl AsRef) -> Result { + Path::parse(path).map_err(|error| anyhow!("failed to parse path: {error}")) +} diff --git a/dash/pipe/provider/src/storage/nats.rs b/dash/pipe/provider/src/storage/nats.rs new file mode 100644 index 00000000..fd557600 --- /dev/null +++ b/dash/pipe/provider/src/storage/nats.rs @@ -0,0 +1,76 @@ +use std::io::Cursor; + +use anyhow::{anyhow, bail, Error, Result}; +use async_trait::async_trait; +use bytes::Bytes; +use clap::Parser; +use deltalake::Path; +use futures::TryFutureExt; +use nats::jetstream::object_store::ObjectStore; +use serde::{Deserialize, Serialize}; +use tokio::io::AsyncReadExt; + +pub struct Storage { + store: ObjectStore, +} + +impl Storage { + pub async fn try_new( + StorageNatsArgs {}: &StorageNatsArgs, + client: &::nats::Client, + bucket_name: &str, + ) -> Result { + Ok(Self { + store: { + let context = ::nats::jetstream::new(client.clone()); + context + .get_object_store(bucket_name) + .await + .map_err(|error| anyhow!("failed to init NATS object store: {error}"))? + }, + }) + } +} + +#[async_trait] +impl super::Storage for Storage { + fn storage_type(&self) -> super::StorageType { + super::StorageType::Nats + } + + async fn get(&self, path: &Path) -> Result { + self.store + .get(path.as_ref()) + .map_err(Error::from) + .and_then(|mut object| async move { + let mut buf = Vec::with_capacity(object.info().size); + match object.read_to_end(&mut buf).await { + Ok(_) => Ok(buf.into()), + Err(error) => { + bail!("failed to get object data from NATS object store: {error}") + } + } + }) + .await + .map_err(|error| anyhow!("failed to get object from NATS object store: {error}")) + } + + async fn put(&self, path: &Path, bytes: Bytes) -> Result<()> { + self.store + .put(path.as_ref(), &mut Cursor::new(bytes)) + .await + .map(|_| ()) + .map_err(|error| anyhow!("failed to put object into NATS object store: {error}")) + } + + async fn delete(&self, path: &Path) -> Result<()> { + self.store + .delete(path.as_ref()) + .await + .map(|_| ()) + .map_err(|error| anyhow!("failed to delete object from NATS object store: {error}")) + } +} + +#[derive(Clone, Debug, Parser, Serialize, Deserialize)] +pub struct StorageNatsArgs {} diff --git a/dash/plugins/storage-sync/src/lib.rs b/dash/plugins/storage-sync/src/lib.rs deleted file mode 100644 index 7d12d9af..00000000 --- a/dash/plugins/storage-sync/src/lib.rs +++ /dev/null @@ -1,14 +0,0 @@ -pub fn add(left: usize, right: usize) -> usize { - left + right -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn it_works() { - let result = add(2, 2); - assert_eq!(result, 4); - } -} diff --git a/dash/router/Cargo.toml b/dash/router/Cargo.toml index 1fbc0de0..8429cb9c 100644 --- a/dash/router/Cargo.toml +++ b/dash/router/Cargo.toml @@ -16,7 +16,6 @@ repository = "https://github.com/ulagbulag/OpenARK" [dependencies] ark-core = { path = "../../ark/core", features = ["actix-web"] } dash-api = { path = "../api" } -dash-plugin-storage-sync = { path = "../plugins/storage-sync" } actix-web = { workspace = true } anyhow = { workspace = true } diff --git a/templates/dash/nats/values.yaml b/templates/dash/nats/values.yaml index a1d33ffe..d69a9231 100644 --- a/templates/dash/nats/values.yaml +++ b/templates/dash/nats/values.yaml @@ -7,6 +7,29 @@ config: # routeURLs: # k8sClusterDomain: ops.openark + jetstream: + enabled: true + + fileStore: + enabled: true + dir: /data + + ############################################################ + # stateful set -> volume claim templates -> jetstream pvc + ############################################################ + pvc: + enabled: true + size: 1Ti + storageClassName: ceph-block + + # defaults to the PVC size + maxSize: + + memoryStore: + enabled: true + # ensure that container has a sufficient memory limit greater than maxSize + maxSize: 1Gi + # websocket: # enabled: true