Skip to content

Commit

Permalink
remove task_status unwrap with more graceful exit.
Browse files Browse the repository at this point in the history
  • Loading branch information
Shourya742 committed Jan 5, 2025
1 parent 9585a3d commit a3efa46
Showing 1 changed file with 52 additions and 47 deletions.
99 changes: 52 additions & 47 deletions roles/jd-client/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl JobDeclaratorClient {
});

let proxy_config = self.config;
loop {
'outer: loop {
let task_collector = task_collector.clone();
let tx_status = tx_status.clone();
let proxy_config = proxy_config.clone();
Expand Down Expand Up @@ -119,53 +119,58 @@ impl JobDeclaratorClient {
std::process::exit(0);
}
};
let task_status: status::Status = task_status.unwrap();

match task_status.state {
// Should only be sent by the downstream listener
status::State::DownstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
task_collector
.safe_lock(|s| {
for handle in s {
handle.abort();
}
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
status::State::UpstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
task_collector
.safe_lock(|s| {
for handle in s {
handle.abort();
}
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
status::State::UpstreamRogue => {
error!("Changin Pool");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
task_collector
.safe_lock(|s| {
for handle in s {
handle.abort();
}
})
.unwrap();
upstream_index += 1;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
status::State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
// let task_status: status::Status = task_status.unwrap();

if let Ok(task_status) = task_status {
match task_status.state {
// Should only be sent by the downstream listener
status::State::DownstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
task_collector
.safe_lock(|s| {
for handle in s {
handle.abort();
}
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
status::State::UpstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
task_collector
.safe_lock(|s| {
for handle in s {
handle.abort();
}
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
status::State::UpstreamRogue => {
error!("Changin Pool");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
task_collector
.safe_lock(|s| {
for handle in s {
handle.abort();
}
})
.unwrap();
upstream_index += 1;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
status::State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
}
}
} else {
info!("Closing channels");
break 'outer;
}
}
}
Expand Down

0 comments on commit a3efa46

Please sign in to comment.