Skip to content

Commit

Permalink
feat: remove inpout output relationship
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Jul 27, 2024
1 parent 4c6c640 commit cc66e32
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 53 deletions.
76 changes: 66 additions & 10 deletions crates/compute_unit_runner/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,24 @@ pub(crate) struct AvaiableDataResponse {
}

#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct SubmitResultReq {
pub(crate) struct CompleteDataReq {
pub(crate) id: String,
}

impl SubmitResultReq {
impl CompleteDataReq {
fn new(id: &str) -> Self {
SubmitResultReq { id: id.to_string() }
CompleteDataReq { id: id.to_string() }
}
}

#[derive(Debug, Serialize, Deserialize)]
pub(crate) struct SubmitOuputDataReq {
pub(crate) id: String,
}

impl SubmitOuputDataReq {
fn new(id: &str) -> Self {
SubmitOuputDataReq { id: id.to_string() }
}
}

Expand All @@ -54,9 +65,9 @@ where
}
}

async fn process_submit_result_request<R>(
async fn process_completed_request<R>(
program_mutex: web::Data<Arc<Mutex<MediaDataTracker<R>>>>,
data: web::Json<SubmitResultReq>,
data: web::Json<CompleteDataReq>,
) -> HttpResponse
where
R: NodeRepo,
Expand All @@ -65,7 +76,33 @@ where
let program = program_mutex.lock().await;
//read request
program
.ipc_process_submit_result_tx
.ipc_process_completed_data_tx
.as_ref()
.unwrap()
.send((data.0, tx))
.await
.unwrap();
match rx.try_recv() {
Ok(resp) => {
//response result
HttpResponse::Ok().body(resp)
}
Err(e) => HttpResponse::ServiceUnavailable().body(e.to_string()),
}
}

async fn process_submit_output_request<R>(
program_mutex: web::Data<Arc<Mutex<MediaDataTracker<R>>>>,
data: web::Json<SubmitOuputDataReq>,
) -> HttpResponse
where
R: NodeRepo,
{
let (tx, mut rx) = oneshot::channel::<()>();
let program = program_mutex.lock().await;
//read request
program
.ipc_process_submit_output_tx
.as_ref()
.unwrap()
.send((data.0, tx))
Expand All @@ -91,7 +128,8 @@ where
App::new()
.app_data(program.clone())
.service(web::resource("/api/v1/data").get(process_data_request::<R>))
.service(web::resource("/api/v1/submit").post(process_submit_result_request::<R>))
.service(web::resource("/api/v1/data").post(process_completed_request::<R>))
.service(web::resource("/api/v1/submit").post(process_submit_output_request::<R>))
})
.bind_uds(unix_socket_addr)
.unwrap();
Expand All @@ -100,7 +138,8 @@ where
}

pub trait IPCClient {
async fn submit_result(&self, id: &str) -> Result<()>;
async fn submit_output(&self, id: &str) -> Result<()>;
async fn complete_result(&self, id: &str) -> Result<()>;
async fn request_avaiable_data(&self) -> Result<String>;
}

Expand All @@ -119,8 +158,8 @@ impl IPCClientImpl {
}

impl IPCClient for IPCClientImpl {
async fn submit_result(&self, id: &str) -> Result<()> {
let req = SubmitResultReq::new(id);
async fn submit_output(&self, id: &str) -> Result<()> {
let req = SubmitOuputDataReq::new(id);
let json = serde_json::to_string(&req)?;

let req: Request<Full<Bytes>> = Request::builder()
Expand Down Expand Up @@ -148,4 +187,21 @@ impl IPCClient for IPCClientImpl {
let avaiabel_data: AvaiableDataResponse = serde_json::from_slice(contents.as_ref())?;
Ok(avaiabel_data.id)
}

async fn complete_result(&self, id: &str) -> Result<()> {
let req = CompleteDataReq::new(id);
let json = serde_json::to_string(&req)?;

let req: Request<Full<Bytes>> = Request::builder()
.method(Method::POST)
.uri(self.unix_socket_addr.clone() + "/api/v1/data")
.body(Full::from(json))?;

let resp = self.client.request(req).await.anyhow()?;
if resp.status().is_success() {
return Ok(());
}

Err(anyhow!("submit data fail {}", resp.status()))
}
}
76 changes: 42 additions & 34 deletions crates/compute_unit_runner/src/media_data_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::ipc::{AvaiableDataResponse, SubmitResultReq};
use crate::ipc::{AvaiableDataResponse, CompleteDataReq, SubmitOuputDataReq};
use anyhow::Result;
use jz_action::core::models::{NodeRepo, TrackerState};
use jz_action::network::common::Empty;
Expand Down Expand Up @@ -51,10 +51,18 @@ where

pub(crate) upstreams: Option<Vec<String>>,

pub(crate) ipc_process_submit_result_tx:
Option<mpsc::Sender<(SubmitResultReq, oneshot::Sender<()>)>>,
// channel for process avaiable data request
pub(crate) ipc_process_data_req_tx:
Option<mpsc::Sender<((), oneshot::Sender<AvaiableDataResponse>)>>,

// channel for response complete data. do clean work when receive this request
pub(crate) ipc_process_completed_data_tx:
Option<mpsc::Sender<(CompleteDataReq, oneshot::Sender<()>)>>,

// channel for submit output data
pub(crate) ipc_process_submit_output_tx:
Option<mpsc::Sender<(SubmitOuputDataReq, oneshot::Sender<()>)>>,

pub(crate) out_going_tx: broadcast::Sender<MediaDataBatchResponse>, //receive data from upstream and send it to program with this
}

Expand All @@ -72,7 +80,8 @@ where
node_type: NodeType::Input,
local_state: TrackerState::Init,
upstreams: None,
ipc_process_submit_result_tx: None,
ipc_process_submit_output_tx: None,
ipc_process_completed_data_tx: None,
ipc_process_data_req_tx: None,
out_going_tx: out_going_tx,
}
Expand All @@ -86,9 +95,10 @@ where
}
}

