Skip to content

Commit

Permalink
Begin implementation of storage connector
Browse files Browse the repository at this point in the history
  • Loading branch information
HoKim98 committed Oct 6, 2023
1 parent 410a435 commit 18b70ec
Show file tree
Hide file tree
Showing 16 changed files with 350 additions and 102 deletions.
11 changes: 7 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ members = [
"dash/controller",
"dash/gateway",
"dash/openapi",
"dash/pipe/connectors/storage",
"dash/pipe/connectors/webcam", # exclude(alpine)
"dash/pipe/functions/identity",
"dash/pipe/functions/python", # exclude(alpine)
Expand Down Expand Up @@ -57,6 +58,7 @@ actix-web = { version = "=4.4", default-features = false, features = [
anyhow = { version = "=1.0", features = ["backtrace"] }
argon2 = { version = "=0.5" }
async-recursion = { version = "=1.0" }
async-stream = { version = "=0.3" }
async-trait = { version = "=0.1" }
base64 = { version = "=0.21" }
byteorder = { version = "=1.4" }
Expand All @@ -67,7 +69,7 @@ chrono = { version = "=0.4", features = ["serde"] }
clap = { version = "=4.4", features = ["env", "derive"] }
csv = { version = "=1.2" }
ctrlc = { version = "=3.4" }
deltalake = { version = "0.16", default-features = false }
deltalake = { version = "=0.16", default-features = false }
email_address = { version = "=0.2" }
futures = { version = "=0.3" }
gethostname = { version = "=0.4" }
Expand All @@ -87,7 +89,7 @@ k8s-openapi = { version = "=0.20", features = ["schemars", "v1_26"] }
kube = { version = "=0.86", default-features = false }
language-tags = { version = "=0.3", features = ["serde"] }
tracing = { version = "=0.1" }
tracing-subscriber = { version = "0.3" }
tracing-subscriber = { version = "=0.3" }
mime = { version = "=0.3" }
# FIXME: push a PR: rustls-tls feature support
minio = { git = "https://github.com/ulagbulag/minio-rs.git", default-features = false, rev = "5be4686e307b058aa4190134a555c925301c59b2", features = [
Expand All @@ -99,8 +101,8 @@ num-traits = { version = "=0.2" }
octocrab = { git = "https://github.com/ulagbulag/octocrab.git", default-features = false, features = [
"rustls-tls",
] }
opencv = { version = "0.84", default-features = false }
ordered-float = { version = "4.0", default-features = false, features = [
opencv = { version = "=0.84", default-features = false }
ordered-float = { version = "=4.0", default-features = false, features = [
"bytemuck",
"schemars",
"serde",
Expand Down Expand Up @@ -147,6 +149,7 @@ sio = { git = "https://github.com/ulagbulag/sio-rs.git" }
strum = { version = "=0.25", features = ["derive"] }
tera = { version = "=1.19" }
tokio = { version = "=1.32", features = ["macros", "rt"] }
tokio-stream = { version = "=0.1" }
url = { version = "=2.4", features = ["serde"] }
uuid = { version = "=1.4", features = ["js", "serde", "v4"] }
which = { version = "=4.4" }
24 changes: 24 additions & 0 deletions dash/pipe/connectors/storage/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "dash-pipe-connector-storage"
version = "0.1.0"
edition = "2021"

authors = ["Ho Kim <[email protected]>"]
description = "Kubernetes Is Simple, Stupid which a part of OpenARK"
documentation = "https://docs.rs/kiss-api"
license = "GPL-3.0-or-later WITH Classpath-exception-2.0"
readme = "../../README.md"
homepage = "https://github.com/ulagbulag/OpenARK"
repository = "https://github.com/ulagbulag/OpenARK"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
dash-pipe-provider = { path = "../../provider" }

anyhow = { workspace = true }
async-trait = { workspace = true }
clap = { workspace = true }
futures = { workspace = true }
serde = { workspace = true }
tokio = { workspace = true, features = ["time"] }
65 changes: 65 additions & 0 deletions dash/pipe/connectors/storage/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::sync::Arc;

use anyhow::{bail, Result};
use async_trait::async_trait;
use clap::{ArgAction, Parser};
use dash_pipe_provider::{
FunctionContext, PipeArgs, PipeMessage, PipeMessages, PipePayload, StorageSet, StorageType,
Stream,
};
use futures::StreamExt;
use serde::{Deserialize, Serialize};

fn main() {
PipeArgs::<Function>::from_env().loop_forever()
}

#[derive(Clone, Debug, Serialize, Deserialize, Parser)]
pub struct FunctionArgs {
#[arg(long, env = "PIPE_PERSISTENCE", action = ArgAction::SetTrue)]
#[serde(default)]
persistence: Option<bool>,
}

pub struct Function {
ctx: FunctionContext,
items: Stream,
}

#[async_trait(?Send)]
impl ::dash_pipe_provider::Function for Function {
type Args = FunctionArgs;
type Input = ();
type Output = usize;

async fn try_new(
args: &<Self as ::dash_pipe_provider::Function>::Args,
ctx: &mut FunctionContext,
storage: &Arc<StorageSet>,
) -> Result<Self> {
let storage_type = match args.persistence {
Some(true) => StorageType::PERSISTENT,
Some(false) | None => StorageType::TEMPORARY,
};

Ok(Self {
ctx: ctx.clone(),
items: storage.get(storage_type).list().await?,
})
}

async fn tick(
&mut self,
_inputs: PipeMessages<<Self as ::dash_pipe_provider::Function>::Input>,
) -> Result<PipeMessages<<Self as ::dash_pipe_provider::Function>::Output>> {
match self.items.next().await {
// TODO: stream이 JSON 메타데이터를 포함한 PipeMessage Object를 배출
Some(Ok((path, value))) => Ok(PipeMessages::Single(PipeMessage {
payloads: vec![PipePayload::new(path.to_string(), value)],
value: Default::default(),
})),
Some(Err(error)) => bail!("failed to load data: {error}"),
None => self.ctx.terminate_ok(),
}
}
}
1 change: 0 additions & 1 deletion dash/pipe/connectors/webcam/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,3 @@ image = { workspace = true, features = ["png"] }
opencv = { workspace = true, features = ["imgcodecs", "videoio"] }
serde = { workspace = true }
tokio = { workspace = true, features = ["time"] }
tracing = { workspace = true }
22 changes: 14 additions & 8 deletions dash/pipe/connectors/webcam/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use std::time::Duration;
use std::sync::Arc;

use anyhow::{anyhow, bail, Result};
use async_trait::async_trait;
use clap::{Parser, ValueEnum};
use dash_pipe_provider::{PipeArgs, PipeMessage, PipeMessages, PipePayload};
use dash_pipe_provider::{
FunctionContext, PipeArgs, PipeMessage, PipeMessages, PipePayload, StorageSet,
};
use image::{codecs, RgbImage};
use opencv::{
core::{Mat, MatTraitConst, MatTraitConstManual, Vec3b, Vector},
imgcodecs,
videoio::{self, VideoCapture, VideoCaptureTrait, VideoCaptureTraitConst},
};
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
use tracing::error;

fn main() {
PipeArgs::<Function>::from_env().loop_forever()
Expand Down Expand Up @@ -69,6 +69,7 @@ impl CameraEncoder {
pub struct Function {
camera_encoder: CameraEncoder,
capture: VideoCapture,
ctx: FunctionContext,
frame: Mat,
frame_counter: FrameCounter,
frame_size: FrameSize,
Expand All @@ -81,7 +82,11 @@ impl ::dash_pipe_provider::Function for Function {
type Input = ();
type Output = usize;

async fn try_new(args: &<Self as ::dash_pipe_provider::Function>::Args) -> Result<Self> {
async fn try_new(
args: &<Self as ::dash_pipe_provider::Function>::Args,
ctx: &mut FunctionContext,
_storage: &Arc<StorageSet>,
) -> Result<Self> {
let FunctionArgs {
camera_device,
camera_encoder,
Expand All @@ -96,6 +101,7 @@ impl ::dash_pipe_provider::Function for Function {
Ok(Self {
camera_encoder,
capture,
ctx: ctx.clone(),
frame: Default::default(),
frame_counter: Default::default(),
frame_size: Default::default(),
Expand Down Expand Up @@ -158,9 +164,9 @@ impl ::dash_pipe_provider::Function for Function {
}
}
Ok(false) => {
error!("video capture is disconnected!");
sleep(Duration::from_millis(u64::MAX)).await;
return Ok(PipeMessages::None);
return self
.ctx
.terminate_err(anyhow!("video capture is disconnected!"))
}
Err(error) => bail!("failed to capture a frame: {error}"),
};
Expand Down
12 changes: 10 additions & 2 deletions dash/pipe/functions/identity/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;
use clap::{ArgAction, Parser};
use dash_pipe_provider::{PipeArgs, PipeMessage, PipeMessages, PipePayload};
use dash_pipe_provider::{
FunctionContext, PipeArgs, PipeMessage, PipeMessages, PipePayload, StorageSet,
};
use serde::{Deserialize, Serialize};
use serde_json::Value;

Expand All @@ -26,7 +30,11 @@ impl ::dash_pipe_provider::Function for Function {
type Input = Value;
type Output = Value;

async fn try_new(args: &<Self as ::dash_pipe_provider::Function>::Args) -> Result<Self> {
async fn try_new(
args: &<Self as ::dash_pipe_provider::Function>::Args,
_ctx: &mut FunctionContext,
_storage: &Arc<StorageSet>,
) -> Result<Self> {
Ok(Self { args: args.clone() })
}

Expand Down
10 changes: 7 additions & 3 deletions dash/pipe/functions/python/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::path::PathBuf;
use std::{path::PathBuf, sync::Arc};

use anyhow::{anyhow, Error, Result};
use async_trait::async_trait;
use clap::Parser;
use dash_pipe_provider::{PipeArgs, PipeMessages, PyPipeMessage};
use dash_pipe_provider::{FunctionContext, PipeArgs, PipeMessages, PyPipeMessage, StorageSet};
use pyo3::{types::PyModule, PyObject, Python};
use serde::{Deserialize, Serialize};
use serde_json::Value;
Expand All @@ -29,7 +29,11 @@ impl ::dash_pipe_provider::Function for Function {
type Input = Value;
type Output = Value;

async fn try_new(args: &<Self as ::dash_pipe_provider::Function>::Args) -> Result<Self> {
async fn try_new(
args: &<Self as ::dash_pipe_provider::Function>::Args,
_ctx: &mut FunctionContext,
_storage: &Arc<StorageSet>,
) -> Result<Self> {
let FunctionArgs {
python_script: file_path,
} = args;
Expand Down
3 changes: 3 additions & 0 deletions dash/pipe/provider/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ s3 = ["minio"]
ark-core = { path = "../../../ark/core" }

anyhow = { workspace = true }
async-stream = { workspace = true }
async-trait = { workspace = true }
bytes = { workspace = true }
clap = { workspace = true }
ctrlc = { workspace = true }
deltalake = { workspace = true }
futures = { workspace = true }
minio = { workspace = true, optional = true }
Expand All @@ -36,5 +38,6 @@ pyo3 = { workspace = true, optional = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio-stream = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }
48 changes: 44 additions & 4 deletions dash/pipe/provider/src/function.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
use std::fmt;
use std::{
fmt,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};

use anyhow::Result;
use anyhow::{anyhow, Error, Result};
use async_trait::async_trait;
use clap::Args;
use serde::{de::DeserializeOwned, Serialize};

use crate::PipeMessages;
use crate::{PipeMessages, StorageSet};

#[async_trait(?Send)]
pub trait Function {
type Args: Clone + fmt::Debug + Serialize + DeserializeOwned + Args;
type Input: 'static + Send + Sync + DeserializeOwned;
type Output: 'static + Send + Serialize;

async fn try_new(args: &<Self as Function>::Args) -> Result<Self>
async fn try_new(
args: &<Self as Function>::Args,
ctx: &mut FunctionContext,
storage: &Arc<StorageSet>,
) -> Result<Self>
where
Self: Sized;

Expand All @@ -22,3 +32,33 @@ pub trait Function {
inputs: PipeMessages<<Self as Function>::Input>,
) -> Result<PipeMessages<<Self as Function>::Output>>;
}

#[derive(Clone, Debug, Default)]
pub struct FunctionContext {
is_terminating: Arc<AtomicBool>,
}

impl FunctionContext {
pub(crate) fn trap_on_sigint(self) -> Result<()> {
::ctrlc::set_handler(move || self.terminate())
.map_err(|error| anyhow!("failed to set SIGINT handler: {error}"))
}

pub(crate) fn terminate(&self) {
self.is_terminating.store(true, Ordering::SeqCst)
}

pub fn terminate_ok<T>(&self) -> Result<PipeMessages<T>> {
self.terminate();
Ok(PipeMessages::None)
}

pub fn terminate_err<T>(&self, error: impl Into<Error>) -> Result<PipeMessages<T>> {
self.terminate();
Err(error.into())
}

pub(crate) fn is_terminating(&self) -> bool {
self.is_terminating.load(Ordering::SeqCst)
}
}
3 changes: 2 additions & 1 deletion dash/pipe/provider/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ mod message;
mod pipe;
mod storage;

pub use self::function::Function;
pub use self::function::{Function, FunctionContext};
#[cfg(feature = "pyo3")]
pub use self::message::PyPipeMessage;
pub use self::message::{PipeMessage, PipeMessages, PipePayload};
pub use self::pipe::PipeArgs;
pub use self::storage::{Storage, StorageSet, StorageType, Stream};
Loading

0 comments on commit 18b70ec

Please sign in to comment.