Skip to content

Commit

Permalink
refactor: collect the creation of all product status to speed things up
Browse files Browse the repository at this point in the history
  • Loading branch information
ctron committed Jun 26, 2024
1 parent 7f57a81 commit 47be914
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 67 deletions.
2 changes: 0 additions & 2 deletions integration-tests/src/csaf/perf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use test_context::test_context;
use test_log::test;
use tracing::instrument;
use trustify_common::{db::test::TrustifyContext, hashing::Digests};
use trustify_module_fundamental::advisory::service::AdvisoryService;
use trustify_module_ingestor::{graph::Graph, service::advisory::csaf::loader::CsafLoader};

#[test_context(TrustifyContext, skip_teardown)]
Expand All @@ -14,7 +13,6 @@ use trustify_module_ingestor::{graph::Graph, service::advisory::csaf::loader::Cs
async fn ingest(ctx: TrustifyContext) -> anyhow::Result<()> {
let db = ctx.db;
let graph = Graph::new(db.clone());
let advisory = AdvisoryService::new(db.clone());

let start = Instant::now();

Expand Down
2 changes: 1 addition & 1 deletion integration-tests/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ pub fn xz_stream(data: &[u8]) -> impl Stream<Item = Result<Bytes, LzmaError>> {
}

/// Create a stream from a static BLOB.
pub fn stream<'a>(data: &'static [u8]) -> impl Stream<Item = Result<Bytes, Infallible>> {
pub fn stream(data: &'static [u8]) -> impl Stream<Item = Result<Bytes, Infallible>> {
stream::once(async move { Ok(Bytes::from_static(data)) })
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub enum Version {
}

impl VersionInfo {
fn into_active_model(self) -> version_range::ActiveModel {
pub fn into_active_model(self) -> version_range::ActiveModel {
version_range::ActiveModel {
id: Default::default(),
version_scheme_id: Set(self.scheme),
Expand Down
1 change: 1 addition & 0 deletions modules/ingestor/src/graph/vulnerability/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl Graph {
information: I,
tx: TX,
) -> Result<VulnerabilityContext, Error> {
// TODO: consider transforming into upsert
let information = information.into();
if let Some(found) = self.get_vulnerability(identifier, &tx).await? {
if information.has_data() {
Expand Down
147 changes: 147 additions & 0 deletions modules/ingestor/src/service/advisory/csaf/creator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use crate::{
graph::{
advisory::advisory_vulnerability::{VersionInfo, VersionSpec},
purl::creator::PurlCreator,
},
service::{advisory::csaf::util::resolve_purls, Error},
};
use csaf::{definitions::ProductIdT, Csaf};
use sea_orm::{ActiveValue::Set, ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter};
use sea_query::IntoCondition;
use std::{collections::hash_map::Entry, collections::HashMap};
use trustify_common::{db::chunk::EntityChunkedIter, purl::Purl};
use trustify_entity::{package_status, status, version_range};
use uuid::Uuid;

struct PackageStatus {
package: Purl,
status: &'static str,
info: VersionInfo,
}

pub struct PackageStatusCreator {
advisory_id: Uuid,
vulnerability_id: i32,
entries: Vec<PackageStatus>,
}

impl PackageStatusCreator {
pub fn new(advisory_id: Uuid, vulnerability_id: i32) -> Self {
Self {
advisory_id,
vulnerability_id,
entries: Vec::new(),
}
}

pub fn add_all(&mut self, csaf: &Csaf, ps: &Option<Vec<ProductIdT>>, status: &'static str) {
for r in ps.iter().flatten() {
for purl in resolve_purls(csaf, r) {
let package = Purl::from(purl.clone());

if let Some(version) = package.version.clone() {
self.entries.push(PackageStatus {
package,
status,
info: VersionInfo {
scheme: "generic".to_string(),
spec: VersionSpec::Exact(version),
},
});
}
}
}
}

async fn check_status(
status: &str,
connection: &impl ConnectionTrait,
) -> Result<status::Model, Error> {
Ok(status::Entity::find()
.filter(status::Column::Slug.eq(status))
.one(connection)
.await?
.ok_or_else(|| crate::graph::error::Error::InvalidStatus(status.to_string()))?)
}

pub async fn create(self, connection: &impl ConnectionTrait) -> Result<(), Error> {
let mut checked = HashMap::new();

let mut purls = PurlCreator::new();

for ps in &self.entries {
// ensure a correct status, and get id
if let Entry::Vacant(entry) = checked.entry(ps.status) {
entry.insert(Self::check_status(ps.status, connection).await?);
}
// add to PURL creator
purls.add(ps.package.clone());
}

purls.create(connection).await?;

// round two, status is checked, purls exist

let mut version_ranges = Vec::new();
let mut package_statuses = Vec::new();

for ps in self.entries {
let status = checked.get(&ps.status).ok_or_else(|| {
Error::Graph(crate::graph::error::Error::InvalidStatus(
ps.status.to_string(),
))
})?;

// TODO: we could try to batch process this too

let package_id = ps.package.package_uuid();

let package_status = package_status::Entity::find()
.filter(package_status::Column::PackageId.eq(package_id))
.filter(package_status::Column::AdvisoryId.eq(self.advisory_id))
.filter(package_status::Column::StatusId.eq(status.id))
.left_join(version_range::Entity)
.filter(ps.info.clone().into_condition())
.one(connection)
.await?;

if package_status.is_some() {
continue;
}

let mut version_range = ps.info.into_active_model();
let version_range_id = Uuid::now_v7();
version_range.id = Set(version_range_id);
version_ranges.push(version_range);

let package_status = package_status::ActiveModel {
id: Default::default(),
advisory_id: Set(self.advisory_id),
vulnerability_id: Set(self.vulnerability_id),
status_id: Set(status.id),
package_id: Set(package_id),
version_range_id: Set(version_range_id),
};

package_statuses.push(package_status);
}

// batch insert

for batch in &version_ranges.chunked() {
version_range::Entity::insert_many(batch)
.exec(connection)
.await?;
}

for batch in &package_statuses.chunked() {
package_status::Entity::insert_many(batch)
.exec(connection)
.await?;
}

// done

Ok(())
}
}
79 changes: 16 additions & 63 deletions modules/ingestor/src/service/advisory/csaf/loader.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use crate::graph::advisory::advisory_vulnerability::{VersionInfo, VersionSpec};
use crate::model::IngestResult;
use crate::{
graph::{
advisory::{
Expand All @@ -8,17 +6,17 @@ use crate::{
},
Graph,
},
service::{advisory::csaf::util::resolve_purls, Error},
model::IngestResult,
service::{advisory::csaf::PackageStatusCreator, Error},
};
use csaf::{
vulnerability::{ProductStatus, Vulnerability},
Csaf,
};
use std::io::Read;
use std::str::FromStr;
use std::{io::Read, str::FromStr};
use time::OffsetDateTime;
use tracing::{info_span, instrument};
use trustify_common::{db::Transactional, hashing::Digests, id::Id, purl::Purl};
use trustify_common::{db::Transactional, hashing::Digests, id::Id};
use trustify_cvss::cvss3::Cvss3Base;
use trustify_entity::labels::Labels;

Expand Down Expand Up @@ -153,63 +151,18 @@ impl<'g> CsafLoader<'g> {
product_status: &ProductStatus,
tx: TX,
) -> Result<(), Error> {
for r in product_status.fixed.iter().flatten() {
for purl in resolve_purls(csaf, r) {
let package = Purl::from(purl.clone());

if let Some(version) = &package.version {
advisory_vulnerability
.ingest_package_status(
&package,
"fixed",
VersionInfo {
scheme: "generic".to_string(),
spec: VersionSpec::Exact(version.clone()),
},
&tx,
)
.await?
}
}
}
for r in product_status.known_not_affected.iter().flatten() {
for purl in resolve_purls(csaf, r) {
let package = Purl::from(purl.clone());

if let Some(version) = &package.version {
advisory_vulnerability
.ingest_package_status(
&package,
"not_affected",
VersionInfo {
scheme: "generic".to_string(),
spec: VersionSpec::Exact(version.clone()),
},
&tx,
)
.await?
}
}
}
for r in product_status.known_affected.iter().flatten() {
for purl in resolve_purls(csaf, r) {
let package = Purl::from(purl.clone());

if let Some(version) = &package.version {
advisory_vulnerability
.ingest_package_status(
&package,
"affected",
VersionInfo {
scheme: "generic".to_string(),
spec: VersionSpec::Exact(version.clone()),
},
&tx,
)
.await?
}
}
}
let mut creator = PackageStatusCreator::new(
advisory_vulnerability.advisory_vulnerability.advisory_id,
advisory_vulnerability
.advisory_vulnerability
.vulnerability_id,
);

creator.add_all(csaf, &product_status.fixed, "fixed");
creator.add_all(csaf, &product_status.known_not_affected, "not_affected");
creator.add_all(csaf, &product_status.known_affected, "affected");

creator.create(&self.graph.connection(&tx)).await?;

Ok(())
}
Expand Down
3 changes: 3 additions & 0 deletions modules/ingestor/src/service/advisory/csaf/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
pub mod loader;
mod util;

mod creator;
pub use creator::*;
1 change: 1 addition & 0 deletions server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![allow(unused)]
#![recursion_limit = "256"]

#[cfg(feature = "garage-door")]
mod embedded_oidc;
Expand Down

0 comments on commit 47be914

Please sign in to comment.