Skip to content

Commit

Permalink
feat: add selected and deployed state
Browse files Browse the repository at this point in the history
  • Loading branch information
hunjixin committed Aug 11, 2024
1 parent 2b2ea70 commit cc55e68
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 22 deletions.
14 changes: 9 additions & 5 deletions crates/compute_unit_runner/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,8 @@ where
}
}
None => {
return HttpResponse::InternalServerError().json("channel is not ready maybe not start route data");
return HttpResponse::InternalServerError()
.json("channel is not ready maybe not start route data");
}
}

Expand Down Expand Up @@ -361,7 +362,8 @@ where
}
}
None => {
return HttpResponse::InternalServerError().json("channel is not ready maybe not start route data");
return HttpResponse::InternalServerError()
.json("channel is not ready maybe not start route data");
}
}

Expand Down Expand Up @@ -412,7 +414,8 @@ where
}
}
None => {
return HttpResponse::InternalServerError().json("channel is not ready maybe not start route data");
return HttpResponse::InternalServerError()
.json("channel is not ready maybe not start route data");
}
}

Expand Down Expand Up @@ -458,10 +461,11 @@ where
}
}
None => {
return HttpResponse::InternalServerError().json("channel is not ready maybe not start route data");
return HttpResponse::InternalServerError()
.json("channel is not ready maybe not start route data");
}
}

match rx.await {
Ok(Ok(resp)) => HttpResponse::Ok().json(resp),
Ok(Err(err)) => HttpResponse::InternalServerError().json(err.to_string()),
Expand Down
4 changes: 4 additions & 0 deletions makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ docker_dp: build-dp
docker build -f ./crates/channel_runner/dockerfile -t gitdatateam/channel_runner:latest .

################## build nodes
jz_reader:
cargo build -p jz_writer --release
cp target/release/jz_writer $(OUTPUT)/jz_writer

build-nodes: $(OUTPUT)
cargo build -p jz_reader --release
cp target/release/jz_reader $(OUTPUT)/jz_reader
Expand Down
2 changes: 2 additions & 0 deletions src/core/main_db_models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use serde::{
pub enum JobState {
#[default]
Created,
Selected,
Deployed,
Running,
Error,
Finish,
Expand Down
2 changes: 1 addition & 1 deletion src/dbrepo/main_db_mongo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl JobRepo for MongoMainDbRepo {
async fn get_job_for_running(&self) -> Result<Option<Job>> {
let update = doc! {
"$set": {
"state": to_variant_name(&JobState::Running)?,
"state": to_variant_name(&JobState::Selected)?,
"updated_at":Utc::now().timestamp(),
},
"$inc":{
Expand Down
63 changes: 47 additions & 16 deletions src/job/job_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
StdIntoAnyhowResult,
},
};
use anyhow::Result;
use anyhow::{anyhow, Result};
use futures::future::try_join_all;
use kube::Client;
use mongodb::bson::oid::ObjectId;
Expand Down Expand Up @@ -101,21 +101,37 @@ where
while let Some(job) = db.get_job_for_running().await? {
let dag = Dag::from_json(job.graph_json.as_str())?;
let namespace = format!("{}-{}", job.name, job.retry_number);
if let Err(err) = driver.deploy(namespace.as_str(), &dag).await {
error!("run job {} {err}, start cleaning", job.name);
if let Err(err) = driver.clean(namespace.as_str()).await {
error!("clean job resource {err}");

match driver.deploy(namespace.as_str(), &dag).await {
Ok(_) => {
if let Err(err) = db
.update(
&job.id,
&JobUpdateInfo {
state: Some(JobState::Deployed),
},
)
.await
{
error!("set job to deploy state {err}");
}
}
if let Err(err) = db
.update(
&job.id,
&JobUpdateInfo {
state: Some(JobState::Error),
},
)
.await
{
error!("set job to error state {err}");
Err(err) => {
error!("run job {} {err}, start cleaning", job.name);
if let Err(err) = driver.clean(namespace.as_str()).await {
error!("clean job resource {err}");
}
if let Err(err) = db
.update(
&job.id,
&JobUpdateInfo {
state: Some(JobState::Error),
},
)
.await
{
error!("set job to error state {err}");
}
}
};
}
Expand Down Expand Up @@ -175,10 +191,25 @@ where

pub async fn start_job(&self, id: &ObjectId) -> Result<()> {
let job = self.db.get(id).await?.anyhow("job not found")?;
if job.state != JobState::Deployed {
return Err(anyhow!("only can run a deployed job"));
}

let dag = Dag::from_json(job.graph_json.as_str())?;
let namespace = format!("{}-{}", job.name, job.retry_number - 1);
let controller = self.driver.attach(&namespace, &dag).await?;
controller.start().await
controller.start().await?;
self.db
.update(
&job.id,
&JobUpdateInfo {
state: Some(JobState::Running),
},
).await
.map_err(|err|{
error!("set job to deploy state {err}");
err
})
}

pub async fn clean_job(&self, id: &ObjectId) -> Result<()> {
Expand Down

0 comments on commit cc55e68

Please sign in to comment.