diff --git a/lib/dal/src/module.rs b/lib/dal/src/module.rs index d859ae5b70..a2b941c576 100644 --- a/lib/dal/src/module.rs +++ b/lib/dal/src/module.rs @@ -1,15 +1,17 @@ -use chrono::{DateTime, Utc}; -use std::collections::HashMap; +use std::collections::hash_map::Entry; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use si_events::ulid::Ulid; use si_events::ContentHash; -use thiserror::Error; -use tokio::sync::TryLockError; - +use si_frontend_types as frontend_types; use si_layer_cache::LayerDbError; use telemetry::prelude::*; +use thiserror::Error; +use tokio::sync::TryLockError; +use tokio::time::Instant; use crate::layer_db_types::{ModuleContent, ModuleContentV2}; use crate::workspace_snapshot::content_address::{ContentAddress, ContentAddressDiscriminants}; @@ -20,7 +22,7 @@ use crate::workspace_snapshot::node_weight::category_node_weight::CategoryNodeKi use crate::workspace_snapshot::node_weight::{NodeWeight, NodeWeightError}; use crate::workspace_snapshot::WorkspaceSnapshotError; use crate::{ - pk, ChangeSetError, DalContext, Func, FuncError, Schema, SchemaError, SchemaVariant, + pk, ChangeSetError, DalContext, Func, FuncError, Schema, SchemaError, SchemaId, SchemaVariant, SchemaVariantError, Timestamp, TransactionsError, }; @@ -31,16 +33,22 @@ pub enum ModuleError { ChangeSet(#[from] ChangeSetError), #[error("edge weight error: {0}")] EdgeWeight(#[from] EdgeWeightError), - #[error(transparent)] + #[error("func error: {0}")] Func(#[from] FuncError), #[error("layer db error: {0}")] LayerDb(#[from] LayerDbError), + #[error("module missing schema id (module id: {0}) (module hash: {1})")] + MissingSchemaId(String, String), #[error("node weight error: {0}")] NodeWeight(#[from] NodeWeightError), - #[error(transparent)] + #[error("module not found for schema: {0}")] + NotFoundForSchema(SchemaId), + #[error("schema error: {0}")] Schema(#[from] SchemaError), - #[error(transparent)] + #[error("schema variant error: {0}")] SchemaVariant(#[from] SchemaVariantError), + #[error("too many latest modules for schema: {0} (at least two hashes found: {1} and {2})")] + TooManyLatestModulesForSchema(SchemaId, String, String), #[error("transactions error: {0}")] Transactions(#[from] TransactionsError), #[error("try lock error: {0}")] @@ -402,4 +410,106 @@ impl Module { Ok(modules) } + + /// Takes in a list of [`LatestModules`](si_frontend_types::LatestModule) and creates a + /// [`SyncedModules`](si_frontend_types::SyncedModules) object with them. The object enables callers to know what + /// [`Modules`](Module) can be upgraded and installed. + #[instrument( + name = "module.sync" + skip_all, + level = "debug", + )] + pub async fn sync( + ctx: &DalContext, + latest_modules: Vec, + ) -> ModuleResult { + debug!("working with {} latest modules", latest_modules.len()); + + let start = Instant::now(); + + // Collect all user facing schema variants. We need to see what can be upgraded. + let schema_variants = SchemaVariant::list_user_facing(ctx).await?; + + // Find all local hashes and mark all seen schemas. + let mut local_hashes = HashMap::new(); + let mut seen_schema_ids = HashSet::new(); + for schema_variant in &schema_variants { + let schema_id: SchemaId = schema_variant.schema_id.into(); + seen_schema_ids.insert(schema_id); + + if let Entry::Vacant(entry) = local_hashes.entry(schema_id) { + let local_module = Self::find_for_member_id(ctx, schema_id) + .await? + .ok_or(ModuleError::NotFoundForSchema(schema_id))?; + entry.insert(local_module.root_hash().to_owned()); + } + } + + // Begin populating synced modules. + let mut synced_modules = frontend_types::SyncedModules::new(); + + // Group the latest hashes by schema. Populate installable modules along the way. + let mut latest_modules_by_schema: HashMap = + HashMap::new(); + for latest_module in latest_modules { + let schema_id: SchemaId = latest_module + .schema_id() + .ok_or(ModuleError::MissingSchemaId( + latest_module.id.to_owned(), + latest_module.latest_hash.to_owned(), + ))? + .into(); + match latest_modules_by_schema.entry(schema_id) { + Entry::Occupied(entry) => { + let existing: frontend_types::LatestModule = entry.get().to_owned(); + return Err(ModuleError::TooManyLatestModulesForSchema( + schema_id, + existing.latest_hash, + latest_module.latest_hash, + )); + } + Entry::Vacant(entry) => { + entry.insert(latest_module.to_owned()); + } + } + + if !seen_schema_ids.contains(&schema_id) { + synced_modules.installable.push(latest_module.to_owned()); + } + } + trace!(?synced_modules.installable, "collected installable modules"); + + // Populate upgradeable modules. + for schema_variant in schema_variants { + let schema_id: SchemaId = schema_variant.schema_id.into(); + match ( + latest_modules_by_schema.get(&schema_id), + local_hashes.get(&schema_id), + ) { + (Some(latest_module), Some(local_hash)) => { + trace!(?latest_module, %local_hash, "comparing hashes"); + if &latest_module.latest_hash != local_hash { + synced_modules + .upgradeable + .insert(schema_variant.schema_variant_id, latest_module.to_owned()); + } + } + (maybe_latest, maybe_local) => { + trace!( + %schema_id, + %schema_variant.schema_variant_id, + %schema_variant.schema_name, + ?maybe_latest, + ?maybe_local, + "skipping since there's incomplete data for determining if schema variant can be updated (perhaps module was not prompted to builtin?)" + ); + } + } + } + trace!(?synced_modules.upgradeable, "collected upgradeable modules"); + + debug!("syncing modules took: {:?}", start.elapsed()); + + Ok(synced_modules) + } } diff --git a/lib/dal/tests/integration_test/module.rs b/lib/dal/tests/integration_test/module.rs index d023ca5582..180e7070b0 100644 --- a/lib/dal/tests/integration_test/module.rs +++ b/lib/dal/tests/integration_test/module.rs @@ -1,8 +1,10 @@ +use chrono::Utc; use dal::module::Module; use dal::pkg::export::PkgExporter; use dal::{DalContext, Schema}; use dal_test::test; use si_pkg::{SocketSpecArity, SocketSpecKind}; +use ulid::Ulid; #[test] async fn list_modules(ctx: &DalContext) { @@ -169,3 +171,72 @@ async fn module_export_simple(ctx: &mut DalContext) { assert_eq!(exported_pkg_func_names, expected_func_names); } + +#[test] +async fn dummy_sync(ctx: &DalContext) { + let schema = Schema::find_by_name(ctx, "starfield") + .await + .expect("could not perform find by name") + .expect("schema not found"); + let schema_variant_id = schema + .get_default_schema_variant_id(ctx) + .await + .expect("could not perform get default schema variant id") + .expect("no schema variant id found"); + let module = Module::find_for_member_id(ctx, schema.id()) + .await + .expect("could not perform find for module schema id") + .expect("module not found"); + + // Create dummy latest modules. + let now = Utc::now(); + let dummy_latest_module_upgradeable = si_frontend_types::LatestModule { + id: Ulid::new().to_string(), + name: module.name().to_owned(), + description: Some(module.description().to_owned()), + owner_user_id: "this is BS!".to_string(), + owner_display_name: None, + metadata: serde_json::Value::Null, + latest_hash: Ulid::new().to_string(), + latest_hash_created_at: now, + created_at: now, + schema_id: Some(schema.id().into()), + }; + let dummy_latest_module_installable = si_frontend_types::LatestModule { + id: Ulid::new().to_string(), + name: "threadripper 7980X".to_string(), + description: None, + owner_user_id: "this is also BS!".to_string(), + owner_display_name: None, + metadata: serde_json::Value::Null, + latest_hash: Ulid::new().to_string(), + latest_hash_created_at: now, + created_at: now, + schema_id: Some(Ulid::new().to_string()), + }; + + // Assemble our expected result. + let mut expected = si_frontend_types::SyncedModules::new(); + expected.upgradeable.insert( + schema_variant_id.into(), + dummy_latest_module_upgradeable.clone(), + ); + expected + .installable + .push(dummy_latest_module_installable.clone()); + + // Perform the sync and check that the result is what we expect. + let actual = Module::sync( + ctx, + vec![ + dummy_latest_module_upgradeable, + dummy_latest_module_installable, + ], + ) + .await + .expect("could not sync"); + assert_eq!( + expected, // expected + actual // actual + ); +} diff --git a/lib/sdf-server/src/server/service/v2/module.rs b/lib/sdf-server/src/server/service/v2/module.rs index 5505dd45d6..29bb6c2062 100644 --- a/lib/sdf-server/src/server/service/v2/module.rs +++ b/lib/sdf-server/src/server/service/v2/module.rs @@ -7,8 +7,6 @@ use axum::{ use telemetry::prelude::*; use thiserror::Error; -use dal::SchemaId; - use crate::{server::state::AppState, service::ApiError}; mod sync; @@ -16,18 +14,12 @@ mod sync; #[remain::sorted] #[derive(Debug, Error)] pub enum ModulesAPIError { - #[error("too many latest modules for schema: {0} (at least two hashes found: {1} and {2})")] - LatestModuleTooManyForSchema(SchemaId, String, String), #[error("module error: {0}")] Module(#[from] dal::module::ModuleError), #[error("module index client error: {0}")] ModuleIndexClient(#[from] module_index_client::ModuleIndexClientError), #[error("module index not configured")] ModuleIndexNotConfigured, - #[error("module missing schema id (module id: {0}) (module hash: {1})")] - ModuleMissingSchemaId(String, String), - #[error("module not found for schema: {0}")] - ModuleNotFoundForSchema(SchemaId), #[error("schema error: {0}")] SchemaVariant(#[from] dal::SchemaVariantError), #[error("transactions error: {0}")] @@ -45,7 +37,7 @@ impl IntoResponse for ModulesAPIError { Self::Transactions(dal::TransactionsError::ConflictsOccurred(_)) => { StatusCode::CONFLICT } - Self::ModuleNotFoundForSchema(_) + Self::Module(dal::module::ModuleError::NotFoundForSchema(_)) | Self::SchemaVariant(dal::SchemaVariantError::NotFound(_)) => StatusCode::NOT_FOUND, _ => ApiError::DEFAULT_ERROR_STATUS_CODE, }; diff --git a/lib/sdf-server/src/server/service/v2/module/sync.rs b/lib/sdf-server/src/server/service/v2/module/sync.rs index 9ddda739ab..9f4741a187 100644 --- a/lib/sdf-server/src/server/service/v2/module/sync.rs +++ b/lib/sdf-server/src/server/service/v2/module/sync.rs @@ -1,13 +1,9 @@ -use std::collections::{hash_map::Entry, HashMap, HashSet}; - use axum::{ extract::{OriginalUri, Path}, Json, }; -use telemetry::prelude::*; -use tokio::time::Instant; -use dal::{module::Module, ChangeSetId, DalContext, SchemaId, SchemaVariant, WorkspacePk}; +use dal::{module::Module, ChangeSetId, WorkspacePk}; use module_index_client::ModuleIndexClient; use si_frontend_types as frontend_types; @@ -30,119 +26,24 @@ pub async fn sync( .build(access_builder.build(change_set_id.into())) .await?; - let synced_modules = sync_inner(&ctx, &raw_access_token).await?; - - track( - &posthog_client, - &ctx, - &original_uri, - "sync", - serde_json::json!({}), - ); - - Ok(Json(synced_modules)) -} - -#[instrument( - name = "sdf.v2.module.sync.sync_inner" - skip_all, - level = "debug", -)] -async fn sync_inner( - ctx: &DalContext, - raw_access_token: &str, -) -> Result { - let start = Instant::now(); - - // Collect all user facing schema variants. We need to see what can be upgraded. - let schema_variants = SchemaVariant::list_user_facing(ctx).await?; - - // Find all local hashes and mark all seen schemas. - let mut local_hashes = HashMap::new(); - let mut seen_schema_ids = HashSet::new(); - for schema_variant in &schema_variants { - let schema_id: SchemaId = schema_variant.schema_id.into(); - seen_schema_ids.insert(schema_id); - - if let Entry::Vacant(entry) = local_hashes.entry(schema_id) { - let local_module = Module::find_for_member_id(ctx, schema_id) - .await? - .ok_or(ModulesAPIError::ModuleNotFoundForSchema(schema_id))?; - entry.insert(local_module.root_hash().to_owned()); - } - } - - // Collect the latest modules. let latest_modules = { let module_index_url = ctx .module_index_url() .ok_or(ModulesAPIError::ModuleIndexNotConfigured)?; let module_index_client = - ModuleIndexClient::new(module_index_url.try_into()?, raw_access_token); + ModuleIndexClient::new(module_index_url.try_into()?, &raw_access_token); module_index_client.list_latest_modules().await? }; - // Begin populating synced modules. - let mut synced_modules = frontend_types::SyncedModules::new(); - - // Group the latest hashes by schema. Populate installable modules along the way. - let mut latest_modules_by_schema: HashMap = - HashMap::new(); - for latest_module in latest_modules.modules { - let schema_id: SchemaId = latest_module - .schema_id() - .ok_or(ModulesAPIError::ModuleMissingSchemaId( - latest_module.id.to_owned(), - latest_module.latest_hash.to_owned(), - ))? - .into(); - match latest_modules_by_schema.entry(schema_id) { - Entry::Occupied(entry) => { - let existing: frontend_types::LatestModule = entry.get().to_owned(); - return Err(ModulesAPIError::LatestModuleTooManyForSchema( - schema_id, - existing.latest_hash, - latest_module.latest_hash, - )); - } - Entry::Vacant(entry) => { - entry.insert(latest_module.to_owned()); - } - } + let synced_modules = Module::sync(&ctx, latest_modules.modules).await?; - if !seen_schema_ids.contains(&schema_id) { - synced_modules.installable.push(latest_module.to_owned()); - } - } - - // Populate upgradeable modules. - for schema_variant in schema_variants { - let schema_id: SchemaId = schema_variant.schema_id.into(); - match ( - latest_modules_by_schema.get(&schema_id), - local_hashes.get(&schema_id), - ) { - (Some(latest_module), Some(local_hash)) => { - if &latest_module.latest_hash != local_hash { - synced_modules - .upgradeable - .insert(schema_variant.schema_variant_id, latest_module.to_owned()); - } - } - (maybe_latest, maybe_local) => { - trace!( - %schema_id, - %schema_variant.schema_variant_id, - %schema_variant.schema_name, - ?maybe_latest, - ?maybe_local, - "skipping since there's incomplete data for determining if schema variant can be updated (perhaps module was not prompted to builtin?)" - ); - } - } - } - - debug!("syncing modules took: {:?}", start.elapsed()); + track( + &posthog_client, + &ctx, + &original_uri, + "sync", + serde_json::json!({}), + ); - Ok(synced_modules) + Ok(Json(synced_modules)) }