diff --git a/crates/compute_unit_runner/src/ipc.rs b/crates/compute_unit_runner/src/ipc.rs index f4586d9..1179f40 100644 --- a/crates/compute_unit_runner/src/ipc.rs +++ b/crates/compute_unit_runner/src/ipc.rs @@ -309,7 +309,8 @@ where } } None => { - return HttpResponse::InternalServerError().json("channel is not ready maybe not start route data"); + return HttpResponse::InternalServerError() + .json("channel is not ready maybe not start route data"); } } @@ -361,7 +362,8 @@ where } } None => { - return HttpResponse::InternalServerError().json("channel is not ready maybe not start route data"); + return HttpResponse::InternalServerError() + .json("channel is not ready maybe not start route data"); } } @@ -412,7 +414,8 @@ where } } None => { - return HttpResponse::InternalServerError().json("channel is not ready maybe not start route data"); + return HttpResponse::InternalServerError() + .json("channel is not ready maybe not start route data"); } } @@ -458,10 +461,11 @@ where } } None => { - return HttpResponse::InternalServerError().json("channel is not ready maybe not start route data"); + return HttpResponse::InternalServerError() + .json("channel is not ready maybe not start route data"); } } - + match rx.await { Ok(Ok(resp)) => HttpResponse::Ok().json(resp), Ok(Err(err)) => HttpResponse::InternalServerError().json(err.to_string()), diff --git a/makefile b/makefile index b96ef17..cbed872 100644 --- a/makefile +++ b/makefile @@ -26,6 +26,10 @@ docker_dp: build-dp docker build -f ./crates/channel_runner/dockerfile -t gitdatateam/channel_runner:latest . ################## build nodes +jz_reader: + cargo build -p jz_writer --release + cp target/release/jz_writer $(OUTPUT)/jz_writer + build-nodes: $(OUTPUT) cargo build -p jz_reader --release cp target/release/jz_reader $(OUTPUT)/jz_reader diff --git a/src/core/main_db_models.rs b/src/core/main_db_models.rs index bf81a8f..b4ce69d 100644 --- a/src/core/main_db_models.rs +++ b/src/core/main_db_models.rs @@ -9,6 +9,8 @@ use serde::{ pub enum JobState { #[default] Created, + Selected, + Deployed, Running, Error, Finish, diff --git a/src/dbrepo/main_db_mongo.rs b/src/dbrepo/main_db_mongo.rs index 5821bd4..f9ff68c 100644 --- a/src/dbrepo/main_db_mongo.rs +++ b/src/dbrepo/main_db_mongo.rs @@ -115,7 +115,7 @@ impl JobRepo for MongoMainDbRepo { async fn get_job_for_running(&self) -> Result> { let update = doc! { "$set": { - "state": to_variant_name(&JobState::Running)?, + "state": to_variant_name(&JobState::Selected)?, "updated_at":Utc::now().timestamp(), }, "$inc":{ diff --git a/src/job/job_mgr.rs b/src/job/job_mgr.rs index afd3a3a..d4b8304 100644 --- a/src/job/job_mgr.rs +++ b/src/job/job_mgr.rs @@ -21,7 +21,7 @@ use crate::{ StdIntoAnyhowResult, }, }; -use anyhow::Result; +use anyhow::{anyhow, Result}; use futures::future::try_join_all; use kube::Client; use mongodb::bson::oid::ObjectId; @@ -101,21 +101,37 @@ where while let Some(job) = db.get_job_for_running().await? { let dag = Dag::from_json(job.graph_json.as_str())?; let namespace = format!("{}-{}", job.name, job.retry_number); - if let Err(err) = driver.deploy(namespace.as_str(), &dag).await { - error!("run job {} {err}, start cleaning", job.name); - if let Err(err) = driver.clean(namespace.as_str()).await { - error!("clean job resource {err}"); + + match driver.deploy(namespace.as_str(), &dag).await { + Ok(_) => { + if let Err(err) = db + .update( + &job.id, + &JobUpdateInfo { + state: Some(JobState::Deployed), + }, + ) + .await + { + error!("set job to deploy state {err}"); + } } - if let Err(err) = db - .update( - &job.id, - &JobUpdateInfo { - state: Some(JobState::Error), - }, - ) - .await - { - error!("set job to error state {err}"); + Err(err) => { + error!("run job {} {err}, start cleaning", job.name); + if let Err(err) = driver.clean(namespace.as_str()).await { + error!("clean job resource {err}"); + } + if let Err(err) = db + .update( + &job.id, + &JobUpdateInfo { + state: Some(JobState::Error), + }, + ) + .await + { + error!("set job to error state {err}"); + } } }; } @@ -175,10 +191,25 @@ where pub async fn start_job(&self, id: &ObjectId) -> Result<()> { let job = self.db.get(id).await?.anyhow("job not found")?; + if job.state != JobState::Deployed { + return Err(anyhow!("only can run a deployed job")); + } + let dag = Dag::from_json(job.graph_json.as_str())?; let namespace = format!("{}-{}", job.name, job.retry_number - 1); let controller = self.driver.attach(&namespace, &dag).await?; - controller.start().await + controller.start().await?; + self.db + .update( + &job.id, + &JobUpdateInfo { + state: Some(JobState::Running), + }, + ).await + .map_err(|err|{ + error!("set job to deploy state {err}"); + err + }) } pub async fn clean_job(&self, id: &ObjectId) -> Result<()> {