diff --git a/Cargo.lock b/Cargo.lock index 7fa88d253..8d0b90436 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -213,9 +213,9 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.1" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62b74f44609f0f91493e3082d3734d98497e094777144380ea4db9f9905dd5b6" +checksum = "bb42b2197bf15ccb092b62c74515dbd8b86d0effd934795f6687c93b6e679a2c" dependencies = [ "flate2", "futures-core", @@ -224,6 +224,64 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-executor" +version = "1.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c1da3ae8dabd9c00f453a329dfe1fb28da3c0a72e2478cdcd93171740c20499" +dependencies = [ + "async-lock", + "async-task", + "concurrent-queue", + "fastrand 2.0.0", + "futures-lite", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1b6f5d7df27bd294849f8eec66ecfc63d11814df7a4f5d74168a2394467b776" +dependencies = [ + "async-channel", + "async-executor", + "async-io", + "async-lock", + "blocking", + "futures-lite", + "once_cell", +] + +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite", + "log", + "parking", + "polling", + "rustix 0.37.20", + "slab", + "socket2 0.4.9", + "waker-fn", +] + +[[package]] +name = "async-lock" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" +dependencies = [ + "event-listener", +] + [[package]] name = "async-priority-channel" version = "0.1.0" @@ -233,6 +291,24 @@ dependencies = [ "event-listener", ] +[[package]] +name = "async-process" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a9d28b1d97e08915212e2e45310d47854eafa69600756fc735fb788f75199c9" +dependencies = [ + "async-io", + "async-lock", + "autocfg", + "blocking", + "cfg-if", + "event-listener", + "futures-lite", + "rustix 0.37.20", + "signal-hook", + "windows-sys 0.48.0", +] + [[package]] name = "async-recursion" version = "1.0.4" @@ -244,6 +320,33 @@ dependencies = [ "syn 2.0.29", ] +[[package]] +name = "async-std" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" +dependencies = [ + "async-channel", + "async-global-executor", + "async-io", + "async-lock", + "async-process", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -266,6 +369,25 @@ dependencies = [ "syn 2.0.29", ] +[[package]] +name = "async-tar" +version = "0.4.2" +source = "git+https://github.com/vdice/async-tar?rev=71e037f9652971e7a55b412a8e47a37b06f9c29d#71e037f9652971e7a55b412a8e47a37b06f9c29d" +dependencies = [ + "async-std", + "filetime", + "libc", + "pin-project", + "redox_syscall 0.2.16", + "xattr", +] + +[[package]] +name = "async-task" +version = "4.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9441c6b2fe128a7c2bf680a44c34d0df31ce09e5b7e401fcca3faa483dbc921" + [[package]] name = "async-trait" version = "0.1.73" @@ -283,6 +405,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8ab6b55fe97976e46f91ddbed8d147d966475dc29b2032757ba47e02376fbc3" +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "atty" version = "0.2.14" @@ -450,6 +578,22 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94c4ef1f913d78636d78d538eec1f18de81e481f44b1be0a81060090530846e1" +dependencies = [ + "async-channel", + "async-lock", + "async-task", + "fastrand 2.0.0", + "futures-io", + "futures-lite", + "piper", + "tracing", +] + [[package]] name = "bstr" version = "1.4.0" @@ -986,7 +1130,7 @@ dependencies = [ "memchr", "pin-project-lite", "tokio", - "tokio-util 0.7.7", + "tokio-util 0.7.9", ] [[package]] @@ -1350,6 +1494,16 @@ dependencies = [ "memchr", ] +[[package]] +name = "ctor" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096" +dependencies = [ + "quote", + "syn 1.0.109", +] + [[package]] name = "ctrlc" version = "3.2.5" @@ -2296,6 +2450,18 @@ dependencies = [ "regex", ] +[[package]] +name = "gloo-timers" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "h2" version = "0.3.17" @@ -2311,7 +2477,7 @@ dependencies = [ "indexmap 1.9.2", "slab", "tokio", - "tokio-util 0.7.7", + "tokio-util 0.7.9", "tracing", ] @@ -2939,6 +3105,15 @@ dependencies = [ "serde", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -3318,6 +3493,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" dependencies = [ "cfg-if", + "value-bag", ] [[package]] @@ -3580,7 +3756,7 @@ dependencies = [ "thiserror", "tokio", "tokio-native-tls", - "tokio-util 0.7.7", + "tokio-util 0.7.9", "twox-hash", "url", ] @@ -3833,7 +4009,7 @@ dependencies = [ "sha2", "thiserror", "tokio", - "tokio-util 0.7.7", + "tokio-util 0.7.9", "tracing", "unicase", ] @@ -4244,6 +4420,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "668d31b1c4eba19242f2088b2bf3316b82ca31082a8335764db4e083db7485d4" +dependencies = [ + "atomic-waker", + "fastrand 2.0.0", + "futures-io", +] + [[package]] name = "pkg-config" version = "0.3.26" @@ -4278,6 +4465,22 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags 1.3.2", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys 0.48.0", +] + [[package]] name = "portable-atomic" version = "1.3.2" @@ -4597,7 +4800,7 @@ dependencies = [ "sha1 0.6.1", "tokio", "tokio-native-tls", - "tokio-util 0.7.7", + "tokio-util 0.7.9", "url", ] @@ -4721,7 +4924,7 @@ dependencies = [ "tokio", "tokio-native-tls", "tokio-rustls 0.24.1", - "tokio-util 0.7.7", + "tokio-util 0.7.9", "tower-service", "url", "wasm-bindgen", @@ -5736,6 +5939,8 @@ name = "spin-oci" version = "2.0.0-pre0" dependencies = [ "anyhow", + "async-compression", + "async-tar", "base64 0.21.3", "dirs 4.0.0", "dkregistry", @@ -5746,11 +5951,14 @@ dependencies = [ "serde", "serde_json", "spin-app", + "spin-common", "spin-loader", "spin-manifest", + "spin-testing", "spin-trigger", "tempfile", "tokio", + "tokio-util 0.7.9", "tracing", "walkdir", ] @@ -6438,7 +6646,7 @@ dependencies = [ "postgres-types", "socket2 0.4.9", "tokio", - "tokio-util 0.7.7", + "tokio-util 0.7.9", ] [[package]] @@ -6489,9 +6697,9 @@ dependencies = [ [[package]] name = "tokio-util" -version = "0.7.7" +version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5427d89453009325de0d8f342c9490009f76e999cb7672d77e46267448f7e6b2" +checksum = "1d68074620f57a0b21594d9735eb2e98ab38b17f80d3fcb189fca266771ca60d" dependencies = [ "bytes", "futures-core", @@ -6796,6 +7004,16 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "value-bag" +version = "1.0.0-alpha.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2209b78d1249f7e6f3293657c9779fe31ced465df091bbd433a1cf88e916ec55" +dependencies = [ + "ctor", + "version_check", +] + [[package]] name = "vaultrs" version = "0.6.2" diff --git a/crates/oci/Cargo.toml b/crates/oci/Cargo.toml index 7843b612b..ac5a49d07 100644 --- a/crates/oci/Cargo.toml +++ b/crates/oci/Cargo.toml @@ -6,6 +6,9 @@ edition = { workspace = true } [dependencies] anyhow = "1.0" +async-compression = "0.4.3" +# Fork with nested async-std dependency bumped to satisfy Windows build; branch/revision is protected +async-tar = { git = "https://github.com/vdice/async-tar", rev = "71e037f9652971e7a55b412a8e47a37b06f9c29d" } base64 = "0.21" dkregistry = { git = "https://github.com/camallo/dkregistry-rs", rev = "37acecb4b8139dd1b1cc83795442f94f90e1ffc5" } docker_credential = "1.0" @@ -16,11 +19,15 @@ reqwest = "0.11" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" spin-app = { path = "../app" } +spin-common = { path = "../common" } spin-loader = { path = "../loader" } spin-manifest = { path = "../manifest" } spin-trigger = { path = "../trigger" } tempfile = "3.3" tokio = { version = "1", features = ["fs"] } +tokio-util = "0.7.9" tracing = { workspace = true } walkdir = "2.3" +[dev-dependencies] +spin-testing = { path = "../testing" } diff --git a/crates/oci/src/client.rs b/crates/oci/src/client.rs index 666c6e2d8..5af2cc054 100644 --- a/crates/oci/src/client.rs +++ b/crates/oci/src/client.rs @@ -4,6 +4,7 @@ use anyhow::{bail, Context, Result}; use docker_credential::DockerCredential; use futures_util::future; use futures_util::stream::{self, StreamExt, TryStreamExt}; +use oci_distribution::errors::OciDistributionError; use oci_distribution::token_cache::RegistryTokenType; use oci_distribution::RegistryOperation; use oci_distribution::{ @@ -14,6 +15,7 @@ use oci_distribution::{ }; use reqwest::Url; use spin_app::locked::{ContentPath, ContentRef, LockedApp}; +use spin_common::sha256; use spin_loader::cache::Cache; use spin_manifest::Application; use tokio::fs; @@ -21,16 +23,21 @@ use walkdir::WalkDir; use crate::auth::AuthConfig; -// TODO: the media types for application, wasm module, and data layer are not final. +// TODO: the media types for application, wasm module, data and archive layer are not final. const SPIN_APPLICATION_MEDIA_TYPE: &str = "application/vnd.fermyon.spin.application.v1+config"; const WASM_LAYER_MEDIA_TYPE: &str = "application/vnd.wasm.content.layer.v1+wasm"; const DATA_MEDIATYPE: &str = "application/vnd.wasm.content.layer.v1+data"; +const ARCHIVE_MEDIATYPE: &str = "application/vnd.wasm.content.bundle.v1.tar+gzip"; const CONFIG_FILE: &str = "config.json"; const LATEST_TAG: &str = "latest"; const MANIFEST_FILE: &str = "manifest.json"; const MAX_PARALLEL_PULL: usize = 16; +/// Maximum layer count allowed per app, set in accordance to the lowest +/// known maximum per image in well-known OCI registry implementations. +/// (500 appears to be the limit for Elastic Container Registry) +const MAX_LAYER_COUNT: usize = 500; // Inline content into ContentRef iff < this size. const CONTENT_REF_INLINE_MAX_SIZE: usize = 128; @@ -100,10 +107,13 @@ impl Client { auth: RegistryAuth, reference: Reference, ) -> Result> { - // For each component in the application, add layers for the wasm module and - // all static assets and update the locked application with the file digests. + // For each component in the application, add a layer for the wasm module and + // separate layers for all static assets if application total will be under MAX_LAYER_COUNT, + // else an archive layer for all static assets per file entry if not. + // Finally, update the locked application with the layer digests. let mut layers = Vec::new(); let mut components = Vec::new(); + let archive_layers: bool = layer_count(locked.clone()).await? > MAX_LAYER_COUNT; for mut c in locked.components { // Add the wasm module for the component as layers. @@ -122,9 +132,6 @@ impl Client { layers.push(layer); - // Add a layer for each file referenced in the mount directory. - // Note that this is in fact a directory, and not a single file, so we need to - // recursively traverse it and add layers for each file. let mut files = Vec::new(); for f in c.files { let source = f @@ -132,30 +139,15 @@ impl Client { .source .context("file mount loaded from disk should contain a file source")?; let source = spin_trigger::parse_file_url(source.as_str())?; - // Traverse each mount directory, add all static assets as layers, then update the - // locked application file with the file digest. - for entry in WalkDir::new(&source) { - let entry = entry?; - if entry.file_type().is_file() && !entry.file_type().is_dir() { - tracing::trace!( - "Adding new layer for asset {:?}", - spin_loader::to_relative(entry.path(), &source)? - ); - let layer = Self::data_layer(entry.path()).await?; - let content = Self::content_ref_for_layer(&layer); - let content_inline = content.inline.is_some(); - files.push(ContentPath { - content, - path: PathBuf::from(spin_loader::to_relative(entry.path(), &source)?), - }); - // As a workaround for OCI implementations that don't support very small blobs, - // don't push very small content that has been inlined into the manifest: - // https://github.com/distribution/distribution/discussions/4029 - let skip_layer = content_inline; - if !skip_layer { - layers.push(layer); - } - } + + if archive_layers { + self.push_archive_layer(&source, &mut files, &mut layers) + .await + .context(format!("cannot push archive layer for source {:?}", source))?; + } else { + self.push_file_layers(&source, &mut files, &mut layers) + .await + .context(format!("cannot push file layers for source {:?}", source))?; } } c.files = files; @@ -183,6 +175,83 @@ impl Client { Ok(digest) } + /// Archive all of the files recursively under the source directory + /// and push as a compressed archive layer + async fn push_archive_layer( + &mut self, + source: &PathBuf, + files: &mut Vec, + layers: &mut Vec, + ) -> Result<()> { + // Add all archived file entries to the locked app manifest + for entry in WalkDir::new(source) { + let entry = entry?; + if entry.file_type().is_file() && !entry.file_type().is_dir() { + tracing::trace!( + "Adding asset {:?} to component files list", + spin_loader::to_relative(entry.path(), source)? + ); + // Add content/path to the locked component files list + let layer = Self::data_layer(entry.path(), DATA_MEDIATYPE.to_string()).await?; + let content = Self::content_ref_for_layer(&layer); + files.push(ContentPath { + content, + path: PathBuf::from(spin_loader::to_relative(entry.path(), source)?), + }); + } + } + + // Only add the archive layer to the OCI manifest + tracing::trace!("Adding archive layer for all files in source {:?}", &source); + let working_dir = tempfile::tempdir()?; + let archive_path = crate::utils::archive(source, &working_dir.into_path()) + .await + .context(format!( + "Unable to create compressed archive for source {:?}", + source + ))?; + let layer = Self::data_layer(archive_path.as_path(), ARCHIVE_MEDIATYPE.to_string()).await?; + layers.push(layer); + Ok(()) + } + + /// Recursively traverse the source directory and add layers for each file. + async fn push_file_layers( + &mut self, + source: &PathBuf, + files: &mut Vec, + layers: &mut Vec, + ) -> Result<()> { + // Traverse each mount directory, add all static assets as layers, then update the + // locked application file with the file digest. + tracing::trace!("Adding new layer per file under source {:?}", source); + for entry in WalkDir::new(source) { + let entry = entry?; + if entry.file_type().is_file() && !entry.file_type().is_dir() { + tracing::trace!( + "Adding new layer for asset {:?}", + spin_loader::to_relative(entry.path(), source)? + ); + // Construct and push layer, adding its digest to the locked component files Vec + let layer = Self::data_layer(entry.path(), DATA_MEDIATYPE.to_string()).await?; + let content = Self::content_ref_for_layer(&layer); + let content_inline = content.inline.is_some(); + files.push(ContentPath { + content, + path: PathBuf::from(spin_loader::to_relative(entry.path(), source)?), + }); + // As a workaround for OCI implementations that don't support very small blobs, + // don't push very small content that has been inlined into the manifest: + // https://github.com/distribution/distribution/discussions/4029 + let skip_layer = content_inline; + if !skip_layer { + layers.push(layer); + } + } + } + Ok(()) + } + /// Pull a Spin application from an OCI registry. pub async fn pull(&mut self, reference: &str) -> Result<()> { let reference: Reference = reference.parse().context("cannot parse reference")?; @@ -210,7 +279,7 @@ impl Client { fs::write(&c, &cfg).await?; // If a layer is a Wasm module, write it in the Wasm directory. - // Otherwise, write it in the data directory. + // Otherwise, write it in the data directory (after unpacking if archive layer) stream::iter(manifest.layers) .map(|layer| { let this = &self; @@ -234,6 +303,15 @@ impl Client { WASM_LAYER_MEDIA_TYPE => { let _ = this.cache.write_wasm(&bytes, &layer.digest).await; } + ARCHIVE_MEDIATYPE => { + if let Err(e) = + this.unpack_archive_layer(&bytes, &layer.digest).await + { + return Err(OciDistributionError::GenericError(Some( + e.to_string(), + ))); + } + } _ => { let _ = this.cache.write_data(&bytes, &layer.digest).await; } @@ -307,13 +385,9 @@ impl Client { } /// Create a new data layer based on a file. - async fn data_layer(file: &Path) -> Result { + async fn data_layer(file: &Path, media_type: String) -> Result { tracing::log::trace!("Reading data file from {:?}", file); - Ok(ImageLayer::new( - fs::read(&file).await?, - DATA_MEDIATYPE.to_string(), - None, - )) + Ok(ImageLayer::new(fs::read(&file).await?, media_type, None)) } fn content_ref_for_layer(layer: &ImageLayer) -> ContentRef { @@ -326,6 +400,44 @@ impl Client { } } + /// Unpack archive layer into self.cache + async fn unpack_archive_layer( + &self, + bytes: impl AsRef<[u8]>, + digest: impl AsRef, + ) -> Result<()> { + // Write archive layer to cache as usual + self.cache.write_data(&bytes, &digest).await?; + + // Unpack archive into a staging dir + let path = self + .cache + .data_file(&digest) + .context("unable to read archive layer from cache")?; + let staging_dir = tempfile::tempdir()?; + crate::utils::unarchive(path.as_ref(), staging_dir.path()).await?; + + // Traverse unpacked contents and if a file, write to cache by digest + // (if it doesn't already exist) + for entry in WalkDir::new(staging_dir.path()) { + let entry = entry?; + if entry.file_type().is_file() && !entry.file_type().is_dir() { + let bytes = tokio::fs::read(entry.path()).await?; + let digest = format!("sha256:{}", sha256::hex_digest_from_bytes(&bytes)); + if self.cache.data_file(&digest).is_ok() { + tracing::debug!( + "Skipping unpacked asset {:?}; file already exists", + entry.path() + ); + } else { + tracing::debug!("Adding unpacked asset {:?} to cache", entry.path()); + self.cache.write_data(bytes, &digest).await?; + } + } + } + Ok(()) + } + /// Save a credential set containing the registry username and password. pub async fn login( server: impl AsRef, @@ -444,6 +556,27 @@ fn digest_from_url(manifest_url: &str) -> Option { } } +async fn layer_count(locked: LockedApp) -> Result { + let mut layer_count = 0; + for c in locked.components { + layer_count += 1; + for f in c.files { + let source = f + .content + .source + .context("file mount loaded from disk should contain a file source")?; + let source = spin_trigger::parse_file_url(source.as_str())?; + for entry in WalkDir::new(&source) { + let entry = entry?; + if entry.file_type().is_file() && !entry.file_type().is_dir() { + layer_count += 1; + } + } + } + } + Ok(layer_count) +} + #[cfg(test)] mod test { use super::*; @@ -457,4 +590,59 @@ mod test { digest ); } + + #[tokio::test] + async fn can_get_layer_count() { + use spin_app::locked::LockedComponent; + + let working_dir = tempfile::tempdir().unwrap(); + let source_dir = working_dir.path().join("foo"); + let _ = tokio::fs::create_dir(source_dir.as_path()).await; + let file_path = source_dir.join("bar"); + let _ = tokio::fs::File::create(file_path.as_path()).await; + + let tests: Vec<(Vec, usize)> = [ + ( + spin_testing::from_json!([{ + "id": "test-component", + "source": { + "content_type": "application/wasm", + "digest": "test-source", + }, + }]), + 1, + ), + ( + spin_testing::from_json!([{ + "id": "test-component", + "source": { + "content_type": "application/wasm", + "digest": "test-source", + }, + "files": [ + { + "source": format!("file://{}", file_path.to_str().unwrap()), + "path": "" + } + ] + }]), + 2, + ), + ] + .to_vec(); + + for (components, expected) in tests { + let triggers = Default::default(); + let metadata = Default::default(); + let variables = Default::default(); + let locked = LockedApp { + spin_lock_version: spin_app::locked::FixedVersion, + components, + triggers, + metadata, + variables, + }; + assert_eq!(expected, layer_count(locked).await.unwrap()); + } + } } diff --git a/crates/oci/src/lib.rs b/crates/oci/src/lib.rs index 02fbd232a..944457c19 100644 --- a/crates/oci/src/lib.rs +++ b/crates/oci/src/lib.rs @@ -4,6 +4,7 @@ mod auth; mod client; mod loader; +mod utils; pub use client::Client; pub use loader::OciLoader; diff --git a/crates/oci/src/utils.rs b/crates/oci/src/utils.rs new file mode 100644 index 000000000..709104c8e --- /dev/null +++ b/crates/oci/src/utils.rs @@ -0,0 +1,58 @@ +use anyhow::{Context, Result}; +use async_compression::tokio::bufread::GzipDecoder; +use async_compression::tokio::write::GzipEncoder; +use async_tar::Archive; +use std::path::{Path, PathBuf}; + +/// Create a compressed archive of source, returning its path in working_dir +pub async fn archive(source: &Path, working_dir: &Path) -> Result { + // Create tar archive file + let tar_gz_path = working_dir + .join(source.file_name().unwrap()) + .with_extension("tar.gz"); + let tar_gz = tokio::fs::File::create(tar_gz_path.as_path()) + .await + .context(format!( + "Unable to create tar archive for source {:?}", + source + ))?; + + // Create encoder + // TODO: use zstd? May be more performant + let tar_gz_enc = GzipEncoder::new(tar_gz); + + // Build tar archive + let mut tar_builder = async_tar::Builder::new( + tokio_util::compat::TokioAsyncWriteCompatExt::compat_write(tar_gz_enc), + ); + tar_builder + .append_dir_all(".", source) + .await + .context(format!( + "Unable to create tar archive for source {:?}", + source + ))?; + // Finish writing the archive + tar_builder.finish().await?; + // Shutdown the encoder + use tokio::io::AsyncWriteExt; + tar_builder + .into_inner() + .await? + .into_inner() + .shutdown() + .await?; + Ok(tar_gz_path) +} + +/// Unpack a compressed archive existing at source into dest +pub async fn unarchive(source: &Path, dest: &Path) -> Result<()> { + let decoder = GzipDecoder::new(tokio::io::BufReader::new( + tokio::fs::File::open(source).await?, + )); + let archive = Archive::new(tokio_util::compat::TokioAsyncReadCompatExt::compat(decoder)); + if let Err(e) = archive.unpack(dest).await { + return Err(e.into()); + }; + Ok(()) +} diff --git a/crates/testing/src/lib.rs b/crates/testing/src/lib.rs index f057f87f6..8275bbe36 100644 --- a/crates/testing/src/lib.rs +++ b/crates/testing/src/lib.rs @@ -38,6 +38,7 @@ pub fn init_tracing() { } // Convenience wrapper for deserializing from literal JSON +#[macro_export] macro_rules! from_json { ($($json:tt)+) => { serde_json::from_value(serde_json::json!($($json)+)).expect("valid json")