Skip to content
This repository has been archived by the owner on Nov 7, 2024. It is now read-only.

Commit

Permalink
Remove decompress_bridge and move decompression inline
Browse files Browse the repository at this point in the history
This (with cautious optimism) appears to fix #657.

Signed-off-by: John Eckersberg <[email protected]>
  • Loading branch information
jeckersb committed Oct 31, 2024
1 parent b6992cc commit 25894c5
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 87 deletions.
17 changes: 12 additions & 5 deletions lib/src/container/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ impl ImageImporter {
p.send(ImportProgress::OstreeChunkStarted(layer.layer.clone()))
.await?;
}
let (blob, driver) = fetch_layer_decompress(
let (blob, driver, media_type) = fetch_layer(
&self.proxy,
&self.proxy_img,
&import.manifest,
Expand All @@ -733,6 +733,7 @@ impl ImageImporter {
let txn = repo.auto_transaction(Some(cancellable))?;
let mut importer = crate::tar::Importer::new_for_object_set(&repo);
let blob = tokio_util::io::SyncIoBridge::new(blob);
let blob = super::unencapsulate::decompressor(&media_type, blob)?;
let mut archive = tar::Archive::new(blob);
importer.import_objects(&mut archive, Some(cancellable))?;
let commit = if write_refs {
Expand Down Expand Up @@ -761,7 +762,7 @@ impl ImageImporter {
))
.await?;
}
let (blob, driver) = fetch_layer_decompress(
let (blob, driver, media_type) = fetch_layer(
&self.proxy,
&self.proxy_img,
&import.manifest,
Expand All @@ -778,6 +779,7 @@ impl ImageImporter {
let txn = repo.auto_transaction(Some(cancellable))?;
let mut importer = crate::tar::Importer::new_for_commit(&repo, remote);
let blob = tokio_util::io::SyncIoBridge::new(blob);
let blob = super::unencapsulate::decompressor(&media_type, blob)?;
let mut archive = tar::Archive::new(blob);
importer.import_commit(&mut archive, Some(cancellable))?;
let commit = importer.finish_import_commit();
Expand Down Expand Up @@ -873,7 +875,7 @@ impl ImageImporter {
p.send(ImportProgress::DerivedLayerStarted(layer.layer.clone()))
.await?;
}
let (blob, driver) = super::unencapsulate::fetch_layer_decompress(
let (blob, driver, media_type) = super::unencapsulate::fetch_layer(
&proxy,
&proxy_img,
&import.manifest,
Expand All @@ -891,8 +893,13 @@ impl ImageImporter {
allow_nonusr: root_is_transient,
retain_var: self.ostree_v2024_3,
};
let r =
crate::tar::write_tar(&self.repo, blob, layer.ostree_ref.as_str(), Some(opts));
let r = crate::tar::write_tar(
&self.repo,
blob,
media_type,
layer.ostree_ref.as_str(),
Some(opts),
);
let r = super::unencapsulate::join_fetch(r, driver)
.await
.with_context(|| format!("Parsing layer blob {}", layer.layer.digest()))?;
Expand Down
98 changes: 23 additions & 75 deletions lib/src/container/unencapsulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
use crate::container::store::LayerProgress;

use super::*;
use anyhow::Context as _;
use containers_image_proxy::{ImageProxy, OpenedImage};
use fn_error_context::context;
use futures_util::{Future, FutureExt, TryFutureExt as _};
use futures_util::{Future, FutureExt};
use oci_spec::image::{self as oci_image, Digest};
use std::io::Read;
use std::sync::{Arc, Mutex};
use tokio::{
io::{AsyncBufRead, AsyncRead},
Expand Down Expand Up @@ -191,80 +191,30 @@ pub async fn unencapsulate(repo: &ostree::Repo, imgref: &OstreeImageReference) -
importer.unencapsulate().await
}

/// Take an async AsyncBufRead and handle decompression for it, returning
/// a wrapped AsyncBufRead implementation.
/// This is implemented with a background thread using a pipe-to-self,
/// and so there is an additional Future object returned that is a "driver"
/// task and must also be checked for errors.
pub(crate) fn decompress_bridge(
src: impl tokio::io::AsyncBufRead + Send + Unpin + 'static,
is_zstd: bool,
) -> Result<(
// This one is the input reader
impl tokio::io::AsyncBufRead + Send + Unpin + 'static,
// And this represents the worker thread doing copying
impl Future<Output = Result<()>> + Send + Unpin + 'static,
)> {
// We use a plain unix pipe() because it's just a very convenient
// way to bridge arbitrarily between sync and async with a worker
// thread. Yes, it involves going through the kernel, but
// eventually we'll replace all this logic with podman anyways.
let (tx, rx) = tokio::net::unix::pipe::pipe()?;
let task = tokio::task::spawn_blocking(move || -> Result<()> {
// Convert the write half of the pipe() into a regular blocking file descriptor
let tx = tx.into_blocking_fd()?;
let mut tx = std::fs::File::from(tx);
// Convert the async input back to synchronous.
let src = tokio_util::io::SyncIoBridge::new(src);
let bufr = std::io::BufReader::new(src);
// Wrap the input in a decompressor; I originally tried to make
// this function take a function pointer, but yeah that was painful
// with the type system.
let mut src: Box<dyn std::io::Read> = if is_zstd {
Box::new(zstd::stream::read::Decoder::new(bufr)?)
} else {
Box::new(flate2::bufread::GzDecoder::new(bufr))
};
// We don't care about the number of bytes copied
let _n: u64 = std::io::copy(&mut src, &mut tx).context("Copying for decompression")?;
Ok(())
})
// Flatten the nested Result<Result<>>
.map(crate::tokio_util::flatten_anyhow);
// And return the pair of futures
Ok((tokio::io::BufReader::new(rx), task))
}

/// Create a decompressor for this MIME type, given a stream of input.
fn new_async_decompressor(
pub(crate) fn decompressor(
media_type: &oci_image::MediaType,
src: impl AsyncBufRead + Send + Unpin + 'static,
) -> Result<(
Box<dyn AsyncBufRead + Send + Unpin + 'static>,
impl Future<Output = Result<()>> + Send + Unpin + 'static,
)> {
let r: (
Box<dyn AsyncBufRead + Send + Unpin + 'static>,
Box<dyn Future<Output = Result<()>> + Send + Unpin + 'static>,
) = match media_type {
src: impl Read + Send + 'static,
) -> Result<Box<dyn Read + Send + 'static>> {
let r: Box<dyn std::io::Read + Send + 'static> = match media_type {
m @ (oci_image::MediaType::ImageLayerGzip | oci_image::MediaType::ImageLayerZstd) => {
let is_zstd = matches!(m, oci_image::MediaType::ImageLayerZstd);
let (r, driver) = decompress_bridge(src, is_zstd)?;
(Box::new(r), Box::new(driver) as _)
}
oci_image::MediaType::ImageLayer => {
(Box::new(src), Box::new(futures_util::future::ready(Ok(()))))
}
oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => {
(Box::new(src), Box::new(futures_util::future::ready(Ok(()))))
if matches!(m, oci_image::MediaType::ImageLayerZstd) {
Box::new(zstd::stream::read::Decoder::new(src)?)
} else {
Box::new(flate2::bufread::GzDecoder::new(std::io::BufReader::new(
src,
)))
}
}
oci_image::MediaType::ImageLayer => Box::new(src),
oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => Box::new(src),
o => anyhow::bail!("Unhandled layer type: {}", o),
};
Ok(r)
}

/// A wrapper for [`get_blob`] which fetches a layer and decompresses it.
pub(crate) async fn fetch_layer_decompress<'a>(
pub(crate) async fn fetch_layer<'a>(
proxy: &'a ImageProxy,
img: &OpenedImage,
manifest: &oci_image::ImageManifest,
Expand All @@ -275,12 +225,13 @@ pub(crate) async fn fetch_layer_decompress<'a>(
) -> Result<(
Box<dyn AsyncBufRead + Send + Unpin>,
impl Future<Output = Result<()>> + 'a,
oci_image::MediaType,
)> {
use futures_util::future::Either;
tracing::debug!("fetching {}", layer.digest());
let layer_index = manifest.layers().iter().position(|x| x == layer).unwrap();
let (blob, driver, size);
let media_type: &oci_image::MediaType;
let media_type: oci_image::MediaType;
match transport_src {
Transport::ContainerStorage => {
let layer_info = layer_info
Expand All @@ -290,12 +241,12 @@ pub(crate) async fn fetch_layer_decompress<'a>(
anyhow!("blobid position {layer_index} exceeds diffid count {n_layers}")
})?;
size = layer_blob.size;
media_type = &layer_blob.media_type;
media_type = layer_blob.media_type.clone();
(blob, driver) = proxy.get_blob(img, &layer_blob.digest, size).await?;
}
_ => {
size = layer.size();
media_type = layer.media_type();
media_type = layer.media_type().clone();
(blob, driver) = proxy.get_blob(img, layer.digest(), size).await?;
}
};
Expand All @@ -316,13 +267,10 @@ pub(crate) async fn fetch_layer_decompress<'a>(
progress.send_replace(Some(status));
}
};
let (reader, compression_driver) = new_async_decompressor(media_type, readprogress)?;
let driver = driver.and_then(|()| compression_driver);
let reader = Box::new(readprogress);
let driver = futures_util::future::join(readproxy, driver).map(|r| r.1);
Ok((reader, Either::Left(driver)))
Ok((reader, Either::Left(driver), media_type))
} else {
let (blob, compression_driver) = new_async_decompressor(media_type, blob)?;
let driver = driver.and_then(|()| compression_driver);
Ok((blob, Either::Right(driver)))
Ok((Box::new(blob), Either::Right(driver), media_type))
}
}
23 changes: 18 additions & 5 deletions lib/src/tar/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use cap_std::io_lifetimes;
use cap_std_ext::cap_std::fs::Dir;
use cap_std_ext::cmdext::CapStdExtCommandExt;
use cap_std_ext::{cap_std, cap_tempfile};
use containers_image_proxy::oci_spec::image as oci_image;
use fn_error_context::context;
use ostree::gio;
use ostree::prelude::FileExt;
Expand Down Expand Up @@ -337,6 +338,7 @@ pub(crate) fn filter_tar(
#[context("Filtering tar stream")]
async fn filter_tar_async(
src: impl AsyncRead + Send + 'static,
media_type: oci_image::MediaType,
mut dest: impl AsyncWrite + Send + Unpin,
config: &TarImportConfig,
repo_tmpdir: Dir,
Expand All @@ -345,12 +347,14 @@ async fn filter_tar_async(
// The source must be moved to the heap so we know it is stable for passing to the worker thread
let src = Box::pin(src);
let config = config.clone();
let tar_transformer = tokio::task::spawn_blocking(move || {
let mut src = tokio_util::io::SyncIoBridge::new(src);
let tar_transformer = crate::tokio_util::spawn_blocking_flatten(move || {
let src = tokio_util::io::SyncIoBridge::new(src);
let mut src = crate::container::decompressor(&media_type, src)?;
let dest = tokio_util::io::SyncIoBridge::new(tx_buf);

let r = filter_tar(&mut src, dest, &config, &repo_tmpdir);
// Pass ownership of the input stream back to the caller - see below.
(r, src)
Ok((r, src))
});
let copier = tokio::io::copy(&mut rx_buf, &mut dest);
let (r, v) = tokio::join!(tar_transformer, copier);
Expand All @@ -373,6 +377,7 @@ async fn filter_tar_async(
pub async fn write_tar(
repo: &ostree::Repo,
src: impl tokio::io::AsyncRead + Send + Unpin + 'static,
media_type: oci_image::MediaType,
refname: &str,
options: Option<WriteTarOptions>,
) -> Result<WriteTarResult> {
Expand Down Expand Up @@ -430,7 +435,8 @@ pub async fn write_tar(
let repo_tmpdir = Dir::reopen_dir(&repo.dfd_borrow())?
.open_dir("tmp")
.context("Getting repo tmpdir")?;
let filtered_result = filter_tar_async(src, child_stdin, &import_config, repo_tmpdir);
let filtered_result =
filter_tar_async(src, media_type, child_stdin, &import_config, repo_tmpdir);
let output_copier = async move {
// Gather stdout/stderr to buffers
let mut child_stdout_buf = String::new();
Expand Down Expand Up @@ -585,7 +591,14 @@ mod tests {
let mut dest = Vec::new();
let src = tokio::io::BufReader::new(tokio::fs::File::open(rootfs_tar_path).await?);
let cap_tmpdir = Dir::open_ambient_dir(&tempd, cap_std::ambient_authority())?;
filter_tar_async(src, &mut dest, &Default::default(), cap_tmpdir).await?;
filter_tar_async(
src,
oci_image::MediaType::ImageLayer,
&mut dest,
&Default::default(),
cap_tmpdir,
)
.await?;
let dest = dest.as_slice();
let mut final_tar = tar::Archive::new(Cursor::new(dest));
let destdir = &tempd.path().join("destdir");
Expand Down
9 changes: 9 additions & 0 deletions lib/src/tokio_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ where
spawn_blocking_cancellable(f).map(flatten_anyhow)
}

/// A wrapper around [`tokio::task::spawn_blocking`] that flattens nested results.
pub fn spawn_blocking_flatten<F, T>(f: F) -> impl Future<Output = Result<T>>
where
F: FnOnce() -> Result<T> + Send + 'static,
T: Send + 'static,
{
tokio::task::spawn_blocking(f).map(flatten_anyhow)
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
18 changes: 16 additions & 2 deletions lib/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,14 @@ async fn test_tar_write() -> Result<()> {
let src = fixture.dir.open(tmptar)?;
fixture.dir.remove_file(tmptar)?;
let src = tokio::fs::File::from_std(src.into_std());
let r = ostree_ext::tar::write_tar(fixture.destrepo(), src, "layer", None).await?;
let r = ostree_ext::tar::write_tar(
fixture.destrepo(),
src,
oci_image::MediaType::ImageLayer,
"layer",
None,
)
.await?;
let layer_commit = r.commit.as_str();
cmd!(
sh,
Expand All @@ -409,7 +416,14 @@ async fn test_tar_write_tar_layer() -> Result<()> {
let mut dec = flate2::bufread::GzDecoder::new(std::io::Cursor::new(EXAMPLE_TAR_LAYER));
let _n = std::io::copy(&mut dec, &mut v)?;
let r = tokio::io::BufReader::new(std::io::Cursor::new(v));
ostree_ext::tar::write_tar(fixture.destrepo(), r, "test", None).await?;
ostree_ext::tar::write_tar(
fixture.destrepo(),
r,
oci_image::MediaType::ImageLayer,
"test",
None,
)
.await?;
Ok(())
}

Expand Down

0 comments on commit 25894c5

Please sign in to comment.