Skip to content
This repository has been archived by the owner on Jan 15, 2025. It is now read-only.

Commit

Permalink
container: Drop async_compression
Browse files Browse the repository at this point in the history
This is basically just a workaround for Nullus157/async-compression#271

However, in practice I think we may as well just use
a native blocking tokio thread here.

There's a lot of shenanigans going on though because
we're wrapping sync I/O with async and then back to sync
because the tar code we're using is still sync.

What would be a lot better is to move the compression to be
inline with the sync tar parsing, but that would require some
API changes and more code motion.
  • Loading branch information
cgwalters committed Apr 29, 2024
1 parent 3014069 commit e30d232
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 21 deletions.
2 changes: 1 addition & 1 deletion lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ rust-version = "1.74.0"
[dependencies]
anyhow = "1.0"
containers-image-proxy = "0.5.5"
async-compression = { version = "0.4", features = ["gzip", "tokio", "zstd"] }
camino = "1.0.4"
chrono = "0.4.19"
olpc-cjson = "0.1.1"
Expand Down Expand Up @@ -43,6 +42,7 @@ tokio = { features = ["io-std", "time", "process", "rt", "net"], version = ">= 1
tokio-util = { features = ["io-util"], version = "0.7" }
tokio-stream = { features = ["sync"], version = "0.1.8" }
tracing = "0.1"
zstd = "0.13.1"

indoc = { version = "2", optional = true }
xshell = { version = "0.2", optional = true }
Expand Down
88 changes: 72 additions & 16 deletions lib/src/container/unencapsulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::container::store::LayerProgress;
use super::*;
use containers_image_proxy::{ImageProxy, OpenedImage};
use fn_error_context::context;
use futures_util::{Future, FutureExt};
use futures_util::{Future, FutureExt, TryFutureExt as _};
use oci_spec::image as oci_image;
use std::sync::{Arc, Mutex};
use tokio::{
Expand Down Expand Up @@ -189,22 +189,76 @@ pub async fn unencapsulate(repo: &ostree::Repo, imgref: &OstreeImageReference) -
importer.unencapsulate().await
}

/// Take an async AsyncBufRead and handle decompression for it, returning
/// a wrapped AsyncBufRead implementation.
/// This is implemented with a background thread using a pipe-to-self,
/// and so there is an additional Future object returned that is a "driver"
/// task and must also be checked for errors.
pub(crate) fn decompress_bridge<'a>(
src: impl tokio::io::AsyncBufRead + Send + Unpin + 'static,
is_zstd: bool,
) -> Result<(
// This one is the input reader
impl tokio::io::AsyncBufRead + Send + Unpin + 'static,
// And this represents the worker thread doing copying
impl Future<Output = Result<()>> + Send + Unpin + 'static,
)> {
// We use a plain unix pipe() because it's just a very convenient
// way to bridge arbitrarily between sync and async with a worker
// thread. Yes, it involves going through the kernel, but
// eventually we'll replace all this logic with podman anyways.
let (tx, rx) = tokio::net::unix::pipe::pipe()?;
let task = tokio::task::spawn_blocking(move || -> Result<()> {
// Convert the write half of the pipe() into a regular blocking file descriptor
let tx = tx.into_blocking_fd()?;
let mut tx = std::fs::File::from(tx);
// Convert the async input back to synchronous.
let src = tokio_util::io::SyncIoBridge::new(src);
let bufr = std::io::BufReader::new(src);
// Wrap the input in a decompressor; I originally tried to make
// this function take a function pointer, but yeah that was painful
// with the type system.
let mut src: Box<dyn std::io::Read> = if is_zstd {
Box::new(zstd::stream::read::Decoder::new(bufr)?)
} else {
Box::new(flate2::bufread::GzDecoder::new(bufr))
};
// We don't care about the number of bytes copied
let _n: u64 = std::io::copy(&mut src, &mut tx)?;
Ok(())
})
// Flatten the nested Result<Result<>>
.map(crate::tokio_util::flatten_anyhow);
// And return the pair of futures
Ok((tokio::io::BufReader::new(rx), task))
}

