Skip to content

Commit

Permalink
feat: support compressed upload
Browse files Browse the repository at this point in the history
  • Loading branch information
ctron committed Sep 13, 2024
1 parent ca8a0cd commit 14c356c
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 27 deletions.
23 changes: 15 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ utoipa-redoc = { version = "4.0.0", features = ["actix-web"] }
utoipa-swagger-ui = "7.1.0"
uuid = "1.7.0"
walkdir = "2.5"
walker-common = "0.9.0"
walker-common = "0.9.2"
walker-extras = "0.9.0"
zip = "2.2.0"

Expand Down
7 changes: 7 additions & 0 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ license.workspace = true
[dependencies]
trustify-migration = { workspace = true }

actix-web = { workspace = true }
anyhow = { workspace = true }
bytes = { workspace = true }
bytesize = { workspace = true, features = ["serde"] }
chrono = { workspace = true }
clap = { workspace = true, features = ["derive", "env"] }
Expand All @@ -34,15 +36,20 @@ serde_json = { workspace = true }
sqlx = { workspace = true }
thiserror = { workspace = true }
time = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
utoipa = { workspace = true, features = ["url"] }
uuid = { workspace = true, features = ["v5", "serde"] }
walker-common = { workspace = true, features = ["bzip2", "liblzma"]}

[dev-dependencies]
chrono = { workspace = true }
rand = { workspace = true }
rstest = { workspace = true }
serde_json = { workspace = true }
test-context = { workspace = true }
test-log = { workspace = true, features = ["log", "trace"] }
time = { workspace = true, features = ["macros"] }
tokio = { workspace = true, features = ["full"] }

trustify-test-context = { workspace = true }
167 changes: 167 additions & 0 deletions common/src/decompress.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
use actix_web::http::header;
use anyhow::anyhow;
use bytes::Bytes;
use tokio::{runtime::Handle, task::JoinError};
use walker_common::compression::{Compression, Detector};

#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("unknown compression type")]
UnknownType,
#[error(transparent)]
Detector(anyhow::Error),
#[error(transparent)]
Io(#[from] std::io::Error),
}

/// Take some bytes, and an optional content-type header and decompress, if required.
///
/// If a content type is present, then it is expected to indicate its compression type by appending
/// it using and extension to the subtype, like `+bz2`. If that's not present, or no content-type
/// is present altogether, then it will try detecting it based on some magic bytes.
///
/// If no magic bytes could be detected, it will assume the content is not compressed.
///
/// **NOTE:** Depending on the size of the payload, this method might take some time. In an async
/// context, it might be necessary to run this as a blocking function, or use [`decompress_async`]
/// instead.
pub fn decompress(bytes: Bytes, content_type: Option<header::ContentType>) -> Result<Bytes, Error> {
let content_type = content_type.as_ref().map(|ct| ct.as_ref());

// check what the user has declared

let declared = content_type.map(|content_type| {
if content_type.ends_with("+bzip2") {
Compression::Bzip2
} else if content_type.ends_with("+xz") {
Compression::Xz
} else {
// The user provided a type, and it doesn't indicate a supported compression type,
// So we just accept the payload as-is.
Compression::None
}
});

// otherwise, try to auto-detect

let compression = match declared {
Some(declared) => declared,
None => {
let detector = Detector::default();
detector
.detect(&bytes)
.map_err(|err| Error::Detector(anyhow!("{err}")))?
}
};

// decompress (or not)

Ok(compression.decompress(bytes)?)
}

/// An async version of [`decompress`].
pub async fn decompress_async(
bytes: Bytes,
content_type: Option<header::ContentType>,
) -> Result<Result<Bytes, Error>, JoinError> {
Handle::current()
.spawn_blocking(|| decompress(bytes, content_type))
.await
}

