Skip to content

Commit

Permalink
feat: implement compute data runner
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Jul 17, 2024
1 parent 00037a7 commit e1a0285
Show file tree
Hide file tree
Showing 13 changed files with 320 additions and 151 deletions.
13 changes: 8 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[workspace]
members = [
"runner/dc_runner",
"runner/compute_runner",
"runner/compute_data_runner",
]

[workspace.package]
Expand All @@ -24,18 +24,18 @@ tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"] }
tokio-retry = "0.3"
tokio-stream = "0.1.15"
tonic = "0.11.0"

serde_json = {version = "1.0.117"}
serde = {version ="1.0.203", features = ["derive"]}
uuid = {version="1.8.0", features = ["v4","serde"]}

[package]
name = "jz_action"
version = "0.1.0"
edition = "2021"

[dependencies]
serde = {version ="1.0.203", features = ["derive"]}
serde_json = {version = "1.0.117"}

bimap = "0.6.3"
uuid = {version="1.8.0", features = ["v4","serde"]}
kube = { version = "0.91.0", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.22.0", features = ["latest"] }
handlebars = "5.1.2"
Expand All @@ -44,10 +44,13 @@ prost = "0.12.6"
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tokio-retry = {workspace = true}
tokio-stream = {workspace = true}
uuid = {workspace = true}
anyhow = {workspace = true}
tracing = {workspace = true}
tracing-subscriber = {workspace = true}
tonic = {workspace = true}
serde = {workspace = true}
serde_json = {workspace = true}

[dev-dependencies]
arrayvec = {version="0.7.4", features= ["serde"]}
Expand Down
Binary file added docs/drawio/compute_unit_pod.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 2 additions & 0 deletions docs/架构.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ scheduler根据graph从依赖图中自动插入数据通道,在从输入到输

2. 用户容器 允许用户自定义脚本。从特定位置读取数据,处理数据,并写入到指定的输出位置。

![计算节点pod](./drawio/compute_unit_pod.png)

### 数据通道节点

数据通道节点采用单容器pod。订阅多个前序节点的数据输出。把数据合并成一个batch,并把数据通过扇出的方式传给后面的计算节点。
Expand Down
26 changes: 0 additions & 26 deletions docs/进度规划.md

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
[package]
name = "compute_runner"
name = "compute_data_runner"
version = "0.1.0"
edition = "2021"

[dependencies]
jz_action = { path = "../../"}

tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
tokio = { workspace = true }
tokio-retry = {workspace = true}
tokio-stream = {workspace = true}
anyhow = {workspace = true}
tracing = {workspace = true}
tracing-subscriber = {workspace = true}
tonic = {workspace = true}
serde = {workspace = true}
serde_json = {workspace = true}
uuid = {workspace = true}

clap = {version="4.5.7", features=["derive"]}

actix-web = "4.8.0"
walkdir = "2.5.0"
75 changes: 75 additions & 0 deletions runner/compute_data_runner/src/ipc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use anyhow::Result;

use actix_web::{web, App, HttpResponse, HttpServer};

use crate::program::BatchProgram;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::oneshot;
use tokio::sync::Mutex;

#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct DataResponse {
pub(crate) path: String,
}

#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct SubmitResultReq {
pub(crate) path: String,
}

async fn process_data_request(program_mutex: web::Data<Arc<Mutex<BatchProgram>>>) -> HttpResponse {
let (tx, mut rx) = oneshot::channel::<DataResponse>();
let program = program_mutex.lock().await;
program
.ipc_process_data_req_tx
.as_ref()
.unwrap()
.send(((), tx))
.await
.unwrap();

match rx.try_recv() {
Ok(resp) => HttpResponse::Ok().json(resp),
Err(e) => HttpResponse::ServiceUnavailable().body(e.to_string()),
}
}

async fn process_submit_result_request(
program_mutex: web::Data<Arc<Mutex<BatchProgram>>>,
data: web::Json<SubmitResultReq>,
) -> HttpResponse {
let (tx, mut rx) = oneshot::channel::<()>();
let program = program_mutex.lock().await;
//read request
program
.ipc_process_submit_result_tx
.as_ref()
.unwrap()
.send((data.0, tx))
.await
.unwrap();
match rx.try_recv() {
Ok(resp) => {
//response result
HttpResponse::Ok().body(resp)
}
Err(e) => HttpResponse::ServiceUnavailable().body(e.to_string()),
}
}

pub(crate) fn start_ipc_server(
unix_socket_addr: String,
program: Arc<Mutex<BatchProgram>>,
) -> Result<()> {
HttpServer::new(move || {
App::new()
.app_data(program.clone())
.service(web::resource("/api/v1/data").get(process_data_request))
.service(web::resource("/api/v1/submit").post(process_submit_result_request))
})
.bind_uds(unix_socket_addr)
.unwrap();

Ok(())
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![feature(future_join)]

mod ipc;
mod program;
mod unit;

Expand All @@ -10,6 +11,7 @@ use jz_action::utils::StdIntoAnyhowResult;
use anyhow::{anyhow, Result};
use clap::Parser;
use program::BatchProgram;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use tokio::select;
Expand Down Expand Up @@ -45,8 +47,9 @@ async fn main() -> Result<()> {
.anyhow()?;

let addr = args.host_port.parse()?;
let program = BatchProgram::new();
let program = BatchProgram::new(PathBuf::new());
let program_safe = Arc::new(Mutex::new(program));

let node_controller = DataNodeControllerServer {
program: program_safe.clone(),
};
Expand Down
Loading

0 comments on commit e1a0285

Please sign in to comment.