From 3451362e0de971c4226205529d6a09f30f5ae636 Mon Sep 17 00:00:00 2001 From: Antheas Kapenekakis Date: Wed, 27 Nov 2024 19:19:33 +0100 Subject: [PATCH] make progress writer multi-thread safe, add deploy stages Signed-off-by: Antheas Kapenekakis --- lib/src/cli.rs | 21 +++++++----------- lib/src/deploy.rs | 30 ++++++++++++++++++++------ lib/src/progress_jsonl.rs | 45 ++++++++++++++++++++++++++++----------- 3 files changed, 64 insertions(+), 32 deletions(-) diff --git a/lib/src/cli.rs b/lib/src/cli.rs index b4fa3a330..e1afae139 100644 --- a/lib/src/cli.rs +++ b/lib/src/cli.rs @@ -686,7 +686,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> { } } } else { - let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet, prog).await?; + let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet, prog.clone()).await?; let staged_digest = staged_image.map(|s| s.digest().expect("valid digest in status")); let fetched_digest = &fetched.manifest_digest; tracing::debug!("staged: {staged_digest:?}"); @@ -709,7 +709,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> { println!("No update available.") } else { let osname = booted_deployment.osname(); - crate::deploy::stage(sysroot, &osname, &fetched, &spec).await?; + crate::deploy::stage(sysroot, &osname, &fetched, &spec, prog.clone()).await?; changed = true; if let Some(prev) = booted_image.as_ref() { if let Some(fetched_manifest) = fetched.get_manifest(repo)? { @@ -784,7 +784,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> { } let new_spec = RequiredHostSpec::from_spec(&new_spec)?; - let fetched = crate::deploy::pull(repo, &target, None, opts.quiet, prog).await?; + let fetched = crate::deploy::pull(repo, &target, None, opts.quiet, prog.clone()).await?; if !opts.retain { // By default, we prune the previous ostree ref so it will go away after later upgrades @@ -798,7 +798,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> { } let stateroot = booted_deployment.osname(); - crate::deploy::stage(sysroot, &stateroot, &fetched, &new_spec).await?; + crate::deploy::stage(sysroot, &stateroot, &fetched, &new_spec, prog).await?; if opts.apply { crate::reboot::reboot()?; @@ -840,25 +840,20 @@ async fn edit(opts: EditOpts) -> Result<()> { host.spec.verify_transition(&new_host.spec)?; let new_spec = RequiredHostSpec::from_spec(&new_host.spec)?; + let prog = ProgressWriter::from_empty(); + // We only support two state transitions right now; switching the image, // or flipping the bootloader ordering. if host.spec.boot_order != new_host.spec.boot_order { return crate::deploy::rollback(sysroot).await; } - let fetched = crate::deploy::pull( - repo, - new_spec.image, - None, - opts.quiet, - ProgressWriter::from_empty(), - ) - .await?; + let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet, prog.clone()).await?; // TODO gc old layers here let stateroot = booted_deployment.osname(); - crate::deploy::stage(sysroot, &stateroot, &fetched, &new_spec).await?; + crate::deploy::stage(sysroot, &stateroot, &fetched, &new_spec, prog).await?; Ok(()) } diff --git a/lib/src/deploy.rs b/lib/src/deploy.rs index b36127dc6..0e4f5cd2f 100644 --- a/lib/src/deploy.rs +++ b/lib/src/deploy.rs @@ -146,7 +146,7 @@ async fn handle_layer_progress_print( layers_total: usize, bytes_download: u64, bytes_total: u64, - mut prog: ProgressWriter, + prog: ProgressWriter, ) { let start = std::time::Instant::now(); let mut total_read = 0u64; @@ -219,7 +219,7 @@ async fn handle_layer_progress_print( // They are common enough, anyhow. Debounce on time. let curr = std::time::Instant::now(); if curr.duration_since(last_json_written).as_secs_f64() > 0.2 { - prog.send(ProgressStage::Fetching { + prog.send(ProgressStage::Fetch { bytes_done, bytes_download, bytes_total, @@ -243,14 +243,12 @@ async fn handle_layer_progress_print( let elapsed = end.duration_since(start); let persec = total_read as f64 / elapsed.as_secs_f64(); let persec = indicatif::HumanBytes(persec as u64); - if let Err(e) = bar.println(&format!( + println!( "Fetched layers: {} in {} ({}/s)", indicatif::HumanBytes(total_read), indicatif::HumanDuration(elapsed), persec, - )) { - tracing::warn!("writing to stdout: {e}"); - } + ); } /// Wrapper for pulling a container image, wiring up status output. @@ -296,7 +294,7 @@ pub(crate) async fn pull( layers_total, bytes_download, bytes_total, - prog, + prog.clone(), ) .await }) @@ -493,7 +491,15 @@ pub(crate) async fn stage( stateroot: &str, image: &ImageState, spec: &RequiredHostSpec<'_>, + prog: ProgressWriter, ) -> Result<()> { + let n_steps = 3; + + prog.send(ProgressStage::Deploy { + n_steps, + step: 0, + name: "deploying".to_string(), + }); let merge_deployment = sysroot.merge_deployment(Some(stateroot)); let origin = origin_from_imageref(spec.image)?; let deployment = crate::deploy::deploy( @@ -505,8 +511,18 @@ pub(crate) async fn stage( ) .await?; + prog.send(ProgressStage::Deploy { + n_steps, + step: 1, + name: "pulling_bound_images".to_string(), + }); crate::boundimage::pull_bound_images(sysroot, &deployment).await?; + prog.send(ProgressStage::Deploy { + n_steps, + step: 2, + name: "cleaning_up".to_string(), + }); crate::deploy::cleanup(sysroot).await?; println!("Queued for next boot: {:#}", spec.image); if let Some(version) = image.version.as_deref() { diff --git a/lib/src/progress_jsonl.rs b/lib/src/progress_jsonl.rs index 6fecb9bec..8c200cc5f 100644 --- a/lib/src/progress_jsonl.rs +++ b/lib/src/progress_jsonl.rs @@ -3,7 +3,9 @@ use std::fs; use std::io::{BufWriter, Write}; +use std::mem; use std::os::fd::{FromRawFd, RawFd}; +use std::sync::{Arc, Mutex}; use anyhow::Result; use serde::Serialize; @@ -20,8 +22,8 @@ pub struct LayerState { #[derive(Debug, serde::Serialize, serde::Deserialize)] #[serde(tag = "stage")] pub enum ProgressStage { - #[serde(rename = "fetching")] - Fetching { + #[serde(rename = "fetch")] + Fetch { bytes_done: u64, bytes_download: u64, bytes_total: u64, @@ -30,16 +32,23 @@ pub enum ProgressStage { layers_total: usize, layers: Option>, }, + #[serde(rename = "deploy")] + Deploy { + n_steps: usize, + step: usize, + name: String, + }, } +#[derive(Clone)] pub(crate) struct ProgressWriter { - fd: Option>, + fd: Arc>>>, } impl From for ProgressWriter { fn from(value: fs::File) -> Self { Self { - fd: Some(BufWriter::new(value)), + fd: Arc::new(Mutex::new(Some(BufWriter::new(value)))), } } } @@ -52,15 +61,21 @@ impl ProgressWriter { } pub(crate) fn from_empty() -> Self { - Self { fd: None } + Self { + fd: Arc::new(Mutex::new(None)), + } } /// Serialize the target object to JSON as a single line - pub(crate) fn send_unchecked(&mut self, v: T) -> Result<()> { - if self.fd.is_none() { + pub(crate) fn send_unchecked(&self, v: T) -> Result<()> { + let arc = self.fd.clone(); + let mut mutex = arc.lock().expect("Could not lock mutex"); + let fd_opt = mutex.as_mut(); + + if fd_opt.is_none() { return Ok(()); } - let mut fd = self.fd.as_mut().unwrap(); + let mut fd = fd_opt.unwrap(); // serde is guaranteed not to output newlines here serde_json::to_writer(&mut fd, &v)?; @@ -71,18 +86,24 @@ impl ProgressWriter { Ok(()) } - pub(crate) fn send(&mut self, v: T) { + pub(crate) fn send(&self, v: T) { if let Err(e) = self.send_unchecked(v) { eprintln!("Failed to write to jsonl: {}", e); // Stop writing to fd but let process continue - self.fd = None; + let arc = self.fd.clone(); + let mut mutex = arc.lock().expect("Could not lock mutex"); + *mutex = None.into(); } } /// Flush remaining data and return the underlying file. #[allow(dead_code)] pub(crate) fn into_inner(self) -> Result { - if let Some(fd) = self.fd { + let arc = self.fd.clone(); + let mut mutex = arc.lock().expect("Could not lock mutex"); + let fd_opt = mem::replace(&mut *mutex, None); + + if let Some(fd) = fd_opt { return fd.into_inner().map_err(Into::into); } else { return Err(anyhow::anyhow!("File descriptor closed/never existed.")); @@ -107,7 +128,7 @@ mod test { #[test] fn test_jsonl() -> Result<()> { let tf = tempfile::tempfile()?; - let mut w = ProgressWriter::from(tf); + let w = ProgressWriter::from(tf); let testvalues = [ S { s: "foo".into(),