From c4d2c475f41e2e86e5809a4d62e102682d401b9b Mon Sep 17 00:00:00 2001 From: Nick Gerace Date: Tue, 16 Jul 2024 13:26:42 -0400 Subject: [PATCH] Move module sync to dal, fix its tracing and add integration test This commit moves module syncing to the dal. In addition, it fixes tracing for module syncing and includes more logging. As a result of module syncing moving to the dal, there's a new integration test too. Signed-off-by: Nick Gerace --- lib/dal/src/module.rs | 128 ++++++++++++++++-- lib/dal/tests/integration_test/module.rs | 71 ++++++++++ .../src/server/service/v2/module.rs | 10 +- .../src/server/service/v2/module/sync.rs | 121 ++--------------- 4 files changed, 202 insertions(+), 128 deletions(-) diff --git a/lib/dal/src/module.rs b/lib/dal/src/module.rs index d859ae5b70..a2b941c576 100644 --- a/lib/dal/src/module.rs +++ b/lib/dal/src/module.rs @@ -1,15 +1,17 @@ -use chrono::{DateTime, Utc}; -use std::collections::HashMap; +use std::collections::hash_map::Entry; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use si_events::ulid::Ulid; use si_events::ContentHash; -use thiserror::Error; -use tokio::sync::TryLockError; - +use si_frontend_types as frontend_types; use si_layer_cache::LayerDbError; use telemetry::prelude::*; +use thiserror::Error; +use tokio::sync::TryLockError; +use tokio::time::Instant; use crate::layer_db_types::{ModuleContent, ModuleContentV2}; use crate::workspace_snapshot::content_address::{ContentAddress, ContentAddressDiscriminants}; @@ -20,7 +22,7 @@ use crate::workspace_snapshot::node_weight::category_node_weight::CategoryNodeKi use crate::workspace_snapshot::node_weight::{NodeWeight, NodeWeightError}; use crate::workspace_snapshot::WorkspaceSnapshotError; use crate::{ - pk, ChangeSetError, DalContext, Func, FuncError, Schema, SchemaError, SchemaVariant, + pk, ChangeSetError, DalContext, Func, FuncError, Schema, SchemaError, SchemaId, SchemaVariant, SchemaVariantError, Timestamp, TransactionsError, }; @@ -31,16 +33,22 @@ pub enum ModuleError { ChangeSet(#[from] ChangeSetError), #[error("edge weight error: {0}")] EdgeWeight(#[from] EdgeWeightError), - #[error(transparent)] + #[error("func error: {0}")] Func(#[from] FuncError), #[error("layer db error: {0}")] LayerDb(#[from] LayerDbError), + #[error("module missing schema id (module id: {0}) (module hash: {1})")] + MissingSchemaId(String, String), #[error("node weight error: {0}")] NodeWeight(#[from] NodeWeightError), - #[error(transparent)] + #[error("module not found for schema: {0}")] + NotFoundForSchema(SchemaId), + #[error("schema error: {0}")] Schema(#[from] SchemaError), - #[error(transparent)] + #[error("schema variant error: {0}")] SchemaVariant(#[from] SchemaVariantError), + #[error("too many latest modules for schema: {0} (at least two hashes found: {1} and {2})")] + TooManyLatestModulesForSchema(SchemaId, String, String), #[error("transactions error: {0}")] Transactions(#[from] TransactionsError), #[error("try lock error: {0}")] @@ -402,4 +410,106 @@ impl Module { Ok(modules) } + + /// Takes in a list of [`LatestModules`](si_frontend_types::LatestModule) and creates a + /// [`SyncedModules`](si_frontend_types::SyncedModules) object with them. The object enables callers to know what + /// [`Modules`](Module) can be upgraded and installed. + #[instrument( + name = "module.sync" + skip_all, + level = "debug", + )] + pub async fn sync( + ctx: &DalContext, + latest_modules: Vec, + ) -> ModuleResult { + debug!("working with {} latest modules", latest_modules.len()); + + let start = Instant::now(); + + // Collect all user facing schema variants. We need to see what can be upgraded. + let schema_variants = SchemaVariant::list_user_facing(ctx).await?; + + // Find all local hashes and mark all seen schemas. + let mut local_hashes = HashMap::new(); + let mut seen_schema_ids = HashSet::new(); + for schema_variant in &schema_variants { + let schema_id: SchemaId = schema_variant.schema_id.into(); + seen_schema_ids.insert(schema_id); + + if let Entry::Vacant(entry) = local_hashes.entry(schema_id) { + let local_module = Self::find_for_member_id(ctx, schema_id) + .await? + .ok_or(ModuleError::NotFoundForSchema(schema_id))?; + entry.insert(local_module.root_hash().to_owned()); + } + } + + // Begin populating synced modules. + let mut synced_modules = frontend_types::SyncedModules::new(); + + // Group the latest hashes by schema. Populate installable modules along the way. + let mut latest_modules_by_schema: HashMap = + HashMap::new(); + for latest_module in latest_modules { + let schema_id: SchemaId = latest_module + .schema_id() + .ok_or(ModuleError::MissingSchemaId( + latest_module.id.to_owned(), + latest_module.latest_hash.to_owned(), + ))? + .into(); + match latest_modules_by_schema.entry(schema_id) { + Entry::Occupied(entry) => { + let existing: frontend_types::LatestModule = entry.get().to_owned(); + return Err(ModuleError::TooManyLatestModulesForSchema( + schema_id, + existing.latest_hash, + latest_module.latest_hash, + )); + } + Entry::Vacant(entry) => { + entry.insert(latest_module.to_owned()); + } + } + + if !seen_schema_ids.contains(&schema_id) { + synced_modules.installable.push(latest_module.to_owned()); + } + } + trace!(?synced_modules.installable, "collected installable modules"); + + // Populate upgradeable modules. + for schema_variant in schema_variants { + let schema_id: SchemaId = schema_variant.schema_id.into(); + match ( + latest_modules_by_schema.get(&schema_id), + local_hashes.get(&schema_id), + ) { + (Some(latest_module), Some(local_hash)) => { + trace!(?latest_module, %local_hash, "comparing hashes"); + if &latest_module.latest_hash != local_hash { + synced_modules + .upgradeable + .insert(schema_variant.schema_variant_id, latest_module.to_owned()); + } + } + (maybe_latest, maybe_local) => { + trace!( + %schema_id, + %schema_variant.schema_variant_id, + %schema_variant.schema_name, + ?maybe_latest, + ?maybe_local, + "skipping since there's incomplete data for determining if schema variant can be updated (perhaps module was not prompted to builtin?)" + ); + } + } + } + trace!(?synced_modules.upgradeable, "collected upgradeable modules"); + + debug!("syncing modules took: {:?}", start.elapsed()); + + Ok(synced_modules) + } } diff --git a/lib/dal/tests/integration_test/module.rs b/lib/dal/tests/integration_test/module.rs index d023ca5582..180e7070b0 100644 --- a/lib/dal/tests/integration_test/module.rs +++ b/lib/dal/tests/integration_test/module.rs @@ -1,8 +1,10 @@ +use chrono::Utc; use dal::module::Module; use dal::pkg::export::PkgExporter; use dal::{DalContext, Schema}; use dal_test::test; use si_pkg::{SocketSpecArity, SocketSpecKind}; +use ulid::Ulid; #[test] async fn list_modules(ctx: &DalContext) { @@ -169,3 +171,72 @@ async fn module_export_simple(ctx: &mut DalContext) { assert_eq!(exported_pkg_func_names, expected_func_names); } + +#[test] +async fn dummy_sync(ctx: &DalContext) { + let schema = Schema::find_by_name(ctx, "starfield") + .await + .expect("could not perform find by name") + .expect("schema not found"); + let schema_variant_id = schema + .get_default_schema_variant_id(ctx) + .await + .expect("could not perform get default schema variant id") + .expect("no schema variant id found"); + let module = Module::find_for_member_id(ctx, schema.id()) + .await + .expect("could not perform find for module schema id") + .expect("module not found"); + + // Create dummy latest modules. + let now = Utc::now(); + let dummy_latest_module_upgradeable = si_frontend_types::LatestModule { + id: Ulid::new().to_string(), + name: module.name().to_owned(), + description: Some(module.description().to_owned()), + owner_user_id: "this is BS!".to_string(), + owner_display_name: None, + metadata: serde_json::Value::Null, + latest_hash: Ulid::new().to_string(), + latest_hash_created_at: now, + created_at: now, + schema_id: Some(schema.id().into()), + }; + let dummy_latest_module_installable = si_frontend_types::LatestModule { + id: Ulid::new().to_string(), + name: "threadripper 7980X".to_string(), + description: None, + owner_user_id: "this is also BS!".to_string(), + owner_display_name: None, + metadata: serde_json::Value::Null, + latest_hash: Ulid::new().to_string(), + latest_hash_created_at: now, + created_at: now, + schema_id: Some(Ulid::new().to_string()), + }; + + // Assemble our expected result. + let mut expected = si_frontend_types::SyncedModules::new(); + expected.upgradeable.insert( + schema_variant_id.into(), + dummy_latest_module_upgradeable.clone(), + ); + expected + .installable + .push(dummy_latest_module_installable.clone()); + + // Perform the sync and check that the result is what we expect. + let actual = Module::sync( + ctx, + vec![ + dummy_latest_module_upgradeable, + dummy_latest_module_installable, + ], + ) + .await + .expect("could not sync"); + assert_eq!( + expected, // expected + actual // actual + ); +} diff --git a/lib/sdf-server/src/server/service/v2/module.rs b/lib/sdf-server/src/server/service/v2/module.rs index 5505dd45d6..29bb6c2062 100644 --- a/lib/sdf-server/src/server/service/v2/module.rs +++ b/lib/sdf-server/src/server/service/v2/module.rs @@ -7,8 +7,6 @@ use axum::{ use telemetry::prelude::*; use thiserror::Error; -use dal::SchemaId; - use crate::{server::state::AppState, service::ApiError}; mod sync; @@ -16,18 +14,12 @@ mod sync; #[remain::sorted] #[derive(Debug, Error)] pub enum ModulesAPIError { - #[error("too many latest modules for schema: {0} (at least two hashes found: {1} and {2})")] - LatestModuleTooManyForSchema(SchemaId, String, String), #[error("module error: {0}")] Module(#[from] dal::module::ModuleError), #[error("module index client error: {0}")] ModuleIndexClient(#[from] module_index_client::ModuleIndexClientError), #[error("module index not configured")] ModuleIndexNotConfigured, - #[error("module missing schema id (module id: {0}) (module hash: {1})")] - ModuleMissingSchemaId(String, String), - #[error("module not found for schema: {0}")] - ModuleNotFoundForSchema(SchemaId), #[error("schema error: {0}")] SchemaVariant(#[from] dal::SchemaVariantError), #[error("transactions error: {0}")] @@ -45,7 +37,7 @@ impl IntoResponse for ModulesAPIError { Self::Transactions(dal::TransactionsError::ConflictsOccurred(_)) => { StatusCode::CONFLICT } - Self::ModuleNotFoundForSchema(_) + Self::Module(dal::module::ModuleError::NotFoundForSchema(_)) | Self::SchemaVariant(dal::SchemaVariantError::NotFound(_)) => StatusCode::NOT_FOUND, _ => ApiError::DEFAULT_ERROR_STATUS_CODE, }; diff --git a/lib/sdf-server/src/server/service/v2/module/sync.rs b/lib/sdf-server/src/server/service/v2/module/sync.rs index 9ddda739ab..9f4741a187 100644 --- a/lib/sdf-server/src/server/service/v2/module/sync.rs +++ b/lib/sdf-server/src/server/service/v2/module/sync.rs @@ -1,13 +1,9 @@ -use std::collections::{hash_map::Entry, HashMap, HashSet}; - use axum::{ extract::{OriginalUri, Path}, Json, }; -use telemetry::prelude::*; -use tokio::time::Instant; -use dal::{module::Module, ChangeSetId, DalContext, SchemaId, SchemaVariant, WorkspacePk}; +use dal::{module::Module, ChangeSetId, WorkspacePk}; use module_index_client::ModuleIndexClient; use si_frontend_types as frontend_types; @@ -30,119 +26,24 @@ pub async fn sync( .build(access_builder.build(change_set_id.into())) .await?; - let synced_modules = sync_inner(&ctx, &raw_access_token).await?; - - track( - &posthog_client, - &ctx, - &original_uri, - "sync", - serde_json::json!({}), - ); - - Ok(Json(synced_modules)) -} - -#[instrument( - name = "sdf.v2.module.sync.sync_inner" - skip_all, - level = "debug", -)] -async fn sync_inner( - ctx: &DalContext, - raw_access_token: &str, -) -> Result { - let start = Instant::now(); - - // Collect all user facing schema variants. We need to see what can be upgraded. - let schema_variants = SchemaVariant::list_user_facing(ctx).await?; - - // Find all local hashes and mark all seen schemas. - let mut local_hashes = HashMap::new(); - let mut seen_schema_ids = HashSet::new(); - for schema_variant in &schema_variants { - let schema_id: SchemaId = schema_variant.schema_id.into(); - seen_schema_ids.insert(schema_id); - - if let Entry::Vacant(entry) = local_hashes.entry(schema_id) { - let local_module = Module::find_for_member_id(ctx, schema_id) - .await? - .ok_or(ModulesAPIError::ModuleNotFoundForSchema(schema_id))?; - entry.insert(local_module.root_hash().to_owned()); - } - } - - // Collect the latest modules. let latest_modules = { let module_index_url = ctx .module_index_url() .ok_or(ModulesAPIError::ModuleIndexNotConfigured)?; let module_index_client = - ModuleIndexClient::new(module_index_url.try_into()?, raw_access_token); + ModuleIndexClient::new(module_index_url.try_into()?, &raw_access_token); module_index_client.list_latest_modules().await? }; - // Begin populating synced modules. - let mut synced_modules = frontend_types::SyncedModules::new(); - - // Group the latest hashes by schema. Populate installable modules along the way. - let mut latest_modules_by_schema: HashMap = - HashMap::new(); - for latest_module in latest_modules.modules { - let schema_id: SchemaId = latest_module - .schema_id() - .ok_or(ModulesAPIError::ModuleMissingSchemaId( - latest_module.id.to_owned(), - latest_module.latest_hash.to_owned(), - ))? - .into(); - match latest_modules_by_schema.entry(schema_id) { - Entry::Occupied(entry) => { - let existing: frontend_types::LatestModule = entry.get().to_owned(); - return Err(ModulesAPIError::LatestModuleTooManyForSchema( - schema_id, - existing.latest_hash, - latest_module.latest_hash, - )); - } - Entry::Vacant(entry) => { - entry.insert(latest_module.to_owned()); - } - } + let synced_modules = Module::sync(&ctx, latest_modules.modules).await?; - if !seen_schema_ids.contains(&schema_id) { - synced_modules.installable.push(latest_module.to_owned()); - } - } - - // Populate upgradeable modules. - for schema_variant in schema_variants { - let schema_id: SchemaId = schema_variant.schema_id.into(); - match ( - latest_modules_by_schema.get(&schema_id), - local_hashes.get(&schema_id), - ) { - (Some(latest_module), Some(local_hash)) => { - if &latest_module.latest_hash != local_hash { - synced_modules - .upgradeable - .insert(schema_variant.schema_variant_id, latest_module.to_owned()); - } - } - (maybe_latest, maybe_local) => { - trace!( - %schema_id, - %schema_variant.schema_variant_id, - %schema_variant.schema_name, - ?maybe_latest, - ?maybe_local, - "skipping since there's incomplete data for determining if schema variant can be updated (perhaps module was not prompted to builtin?)" - ); - } - } - } - - debug!("syncing modules took: {:?}", start.elapsed()); + track( + &posthog_client, + &ctx, + &original_uri, + "sync", + serde_json::json!({}), + ); - Ok(synced_modules) + Ok(Json(synced_modules)) }