Skip to content

Commit

Permalink
Begin implementation of dash pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
HoKim98 committed Oct 1, 2023
1 parent 209d23c commit 891c5d3
Show file tree
Hide file tree
Showing 15 changed files with 767 additions and 18 deletions.
11 changes: 10 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ members = [
"dash/controller",
"dash/gateway",
"dash/openapi",
"dash/plugins/storage-sync",
"dash/pipe/functions/identity",
"dash/pipe/functions/loader-webcam",
"dash/pipe/provider",
"dash/provider",
"dash/provider/api",
"dash/provider/client",
Expand Down Expand Up @@ -63,6 +65,12 @@ chrono = { version = "=0.4", features = ["serde"] }
clap = { version = "=4.4", features = ["env", "derive"] }
csv = { version = "=1.2" }
ctrlc = { version = "=3.4" }
deltalake = { version = "0.16", features = [
"arrow",
"datafusion",
"parquet",
"s3",
] }
email_address = { version = "=0.2" }
futures = { version = "=0.3" }
gethostname = { version = "=0.4" }
Expand All @@ -87,6 +95,7 @@ mime = { version = "=0.3" }
minio = { git = "https://github.com/ulagbulag/minio-rs.git", default-features = false, rev = "06d98675b4457e9139f6c01cea4a0659cb82c82c", features = [
"rustls-tls",
] } # not deployed to crates.io
nats = { package = "async-nats", version = "0.32" }
ndarray = { version = "=0.15", features = ["serde"] }
num-traits = { version = "=0.2" }
octocrab = { git = "https://github.com/ulagbulag/octocrab.git", default-features = false, features = [
Expand Down
20 changes: 20 additions & 0 deletions dash/pipe/functions/identity/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "dash-pipe-function-identity"
version = "0.1.0"
edition = "2021"

authors = ["Ho Kim <[email protected]>"]
description = "Kubernetes Is Simple, Stupid which a part of OpenARK"
documentation = "https://docs.rs/kiss-api"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
readme = "../../README.md"
homepage = "https://github.com/ulagbulag/OpenARK"
repository = "https://github.com/ulagbulag/OpenARK"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dash-pipe-provider = { path = "../../provider" }

anyhow = { workspace = true }
serde_json = { workspace = true }
11 changes: 11 additions & 0 deletions dash/pipe/functions/identity/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use anyhow::Result;
use dash_pipe_provider::{PipeEngine, PipeMessages};
use serde_json::Value;

fn main() {
PipeEngine::from_env().loop_forever(tick)
}

async fn tick(input: PipeMessages<Value>) -> Result<PipeMessages<Value>> {
Ok(input)
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
[package]
name = "dash-plugin-storage-sync"
name = "dash-pipe-function-loader-webcam"
version = "0.1.0"
edition = "2021"

authors = ["Ho Kim <[email protected]>"]
description = "Kubernetes Is Simple, Stupid which a part of OpenARK"
documentation = "https://docs.rs/kiss-api"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
readme = "../../../README.md"
readme = "../../README.md"
homepage = "https://github.com/ulagbulag/OpenARK"
repository = "https://github.com/ulagbulag/OpenARK"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dash-pipe-provider = { path = "../../provider" }

anyhow = { workspace = true }
12 changes: 12 additions & 0 deletions dash/pipe/functions/loader-webcam/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use anyhow::Result;
use dash_pipe_provider::{PipeEngine, PipeMessages};

fn main() {
PipeEngine::from_env().loop_forever(tick)
}

async fn tick(input: PipeMessages<String>) -> Result<PipeMessages<String>> {
// TODO: to be implemented
dbg!(&input);
Ok(input)
}
30 changes: 30 additions & 0 deletions dash/pipe/provider/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[package]
name = "dash-pipe-provider"
version = "0.1.0"
edition = "2021"

authors = ["Ho Kim <[email protected]>"]
description = "Kubernetes Is Simple, Stupid which a part of OpenARK"
documentation = "https://docs.rs/kiss-api"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
readme = "../../README.md"
homepage = "https://github.com/ulagbulag/OpenARK"
repository = "https://github.com/ulagbulag/OpenARK"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
ark-core = { path = "../../../ark/core" }

anyhow = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
clap = { workspace = true }
deltalake = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
nats = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["full"] }
url = { workspace = true }
215 changes: 215 additions & 0 deletions dash/pipe/provider/src/engine.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
use std::future::Future;

use anyhow::{anyhow, bail, Result};
use clap::Parser;
use futures::{StreamExt, TryFutureExt};
use log::warn;
use nats::ToServerAddrs;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tokio::task::yield_now;

use crate::{
message::PipeMessages,
storage::{StorageSet, StorageType},
};

#[derive(Clone, Debug, Parser, Serialize, Deserialize)]
pub struct PipeEngine {
#[arg(long, env = "NATS_ADDRS", value_name = "ADDR")]
addrs: Vec<String>,

#[arg(long, env = "PIPE_BATCH_SIZE", value_name = "BATCH_SIZE")]
#[serde(default)]
batch_size: Option<usize>,

#[arg(long, env = "PIPE_PERSISTENCE")]
#[serde(default)]
persistence: Option<bool>,

#[arg(long, env = "PIPE_REPLY")]
#[serde(default)]
reply: Option<bool>,

#[command(flatten)]
storage: crate::storage::StorageArgs,

#[arg(long, env = "PIPE_STREAM_IN", value_name = "NAME")]
#[serde(default)]
stream_in: Option<String>,

#[arg(long, env = "PIPE_STREAM_OUT", value_name = "NAME")]
#[serde(default)]
stream_out: Option<String>,
}

impl PipeEngine {
pub fn from_env() -> Self {
Self::parse()
}

pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = Some(batch_size);
self
}

pub fn with_persistence(mut self, persistence: bool) -> Self {
self.persistence = Some(persistence);
self
}

pub fn with_reply(mut self, reply: bool) -> Self {
self.reply = Some(reply);
self
}

pub fn with_stream_in(mut self, stream_in: String) -> Self {
self.stream_in = Some(stream_in);
self
}

pub fn with_stream_out(mut self, stream_out: String) -> Self {
self.stream_out = Some(stream_out);
self
}
}

impl PipeEngine {
pub fn loop_forever<F, Fut, Input, Output>(&self, tick: F)
where
F: Fn(PipeMessages<Input>) -> Fut,
Fut: Future<Output = Result<PipeMessages<Output>>>,
Input: DeserializeOwned,
Output: Serialize,
{
::ark_core::logger::init_once();

if let Err(error) = ::tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("failed to init tokio runtime")
.block_on(self.loop_forever_async(tick))
{
panic!("{error}")
}
}

async fn loop_forever_async<F, Fut, Input, Output>(&self, tick: F) -> Result<()>
where
F: Fn(PipeMessages<Input>) -> Fut,
Fut: Future<Output = Result<PipeMessages<Output>>>,
Input: DeserializeOwned,
Output: Serialize,
{
// init client
let client = {
let addrs = self
.addrs
.iter()
.flat_map(|addr| {
addr.to_server_addrs()
.map_err(|error| anyhow!("failed to parse NATS address: {error}"))
})
.flatten()
.collect::<Vec<_>>();
match ::nats::connect(addrs).await {
Ok(client) => client,
Err(error) => bail!("failed to init NATS client: {error}"),
}
};

// init streams
let mut input_stream = match &self.stream_in {
Some(stream) => match client.subscribe(stream.clone()).await {
Ok(stream) => Some(stream),
Err(error) => bail!("failed to init NATS input stream: {error}"),
},
None => None,
};

// init storages
let storage = {
let default_output = match self.persistence {
Some(true) => StorageType::LakeHouse,
Some(false) | None => StorageType::Nats,
};
StorageSet::try_new(&self.storage, &client, "myobjbucket", default_output).await?
};

'main: loop {
// yield per every loop
yield_now().await;

let inputs = match &mut input_stream {
// TODO: to be implemented
Some(stream) => match self.batch_size {
Some(batch_size) => {
let mut inputs = vec![];
for _ in 0..batch_size {
match stream.next().await.map(TryInto::try_into).transpose() {
Ok(Some(input)) => {
inputs.push(input);
}
Ok(None) => break,
Err(error) => {
warn!("failed to parse NATS batch input: {error}");
continue 'main;
}
}
}

if inputs.is_empty() {
continue 'main;
} else {
PipeMessages::Batch(inputs)
}
}
None => match stream.next().await.map(TryInto::try_into).transpose() {
Ok(Some(input)) => PipeMessages::Single(input),
Ok(None) => continue 'main,
Err(error) => {
warn!("failed to parse NATS input: {error}");
continue 'main;
}
},
},
None => PipeMessages::None,
};

let inputs = match inputs.load_payloads(&storage).await {
Ok(inputs) => inputs,
Err(error) => {
warn!("failed to get NATS payloads: {error}");
continue 'main;
}
};

let outputs = match tick(inputs)
.and_then(|inputs| inputs.dump_payloads(&storage))
.await
{
Ok(PipeMessages::None) => continue 'main,
Ok(outputs) => outputs,
Err(error) => {
warn!("{error}");
continue 'main;
}
};

if let Some(output_stream) = &self.stream_out {
for output in outputs.into_vec() {
match output.try_into() {
Ok(output) => {
if let Err(error) = client.publish(output_stream.clone(), output).await
{
warn!("failed to send NATS output: {error}");
}
}
Err(error) => {
warn!("failed to parse NATS output: {error}");
}
}
}
}
}
}
}
6 changes: 6 additions & 0 deletions dash/pipe/provider/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
mod engine;
mod message;
mod storage;

pub use self::engine::PipeEngine;
pub use self::message::{PipeMessage, PipeMessages, PipePayload};
Loading

0 comments on commit 891c5d3

Please sign in to comment.