Skip to content

Commit

Permalink
Show upgradeable modules based on past hashes too
Browse files Browse the repository at this point in the history
Signed-off-by: Victor Bustamante <[email protected]>
  • Loading branch information
vbustamante committed Jul 23, 2024
1 parent f4d339d commit a28cbd1
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 59 deletions.
92 changes: 44 additions & 48 deletions lib/dal/src/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::workspace_snapshot::node_weight::{NodeWeight, NodeWeightError};
use crate::workspace_snapshot::WorkspaceSnapshotError;
use crate::{
pk, ChangeSetError, DalContext, Func, FuncError, Schema, SchemaError, SchemaId, SchemaVariant,
SchemaVariantError, Timestamp, TransactionsError,
SchemaVariantError, SchemaVariantId, Timestamp, TransactionsError,
};

#[remain::sorted]
Expand Down Expand Up @@ -423,37 +423,23 @@ impl Module {
pub async fn sync(
ctx: &DalContext,
latest_modules: Vec<frontend_types::LatestModule>,
past_hashes_by_module_id: HashMap<String, HashSet<String>>,
) -> ModuleResult<frontend_types::SyncedModules> {
let start = Instant::now();

// Collect all user facing schema variants. We need to see what can be upgraded.
let schema_variants = SchemaVariant::list_user_facing(ctx).await?;

// Find all local hashes and mark all seen schemas.
let mut local_hashes = HashMap::new();
let mut seen_schema_ids = HashSet::new();
for schema_variant in &schema_variants {
let schema_id: SchemaId = schema_variant.schema_id.into();
seen_schema_ids.insert(schema_id);

if let Entry::Vacant(entry) = local_hashes.entry(schema_id) {
match Self::find_for_member_id(ctx, schema_id).await? {
Some(found_local_module) => {
entry.insert(found_local_module.root_hash().to_owned());
}
None => {
error!(%schema_id, %schema_variant.schema_variant_id, "found orphaned schema (has no corresponding module)");
}
}
}
}

// Begin populating synced modules.
// Initialize result struct
let mut synced_modules = frontend_types::SyncedModules::new();

// Group the latest hashes by schema. Populate installable modules along the way.
// For each latest module
// if not for existing schema, is installable
// if for existing locked schema variant, and hash != hash for module of variant, mark variant as upgradeable
// if for existing schema variants, but they are not upgradeable, do nothing
let mut latest_modules_by_schema: HashMap<SchemaId, frontend_types::LatestModule> =
HashMap::new();

for latest_module in latest_modules {
let schema_id: SchemaId = latest_module
.schema_id()
Expand All @@ -476,39 +462,49 @@ impl Module {
}
}

if !seen_schema_ids.contains(&schema_id) {
// Compute upgradeable variants

// Due to rust lifetimes, we need to actually create a set here to reference in unwrap.
// The Alternative would be to clone the existing hashset every loop, which is way worse.
let empty_set = HashSet::new();
let past_hashes = past_hashes_by_module_id
.get(&latest_module.id)
.unwrap_or(&empty_set);

// A module is for a schema if its schema id matches the other schema
// or if the hash of the package of that schema is contained in the latest hashes list
let mut possible_upgrade_targets = vec![];
for schema_variant in &schema_variants {
let this_schema_id: SchemaId = schema_variant.schema_id.into();
let variant_id: SchemaVariantId = schema_variant.schema_variant_id.into();

let Some(variant_module) = Self::find_for_member_id(ctx, variant_id).await? else {
continue;
};

if this_schema_id == schema_id || past_hashes.contains(&variant_module.root_hash) {
possible_upgrade_targets.push((schema_variant, variant_module));
}
}

if possible_upgrade_targets.is_empty() {
synced_modules.installable.push(latest_module.to_owned());
continue;
}
}
debug!(?synced_modules.installable, "collected installable modules");

// Populate upgradeable modules.
for schema_variant in schema_variants {
let schema_id: SchemaId = schema_variant.schema_id.into();
match (
latest_modules_by_schema.get(&schema_id),
local_hashes.get(&schema_id),
) {
(Some(latest_module), Some(local_hash)) => {
debug!(?latest_module, %local_hash, schema_variant.is_locked, "comparing hashes");
if &latest_module.latest_hash != local_hash && schema_variant.is_locked {
synced_modules
.upgradeable
.insert(schema_variant.schema_variant_id, latest_module.to_owned());
}
}
(maybe_latest, maybe_local) => {
trace!(
%schema_id,
%schema_variant.schema_variant_id,
%schema_variant.schema_name,
?maybe_latest,
?maybe_local,
"skipping since there's incomplete data for determining if schema variant can be updated (perhaps module was not prompted to builtin?)"
for (upgradeable_variant, variant_module) in possible_upgrade_targets {
if latest_module.latest_hash != variant_module.root_hash
&& upgradeable_variant.is_locked
{
synced_modules.upgradeable.insert(
upgradeable_variant.schema_variant_id,
latest_module.to_owned(),
);
}
}
}

debug!(?synced_modules.installable, "collected installable modules");
debug!(?synced_modules.upgradeable, "collected upgradeable modules");

info!("syncing modules took: {:?}", start.elapsed());
Expand Down
2 changes: 2 additions & 0 deletions lib/dal/tests/integration_test/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use dal::pkg::export::PkgExporter;
use dal::{DalContext, Schema};
use dal_test::test;
use si_pkg::{SocketSpecArity, SocketSpecKind};
use std::collections::HashMap;
use ulid::Ulid;

#[test]
Expand Down Expand Up @@ -232,6 +233,7 @@ async fn dummy_sync(ctx: &DalContext) {
dummy_latest_module_upgradeable,
dummy_latest_module_installable,
],
HashMap::new(),
)
.await
.expect("could not sync");
Expand Down
13 changes: 13 additions & 0 deletions lib/module-index-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,4 +289,17 @@ impl ModuleIndexClient {
.json()
.await?)
}

pub async fn list_module_details(&self) -> ModuleIndexClientResult<ListModulesResponse> {
let url = self.base_url.join("modules")?;

Ok(reqwest::Client::new()
.get(url)
.bearer_auth(&self.auth_token)
.send()
.await?
.error_for_status()?
.json()
.await?)
}
}
8 changes: 1 addition & 7 deletions lib/module-index-server/src/routes/list_modules_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use axum::{
Json,
};
use hyper::StatusCode;
use module_index_types::ModuleDetailsResponse;
use module_index_types::{ListModulesResponse, ModuleDetailsResponse};
use sea_orm::{ColumnTrait, DbErr, EntityTrait, QueryFilter, QueryOrder};
use serde::{Deserialize, Serialize};
use thiserror::Error;
Expand Down Expand Up @@ -46,12 +46,6 @@ pub struct ListModulesRequest {
pub su: Option<bool>,
}

#[derive(Deserialize, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ListModulesResponse {
modules: Vec<ModuleDetailsResponse>,
}

pub async fn list_module_route(
Authorization {
user_claim,
Expand Down
6 changes: 6 additions & 0 deletions lib/module-index-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ pub struct BuiltinsDetailsResponse {
pub modules: Vec<ModuleDetailsResponse>,
}

#[derive(Deserialize, Serialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ListModulesResponse {
pub modules: Vec<ModuleDetailsResponse>,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ModuleDetailsResponse {
Expand Down
24 changes: 20 additions & 4 deletions lib/sdf-server/src/server/service/v2/module/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use axum::{
extract::{OriginalUri, Path},
Json,
};

use dal::{module::Module, ChangeSetId, WorkspacePk};
use module_index_client::ModuleIndexClient;
use si_frontend_types as frontend_types;
use std::collections::{HashMap, HashSet};

use crate::server::{
extract::{AccessBuilder, HandlerContext, PosthogClient, RawAccessToken},
Expand All @@ -26,16 +26,32 @@ pub async fn sync(
.build(access_builder.build(change_set_id.into()))
.await?;

let latest_modules = {
let (latest_modules, module_details) = {
let module_index_url = ctx
.module_index_url()
.ok_or(ModulesAPIError::ModuleIndexNotConfigured)?;
let module_index_client =
ModuleIndexClient::new(module_index_url.try_into()?, &raw_access_token);
module_index_client.list_latest_modules().await?
(
module_index_client.list_latest_modules().await?,
module_index_client.list_module_details().await?,
)
};

let synced_modules = Module::sync(&ctx, latest_modules.modules).await?;
let past_hashes_for_module_id = module_details
.modules
.into_iter()
.filter_map(|m| {
if let Some(past_hashes) = m.past_hashes {
Some((m.id, HashSet::from_iter(past_hashes.into_iter())))
} else {
None
}
})
.collect::<HashMap<_, _>>();

let synced_modules =
Module::sync(&ctx, latest_modules.modules, past_hashes_for_module_id).await?;

track(
&posthog_client,
Expand Down

0 comments on commit a28cbd1

Please sign in to comment.