Skip to content

Commit

Permalink
Merge pull request #4162 from systeminit/nick/eng-2601
Browse files Browse the repository at this point in the history
Move module sync to dal, fix its tracing and add integration test
  • Loading branch information
nickgerace authored Jul 16, 2024
2 parents 8571d97 + c4d2c47 commit 27055c7
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 128 deletions.
128 changes: 119 additions & 9 deletions lib/dal/src/module.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use si_events::ulid::Ulid;
use si_events::ContentHash;
use thiserror::Error;
use tokio::sync::TryLockError;

use si_frontend_types as frontend_types;
use si_layer_cache::LayerDbError;
use telemetry::prelude::*;
use thiserror::Error;
use tokio::sync::TryLockError;
use tokio::time::Instant;

use crate::layer_db_types::{ModuleContent, ModuleContentV2};
use crate::workspace_snapshot::content_address::{ContentAddress, ContentAddressDiscriminants};
Expand All @@ -20,7 +22,7 @@ use crate::workspace_snapshot::node_weight::category_node_weight::CategoryNodeKi
use crate::workspace_snapshot::node_weight::{NodeWeight, NodeWeightError};
use crate::workspace_snapshot::WorkspaceSnapshotError;
use crate::{
pk, ChangeSetError, DalContext, Func, FuncError, Schema, SchemaError, SchemaVariant,
pk, ChangeSetError, DalContext, Func, FuncError, Schema, SchemaError, SchemaId, SchemaVariant,
SchemaVariantError, Timestamp, TransactionsError,
};

