From cc66e32d5d76d4a87ec6070786f5a7dbbd339ff3 Mon Sep 17 00:00:00 2001 From: hunjixin <1084400399@qq.com> Date: Sat, 27 Jul 2024 13:27:29 +0000 Subject: [PATCH] feat: remove inpout output relationship --- crates/compute_unit_runner/src/ipc.rs | 76 ++++++++++++++++--- .../src/media_data_tracker.rs | 76 ++++++++++--------- nodes/jz_reader/src/main.rs | 11 ++- nodes/jz_writer/src/main.rs | 10 +-- 4 files changed, 120 insertions(+), 53 deletions(-) diff --git a/crates/compute_unit_runner/src/ipc.rs b/crates/compute_unit_runner/src/ipc.rs index 56d2ee5..648d6b7 100644 --- a/crates/compute_unit_runner/src/ipc.rs +++ b/crates/compute_unit_runner/src/ipc.rs @@ -22,13 +22,24 @@ pub(crate) struct AvaiableDataResponse { } #[derive(Debug, Serialize, Deserialize)] -pub(crate) struct SubmitResultReq { +pub(crate) struct CompleteDataReq { pub(crate) id: String, } -impl SubmitResultReq { +impl CompleteDataReq { fn new(id: &str) -> Self { - SubmitResultReq { id: id.to_string() } + CompleteDataReq { id: id.to_string() } + } +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct SubmitOuputDataReq { + pub(crate) id: String, +} + +impl SubmitOuputDataReq { + fn new(id: &str) -> Self { + SubmitOuputDataReq { id: id.to_string() } } } @@ -54,9 +65,9 @@ where } } -async fn process_submit_result_request( +async fn process_completed_request( program_mutex: web::Data>>>, - data: web::Json, + data: web::Json, ) -> HttpResponse where R: NodeRepo, @@ -65,7 +76,33 @@ where let program = program_mutex.lock().await; //read request program - .ipc_process_submit_result_tx + .ipc_process_completed_data_tx + .as_ref() + .unwrap() + .send((data.0, tx)) + .await + .unwrap(); + match rx.try_recv() { + Ok(resp) => { + //response result + HttpResponse::Ok().body(resp) + } + Err(e) => HttpResponse::ServiceUnavailable().body(e.to_string()), + } +} + +async fn process_submit_output_request( + program_mutex: web::Data>>>, + data: web::Json, +) -> HttpResponse +where + R: NodeRepo, +{ + let (tx, mut rx) = oneshot::channel::<()>(); + let program = program_mutex.lock().await; + //read request + program + .ipc_process_submit_output_tx .as_ref() .unwrap() .send((data.0, tx)) @@ -91,7 +128,8 @@ where App::new() .app_data(program.clone()) .service(web::resource("/api/v1/data").get(process_data_request::)) - .service(web::resource("/api/v1/submit").post(process_submit_result_request::)) + .service(web::resource("/api/v1/data").post(process_completed_request::)) + .service(web::resource("/api/v1/submit").post(process_submit_output_request::)) }) .bind_uds(unix_socket_addr) .unwrap(); @@ -100,7 +138,8 @@ where } pub trait IPCClient { - async fn submit_result(&self, id: &str) -> Result<()>; + async fn submit_output(&self, id: &str) -> Result<()>; + async fn complete_result(&self, id: &str) -> Result<()>; async fn request_avaiable_data(&self) -> Result; } @@ -119,8 +158,8 @@ impl IPCClientImpl { } impl IPCClient for IPCClientImpl { - async fn submit_result(&self, id: &str) -> Result<()> { - let req = SubmitResultReq::new(id); + async fn submit_output(&self, id: &str) -> Result<()> { + let req = SubmitOuputDataReq::new(id); let json = serde_json::to_string(&req)?; let req: Request> = Request::builder() @@ -148,4 +187,21 @@ impl IPCClient for IPCClientImpl { let avaiabel_data: AvaiableDataResponse = serde_json::from_slice(contents.as_ref())?; Ok(avaiabel_data.id) } + + async fn complete_result(&self, id: &str) -> Result<()> { + let req = CompleteDataReq::new(id); + let json = serde_json::to_string(&req)?; + + let req: Request> = Request::builder() + .method(Method::POST) + .uri(self.unix_socket_addr.clone() + "/api/v1/data") + .body(Full::from(json))?; + + let resp = self.client.request(req).await.anyhow()?; + if resp.status().is_success() { + return Ok(()); + } + + Err(anyhow!("submit data fail {}", resp.status())) + } } diff --git a/crates/compute_unit_runner/src/media_data_tracker.rs b/crates/compute_unit_runner/src/media_data_tracker.rs index d574ad7..25bbe93 100644 --- a/crates/compute_unit_runner/src/media_data_tracker.rs +++ b/crates/compute_unit_runner/src/media_data_tracker.rs @@ -1,4 +1,4 @@ -use crate::ipc::{AvaiableDataResponse, SubmitResultReq}; +use crate::ipc::{AvaiableDataResponse, CompleteDataReq, SubmitOuputDataReq}; use anyhow::Result; use jz_action::core::models::{NodeRepo, TrackerState}; use jz_action::network::common::Empty; @@ -51,10 +51,18 @@ where pub(crate) upstreams: Option>, - pub(crate) ipc_process_submit_result_tx: - Option)>>, + // channel for process avaiable data request pub(crate) ipc_process_data_req_tx: Option)>>, + + // channel for response complete data. do clean work when receive this request + pub(crate) ipc_process_completed_data_tx: + Option)>>, + + // channel for submit output data + pub(crate) ipc_process_submit_output_tx: + Option)>>, + pub(crate) out_going_tx: broadcast::Sender, //receive data from upstream and send it to program with this } @@ -72,7 +80,8 @@ where node_type: NodeType::Input, local_state: TrackerState::Init, upstreams: None, - ipc_process_submit_result_tx: None, + ipc_process_submit_output_tx: None, + ipc_process_completed_data_tx: None, ipc_process_data_req_tx: None, out_going_tx: out_going_tx, } @@ -86,9 +95,10 @@ where } } + /// data was trasfer from user contaienr to data container pub(crate) async fn track_input_data(&mut self) -> Result<()> { let (ipc_process_submit_result_tx, mut ipc_process_submit_result_rx) = mpsc::channel(1024); - self.ipc_process_submit_result_tx = Some(ipc_process_submit_result_tx); + self.ipc_process_submit_output_tx = Some(ipc_process_submit_result_tx); //TODO this make a async process to be sync process. got a low performance, //if have any bottleneck here, we should refrator this one @@ -107,7 +117,7 @@ where // respose with nothing resp.send(()).expect("channel only read once"); - let tmp_out_path = tmp_store.join(req.id.clone()+"-out"); + let tmp_out_path = tmp_store.join(req.id.clone()); let mut new_batch =MediaDataBatchResponse::default(); let mut entry_count = 0 ; @@ -155,6 +165,7 @@ where Ok(()) } + /// data was transfer from data container -> user container -> data container pub(crate) async fn track_input_output_data(&mut self) -> Result<()> { let upstreams = self .upstreams @@ -167,7 +178,11 @@ where self.ipc_process_data_req_tx = Some(ipc_process_data_req_tx); let (ipc_process_submit_result_tx, mut ipc_process_submit_result_rx) = mpsc::channel(1024); - self.ipc_process_submit_result_tx = Some(ipc_process_submit_result_tx); + self.ipc_process_submit_output_tx = Some(ipc_process_submit_result_tx); + + let (ipc_process_completed_data_tx, mut ipc_process_completed_data_rx) = + mpsc::channel(1024); + self.ipc_process_completed_data_tx = Some(ipc_process_completed_data_tx); for upstream in upstreams { { @@ -201,18 +216,12 @@ where if let Some(data_batch) = data_batch_result { //create input directory let id = Uuid::new_v4().to_string(); - let tmp_in_path = tmp_store.join(id.clone()+"-input"); + let tmp_in_path = tmp_store.join(id.clone()); if let Err(e) = fs::create_dir_all(&tmp_in_path).await { error!("create input dir {:?} fail {}", tmp_in_path, e); return } - //create output directory at the same time - let tmp_out_path = tmp_store.join(id.clone()+"-output"); - if let Err(e) = fs::create_dir_all(&tmp_out_path).await { - error!("create output dir {:?} fail {}", tmp_out_path, e); - return - } //write batch files for entry in data_batch.cells.iter() { let entry_path = tmp_in_path.join(entry.path.clone()); @@ -238,7 +247,7 @@ where } } }, - Some((req, resp)) = ipc_process_submit_result_rx.recv() => { + Some((req, resp)) = ipc_process_completed_data_rx.recv() => { //mark this data as completed match state_map.get_mut(&req.id) { Some(state)=>{ @@ -248,10 +257,16 @@ where } // respose with nothing resp.send(()).expect("channel only read once"); - + //remove input data + let tmp_path = tmp_store.join(req.id.clone()); + if let Err(e) = fs::remove_dir_all(&tmp_path).await { + error!("remove tmp dir{:?} fail {}", tmp_path, e); + } + }, + Some((req, resp)) = ipc_process_submit_result_rx.recv() => { //reconstruct batch //TODO combine multiple batch - let tmp_out_path = tmp_store.join(req.id.clone()+"-out"); + let tmp_out_path = tmp_store.join(req.id.clone()); let mut new_batch = MediaDataBatchResponse::default(); let mut entry_count = 0 ; @@ -280,23 +295,22 @@ where } new_batch.size = entry_count; + // respose with nothing + resp.send(()).expect("channel only read once"); //write outgoing if new_batch.size >0 { if let Err(e) = out_going_tx.send(new_batch) { error!("send data {}", e); continue; } - let entry = state_map.get_mut(&req.id) - .expect("this value has been inserted before"); - entry.state = DataStateEnum::Sent; } - //remove data - let tmp_path = tmp_store.join(req.id.clone()+"-input"); - if let Err(e) = fs::remove_dir(&tmp_path).await { + //remove output data + // TODO keep outgoing data until all downstream channel recevied this data + let tmp_path = tmp_store.join(req.id.clone()); + if let Err(e) = fs::remove_dir_all(&tmp_path).await { error!("remove tmp dir{:?} fail {}", tmp_path, e); } - let _ = state_map.remove(&req.id); }, } } @@ -316,7 +330,7 @@ where self.ipc_process_data_req_tx = Some(ipc_process_data_req_tx); let (ipc_process_submit_result_tx, mut ipc_process_submit_result_rx) = mpsc::channel(1024); - self.ipc_process_submit_result_tx = Some(ipc_process_submit_result_tx); + self.ipc_process_submit_output_tx = Some(ipc_process_submit_result_tx); for upstream in upstreams { { @@ -350,18 +364,12 @@ where if let Some(data_batch) = data_batch_result { //create input directory let id = Uuid::new_v4().to_string(); - let tmp_in_path = tmp_store.join(id.clone()+"-input"); + let tmp_in_path = tmp_store.join(id.clone()); if let Err(e) = fs::create_dir_all(&tmp_in_path).await { error!("create input dir {:?} fail {}", tmp_in_path, e); return } - //create output directory at the same time - let tmp_out_path = tmp_store.join(id.clone()+"-output"); - if let Err(e) = fs::create_dir_all(&tmp_out_path).await { - error!("create output dir {:?} fail {}", tmp_out_path, e); - return - } //write batch files for entry in data_batch.cells.iter() { let entry_path = tmp_in_path.join(entry.path.clone()); @@ -396,8 +404,8 @@ where None=>error!("id({:?}) not found", &req.id) } //remove data - let tmp_path = tmp_store.join(req.id.clone()+"-input"); - if let Err(e) = fs::remove_dir(&tmp_path).await { + let tmp_path = tmp_store.join(req.id.clone()); + if let Err(e) = fs::remove_dir_all(&tmp_path).await { error!("remove tmp dir{:?} fail {}", tmp_path, e); } //remove state diff --git a/nodes/jz_reader/src/main.rs b/nodes/jz_reader/src/main.rs index bcb4381..4267505 100644 --- a/nodes/jz_reader/src/main.rs +++ b/nodes/jz_reader/src/main.rs @@ -4,6 +4,7 @@ use compute_unit_runner::ipc::{self, IPCClient}; use jiaozifs_client_rs::apis; use jiaozifs_client_rs::models::RefType; use jz_action::utils::StdIntoAnyhowResult; +use std::path::Path; use std::str::FromStr; use tokio::fs; use tokio::select; @@ -119,11 +120,12 @@ async fn read_jz_fs(args: Args) -> Result<()> { .await?; let client = ipc::IPCClientImpl::new(args.unix_socket_addr); + let tmp_path = Path::new(&args.tmp_path); for batch in file_paths.chunks(args.batch_size) { //create temp output directory let id = uuid::Uuid::new_v4().to_string(); - let tmp_dir = args.tmp_path.clone() + "/" + &id + "-output"; - fs::create_dir_all(tmp_dir.clone()).await?; + let output_dir = tmp_path.join(&id); + fs::create_dir_all(output_dir.clone()).await?; //read file from jzfs and write to output directory let mut files = vec![]; @@ -139,7 +141,8 @@ async fn read_jz_fs(args: Args) -> Result<()> { ) .await?; - let mut tmp_file = fs::File::create(tmp_dir.clone() + &path).await?; + let file_path = output_dir.as_path().join(&path); + let mut tmp_file = fs::File::create(file_path).await?; while let Some(item) = byte_stream.next().await { tokio::io::copy(&mut item.unwrap().as_ref(), &mut tmp_file) .await @@ -149,7 +152,7 @@ async fn read_jz_fs(args: Args) -> Result<()> { } //submit directory after completed a batch - client.submit_result(&id).await?; + client.submit_output(&id).await?; } Ok(()) } diff --git a/nodes/jz_writer/src/main.rs b/nodes/jz_writer/src/main.rs index 1bda310..013b0f5 100644 --- a/nodes/jz_writer/src/main.rs +++ b/nodes/jz_writer/src/main.rs @@ -6,7 +6,7 @@ use jiaozifs_client_rs::apis::{self}; use jiaozifs_client_rs::models::RefType; use jz_action::utils::IntoAnyhowResult; use jz_action::utils::StdIntoAnyhowResult; -use std::path::Path; +use std::path::{Path, PathBuf}; use std::str::FromStr; use tokio::fs; use tokio::select; @@ -117,11 +117,11 @@ async fn write_jz_fs(args: Args) -> Result<()> { }?; let client = ipc::IPCClientImpl::new(args.unix_socket_addr); + let tmp_path = Path::new(&args.tmp_path); loop { let id = client.request_avaiable_data().await?; - let path_str = args.tmp_path.clone() + "/" + &id + "-input"; - let root_input_dir = Path::new(&path_str); - + let path_str = tmp_path.join(&id); + let root_input_dir = path_str.as_path(); for entry in WalkDir::new(root_input_dir) { let entry = entry?; if entry.file_type().is_file() { @@ -140,6 +140,6 @@ async fn write_jz_fs(args: Args) -> Result<()> { .await?; } } - client.submit_result(&id).await?; + client.complete_result(&id).await?; } }