/// Create a decompressor for this MIME type, given a stream of input.
fn new_async_decompressor<'a>(
media_type: &oci_image::MediaType,
src: impl AsyncBufRead + Send + Unpin + 'a,
) -> Result<Box<dyn AsyncBufRead + Send + Unpin + 'a>> {
match media_type {
oci_image::MediaType::ImageLayerGzip => Ok(Box::new(tokio::io::BufReader::new(
async_compression::tokio::bufread::GzipDecoder::new(src),
))),
oci_image::MediaType::ImageLayerZstd => Ok(Box::new(tokio::io::BufReader::new(
async_compression::tokio::bufread::ZstdDecoder::new(src),
))),
oci_image::MediaType::ImageLayer => Ok(Box::new(src)),
oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => Ok(Box::new(src)),
o => Err(anyhow::anyhow!("Unhandled layer type: {}", o)),
}
src: impl AsyncBufRead + Send + Unpin + 'static,
) -> Result<(
Box<dyn AsyncBufRead + Send + Unpin + 'static>,
impl Future<Output = Result<()>> + Send + Unpin + 'static,
)> {
let r: (
Box<dyn AsyncBufRead + Send + Unpin + 'static>,
Box<dyn Future<Output = Result<()>> + Send + Unpin + 'static>,
) = match media_type {
m @ (oci_image::MediaType::ImageLayerGzip | oci_image::MediaType::ImageLayerZstd) => {
let is_zstd = matches!(m, oci_image::MediaType::ImageLayerZstd);
let (r, driver) = decompress_bridge(src, is_zstd)?;
(Box::new(r), Box::new(driver) as _)
}
oci_image::MediaType::ImageLayer => {
(Box::new(src), Box::new(futures_util::future::ready(Ok(()))))
}
oci_image::MediaType::Other(t) if t.as_str() == DOCKER_TYPE_LAYER_TAR => {
(Box::new(src), Box::new(futures_util::future::ready(Ok(()))))
}
o => anyhow::bail!("Unhandled layer type: {}", o),
};
Ok(r)
}

/// A wrapper for [`get_blob`] which fetches a layer and decompresses it.
Expand Down Expand Up @@ -262,11 +316,13 @@ pub(crate) async fn fetch_layer_decompress<'a>(
progress.send_replace(Some(status));
}
};
let reader = new_async_decompressor(media_type, readprogress)?;
let (reader, compression_driver) = new_async_decompressor(media_type, readprogress)?;
let driver = driver.and_then(|()| compression_driver);
let driver = futures_util::future::join(readproxy, driver).map(|r| r.1);
Ok((reader, Either::Left(driver)))
} else {
let blob = new_async_decompressor(media_type, blob)?;
let (blob, compression_driver) = new_async_decompressor(media_type, blob)?;
let driver = driver.and_then(|()| compression_driver);
Ok((blob, Either::Right(driver)))
}
}
9 changes: 5 additions & 4 deletions lib/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,10 +401,11 @@ async fn test_tar_write() -> Result<()> {
#[tokio::test]
async fn test_tar_write_tar_layer() -> Result<()> {
let fixture = Fixture::new_v1()?;
let uncompressed_tar = tokio::io::BufReader::new(
async_compression::tokio::bufread::GzipDecoder::new(EXAMPLE_TAR_LAYER),
);
ostree_ext::tar::write_tar(fixture.destrepo(), uncompressed_tar, "test", None).await?;
let mut v = Vec::new();
let mut dec = flate2::bufread::GzDecoder::new(std::io::Cursor::new(EXAMPLE_TAR_LAYER));
let _n = std::io::copy(&mut dec, &mut v)?;
let r = tokio::io::BufReader::new(std::io::Cursor::new(v));
ostree_ext::tar::write_tar(fixture.destrepo(), r, "test", None).await?;
Ok(())
}

Expand Down

0 comments on commit e30d232

Please sign in to comment.