Expand All @@ -31,16 +33,22 @@ pub enum ModuleError {
ChangeSet(#[from] ChangeSetError),
#[error("edge weight error: {0}")]
EdgeWeight(#[from] EdgeWeightError),
#[error(transparent)]
#[error("func error: {0}")]
Func(#[from] FuncError),
#[error("layer db error: {0}")]
LayerDb(#[from] LayerDbError),
#[error("module missing schema id (module id: {0}) (module hash: {1})")]
MissingSchemaId(String, String),
#[error("node weight error: {0}")]
NodeWeight(#[from] NodeWeightError),
#[error(transparent)]
#[error("module not found for schema: {0}")]
NotFoundForSchema(SchemaId),
#[error("schema error: {0}")]
Schema(#[from] SchemaError),
#[error(transparent)]
#[error("schema variant error: {0}")]
SchemaVariant(#[from] SchemaVariantError),
#[error("too many latest modules for schema: {0} (at least two hashes found: {1} and {2})")]
TooManyLatestModulesForSchema(SchemaId, String, String),
#[error("transactions error: {0}")]
Transactions(#[from] TransactionsError),
#[error("try lock error: {0}")]
Expand Down Expand Up @@ -402,4 +410,106 @@ impl Module {

Ok(modules)
}

/// Takes in a list of [`LatestModules`](si_frontend_types::LatestModule) and creates a
/// [`SyncedModules`](si_frontend_types::SyncedModules) object with them. The object enables callers to know what
/// [`Modules`](Module) can be upgraded and installed.
#[instrument(
name = "module.sync"
skip_all,
level = "debug",
)]
pub async fn sync(
ctx: &DalContext,
latest_modules: Vec<frontend_types::LatestModule>,
) -> ModuleResult<frontend_types::SyncedModules> {
debug!("working with {} latest modules", latest_modules.len());

let start = Instant::now();

// Collect all user facing schema variants. We need to see what can be upgraded.
let schema_variants = SchemaVariant::list_user_facing(ctx).await?;

// Find all local hashes and mark all seen schemas.
let mut local_hashes = HashMap::new();
let mut seen_schema_ids = HashSet::new();
for schema_variant in &schema_variants {
let schema_id: SchemaId = schema_variant.schema_id.into();
seen_schema_ids.insert(schema_id);

if let Entry::Vacant(entry) = local_hashes.entry(schema_id) {
let local_module = Self::find_for_member_id(ctx, schema_id)
.await?
.ok_or(ModuleError::NotFoundForSchema(schema_id))?;
entry.insert(local_module.root_hash().to_owned());
}
}

// Begin populating synced modules.
let mut synced_modules = frontend_types::SyncedModules::new();

// Group the latest hashes by schema. Populate installable modules along the way.
let mut latest_modules_by_schema: HashMap<SchemaId, frontend_types::LatestModule> =
HashMap::new();
for latest_module in latest_modules {
let schema_id: SchemaId = latest_module
.schema_id()
.ok_or(ModuleError::MissingSchemaId(
latest_module.id.to_owned(),
latest_module.latest_hash.to_owned(),
))?
.into();
match latest_modules_by_schema.entry(schema_id) {
Entry::Occupied(entry) => {
let existing: frontend_types::LatestModule = entry.get().to_owned();
return Err(ModuleError::TooManyLatestModulesForSchema(
schema_id,
existing.latest_hash,
latest_module.latest_hash,
));
}
Entry::Vacant(entry) => {
entry.insert(latest_module.to_owned());
}
}

if !seen_schema_ids.contains(&schema_id) {
synced_modules.installable.push(latest_module.to_owned());
}
}
trace!(?synced_modules.installable, "collected installable modules");

// Populate upgradeable modules.
for schema_variant in schema_variants {
let schema_id: SchemaId = schema_variant.schema_id.into();
match (
latest_modules_by_schema.get(&schema_id),
local_hashes.get(&schema_id),
) {
(Some(latest_module), Some(local_hash)) => {
trace!(?latest_module, %local_hash, "comparing hashes");
if &latest_module.latest_hash != local_hash {
synced_modules
.upgradeable
.insert(schema_variant.schema_variant_id, latest_module.to_owned());
}
}
(maybe_latest, maybe_local) => {
trace!(
%schema_id,
%schema_variant.schema_variant_id,
%schema_variant.schema_name,
?maybe_latest,
?maybe_local,
"skipping since there's incomplete data for determining if schema variant can be updated (perhaps module was not prompted to builtin?)"
);
}
}
}
trace!(?synced_modules.upgradeable, "collected upgradeable modules");

debug!("syncing modules took: {:?}", start.elapsed());

Ok(synced_modules)
}
}
71 changes: 71 additions & 0 deletions lib/dal/tests/integration_test/module.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use chrono::Utc;
use dal::module::Module;
use dal::pkg::export::PkgExporter;
use dal::{DalContext, Schema};
use dal_test::test;
use si_pkg::{SocketSpecArity, SocketSpecKind};
use ulid::Ulid;

#[test]
async fn list_modules(ctx: &DalContext) {
Expand Down Expand Up @@ -169,3 +171,72 @@ async fn module_export_simple(ctx: &mut DalContext) {

assert_eq!(exported_pkg_func_names, expected_func_names);
}

#[test]
async fn dummy_sync(ctx: &DalContext) {
let schema = Schema::find_by_name(ctx, "starfield")
.await
.expect("could not perform find by name")
.expect("schema not found");
let schema_variant_id = schema
.get_default_schema_variant_id(ctx)
.await
.expect("could not perform get default schema variant id")
.expect("no schema variant id found");
let module = Module::find_for_member_id(ctx, schema.id())
.await
.expect("could not perform find for module schema id")
.expect("module not found");

// Create dummy latest modules.
let now = Utc::now();
let dummy_latest_module_upgradeable = si_frontend_types::LatestModule {
id: Ulid::new().to_string(),
name: module.name().to_owned(),
description: Some(module.description().to_owned()),
owner_user_id: "this is BS!".to_string(),
owner_display_name: None,
metadata: serde_json::Value::Null,
latest_hash: Ulid::new().to_string(),
latest_hash_created_at: now,
created_at: now,
schema_id: Some(schema.id().into()),
};
let dummy_latest_module_installable = si_frontend_types::LatestModule {
id: Ulid::new().to_string(),
name: "threadripper 7980X".to_string(),
description: None,
owner_user_id: "this is also BS!".to_string(),
owner_display_name: None,
metadata: serde_json::Value::Null,
latest_hash: Ulid::new().to_string(),
latest_hash_created_at: now,
created_at: now,
schema_id: Some(Ulid::new().to_string()),
};

// Assemble our expected result.
let mut expected = si_frontend_types::SyncedModules::new();
expected.upgradeable.insert(
schema_variant_id.into(),
dummy_latest_module_upgradeable.clone(),
);
expected
.installable
.push(dummy_latest_module_installable.clone());

// Perform the sync and check that the result is what we expect.
let actual = Module::sync(
ctx,
vec![
dummy_latest_module_upgradeable,
dummy_latest_module_installable,
],
)
.await
.expect("could not sync");
assert_eq!(
expected, // expected
actual // actual
);
}
10 changes: 1 addition & 9 deletions lib/sdf-server/src/server/service/v2/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,19 @@ use axum::{
use telemetry::prelude::*;
use thiserror::Error;

use dal::SchemaId;

use crate::{server::state::AppState, service::ApiError};

mod sync;

#[remain::sorted]
#[derive(Debug, Error)]
pub enum ModulesAPIError {
#[error("too many latest modules for schema: {0} (at least two hashes found: {1} and {2})")]
LatestModuleTooManyForSchema(SchemaId, String, String),
#[error("module error: {0}")]
Module(#[from] dal::module::ModuleError),
#[error("module index client error: {0}")]
ModuleIndexClient(#[from] module_index_client::ModuleIndexClientError),
#[error("module index not configured")]
ModuleIndexNotConfigured,
#[error("module missing schema id (module id: {0}) (module hash: {1})")]
ModuleMissingSchemaId(String, String),
#[error("module not found for schema: {0}")]
ModuleNotFoundForSchema(SchemaId),
#[error("schema error: {0}")]
SchemaVariant(#[from] dal::SchemaVariantError),
#[error("transactions error: {0}")]
Expand All @@ -45,7 +37,7 @@ impl IntoResponse for ModulesAPIError {
Self::Transactions(dal::TransactionsError::ConflictsOccurred(_)) => {
StatusCode::CONFLICT
}
Self::ModuleNotFoundForSchema(_)
Self::Module(dal::module::ModuleError::NotFoundForSchema(_))
| Self::SchemaVariant(dal::SchemaVariantError::NotFound(_)) => StatusCode::NOT_FOUND,
_ => ApiError::DEFAULT_ERROR_STATUS_CODE,
};
Expand Down
Loading

0 comments on commit 27055c7

Please sign in to comment.