Skip to content

Commit

Permalink
fix: js bundle unarchiving (#1456)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Nov 23, 2024
1 parent f6708f3 commit 9a89891
Showing 1 changed file with 109 additions and 67 deletions.
176 changes: 109 additions & 67 deletions packages/infra/client/manager/src/actor/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ impl Actor {
}
}
protocol::ImageKind::OciBundle | protocol::ImageKind::JavaScript => {
tracing::info!(actor_id=?self.actor_id, "decompressing and unarchiving artifact");

let bundle_path = match self.config.image.kind {
protocol::ImageKind::OciBundle => actor_path.join("oci-bundle"),
protocol::ImageKind::JavaScript => actor_path.join("js-bundle"),
Expand All @@ -98,73 +96,117 @@ impl Actor {

fs::create_dir(&bundle_path).await?;

// Spawn the lz4 process
let mut lz4_child = Command::new("lz4")
.arg("-d")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;

// Spawn the tar process
let mut tar_child = Command::new("tar")
.arg("-x")
.arg("-C")
.arg(&bundle_path)
.stdin(Stdio::piped())
.spawn()?;

// Take the stdin of lz4 and tar processes
let mut lz4_stdin = lz4_child.stdin.take().context("lz4 stdin")?;
let mut lz4_stdout = lz4_child.stdout.take().context("lz4 stdout")?;
let mut tar_stdin = tar_child.stdin.take().context("tar stdin")?;

tokio::try_join!(
// Pipe the response body to lz4 stdin
async move {
while let Some(chunk) = stream.next().await {
let data = chunk?;
lz4_stdin.write_all(&data).await?;
}
lz4_stdin.shutdown().await?;
match self.config.image.compression {
protocol::ImageCompression::None => {
tracing::info!(actor_id=?self.actor_id, "unarchiving artifact");

anyhow::Ok(())
},
// Pipe lz4 stdout to tar stdin
async move {
let mut buffer = [0; 8192];
loop {
let n = lz4_stdout.read(&mut buffer).await?;
if n == 0 {
break;
}
tar_stdin.write_all(&buffer[..n]).await?;
}
tar_stdin.shutdown().await?;
// Spawn the tar process
let mut tar_child = Command::new("tar")
.arg("-x")
.arg("-C")
.arg(&bundle_path)
.stdin(Stdio::piped())
.spawn()?;

anyhow::Ok(())
},
// Wait for child processes
async {
let cmd_out = lz4_child.wait_with_output().await?;
ensure!(
cmd_out.status.success(),
"failed `lz4` command\n{}",
std::str::from_utf8(&cmd_out.stderr)?
);

Ok(())
},
async {
let cmd_out = tar_child.wait_with_output().await?;
ensure!(
cmd_out.status.success(),
"failed `tar` command\n{}",
std::str::from_utf8(&cmd_out.stderr)?
);

Ok(())
},
)?;
// Take the stdin of tar process
let mut tar_stdin = tar_child.stdin.take().context("tar stdin")?;

tokio::try_join!(
// Pipe the response body to lz4 stdin
async move {
while let Some(chunk) = stream.next().await {
let data = chunk?;
tar_stdin.write_all(&data).await?;
}
tar_stdin.shutdown().await?;

anyhow::Ok(())
},
// Wait for child process
async {
let cmd_out = tar_child.wait_with_output().await?;
ensure!(
cmd_out.status.success(),
"failed `tar` command\n{}",
std::str::from_utf8(&cmd_out.stderr)?
);

Ok(())
},
)?;
}
protocol::ImageCompression::Lz4 => {
tracing::info!(actor_id=?self.actor_id, "decompressing and unarchiving artifact");

// Spawn the lz4 process
let mut lz4_child = Command::new("lz4")
.arg("-d")
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;

// Spawn the tar process
let mut tar_child = Command::new("tar")
.arg("-x")
.arg("-C")
.arg(&bundle_path)
.stdin(Stdio::piped())
.spawn()?;

// Take the stdin of lz4 and tar processes
let mut lz4_stdin = lz4_child.stdin.take().context("lz4 stdin")?;
let mut lz4_stdout = lz4_child.stdout.take().context("lz4 stdout")?;
let mut tar_stdin = tar_child.stdin.take().context("tar stdin")?;

tokio::try_join!(
// Pipe the response body to lz4 stdin
async move {
while let Some(chunk) = stream.next().await {
let data = chunk?;
lz4_stdin.write_all(&data).await?;
}
lz4_stdin.shutdown().await?;

anyhow::Ok(())
},
// Pipe lz4 stdout to tar stdin
async move {
let mut buffer = [0; 8192];
loop {
let n = lz4_stdout.read(&mut buffer).await?;
if n == 0 {
break;
}
tar_stdin.write_all(&buffer[..n]).await?;
}
tar_stdin.shutdown().await?;

anyhow::Ok(())
},
// Wait for child processes
async {
let cmd_out = lz4_child.wait_with_output().await?;
ensure!(
cmd_out.status.success(),
"failed `lz4` command\n{}",
std::str::from_utf8(&cmd_out.stderr)?
);

Ok(())
},
async {
let cmd_out = tar_child.wait_with_output().await?;
ensure!(
cmd_out.status.success(),
"failed `tar` command\n{}",
std::str::from_utf8(&cmd_out.stderr)?
);

Ok(())
},
)?;
}
}
}
}

Expand Down

0 comments on commit 9a89891

Please sign in to comment.