From 2ba0032347b7773e28b961f6c6b9ef0734fdd5ad Mon Sep 17 00:00:00 2001 From: hunjixin <1084400399@qq.com> Date: Sat, 10 Aug 2024 14:42:21 +0000 Subject: [PATCH] feat: support metadata databatch --- Cargo.toml | 12 +- .../{dp_runner => channel_runner}/Cargo.toml | 2 +- .../{dp_runner => channel_runner}/dockerfile | 2 +- .../src/channel_tracker.rs | 78 ++++--- .../{dp_runner => channel_runner}/src/main.rs | 2 +- .../src/state_controller.rs | 0 .../src/stream.rs | 8 - crates/compute_unit_runner/src/ipc.rs | 15 +- .../src/media_data_tracker.rs | 203 ++++++++++++------ crates/nodes_sdk/src/lib.rs | 1 + crates/nodes_sdk/src/metadata.rs | 7 + makefile | 6 +- nodes/make_article/src/main.rs | 2 +- src/core/job_db_models.rs | 3 +- src/dbrepo/job_db_mongo.rs | 26 ++- src/driver/kube.rs | 10 +- src/driver/kubetpl/channel_statefulset.tpl | 4 +- src/network/protos/datatransfer.proto | 35 +-- 18 files changed, 256 insertions(+), 160 deletions(-) rename crates/{dp_runner => channel_runner}/Cargo.toml (95%) rename crates/{dp_runner => channel_runner}/dockerfile (56%) rename crates/{dp_runner => channel_runner}/src/channel_tracker.rs (79%) rename crates/{dp_runner => channel_runner}/src/main.rs (99%) rename crates/{dp_runner => channel_runner}/src/state_controller.rs (100%) rename crates/{dp_runner => channel_runner}/src/stream.rs (91%) create mode 100644 crates/nodes_sdk/src/metadata.rs diff --git a/Cargo.toml b/Cargo.toml index 79dd604..714b163 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,9 +1,15 @@ [workspace] members = [ - "crates/dp_runner", + "crates/channel_runner", "crates/compute_unit_runner", - "crates/jiaozifs_client_rs", "nodes/jz_reader", "nodes/jz_writer", "nodes/make_article", "nodes/list_files" -, "nodes/copy_in_place", "crates/nodes_sdk"] + "crates/jiaozifs_client_rs", + "nodes/jz_reader", + "nodes/jz_writer", + "nodes/make_article", + "nodes/list_files", + "nodes/copy_in_place", + "crates/nodes_sdk" +] [workspace.package] repository = "https://github.com/GitDataAI/jz-flow" diff --git a/crates/dp_runner/Cargo.toml b/crates/channel_runner/Cargo.toml similarity index 95% rename from crates/dp_runner/Cargo.toml rename to crates/channel_runner/Cargo.toml index 2a7a13b..ad2ca69 100644 --- a/crates/dp_runner/Cargo.toml +++ b/crates/channel_runner/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "dp_runner" +name = "channel_runner" version = "0.1.0" edition = "2021" diff --git a/crates/dp_runner/dockerfile b/crates/channel_runner/dockerfile similarity index 56% rename from crates/dp_runner/dockerfile rename to crates/channel_runner/dockerfile index 6b77abe..ff9c398 100644 --- a/crates/dp_runner/dockerfile +++ b/crates/channel_runner/dockerfile @@ -4,4 +4,4 @@ WORKDIR /app RUN mkdir -p /app -ADD dist/dp_runner /dp_runner +ADD dist/channel_runner /channel_runner diff --git a/crates/dp_runner/src/channel_tracker.rs b/crates/channel_runner/src/channel_tracker.rs similarity index 79% rename from crates/dp_runner/src/channel_tracker.rs rename to crates/channel_runner/src/channel_tracker.rs index d32a83d..3e22847 100644 --- a/crates/dp_runner/src/channel_tracker.rs +++ b/crates/channel_runner/src/channel_tracker.rs @@ -16,6 +16,7 @@ use jz_flow::{ }; use nodes_sdk::{ fs_cache::FileCache, + metadata::is_metadata, MessageSender, }; use std::{ @@ -51,7 +52,7 @@ where pub(crate) buf_size: usize, - pub(crate) fs_cache: Arc, + pub(crate) data_cache: Arc, pub(crate) local_state: TrackerState, @@ -70,7 +71,7 @@ where { pub(crate) fn new( repo: R, - fs_cache: Arc, + data_cache: Arc, name: &str, buf_size: usize, up_nodes: Vec, @@ -79,7 +80,7 @@ where ChannelTracker { name: name.to_string(), repo, - fs_cache, + data_cache, buf_size, local_state: TrackerState::Init, up_nodes, @@ -97,7 +98,7 @@ where //process backent task to do revert clean data etc let db_repo = self.repo.clone(); let node_name = self.name.clone(); - let fs_cache = self.fs_cache.clone(); + let data_cache = self.data_cache.clone(); let token = token.clone(); let up_nodes = self.up_nodes.clone(); @@ -106,7 +107,7 @@ where loop { let now = Instant::now(); info!("backend thread start"); - tokio::select! { + select! { _ = token.cancelled() => { return Ok(()); } @@ -116,15 +117,23 @@ where match db_repo.list_by_node_name_and_state(&node_name, &DataState::EndRecieved).await { Ok(datas) => { for data in datas { - match fs_cache.remove(&data.id).await { - Ok(_)=>{ - if let Err(err) = db_repo.update_state(&node_name, &data.id, &Direction::In, &DataState::Clean, None).await{ - error!("mark data as client receive {err}"); - continue; - } - debug!("remove data {}", &data.id); - }, - Err(err)=> error!("remove data {err}") + if data.is_metadata { + if let Err(err) = db_repo.update_state(&node_name, &data.id, &Direction::In, &DataState::KeeptForMetadata, None).await{ + error!("mark metadata data as client receive {err}"); + continue; + } + info!("mark metadata as cleint received") + }else { + match data_cache.remove(&data.id).await { + Ok(_)=>{ + if let Err(err) = db_repo.update_state(&node_name, &data.id, &Direction::In, &DataState::Clean, None).await{ + error!("mark data as client receive {err}"); + continue; + } + debug!("remove data {}", &data.id); + }, + Err(err)=> error!("remove data {err}") + } } } }, @@ -186,7 +195,7 @@ where //process request data let db_repo = self.repo.clone(); let node_name = self.name.clone(); - let fs_cache = self.fs_cache.clone(); + let data_cache = self.data_cache.clone(); let token = token.clone(); join_set.spawn(async move { @@ -200,7 +209,7 @@ where match db_repo.find_data_and_mark_state(&node_name, &Direction::In, &DataState::SelectForSend).await { std::result::Result::Ok(Some(record)) =>{ info!("return downstream's datarequest and start response data {}", &record.id); - match fs_cache.read(&record.id).await { + match data_cache.read(&record.id).await { Ok(databatch)=>{ //response this data's position resp.send(Ok(Some(databatch))).expect("channel only read once"); @@ -214,7 +223,6 @@ where continue; } } - }, std::result::Result::Ok(None)=>{ resp.send(Ok(None)).expect("channel only read once"); @@ -239,7 +247,7 @@ where let db_repo = self.repo.clone(); let node_name = self.name.clone(); let buf_size = self.buf_size; - let fs_cache = self.fs_cache.clone(); + let data_cache = self.data_cache.clone(); let token = token.clone(); join_set.spawn(async move { @@ -254,17 +262,6 @@ where let now = Instant::now(); let id = data_batch.id.clone(); let size = data_batch.size; - //check limit - if let Err(err) = db_repo.count(&node_name,&[&DataState::Received], Some(&Direction::In)).await.and_then(|count|{ - if count > buf_size { - Err(anyhow!("has reach limit current:{count} limit:{buf_size}")) - } else { - Ok(()) - } - }){ - resp.send(Err(anyhow!("cannt query limit from mongo {err}"))).expect("request alread listen this channel"); - continue; - } // processed before match db_repo.find_by_node_id(&node_name,&id, &Direction::In).await { @@ -278,13 +275,27 @@ where resp.send(Err(err)).expect("request alread listen this channel"); continue; } - _=>{} + _=>{} } - - // code below this can be move another coroutine + let is_data_metadata = if is_metadata(&id) { + //check limit for plain data + if let Err(err) = db_repo.count(&node_name,&[&DataState::Received], Some(&Direction::In)).await.and_then(|count|{ + if count > buf_size { + Err(anyhow!("has reach limit current:{count} limit:{buf_size}")) + } else { + Ok(()) + } + }){ + resp.send(Err(anyhow!("cannt query limit from mongo {err}"))).expect("request alread listen this channel"); + continue; + } + true + }else { + false + }; //write batch files - if let Err(err) = fs_cache.write(data_batch).await { + if let Err(err) = data_cache.write(data_batch).await { error!("write files to disk fail {}", err); resp.send(Err(err)).expect("request alread listen this channel"); continue; @@ -296,6 +307,7 @@ where node_name: node_name.clone(), id:id.clone(), size, + is_metadata:is_data_metadata, state: DataState::Received, sent: vec![], direction:Direction::In, diff --git a/crates/dp_runner/src/main.rs b/crates/channel_runner/src/main.rs similarity index 99% rename from crates/dp_runner/src/main.rs rename to crates/channel_runner/src/main.rs index 37c3cba..a566c81 100644 --- a/crates/dp_runner/src/main.rs +++ b/crates/channel_runner/src/main.rs @@ -37,7 +37,7 @@ use tracing::{ #[derive(Debug, Parser)] #[command( - name = "dp_runner", + name = "channel_runner", version = "0.0.1", author = "Author Name ", about = "embed in k8s images. make process data input output" diff --git a/crates/dp_runner/src/state_controller.rs b/crates/channel_runner/src/state_controller.rs similarity index 100% rename from crates/dp_runner/src/state_controller.rs rename to crates/channel_runner/src/state_controller.rs diff --git a/crates/dp_runner/src/stream.rs b/crates/channel_runner/src/stream.rs similarity index 91% rename from crates/dp_runner/src/stream.rs rename to crates/channel_runner/src/stream.rs index df6fe3e..4aae0dd 100644 --- a/crates/dp_runner/src/stream.rs +++ b/crates/channel_runner/src/stream.rs @@ -6,7 +6,6 @@ use jz_flow::{ datatransfer::{ data_stream_server::DataStream, MediaDataBatchResponse, - TabularDataBatchResponse, }, }, }; @@ -84,11 +83,4 @@ where Err(err) => Err(Status::from_error(Box::new(err))), } } - - async fn transfer_tabular_data( - &self, - _req: Request, - ) -> Result, tonic::Status> { - todo!() - } } diff --git a/crates/compute_unit_runner/src/ipc.rs b/crates/compute_unit_runner/src/ipc.rs index b06f903..5c86f31 100644 --- a/crates/compute_unit_runner/src/ipc.rs +++ b/crates/compute_unit_runner/src/ipc.rs @@ -232,6 +232,17 @@ impl SubmitOuputDataReq { } } +#[derive(Deserialize)] +pub struct RequetDataReq { + pub id: String, +} + +impl RequetDataReq { + pub fn new(id: &str) -> Self { + RequetDataReq { id: id.to_string() } + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct Status { pub state: TrackerState, @@ -254,6 +265,7 @@ where async fn process_data_request( program_mutex: web::Data>>>, + req: web::Query, ) -> HttpResponse where R: JobDbRepo + Clone, @@ -276,11 +288,12 @@ where drop(program); sleep(Duration::from_secs(5)).await; }; + //read request let (tx, rx) = oneshot::channel::>>(); match sender { Some(sender) => { - if let Err(err) = sender.send(((), tx)).await { + if let Err(err) = sender.send((req.into_inner(), tx)).await { return HttpResponse::InternalServerError() .json(format!("send to avaiable data channel {err}")); } diff --git a/crates/compute_unit_runner/src/media_data_tracker.rs b/crates/compute_unit_runner/src/media_data_tracker.rs index 4886f6e..a3dc407 100644 --- a/crates/compute_unit_runner/src/media_data_tracker.rs +++ b/crates/compute_unit_runner/src/media_data_tracker.rs @@ -1,6 +1,7 @@ use crate::ipc::{ AvaiableDataResponse, CompleteDataReq, + RequetDataReq, SubmitOuputDataReq, }; use anyhow::{ @@ -41,6 +42,7 @@ use tracing::{ use nodes_sdk::{ fs_cache::FileCache, + metadata::is_metadata, multi_sender::MultiSender, MessageSender, }; @@ -70,7 +72,7 @@ where pub(crate) buf_size: usize, - pub(crate) fs_cache: Arc, + pub(crate) data_cache: Arc, pub(crate) repo: R, @@ -83,7 +85,8 @@ where pub(crate) downstreams: Vec, // channel for process avaiable data request - pub(crate) ipc_process_data_req_tx: Option>>, + pub(crate) ipc_process_data_req_tx: + Option>>, // channel for response complete data. do clean work when receive this request pub(crate) ipc_process_completed_data_tx: Option>, @@ -101,14 +104,14 @@ where pub fn new( repo: R, name: &str, - fs_cache: Arc, + data_cache: Arc, buf_size: usize, up_nodes: Vec, upstreams: Vec, downstreams: Vec, ) -> Self { MediaDataTracker { - fs_cache, + data_cache, buf_size, name: name.to_string(), repo, @@ -213,7 +216,7 @@ where let new_data_tx = new_data_tx.clone(); let token = token.clone(); join_set.spawn(async move { - let mut interval = time::interval(Duration::from_secs(2)); + let mut interval = time::interval(Duration::from_secs(5)); loop { select! { _ = token.cancelled() => { @@ -230,7 +233,7 @@ where for _ in 0..10 { let downstreams = self.downstreams.clone(); let mut multi_sender = MultiSender::new(downstreams.clone()); - let fs_cache = self.fs_cache.clone(); + let data_cache = self.data_cache.clone(); let token = token.clone(); let node_name = self.name.clone(); let db_repo = self.repo.clone(); @@ -254,7 +257,7 @@ where let now = Instant::now(); match db_repo.find_data_and_mark_state(&node_name, &Direction::Out, &DataState::SelectForSend).await { Ok(Some(req)) =>{ - let new_batch = match fs_cache.read(&req.id).await { + let new_batch = match data_cache.read(&req.id).await { Ok(batch) => batch, Err(err) => { warn!("failed to read batch: {}", err); @@ -284,7 +287,7 @@ where match db_repo.update_state(&node_name, &req.id, &Direction::Out, &DataState::Sent, Some(downstreams.iter().map(|key|key.as_str()).collect())).await{ Ok(_) =>{ //remove input data - if let Err(err) = fs_cache.remove(&req.id).await { + if let Err(err) = data_cache.remove(&req.id).await { error!("remove template batch fail {}", err); } @@ -327,28 +330,28 @@ where } Some((req, resp)) = ipc_process_submit_result_rx.recv() => { //check finish state - if matches!(*local_state.read().await, TrackerState::Finish) { + if *local_state.read().await == TrackerState::Finish { resp.send(Err(anyhow!("node was finished"))).expect("channel only read once"); continue; } - - loop { - if let Err(err) = db_repo.count(&node_name, &[&DataState::Received, &DataState::PartialSent], Some(&Direction::Out)).await.and_then(|count|{ - if count > buf_size { - Err(anyhow!("has reach limit current:{count} limit:{buf_size}")) - } else { - Ok(()) + let is_metadata = is_metadata(&req.id); + if !is_metadata { + loop { + if let Err(err) = db_repo.count(&node_name, &[&DataState::Received, &DataState::PartialSent], Some(&Direction::Out)).await.and_then(|count|{ + if count > buf_size { + Err(anyhow!("has reach limit current:{count} limit:{buf_size}")) + } else { + Ok(()) + } + }){ + warn!("fail with limit {err}"); + sleep(Duration::from_secs(10)).await; + continue; } - }){ - warn!("fail with limit {err}"); - sleep(Duration::from_secs(10)).await; - continue; + break; } - break; } - - info!("start to insert data {}", &req.id); // respose with nothing let tm =Utc::now().timestamp(); @@ -357,6 +360,7 @@ where id:req.id.clone(), size: req.size, sent: vec![], + is_metadata, state: DataState::Received, direction: Direction::Out, created_at:tm, @@ -378,31 +382,53 @@ where }); } - //TODO this make a async process to be sync process. got a low performance, - //if have any bottleneck here, we should refrator this one + let incoming_data_tx = broadcast::Sender::new(1); if !self.upstreams.is_empty() { - //process user contaienr request + //fetch data from channel let db_repo = self.repo.clone(); let node_name = self.name.clone(); let url = self.upstreams[0].clone(); - let fs_cache = self.fs_cache.clone(); + let data_cache = self.data_cache.clone(); let token = token.clone(); let local_state = self.local_state.clone(); + { + let incoming_data_tx = incoming_data_tx.clone(); + let token = token.clone(); + join_set.spawn(async move { + let mut interval = time::interval(Duration::from_secs(2)); + loop { + select! { + _ = token.cancelled() => { + return anyhow::Ok(()); + } + _ = interval.tick() => { + if matches!(*local_state.read().await, TrackerState::Finish) { + warn!("node is finish exit fetch data ticker"); + return anyhow::Ok(()); + } + let _ = incoming_data_tx.send(()); + } + } + } + }); + } + let local_state = self.local_state.clone(); + let mut incoming_data_rx = incoming_data_tx.subscribe(); join_set.spawn(async move { - // let mut client = DataStreamClient::connect(url.clone()).await; let mut client: Option> = None; loop { select! { _ = token.cancelled() => { return anyhow::Ok(()); } - Some((_, resp)) = ipc_process_data_req_rx.recv() => { + _ = incoming_data_rx.recv() => { //check finish state - if matches!(*local_state.read().await, TrackerState::Finish) { - resp.send(Err(anyhow!("node was finished"))).expect("channel only read once"); - continue; + if *local_state.read().await == TrackerState::Finish { + warn!("node is finish exit fetch data from channel"); + return anyhow::Ok(()); } + //select a unassgined data info!("try to find avaiable data"); if client.is_none() { @@ -412,37 +438,31 @@ where }, Err(err) =>{ error!("cannt connect upstream {err}"); - resp.send(Ok(None)).expect("channel only read once"); continue; } } } - let client_non = client.as_mut().unwrap(); - match client_non.request_media_data(Empty{}).await { + let client_ref = client.as_mut().unwrap(); + match client_ref.request_media_data(Empty{}).await { Ok(record) =>{ let data = record.into_inner(); + let id = data.id.clone(); + let size = data.size; match db_repo.find_by_node_id(&node_name,&data.id, &Direction::In).await { Ok(Some(_))=>{ - resp.send(Ok(None)).expect("channel only read once"); + //already exit in this node continue; } Err(err)=>{ error!("query mongo by id {err}"); - resp.send(Err(err)).expect("request alread listen this channel"); continue; } _=>{} } - let res_data = AvaiableDataResponse{ - id: data.id.clone(), - size: data.size - }; - - if let Err(err) = fs_cache.write(data).await { + if let Err(err) = data_cache.write(data).await { error!("write cache files {err}"); - resp.send(Err(anyhow!("write cache files {err}"))).expect("channel only read once"); continue; } //mark data as received @@ -451,8 +471,9 @@ where let tm =Utc::now().timestamp(); if let Err(err) = db_repo.insert_new_path(&DataRecord{ node_name:node_name.clone(), - id :res_data.id.clone(), - size: res_data.size, + id : id.clone(), + size, + is_metadata: is_metadata(&id), sent: vec![], state: DataState::Received, direction: Direction::In, @@ -460,41 +481,97 @@ where created_at:tm, }).await{ error!("mark data as client receive {err}"); - resp.send(Err(anyhow!("mark data as client receive {err}"))).expect("channel only read once"); continue; } //insert a new incoming data record - if let Err(err) = db_repo.update_state(&(node_name.clone() +"-channel"), &res_data.id, &Direction::In, &DataState::EndRecieved, None).await{ + if let Err(err) = db_repo.update_state(&(node_name.clone() +"-channel"), &id, &Direction::In, &DataState::EndRecieved, None).await{ error!("mark data as client receive {err}"); - resp.send(Err(anyhow!("mark data as client receive {err}"))).expect("channel only read once"); continue; } - info!("get data batch {} to user", &res_data.id); - //response this data's position - resp.send(Ok(Some(res_data))).expect("channel only read once"); + info!("save data batch {}", &id); }, Err(status)=>{ match status.code() { Code::NotFound =>{ - resp.send(Ok(None)).expect("channel only read once"); - }, - Code::DeadlineExceeded =>{ - client = None; - resp.send(Err(anyhow!("network deadline exceeded"))).expect("channel only read once"); - }, - Code::Unavailable =>{ - client = None; - resp.send(Err(anyhow!("connect break"))).expect("channel only read once"); + continue; }, _=>{ + client = None; error!("unable to retrieval data from channel {:?}", status); - resp.send(Err(anyhow!("unable to retrieval data"))).expect("channel only read once"); } } } } + } + } + } + }); + } + + //TODO this make a async process to be sync process. got a low performance, + //if have any bottleneck here, we should refrator this one + if !self.upstreams.is_empty() { + //process user contaienr request + let db_repo = self.repo.clone(); + let node_name = self.name.clone(); + let data_cache = self.data_cache.clone(); + let token = token.clone(); + let local_state = self.local_state.clone(); + let incoming_data_tx = incoming_data_tx.clone(); + join_set.spawn(async move { + loop { + select! { + _ = token.cancelled() => { + return anyhow::Ok(()); + } + Some((req, resp)) = ipc_process_data_req_rx.recv() => { + //check finish state + if matches!(*local_state.read().await, TrackerState::Finish) { + resp.send(Err(anyhow!("node was finished"))).expect("channel only read once"); + continue; + } + if is_metadata(&req.id) { + let result = match db_repo.find_by_node_id(&node_name,&req.id, &Direction::In).await { + Ok(Some(record))=>{ + Ok(Some(AvaiableDataResponse { + id: record.id.clone(), + size: record.size, + })) + }, + Ok(None)=> Ok(None), + Err(err)=>Err(err), + }; + resp.send(result).expect("channel send failed: channel can only be read once"); + continue; + } + + let result = { + let record = db_repo + .find_data_and_mark_state(&node_name, &Direction::In, &DataState::Assigned) + .await + .map_err(|err| anyhow!("query available data failed: {}", err))?; + if record.is_none() { + let _ = incoming_data_tx.send(()); + Ok(None) + } else { + let record = record.unwrap(); + if let Err(err) = data_cache.read(&record.id).await { + error!("read files {} {err}, try to find another data", record.id); + if let Err(err) = db_repo.update_state(&node_name,&record.id, &Direction::In, &DataState::Error, None).await { + error!("mark data {} to error {err}", &record.id); + } + continue; + } + let res_data = AvaiableDataResponse { + id: record.id.clone(), + size: record.size, + }; + Ok(Some(res_data)) + } + }; + resp.send(result).expect("channel send failed: channel can only be read once"); }, Some((req, resp)) = ipc_process_completed_data_rx.recv() => { //check finish state @@ -506,7 +583,7 @@ where Ok(_) =>{ // respose with nothing resp.send(Ok(())).expect("channel only read once"); - if let Err(err) = fs_cache.remove(&req.id).await { + if let Err(err) = data_cache.remove(&req.id).await { error!("remove tmp fs fail {}", err); continue; } diff --git a/crates/nodes_sdk/src/lib.rs b/crates/nodes_sdk/src/lib.rs index ff90760..df128e2 100644 --- a/crates/nodes_sdk/src/lib.rs +++ b/crates/nodes_sdk/src/lib.rs @@ -1,6 +1,7 @@ #![feature(duration_constructors)] pub mod fs_cache; +pub mod metadata; pub mod mprc; pub mod multi_sender; diff --git a/crates/nodes_sdk/src/metadata.rs b/crates/nodes_sdk/src/metadata.rs new file mode 100644 index 0000000..2b96bbc --- /dev/null +++ b/crates/nodes_sdk/src/metadata.rs @@ -0,0 +1,7 @@ +pub fn is_metadata(id: &str) -> bool { + id.ends_with("metadata") +} + +pub fn to_metadata(node_name: &str) -> String { + node_name.to_owned() + "metadata" +} diff --git a/makefile b/makefile index baeba08..b96ef17 100644 --- a/makefile +++ b/makefile @@ -13,8 +13,8 @@ build-cd: $(OUTPUT) cp target/release/compute_unit_runner $(OUTPUT)/compute_unit_runner build-dp: $(OUTPUT) - cargo build -p dp_runner --release - cp target/release/dp_runner $(OUTPUT)/dp_runner + cargo build -p channel_runner --release + cp target/release/channel_runner $(OUTPUT)/channel_runner build: build-cd build-dp cargo build --release @@ -23,7 +23,7 @@ docker_cd: build-cd docker build -f ./crates/compute_unit_runner/dockerfile -t gitdatateam/compute_unit_runner:latest . docker_dp: build-dp - docker build -f ./crates/dp_runner/dockerfile -t gitdatateam/dp_runner:latest . + docker build -f ./crates/channel_runner/dockerfile -t gitdatateam/channel_runner:latest . ################## build nodes build-nodes: $(OUTPUT) diff --git a/nodes/make_article/src/main.rs b/nodes/make_article/src/main.rs index c36392a..b9d86ac 100644 --- a/nodes/make_article/src/main.rs +++ b/nodes/make_article/src/main.rs @@ -65,7 +65,7 @@ async fn main() -> Result<()> { { let token = token.clone(); - join_set.spawn(async move { make_article(token, args).await }); + join_set.spawn(async move {make_article(token, args).await}); } { diff --git a/src/core/job_db_models.rs b/src/core/job_db_models.rs index 51b2524..169ce8f 100644 --- a/src/core/job_db_models.rs +++ b/src/core/job_db_models.rs @@ -56,6 +56,7 @@ pub enum DataState { PartialSent, Sent, EndRecieved, + KeeptForMetadata, Clean, Error, } @@ -72,7 +73,7 @@ pub struct DataRecord { pub node_name: String, /// data id but not unique, becase the id in channel wiil transfer to compute unit pub id: String, - + pub is_metadata: bool, pub size: u32, pub state: DataState, pub direction: Direction, diff --git a/src/dbrepo/job_db_mongo.rs b/src/dbrepo/job_db_mongo.rs index 3e95795..a4a9ad5 100644 --- a/src/dbrepo/job_db_mongo.rs +++ b/src/dbrepo/job_db_mongo.rs @@ -81,6 +81,27 @@ impl MongoRunDbRepo { } } + { + //create index for nodes + let idx_opts = IndexOptions::builder() + .name("idx_created_at".to_owned()) + .build(); + + let index = IndexModel::builder() + .keys(doc! { "created_at": 1 }) + .options(idx_opts) + .build(); + + if let Err(err) = data_col.create_index(index).await { + match *err.kind { + ErrorKind::Command(ref command_error) if command_error.code == 85 => {} + err => { + return Err(anyhow!("create data created_at error {err}")); + } + } + } + } + { //create index for data let idx_opts = IndexOptions::builder() @@ -187,13 +208,12 @@ impl DataRepo for MongoRunDbRepo { "updated_at":Utc::now().timestamp(), }, }; - self .data_col .find_one_and_update( doc! {"node_name":node_name, "state": doc! {"$in": ["Received","PartialSent"]}, "direction":to_variant_name(&direction)?}, - update - ).await + update, + ).sort(doc! { "created_at": 1 }).await .anyhow() } diff --git a/src/driver/kube.rs b/src/driver/kube.rs index 519d2d2..54c1e20 100644 --- a/src/driver/kube.rs +++ b/src/driver/kube.rs @@ -853,14 +853,12 @@ where #[cfg(test)] mod tests { - use std::env; - - use crate::dbrepo::MongoRunDbRepo; - use super::*; - + use crate::dbrepo::MongoRunDbRepo; use mongodb::Client as MongoClient; + use std::env; use tracing_subscriber; + #[tokio::test] async fn test_render() { env::set_var("RUST_LOG", "DEBUG"); @@ -914,7 +912,7 @@ mod tests { "#; let dag = Dag::from_json(json_str).unwrap(); - let db_url = "mongodb://127.0.0.1:27017"; + let db_url = "mongodb://192.168.3.163:27017"; let client = MongoClient::with_uri_str(db_url.to_string() + "/ntest") .await .unwrap(); diff --git a/src/driver/kubetpl/channel_statefulset.tpl b/src/driver/kubetpl/channel_statefulset.tpl index ae52ce5..e31a4ca 100644 --- a/src/driver/kubetpl/channel_statefulset.tpl +++ b/src/driver/kubetpl/channel_statefulset.tpl @@ -26,10 +26,10 @@ "containers": [ { "name": "channel", - "image": {{#if node.channel.image}} "{{{node.channel.image}}}"{{else}}"gitdatateam/dp_runner:latest"{{/if}}, + "image": {{#if node.channel.image}} "{{{node.channel.image}}}"{{else}}"gitdatateam/channel_runner:latest"{{/if}}, "imagePullPolicy": "IfNotPresent", "command": [ - "/dp_runner" + "/channel_runner" ], "args": [ "--node-name={{{node.name}}}-channel", diff --git a/src/network/protos/datatransfer.proto b/src/network/protos/datatransfer.proto index 821c59a..1067094 100644 --- a/src/network/protos/datatransfer.proto +++ b/src/network/protos/datatransfer.proto @@ -7,8 +7,6 @@ import "common.proto"; service DataStream { rpc transferMediaData(MediaDataBatchResponse) returns (common.Empty) {} rpc requestMediaData(common.Empty) returns (MediaDataBatchResponse) {} - - rpc transferTabularData(TabularDataBatchResponse) returns (common.Empty) {} } message MediaDataCell { @@ -20,35 +18,6 @@ message MediaDataCell { message MediaDataBatchResponse { string id = 1; uint32 size = 2; - repeated MediaDataCell cells = 3; -} - - -// Tabular - -enum DataType { - STRING = 0; - INT32 = 1; -} - -message Column { - string name = 1; - DataType data_type = 2; -} - -message DataValue { - oneof value { - int32 int_value = 1; - string string_value = 2; - } -} - -message Row { - repeated DataValue values = 1; -} - -message TabularDataBatchResponse { - int32 size = 1; - repeated Column columns = 2; - repeated Row rows = 3; + string from_node = 3; + repeated MediaDataCell cells = 4; }