#[cfg(test)]
mod test {
use crate::decompress::decompress_async;
use actix_web::http::header::ContentType;
use test_log::test;
use trustify_test_context::document_bytes_raw;

#[test(tokio::test)]
async fn decompress_none() -> anyhow::Result<()> {
let bytes = decompress_async(
document_bytes_raw("ubi9-9.2-755.1697625012.json").await?,
None,
)
.await??;

// should decode as JSON

let _json: serde_json::Value = serde_json::from_slice(&bytes)?;

// done

Ok(())
}

#[test(tokio::test)]
async fn decompress_xz() -> anyhow::Result<()> {
let bytes = decompress_async(
document_bytes_raw("openshift-container-storage-4.8.z.json.xz").await?,
None,
)
.await??;

// should decode as JSON

let _json: serde_json::Value = serde_json::from_slice(&bytes)?;

// done

Ok(())
}

#[test(tokio::test)]
async fn decompress_xz_with_invalid_type() -> anyhow::Result<()> {
let bytes = decompress_async(
document_bytes_raw("openshift-container-storage-4.8.z.json.xz").await?,
Some(ContentType::json()),
)
.await??;

// should decode as JSON

let result = serde_json::from_slice::<serde_json::Value>(&bytes);

// must be an error, as we try to decode a xz encoded payload as JSON.

assert!(result.is_err());

// done

Ok(())
}

#[test(tokio::test)]
async fn decompress_xz_with_invalid_type_2() -> anyhow::Result<()> {
let result = decompress_async(
document_bytes_raw("openshift-container-storage-4.8.z.json.xz").await?,
Some(ContentType("application/json+bzip2".parse().unwrap())),
)
.await?;

// must be an error, as we indicated bzip2, but provided xz

assert!(result.is_err());

// done

Ok(())
}

#[test(tokio::test)]
async fn decompress_xz_with_correct_type() -> anyhow::Result<()> {
let bytes = decompress_async(
document_bytes_raw("openshift-container-storage-4.8.z.json.xz").await?,
Some(ContentType("application/json+xz".parse().unwrap())),
)
.await??;

// should decode as JSON

let _json: serde_json::Value = serde_json::from_slice(&bytes)?;

// done

Ok(())
}
}
2 changes: 1 addition & 1 deletion common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ pub mod advisory;
pub mod config;
pub mod cpe;
pub mod db;
pub mod decompress;
pub mod error;
pub mod hashing;
pub mod id;

pub mod memo;
pub mod model;
pub mod package;
Expand Down
1 change: 1 addition & 0 deletions modules/fundamental/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ tokio-util = { workspace = true }
tracing = { workspace = true }
utoipa = { workspace = true, features = ["actix_extras", "uuid"] }
uuid = { workspace = true }
walker-common = { workspace = true, features = ["liblzma"] }

[dev-dependencies]
actix-http = { workspace = true }
Expand Down
17 changes: 12 additions & 5 deletions modules/fundamental/src/advisory/endpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@ mod label;
#[cfg(test)]
mod test;

use crate::purl::service::PurlService;
use crate::Error::Internal;
use crate::{advisory::service::AdvisoryService, Error};
use actix_web::{delete, get, post, web, HttpResponse, Responder};
use crate::{
advisory::service::AdvisoryService, purl::service::PurlService, Error, Error::Internal,
};
use actix_web::{delete, get, http::header, post, web, HttpResponse, Responder};
use futures_util::TryStreamExt;
use std::str::FromStr;
use trustify_common::{db::query::Query, db::Database, id::Id, model::Paginated};
use trustify_common::{
db::{query::Query, Database},
decompress::decompress_async,
id::Id,
model::Paginated,
};
use trustify_entity::labels::Labels;
use trustify_module_ingestor::service::{Format, IngestorService};
use trustify_module_storage::service::StorageBackend;
Expand Down Expand Up @@ -166,8 +171,10 @@ struct UploadParams {
pub async fn upload(
service: web::Data<IngestorService>,
web::Query(UploadParams { issuer, labels }): web::Query<UploadParams>,
content_type: Option<web::Header<header::ContentType>>,
bytes: web::Bytes,
) -> Result<impl Responder, Error> {
let bytes = decompress_async(bytes, content_type.map(|ct| ct.0)).await??;
let result = service
.ingest(&bytes, Format::Advisory, labels, issuer)
.await?;
Expand Down
18 changes: 13 additions & 5 deletions modules/fundamental/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
use actix_http::StatusCode;
use actix_web::body::BoxBody;
use actix_web::{HttpResponse, ResponseError};
use actix_web::{body::BoxBody, HttpResponse, ResponseError};
use sea_orm::DbErr;
use trustify_common::error::ErrorInformation;
use trustify_common::id::IdError;
use trustify_common::purl::PurlErr;
use trustify_common::{decompress, error::ErrorInformation, id::IdError, purl::PurlErr};
use trustify_module_storage::service::StorageKeyError;

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -33,6 +30,10 @@ pub enum Error {
Data(String),
#[error("Internal Server Error: {0}")]
Internal(String),
#[error(transparent)]
Compression(#[from] decompress::Error),
#[error(transparent)]
Join(#[from] tokio::task::JoinError),
}

impl From<DbErr> for Error {
Expand Down Expand Up @@ -60,6 +61,13 @@ impl ResponseError for Error {
Error::StorageKey(err) => {
HttpResponse::BadRequest().json(ErrorInformation::new("Storage Key", err))
}
Error::Compression(decompress::Error::UnknownType) => {
HttpResponse::UnsupportedMediaType()
.json(ErrorInformation::new("UnsupportedCompression", self))
}
Error::Compression(err) => {
HttpResponse::BadRequest().json(ErrorInformation::new("CompressionError", err))
}

// All other cases are internal system errors that are not expected to occur.
// They are logged and a generic error response is returned to avoid leaking
Expand Down
Loading

0 comments on commit 14c356c

Please sign in to comment.