diff --git a/Cargo.lock b/Cargo.lock index 869d4ccd..95111c1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -275,6 +275,7 @@ dependencies = [ "terminal_size", "tokio", "tokio-stream", + "tokio-util", "toml 0.8.19", "tracing", "tracing-chrome", diff --git a/Cargo.toml b/Cargo.toml index 4150a852..82199789 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,7 +68,8 @@ shiplift = "0.7" syntect = "5" tar = "0.4" terminal_size = "0.3" -tokio = { version = "1", features = ["macros", "fs", "process", "io-util", "time"] } +tokio = { version = "1", features = ["macros", "fs", "process", "io-util", "signal", "time"] } +tokio-util = "0.7" tokio-stream = "0.1" toml = "0.8" tracing = "0.1" diff --git a/src/endpoint/configured.rs b/src/endpoint/configured.rs index 71330f27..6a9ac936 100644 --- a/src/endpoint/configured.rs +++ b/src/endpoint/configured.rs @@ -397,6 +397,7 @@ impl From for Image { } } +#[derive(Clone)] pub struct EndpointHandle(Arc); impl EndpointHandle { diff --git a/src/endpoint/scheduler.rs b/src/endpoint/scheduler.rs index 570eb410..5e194a58 100644 --- a/src/endpoint/scheduler.rs +++ b/src/endpoint/scheduler.rs @@ -25,7 +25,7 @@ use itertools::Itertools; use tokio::io::AsyncWriteExt; use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::RwLock; -use tracing::trace; +use tracing::{error, info, trace}; use uuid::Uuid; use crate::db::models as dbmodels; @@ -95,6 +95,7 @@ impl EndpointScheduler { log_dir: self.log_dir.clone(), bar, endpoint, + container_id: None, max_endpoint_name_length: self.max_endpoint_name_length, job, staging_store: self.staging_store.clone(), @@ -136,9 +137,11 @@ impl EndpointScheduler { } } +#[derive(Clone)] pub struct JobHandle { log_dir: Option, endpoint: EndpointHandle, + container_id: Option, max_endpoint_name_length: usize, job: RunnableJob, bar: ProgressBar, @@ -155,7 +158,7 @@ impl std::fmt::Debug for JobHandle { } impl JobHandle { - pub async fn run(self) -> Result>> { + pub async fn run(mut self) -> Result>> { let (log_sender, log_receiver) = tokio::sync::mpsc::unbounded_channel::(); let endpoint_uri = self.endpoint.uri().clone(); let endpoint_name = self.endpoint.name().clone(); @@ -181,6 +184,7 @@ impl JobHandle { ) .await?; let container_id = prepared_container.create_info().id.clone(); + self.container_id = Some(container_id.clone()); let running_container = prepared_container .start() .await @@ -202,12 +206,12 @@ impl JobHandle { package_name: &package.name, package_version: &package.version, log_dir: self.log_dir.as_ref(), - job: self.job, + job: self.job.clone(), log_receiver, bar: self.bar.clone(), } .join(); - drop(self.bar); + drop(self.bar.clone()); let (run_container, logres) = tokio::join!(running_container, logres); let log = @@ -370,6 +374,34 @@ impl JobHandle { } } +impl Drop for JobHandle { + fn drop(&mut self) { + info!("Cleaning up JobHandle"); + if self.container_id.is_some() { + info!("Container already created"); + let docker = self.endpoint.docker().clone(); + let container_id = self.container_id.take().unwrap(); + + tokio::spawn(async move { + let container = docker.containers().get(&container_id); + let container_info = container.inspect().await.unwrap(); + + if container_info.state.running { + info!("Container is still running, cleaning up..."); + match container.kill(None).await { + Ok(_) => info!("Stopped the container {}", container_id), + Err(e) => error!("Failed to stop container {}: {}", container_id, e), + } + } else { + info!("Container has already finished"); + } + }); + } else { + info!("No container created"); + } + } +} + struct LogReceiver<'a> { endpoint_name: &'a str, max_endpoint_name_length: &'a usize, diff --git a/src/job/runnable.rs b/src/job/runnable.rs index de06da4c..6adf38b6 100644 --- a/src/job/runnable.rs +++ b/src/job/runnable.rs @@ -28,7 +28,7 @@ use crate::util::docker::ImageName; use crate::util::EnvironmentVariableName; /// A job configuration that can be run. All inputs are clear here. -#[derive(Debug, Getters)] +#[derive(Clone, Debug, Getters)] pub struct RunnableJob { #[getset(get = "pub")] uuid: Uuid, diff --git a/src/orchestrator/orchestrator.rs b/src/orchestrator/orchestrator.rs index 46f66385..8dcd854f 100644 --- a/src/orchestrator/orchestrator.rs +++ b/src/orchestrator/orchestrator.rs @@ -13,6 +13,7 @@ use std::borrow::Borrow; use std::collections::HashMap; use std::path::PathBuf; +use std::process::ExitCode; use std::sync::Arc; use std::sync::Mutex; @@ -32,8 +33,9 @@ use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Sender; use tokio::sync::RwLock; use tokio_stream::StreamExt; +use tokio_util::sync::CancellationToken; use tracing::Instrument; -use tracing::{debug, error, trace}; +use tracing::{debug, error, info, trace}; use typed_builder::TypedBuilder; use uuid::Uuid; @@ -265,12 +267,26 @@ impl Borrow for ProducedArtifact { impl<'a> Orchestrator<'a> { pub async fn run(self, output: &mut Vec) -> Result> { - let (results, errors) = self.run_tree().await?; + let token = CancellationToken::new(); + let cloned_token = token.clone(); + + tokio::spawn(async move { + info!("Received the ctl-c signal, stopping..."); + tokio::signal::ctrl_c().await.unwrap(); + token.cancel(); + ExitCode::from(1) + }); + + let (results, errors) = self.run_tree(cloned_token).await?; + output.extend(results); Ok(errors) } - async fn run_tree(self) -> Result<(Vec, HashMap)> { + async fn run_tree( + self, + token: CancellationToken, + ) -> Result<(Vec, HashMap)> { let prepare_span = tracing::debug_span!("run tree preparation"); // There is no async code until we drop this guard, so this is fine @@ -452,45 +468,55 @@ impl<'a> Orchestrator<'a> { // The JobTask::run implementation handles the rest, we just have to wait for all futures // to succeed. let run_span = tracing::debug_span!("run"); - let running_jobs = jobs - .into_iter() - .map(|prep| { - trace!(parent: &run_span, job_uuid = %prep.1.jobdef.job.uuid(), "Creating JobTask"); - // the sender is set or we need to use the root sender - let sender = prep - .3 - .into_inner() - .unwrap_or_else(|| vec![root_sender.clone()]); - JobTask::new(prep.0, prep.1, sender) - }) - .inspect( - |task| trace!(parent: &run_span, job_uuid = %task.jobdef.job.uuid(), "Running job"), - ) - .map(|task| { - task.run() - .instrument(tracing::debug_span!(parent: &run_span, "JobTask::run")) - }) - .collect::>(); - debug!("Built {} jobs", running_jobs.len()); - - running_jobs - .collect::>() - .instrument(run_span.clone()) - .await?; - trace!(parent: &run_span, "All jobs finished"); - drop(run_span); - - match root_receiver.recv().await { - None => Err(anyhow!("No result received...")), - Some(Ok(results)) => { - let results = results - .into_iter() - .flat_map(|tpl| tpl.1.into_iter()) - .map(ProducedArtifact::unpack) - .collect(); - Ok((results, HashMap::with_capacity(0))) + + tokio::select! { + _ = token.cancelled() => { + anyhow::bail!("Received Control-C signal"); + } + r = async { + let running_jobs = jobs + .into_iter() + .map(|prep| { + trace!(parent: &run_span, job_uuid = %prep.1.jobdef.job.uuid(), "Creating JobTask"); + // the sender is set or we need to use the root sender + let sender = prep + .3 + .into_inner() + .unwrap_or_else(|| vec![root_sender.clone()]); + JobTask::new(prep.0, prep.1, sender) + }) + .inspect( + |task| trace!(parent: &run_span, job_uuid = %task.jobdef.job.uuid(), "Running job"), + ) + .map(|task| { + task.run() + .instrument(tracing::debug_span!(parent: &run_span, "JobTask::run")) + }) + .collect::>(); + debug!("Built {} jobs", running_jobs.len()); + + running_jobs + .collect::>() + .instrument(run_span.clone()) + .await?; + trace!(parent: &run_span, "All jobs finished"); + drop(run_span); + + match root_receiver.recv().await { + None => Err(anyhow!("No result received...")), + Some(Ok(results)) => { + let results = results + .into_iter() + .flat_map(|tpl| tpl.1.into_iter()) + .map(ProducedArtifact::unpack) + .collect(); + Ok((results, HashMap::with_capacity(0))) + } + Some(Err(errors)) => Ok((vec![], errors)), + } + } => { + r } - Some(Err(errors)) => Ok((vec![], errors)), } } }