Skip to content

Commit

Permalink
feat: support metadata databatch
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Aug 10, 2024
1 parent 05efc9a commit 2ba0032
Show file tree
Hide file tree
Showing 18 changed files with 256 additions and 160 deletions.
12 changes: 9 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
[workspace]
members = [
"crates/dp_runner",
"crates/channel_runner",
"crates/compute_unit_runner",
"crates/jiaozifs_client_rs", "nodes/jz_reader", "nodes/jz_writer", "nodes/make_article", "nodes/list_files"
, "nodes/copy_in_place", "crates/nodes_sdk"]
"crates/jiaozifs_client_rs",
"nodes/jz_reader",
"nodes/jz_writer",
"nodes/make_article",
"nodes/list_files",
"nodes/copy_in_place",
"crates/nodes_sdk"
]

[workspace.package]
repository = "https://github.com/GitDataAI/jz-flow"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[package]
name = "dp_runner"
name = "channel_runner"
version = "0.1.0"
edition = "2021"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ WORKDIR /app

RUN mkdir -p /app

ADD dist/dp_runner /dp_runner
ADD dist/channel_runner /channel_runner
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use jz_flow::{
};
use nodes_sdk::{
fs_cache::FileCache,
metadata::is_metadata,
MessageSender,
};
use std::{
Expand Down Expand Up @@ -51,7 +52,7 @@ where

pub(crate) buf_size: usize,

pub(crate) fs_cache: Arc<dyn FileCache>,
pub(crate) data_cache: Arc<dyn FileCache>,

pub(crate) local_state: TrackerState,

Expand All @@ -70,7 +71,7 @@ where
{
pub(crate) fn new(
repo: R,
fs_cache: Arc<dyn FileCache>,
data_cache: Arc<dyn FileCache>,
name: &str,
buf_size: usize,
up_nodes: Vec<String>,
Expand All @@ -79,7 +80,7 @@ where
ChannelTracker {
name: name.to_string(),
repo,
fs_cache,
data_cache,
buf_size,
local_state: TrackerState::Init,
up_nodes,
Expand All @@ -97,7 +98,7 @@ where
//process backent task to do revert clean data etc
let db_repo = self.repo.clone();
let node_name = self.name.clone();
let fs_cache = self.fs_cache.clone();
let data_cache = self.data_cache.clone();
let token = token.clone();
let up_nodes = self.up_nodes.clone();

Expand All @@ -106,7 +107,7 @@ where
loop {
let now = Instant::now();
info!("backend thread start");
tokio::select! {
select! {
_ = token.cancelled() => {
return Ok(());
}
Expand All @@ -116,15 +117,23 @@ where
match db_repo.list_by_node_name_and_state(&node_name, &DataState::EndRecieved).await {
Ok(datas) => {
for data in datas {
match fs_cache.remove(&data.id).await {
Ok(_)=>{
if let Err(err) = db_repo.update_state(&node_name, &data.id, &Direction::In, &DataState::Clean, None).await{
error!("mark data as client receive {err}");
continue;
}
debug!("remove data {}", &data.id);
},
Err(err)=> error!("remove data {err}")
if data.is_metadata {
if let Err(err) = db_repo.update_state(&node_name, &data.id, &Direction::In, &DataState::KeeptForMetadata, None).await{
error!("mark metadata data as client receive {err}");
continue;
}
info!("mark metadata as cleint received")
}else {
match data_cache.remove(&data.id).await {
Ok(_)=>{
if let Err(err) = db_repo.update_state(&node_name, &data.id, &Direction::In, &DataState::Clean, None).await{
error!("mark data as client receive {err}");
continue;
}
debug!("remove data {}", &data.id);
},
Err(err)=> error!("remove data {err}")
}
}
}
},
Expand Down Expand Up @@ -186,7 +195,7 @@ where
//process request data
let db_repo = self.repo.clone();
let node_name = self.name.clone();
let fs_cache = self.fs_cache.clone();
let data_cache = self.data_cache.clone();
let token = token.clone();

join_set.spawn(async move {
Expand All @@ -200,7 +209,7 @@ where
match db_repo.find_data_and_mark_state(&node_name, &Direction::In, &DataState::SelectForSend).await {
std::result::Result::Ok(Some(record)) =>{
info!("return downstream's datarequest and start response data {}", &record.id);
match fs_cache.read(&record.id).await {
match data_cache.read(&record.id).await {
Ok(databatch)=>{
//response this data's position
resp.send(Ok(Some(databatch))).expect("channel only read once");
Expand All @@ -214,7 +223,6 @@ where
continue;
}
}

},
std::result::Result::Ok(None)=>{
resp.send(Ok(None)).expect("channel only read once");
Expand All @@ -239,7 +247,7 @@ where
let db_repo = self.repo.clone();
let node_name = self.name.clone();
let buf_size = self.buf_size;
let fs_cache = self.fs_cache.clone();
let data_cache = self.data_cache.clone();
let token = token.clone();

join_set.spawn(async move {
Expand All @@ -254,17 +262,6 @@ where
let now = Instant::now();
let id = data_batch.id.clone();
let size = data_batch.size;
//check limit
if let Err(err) = db_repo.count(&node_name,&[&DataState::Received], Some(&Direction::In)).await.and_then(|count|{
if count > buf_size {
Err(anyhow!("has reach limit current:{count} limit:{buf_size}"))
} else {
Ok(())
}
}){
resp.send(Err(anyhow!("cannt query limit from mongo {err}"))).expect("request alread listen this channel");
continue;
}

// processed before
match db_repo.find_by_node_id(&node_name,&id, &Direction::In).await {
Expand All @@ -278,13 +275,27 @@ where
resp.send(Err(err)).expect("request alread listen this channel");
continue;
}
_=>{}
_=>{}
}

// code below this can be move another coroutine
let is_data_metadata = if is_metadata(&id) {
//check limit for plain data
if let Err(err) = db_repo.count(&node_name,&[&DataState::Received], Some(&Direction::In)).await.and_then(|count|{
if count > buf_size {
Err(anyhow!("has reach limit current:{count} limit:{buf_size}"))
} else {
Ok(())
}
}){
resp.send(Err(anyhow!("cannt query limit from mongo {err}"))).expect("request alread listen this channel");
continue;
}
true
}else {
false
};

//write batch files
if let Err(err) = fs_cache.write(data_batch).await {
if let Err(err) = data_cache.write(data_batch).await {
error!("write files to disk fail {}", err);
resp.send(Err(err)).expect("request alread listen this channel");
continue;
Expand All @@ -296,6 +307,7 @@ where
node_name: node_name.clone(),
id:id.clone(),
size,
is_metadata:is_data_metadata,
state: DataState::Received,
sent: vec![],
direction:Direction::In,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use tracing::{

#[derive(Debug, Parser)]
#[command(
name = "dp_runner",
name = "channel_runner",
version = "0.0.1",
author = "Author Name <github.com/GitDataAI/jz-flow>",
about = "embed in k8s images. make process data input output"
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use jz_flow::{
datatransfer::{
data_stream_server::DataStream,
MediaDataBatchResponse,
TabularDataBatchResponse,
},
},
};
Expand Down Expand Up @@ -84,11 +83,4 @@ where
Err(err) => Err(Status::from_error(Box::new(err))),
}
}

async fn transfer_tabular_data(
&self,
_req: Request<TabularDataBatchResponse>,
) -> Result<Response<Empty>, tonic::Status> {
todo!()
}
}
15 changes: 14 additions & 1 deletion crates/compute_unit_runner/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,17 @@ impl SubmitOuputDataReq {
}
}

#[derive(Deserialize)]
pub struct RequetDataReq {
pub id: String,
}

impl RequetDataReq {
pub fn new(id: &str) -> Self {
RequetDataReq { id: id.to_string() }
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Status {
pub state: TrackerState,
Expand All @@ -254,6 +265,7 @@ where

async fn process_data_request<R>(
program_mutex: web::Data<Arc<RwLock<MediaDataTracker<R>>>>,
req: web::Query<RequetDataReq>,
) -> HttpResponse
where
R: JobDbRepo + Clone,
Expand All @@ -276,11 +288,12 @@ where
drop(program);
sleep(Duration::from_secs(5)).await;
};

//read request
let (tx, rx) = oneshot::channel::<Result<Option<AvaiableDataResponse>>>();
match sender {
Some(sender) => {
if let Err(err) = sender.send(((), tx)).await {
if let Err(err) = sender.send((req.into_inner(), tx)).await {
return HttpResponse::InternalServerError()
.json(format!("send to avaiable data channel {err}"));
}
Expand Down
Loading

0 comments on commit 2ba0032

Please sign in to comment.