/// data was trasfer from user contaienr to data container
pub(crate) async fn track_input_data(&mut self) -> Result<()> {
let (ipc_process_submit_result_tx, mut ipc_process_submit_result_rx) = mpsc::channel(1024);
self.ipc_process_submit_result_tx = Some(ipc_process_submit_result_tx);
self.ipc_process_submit_output_tx = Some(ipc_process_submit_result_tx);

//TODO this make a async process to be sync process. got a low performance,
//if have any bottleneck here, we should refrator this one
Expand All @@ -107,7 +117,7 @@ where
// respose with nothing
resp.send(()).expect("channel only read once");

let tmp_out_path = tmp_store.join(req.id.clone()+"-out");
let tmp_out_path = tmp_store.join(req.id.clone());
let mut new_batch =MediaDataBatchResponse::default();

let mut entry_count = 0 ;
Expand Down Expand Up @@ -155,6 +165,7 @@ where
Ok(())
}

/// data was transfer from data container -> user container -> data container
pub(crate) async fn track_input_output_data(&mut self) -> Result<()> {
let upstreams = self
.upstreams
Expand All @@ -167,7 +178,11 @@ where
self.ipc_process_data_req_tx = Some(ipc_process_data_req_tx);

let (ipc_process_submit_result_tx, mut ipc_process_submit_result_rx) = mpsc::channel(1024);
self.ipc_process_submit_result_tx = Some(ipc_process_submit_result_tx);
self.ipc_process_submit_output_tx = Some(ipc_process_submit_result_tx);

let (ipc_process_completed_data_tx, mut ipc_process_completed_data_rx) =
mpsc::channel(1024);
self.ipc_process_completed_data_tx = Some(ipc_process_completed_data_tx);

for upstream in upstreams {
{
Expand Down Expand Up @@ -201,18 +216,12 @@ where
if let Some(data_batch) = data_batch_result {
//create input directory
let id = Uuid::new_v4().to_string();
let tmp_in_path = tmp_store.join(id.clone()+"-input");
let tmp_in_path = tmp_store.join(id.clone());
if let Err(e) = fs::create_dir_all(&tmp_in_path).await {
error!("create input dir {:?} fail {}", tmp_in_path, e);
return
}

//create output directory at the same time
let tmp_out_path = tmp_store.join(id.clone()+"-output");
if let Err(e) = fs::create_dir_all(&tmp_out_path).await {
error!("create output dir {:?} fail {}", tmp_out_path, e);
return
}
//write batch files
for entry in data_batch.cells.iter() {
let entry_path = tmp_in_path.join(entry.path.clone());
Expand All @@ -238,7 +247,7 @@ where
}
}
},
Some((req, resp)) = ipc_process_submit_result_rx.recv() => {
Some((req, resp)) = ipc_process_completed_data_rx.recv() => {
//mark this data as completed
match state_map.get_mut(&req.id) {
Some(state)=>{
Expand All @@ -248,10 +257,16 @@ where
}
// respose with nothing
resp.send(()).expect("channel only read once");

//remove input data
let tmp_path = tmp_store.join(req.id.clone());
if let Err(e) = fs::remove_dir_all(&tmp_path).await {
error!("remove tmp dir{:?} fail {}", tmp_path, e);
}
},
Some((req, resp)) = ipc_process_submit_result_rx.recv() => {
//reconstruct batch
//TODO combine multiple batch
let tmp_out_path = tmp_store.join(req.id.clone()+"-out");
let tmp_out_path = tmp_store.join(req.id.clone());
let mut new_batch = MediaDataBatchResponse::default();

let mut entry_count = 0 ;
Expand Down Expand Up @@ -280,23 +295,22 @@ where
}
new_batch.size = entry_count;

// respose with nothing
resp.send(()).expect("channel only read once");
//write outgoing
if new_batch.size >0 {
if let Err(e) = out_going_tx.send(new_batch) {
error!("send data {}", e);
continue;
}
let entry = state_map.get_mut(&req.id)
.expect("this value has been inserted before");
entry.state = DataStateEnum::Sent;
}

//remove data
let tmp_path = tmp_store.join(req.id.clone()+"-input");
if let Err(e) = fs::remove_dir(&tmp_path).await {
//remove output data
// TODO keep outgoing data until all downstream channel recevied this data
let tmp_path = tmp_store.join(req.id.clone());
if let Err(e) = fs::remove_dir_all(&tmp_path).await {
error!("remove tmp dir{:?} fail {}", tmp_path, e);
}
let _ = state_map.remove(&req.id);
},
}
}
Expand All @@ -316,7 +330,7 @@ where
self.ipc_process_data_req_tx = Some(ipc_process_data_req_tx);

let (ipc_process_submit_result_tx, mut ipc_process_submit_result_rx) = mpsc::channel(1024);
self.ipc_process_submit_result_tx = Some(ipc_process_submit_result_tx);
self.ipc_process_submit_output_tx = Some(ipc_process_submit_result_tx);

for upstream in upstreams {
{
Expand Down Expand Up @@ -350,18 +364,12 @@ where
if let Some(data_batch) = data_batch_result {
//create input directory
let id = Uuid::new_v4().to_string();
let tmp_in_path = tmp_store.join(id.clone()+"-input");
let tmp_in_path = tmp_store.join(id.clone());
if let Err(e) = fs::create_dir_all(&tmp_in_path).await {
error!("create input dir {:?} fail {}", tmp_in_path, e);
return
}

//create output directory at the same time
let tmp_out_path = tmp_store.join(id.clone()+"-output");
if let Err(e) = fs::create_dir_all(&tmp_out_path).await {
error!("create output dir {:?} fail {}", tmp_out_path, e);
return
}
//write batch files
for entry in data_batch.cells.iter() {
let entry_path = tmp_in_path.join(entry.path.clone());
Expand Down Expand Up @@ -396,8 +404,8 @@ where
None=>error!("id({:?}) not found", &req.id)
}
//remove data
let tmp_path = tmp_store.join(req.id.clone()+"-input");
if let Err(e) = fs::remove_dir(&tmp_path).await {
let tmp_path = tmp_store.join(req.id.clone());
if let Err(e) = fs::remove_dir_all(&tmp_path).await {
error!("remove tmp dir{:?} fail {}", tmp_path, e);
}
//remove state
Expand Down
11 changes: 7 additions & 4 deletions nodes/jz_reader/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use compute_unit_runner::ipc::{self, IPCClient};
use jiaozifs_client_rs::apis;
use jiaozifs_client_rs::models::RefType;
use jz_action::utils::StdIntoAnyhowResult;
use std::path::Path;
use std::str::FromStr;
use tokio::fs;
use tokio::select;
Expand Down Expand Up @@ -119,11 +120,12 @@ async fn read_jz_fs(args: Args) -> Result<()> {
.await?;

let client = ipc::IPCClientImpl::new(args.unix_socket_addr);
let tmp_path = Path::new(&args.tmp_path);
for batch in file_paths.chunks(args.batch_size) {
//create temp output directory
let id = uuid::Uuid::new_v4().to_string();
let tmp_dir = args.tmp_path.clone() + "/" + &id + "-output";
fs::create_dir_all(tmp_dir.clone()).await?;
let output_dir = tmp_path.join(&id);
fs::create_dir_all(output_dir.clone()).await?;

//read file from jzfs and write to output directory
let mut files = vec![];
Expand All @@ -139,7 +141,8 @@ async fn read_jz_fs(args: Args) -> Result<()> {
)
.await?;

let mut tmp_file = fs::File::create(tmp_dir.clone() + &path).await?;
let file_path = output_dir.as_path().join(&path);
let mut tmp_file = fs::File::create(file_path).await?;
while let Some(item) = byte_stream.next().await {
tokio::io::copy(&mut item.unwrap().as_ref(), &mut tmp_file)
.await
Expand All @@ -149,7 +152,7 @@ async fn read_jz_fs(args: Args) -> Result<()> {
}

//submit directory after completed a batch
client.submit_result(&id).await?;
client.submit_output(&id).await?;
}
Ok(())
}
10 changes: 5 additions & 5 deletions nodes/jz_writer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use jiaozifs_client_rs::apis::{self};
use jiaozifs_client_rs::models::RefType;
use jz_action::utils::IntoAnyhowResult;
use jz_action::utils::StdIntoAnyhowResult;
use std::path::Path;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use tokio::fs;
use tokio::select;
Expand Down Expand Up @@ -117,11 +117,11 @@ async fn write_jz_fs(args: Args) -> Result<()> {
}?;

let client = ipc::IPCClientImpl::new(args.unix_socket_addr);
let tmp_path = Path::new(&args.tmp_path);
loop {
let id = client.request_avaiable_data().await?;
let path_str = args.tmp_path.clone() + "/" + &id + "-input";
let root_input_dir = Path::new(&path_str);

let path_str = tmp_path.join(&id);
let root_input_dir = path_str.as_path();
for entry in WalkDir::new(root_input_dir) {
let entry = entry?;
if entry.file_type().is_file() {
Expand All @@ -140,6 +140,6 @@ async fn write_jz_fs(args: Args) -> Result<()> {
.await?;
}
}
client.submit_result(&id).await?;
client.complete_result(&id).await?;
}
}

0 comments on commit cc66e32

Please sign in to comment.