From 9a898917759a55242c9674ed2ffe3d7f1b722634 Mon Sep 17 00:00:00 2001 From: MasterPtato <23087326+MasterPtato@users.noreply.github.com> Date: Sat, 23 Nov 2024 15:09:17 +0000 Subject: [PATCH] fix: js bundle unarchiving (#1456) ## Changes --- .../infra/client/manager/src/actor/setup.rs | 176 +++++++++++------- 1 file changed, 109 insertions(+), 67 deletions(-) diff --git a/packages/infra/client/manager/src/actor/setup.rs b/packages/infra/client/manager/src/actor/setup.rs index 7d9d96c73..1bc82ed3a 100644 --- a/packages/infra/client/manager/src/actor/setup.rs +++ b/packages/infra/client/manager/src/actor/setup.rs @@ -88,8 +88,6 @@ impl Actor { } } protocol::ImageKind::OciBundle | protocol::ImageKind::JavaScript => { - tracing::info!(actor_id=?self.actor_id, "decompressing and unarchiving artifact"); - let bundle_path = match self.config.image.kind { protocol::ImageKind::OciBundle => actor_path.join("oci-bundle"), protocol::ImageKind::JavaScript => actor_path.join("js-bundle"), @@ -98,73 +96,117 @@ impl Actor { fs::create_dir(&bundle_path).await?; - // Spawn the lz4 process - let mut lz4_child = Command::new("lz4") - .arg("-d") - .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .spawn()?; - - // Spawn the tar process - let mut tar_child = Command::new("tar") - .arg("-x") - .arg("-C") - .arg(&bundle_path) - .stdin(Stdio::piped()) - .spawn()?; - - // Take the stdin of lz4 and tar processes - let mut lz4_stdin = lz4_child.stdin.take().context("lz4 stdin")?; - let mut lz4_stdout = lz4_child.stdout.take().context("lz4 stdout")?; - let mut tar_stdin = tar_child.stdin.take().context("tar stdin")?; - - tokio::try_join!( - // Pipe the response body to lz4 stdin - async move { - while let Some(chunk) = stream.next().await { - let data = chunk?; - lz4_stdin.write_all(&data).await?; - } - lz4_stdin.shutdown().await?; + match self.config.image.compression { + protocol::ImageCompression::None => { + tracing::info!(actor_id=?self.actor_id, "unarchiving artifact"); - anyhow::Ok(()) - }, - // Pipe lz4 stdout to tar stdin - async move { - let mut buffer = [0; 8192]; - loop { - let n = lz4_stdout.read(&mut buffer).await?; - if n == 0 { - break; - } - tar_stdin.write_all(&buffer[..n]).await?; - } - tar_stdin.shutdown().await?; + // Spawn the tar process + let mut tar_child = Command::new("tar") + .arg("-x") + .arg("-C") + .arg(&bundle_path) + .stdin(Stdio::piped()) + .spawn()?; - anyhow::Ok(()) - }, - // Wait for child processes - async { - let cmd_out = lz4_child.wait_with_output().await?; - ensure!( - cmd_out.status.success(), - "failed `lz4` command\n{}", - std::str::from_utf8(&cmd_out.stderr)? - ); - - Ok(()) - }, - async { - let cmd_out = tar_child.wait_with_output().await?; - ensure!( - cmd_out.status.success(), - "failed `tar` command\n{}", - std::str::from_utf8(&cmd_out.stderr)? - ); - - Ok(()) - }, - )?; + // Take the stdin of tar process + let mut tar_stdin = tar_child.stdin.take().context("tar stdin")?; + + tokio::try_join!( + // Pipe the response body to lz4 stdin + async move { + while let Some(chunk) = stream.next().await { + let data = chunk?; + tar_stdin.write_all(&data).await?; + } + tar_stdin.shutdown().await?; + + anyhow::Ok(()) + }, + // Wait for child process + async { + let cmd_out = tar_child.wait_with_output().await?; + ensure!( + cmd_out.status.success(), + "failed `tar` command\n{}", + std::str::from_utf8(&cmd_out.stderr)? + ); + + Ok(()) + }, + )?; + } + protocol::ImageCompression::Lz4 => { + tracing::info!(actor_id=?self.actor_id, "decompressing and unarchiving artifact"); + + // Spawn the lz4 process + let mut lz4_child = Command::new("lz4") + .arg("-d") + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn()?; + + // Spawn the tar process + let mut tar_child = Command::new("tar") + .arg("-x") + .arg("-C") + .arg(&bundle_path) + .stdin(Stdio::piped()) + .spawn()?; + + // Take the stdin of lz4 and tar processes + let mut lz4_stdin = lz4_child.stdin.take().context("lz4 stdin")?; + let mut lz4_stdout = lz4_child.stdout.take().context("lz4 stdout")?; + let mut tar_stdin = tar_child.stdin.take().context("tar stdin")?; + + tokio::try_join!( + // Pipe the response body to lz4 stdin + async move { + while let Some(chunk) = stream.next().await { + let data = chunk?; + lz4_stdin.write_all(&data).await?; + } + lz4_stdin.shutdown().await?; + + anyhow::Ok(()) + }, + // Pipe lz4 stdout to tar stdin + async move { + let mut buffer = [0; 8192]; + loop { + let n = lz4_stdout.read(&mut buffer).await?; + if n == 0 { + break; + } + tar_stdin.write_all(&buffer[..n]).await?; + } + tar_stdin.shutdown().await?; + + anyhow::Ok(()) + }, + // Wait for child processes + async { + let cmd_out = lz4_child.wait_with_output().await?; + ensure!( + cmd_out.status.success(), + "failed `lz4` command\n{}", + std::str::from_utf8(&cmd_out.stderr)? + ); + + Ok(()) + }, + async { + let cmd_out = tar_child.wait_with_output().await?; + ensure!( + cmd_out.status.success(), + "failed `tar` command\n{}", + std::str::from_utf8(&cmd_out.stderr)? + ); + + Ok(()) + }, + )?; + } + } } }