diff --git a/lib/dal/src/module.rs b/lib/dal/src/module.rs index fa640b7a1d..a8b107a43e 100644 --- a/lib/dal/src/module.rs +++ b/lib/dal/src/module.rs @@ -23,7 +23,7 @@ use crate::workspace_snapshot::node_weight::{NodeWeight, NodeWeightError}; use crate::workspace_snapshot::WorkspaceSnapshotError; use crate::{ pk, ChangeSetError, DalContext, Func, FuncError, Schema, SchemaError, SchemaId, SchemaVariant, - SchemaVariantError, Timestamp, TransactionsError, + SchemaVariantError, SchemaVariantId, Timestamp, TransactionsError, }; #[remain::sorted] @@ -423,37 +423,23 @@ impl Module { pub async fn sync( ctx: &DalContext, latest_modules: Vec, + past_hashes_by_module_id: HashMap>, ) -> ModuleResult { let start = Instant::now(); // Collect all user facing schema variants. We need to see what can be upgraded. let schema_variants = SchemaVariant::list_user_facing(ctx).await?; - // Find all local hashes and mark all seen schemas. - let mut local_hashes = HashMap::new(); - let mut seen_schema_ids = HashSet::new(); - for schema_variant in &schema_variants { - let schema_id: SchemaId = schema_variant.schema_id.into(); - seen_schema_ids.insert(schema_id); - - if let Entry::Vacant(entry) = local_hashes.entry(schema_id) { - match Self::find_for_member_id(ctx, schema_id).await? { - Some(found_local_module) => { - entry.insert(found_local_module.root_hash().to_owned()); - } - None => { - error!(%schema_id, %schema_variant.schema_variant_id, "found orphaned schema (has no corresponding module)"); - } - } - } - } - - // Begin populating synced modules. + // Initialize result struct let mut synced_modules = frontend_types::SyncedModules::new(); - // Group the latest hashes by schema. Populate installable modules along the way. + // For each latest module + // if not for existing schema, is installable + // if for existing locked schema variant, and hash != hash for module of variant, mark variant as upgradeable + // if for existing schema variants, but they are not upgradeable, do nothing let mut latest_modules_by_schema: HashMap = HashMap::new(); + for latest_module in latest_modules { let schema_id: SchemaId = latest_module .schema_id() @@ -476,39 +462,49 @@ impl Module { } } - if !seen_schema_ids.contains(&schema_id) { + // Compute upgradeable variants + + // Due to rust lifetimes, we need to actually create a set here to reference in unwrap. + // The Alternative would be to clone the existing hashset every loop, which is way worse. + let empty_set = HashSet::new(); + let past_hashes = past_hashes_by_module_id + .get(&latest_module.id) + .unwrap_or(&empty_set); + + // A module is for a schema if its schema id matches the other schema + // or if the hash of the package of that schema is contained in the latest hashes list + let mut possible_upgrade_targets = vec![]; + for schema_variant in &schema_variants { + let this_schema_id: SchemaId = schema_variant.schema_id.into(); + let variant_id: SchemaVariantId = schema_variant.schema_variant_id.into(); + + let Some(variant_module) = Self::find_for_member_id(ctx, variant_id).await? else { + continue; + }; + + if this_schema_id == schema_id || past_hashes.contains(&variant_module.root_hash) { + possible_upgrade_targets.push((schema_variant, variant_module)); + } + } + + if possible_upgrade_targets.is_empty() { synced_modules.installable.push(latest_module.to_owned()); + continue; } - } - debug!(?synced_modules.installable, "collected installable modules"); - // Populate upgradeable modules. - for schema_variant in schema_variants { - let schema_id: SchemaId = schema_variant.schema_id.into(); - match ( - latest_modules_by_schema.get(&schema_id), - local_hashes.get(&schema_id), - ) { - (Some(latest_module), Some(local_hash)) => { - debug!(?latest_module, %local_hash, schema_variant.is_locked, "comparing hashes"); - if &latest_module.latest_hash != local_hash && schema_variant.is_locked { - synced_modules - .upgradeable - .insert(schema_variant.schema_variant_id, latest_module.to_owned()); - } - } - (maybe_latest, maybe_local) => { - trace!( - %schema_id, - %schema_variant.schema_variant_id, - %schema_variant.schema_name, - ?maybe_latest, - ?maybe_local, - "skipping since there's incomplete data for determining if schema variant can be updated (perhaps module was not prompted to builtin?)" + for (upgradeable_variant, variant_module) in possible_upgrade_targets { + if latest_module.latest_hash != variant_module.root_hash + && upgradeable_variant.is_locked + { + synced_modules.upgradeable.insert( + upgradeable_variant.schema_variant_id, + latest_module.to_owned(), ); } } } + + debug!(?synced_modules.installable, "collected installable modules"); debug!(?synced_modules.upgradeable, "collected upgradeable modules"); info!("syncing modules took: {:?}", start.elapsed()); diff --git a/lib/dal/tests/integration_test/module.rs b/lib/dal/tests/integration_test/module.rs index 180e7070b0..37b56c34a2 100644 --- a/lib/dal/tests/integration_test/module.rs +++ b/lib/dal/tests/integration_test/module.rs @@ -4,6 +4,7 @@ use dal::pkg::export::PkgExporter; use dal::{DalContext, Schema}; use dal_test::test; use si_pkg::{SocketSpecArity, SocketSpecKind}; +use std::collections::HashMap; use ulid::Ulid; #[test] @@ -232,6 +233,7 @@ async fn dummy_sync(ctx: &DalContext) { dummy_latest_module_upgradeable, dummy_latest_module_installable, ], + HashMap::new(), ) .await .expect("could not sync"); diff --git a/lib/module-index-client/src/lib.rs b/lib/module-index-client/src/lib.rs index 32f35ffaa0..b1359b3d5b 100644 --- a/lib/module-index-client/src/lib.rs +++ b/lib/module-index-client/src/lib.rs @@ -289,4 +289,17 @@ impl ModuleIndexClient { .json() .await?) } + + pub async fn list_module_details(&self) -> ModuleIndexClientResult { + let url = self.base_url.join("modules")?; + + Ok(reqwest::Client::new() + .get(url) + .bearer_auth(&self.auth_token) + .send() + .await? + .error_for_status()? + .json() + .await?) + } } diff --git a/lib/module-index-server/src/routes/list_modules_route.rs b/lib/module-index-server/src/routes/list_modules_route.rs index 2f0cfac65e..6b66775a7f 100644 --- a/lib/module-index-server/src/routes/list_modules_route.rs +++ b/lib/module-index-server/src/routes/list_modules_route.rs @@ -4,7 +4,7 @@ use axum::{ Json, }; use hyper::StatusCode; -use module_index_types::ModuleDetailsResponse; +use module_index_types::{ListModulesResponse, ModuleDetailsResponse}; use sea_orm::{ColumnTrait, DbErr, EntityTrait, QueryFilter, QueryOrder}; use serde::{Deserialize, Serialize}; use thiserror::Error; @@ -46,12 +46,6 @@ pub struct ListModulesRequest { pub su: Option, } -#[derive(Deserialize, Serialize, Debug)] -#[serde(rename_all = "camelCase")] -pub struct ListModulesResponse { - modules: Vec, -} - pub async fn list_module_route( Authorization { user_claim, diff --git a/lib/module-index-types/src/lib.rs b/lib/module-index-types/src/lib.rs index dc9a46440f..636fd05808 100644 --- a/lib/module-index-types/src/lib.rs +++ b/lib/module-index-types/src/lib.rs @@ -20,6 +20,12 @@ pub struct BuiltinsDetailsResponse { pub modules: Vec, } +#[derive(Deserialize, Serialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct ListModulesResponse { + pub modules: Vec, +} + #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct ModuleDetailsResponse { diff --git a/lib/sdf-server/src/server/service/v2/module/sync.rs b/lib/sdf-server/src/server/service/v2/module/sync.rs index 9f4741a187..ece9c0ccfa 100644 --- a/lib/sdf-server/src/server/service/v2/module/sync.rs +++ b/lib/sdf-server/src/server/service/v2/module/sync.rs @@ -2,10 +2,10 @@ use axum::{ extract::{OriginalUri, Path}, Json, }; - use dal::{module::Module, ChangeSetId, WorkspacePk}; use module_index_client::ModuleIndexClient; use si_frontend_types as frontend_types; +use std::collections::{HashMap, HashSet}; use crate::server::{ extract::{AccessBuilder, HandlerContext, PosthogClient, RawAccessToken}, @@ -26,16 +26,32 @@ pub async fn sync( .build(access_builder.build(change_set_id.into())) .await?; - let latest_modules = { + let (latest_modules, module_details) = { let module_index_url = ctx .module_index_url() .ok_or(ModulesAPIError::ModuleIndexNotConfigured)?; let module_index_client = ModuleIndexClient::new(module_index_url.try_into()?, &raw_access_token); - module_index_client.list_latest_modules().await? + ( + module_index_client.list_latest_modules().await?, + module_index_client.list_module_details().await?, + ) }; - let synced_modules = Module::sync(&ctx, latest_modules.modules).await?; + let past_hashes_for_module_id = module_details + .modules + .into_iter() + .filter_map(|m| { + if let Some(past_hashes) = m.past_hashes { + Some((m.id, HashSet::from_iter(past_hashes.into_iter()))) + } else { + None + } + }) + .collect::>(); + + let synced_modules = + Module::sync(&ctx, latest_modules.modules, past_hashes_for_module_id).await?; track( &posthog_client,