Skip to content

Commit

Permalink
Move module sync to dal, fix its tracing and add integration test
Browse files Browse the repository at this point in the history
This commit moves module syncing to the dal. In addition, it fixes
tracing for module syncing and includes more logging. As a result of
module syncing moving to the dal, there's a new integration test too.

Signed-off-by: Nick Gerace <[email protected]>
  • Loading branch information
nickgerace committed Jul 16, 2024
1 parent 8571d97 commit c4d2c47
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 128 deletions.
128 changes: 119 additions & 9 deletions lib/dal/src/module.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use si_events::ulid::Ulid;
use si_events::ContentHash;
use thiserror::Error;
use tokio::sync::TryLockError;

use si_frontend_types as frontend_types;
use si_layer_cache::LayerDbError;
use telemetry::prelude::*;
use thiserror::Error;
use tokio::sync::TryLockError;
use tokio::time::Instant;

use crate::layer_db_types::{ModuleContent, ModuleContentV2};
use crate::workspace_snapshot::content_address::{ContentAddress, ContentAddressDiscriminants};
Expand All @@ -20,7 +22,7 @@ use crate::workspace_snapshot::node_weight::category_node_weight::CategoryNodeKi
use crate::workspace_snapshot::node_weight::{NodeWeight, NodeWeightError};
use crate::workspace_snapshot::WorkspaceSnapshotError;
use crate::{
pk, ChangeSetError, DalContext, Func, FuncError, Schema, SchemaError, SchemaVariant,
pk, ChangeSetError, DalContext, Func, FuncError, Schema, SchemaError, SchemaId, SchemaVariant,
SchemaVariantError, Timestamp, TransactionsError,
};

Expand All @@ -31,16 +33,22 @@ pub enum ModuleError {
ChangeSet(#[from] ChangeSetError),
#[error("edge weight error: {0}")]
EdgeWeight(#[from] EdgeWeightError),
#[error(transparent)]
#[error("func error: {0}")]
Func(#[from] FuncError),
#[error("layer db error: {0}")]
LayerDb(#[from] LayerDbError),
#[error("module missing schema id (module id: {0}) (module hash: {1})")]
MissingSchemaId(String, String),
#[error("node weight error: {0}")]
NodeWeight(#[from] NodeWeightError),
#[error(transparent)]
#[error("module not found for schema: {0}")]
NotFoundForSchema(SchemaId),
#[error("schema error: {0}")]
Schema(#[from] SchemaError),
#[error(transparent)]
#[error("schema variant error: {0}")]
SchemaVariant(#[from] SchemaVariantError),
#[error("too many latest modules for schema: {0} (at least two hashes found: {1} and {2})")]
TooManyLatestModulesForSchema(SchemaId, String, String),
#[error("transactions error: {0}")]
Transactions(#[from] TransactionsError),
#[error("try lock error: {0}")]
Expand Down Expand Up @@ -402,4 +410,106 @@ impl Module {

Ok(modules)
}

/// Takes in a list of [`LatestModules`](si_frontend_types::LatestModule) and creates a
/// [`SyncedModules`](si_frontend_types::SyncedModules) object with them. The object enables callers to know what
/// [`Modules`](Module) can be upgraded and installed.
#[instrument(
name = "module.sync"
skip_all,
level = "debug",
)]
pub async fn sync(
ctx: &DalContext,
latest_modules: Vec<frontend_types::LatestModule>,
) -> ModuleResult<frontend_types::SyncedModules> {
debug!("working with {} latest modules", latest_modules.len());

let start = Instant::now();

// Collect all user facing schema variants. We need to see what can be upgraded.
let schema_variants = SchemaVariant::list_user_facing(ctx).await?;

// Find all local hashes and mark all seen schemas.
let mut local_hashes = HashMap::new();
let mut seen_schema_ids = HashSet::new();
for schema_variant in &schema_variants {
let schema_id: SchemaId = schema_variant.schema_id.into();
seen_schema_ids.insert(schema_id);

if let Entry::Vacant(entry) = local_hashes.entry(schema_id) {
let local_module = Self::find_for_member_id(ctx, schema_id)
.await?
.ok_or(ModuleError::NotFoundForSchema(schema_id))?;
entry.insert(local_module.root_hash().to_owned());
}
}

// Begin populating synced modules.
let mut synced_modules = frontend_types::SyncedModules::new();

// Group the latest hashes by schema. Populate installable modules along the way.
let mut latest_modules_by_schema: HashMap<SchemaId, frontend_types::LatestModule> =
HashMap::new();
for latest_module in latest_modules {
let schema_id: SchemaId = latest_module
.schema_id()
.ok_or(ModuleError::MissingSchemaId(
latest_module.id.to_owned(),
latest_module.latest_hash.to_owned(),
))?
.into();
match latest_modules_by_schema.entry(schema_id) {
Entry::Occupied(entry) => {
let existing: frontend_types::LatestModule = entry.get().to_owned();
return Err(ModuleError::TooManyLatestModulesForSchema(
schema_id,
existing.latest_hash,
latest_module.latest_hash,
));
}
Entry::Vacant(entry) => {
entry.insert(latest_module.to_owned());
}
}

if !seen_schema_ids.contains(&schema_id) {
synced_modules.installable.push(latest_module.to_owned());
}
}
trace!(?synced_modules.installable, "collected installable modules");

// Populate upgradeable modules.
for schema_variant in schema_variants {
let schema_id: SchemaId = schema_variant.schema_id.into();
match (
latest_modules_by_schema.get(&schema_id),
local_hashes.get(&schema_id),
) {
(Some(latest_module), Some(local_hash)) => {
trace!(?latest_module, %local_hash, "comparing hashes");
if &latest_module.latest_hash != local_hash {
synced_modules
.upgradeable
.insert(schema_variant.schema_variant_id, latest_module.to_owned());
}
}
(maybe_latest, maybe_local) => {
trace!(
%schema_id,
%schema_variant.schema_variant_id,
%schema_variant.schema_name,
?maybe_latest,
?maybe_local,
"skipping since there's incomplete data for determining if schema variant can be updated (perhaps module was not prompted to builtin?)"
);
}
}
}
trace!(?synced_modules.upgradeable, "collected upgradeable modules");

debug!("syncing modules took: {:?}", start.elapsed());

Ok(synced_modules)
}
}
71 changes: 71 additions & 0 deletions lib/dal/tests/integration_test/module.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use chrono::Utc;
use dal::module::Module;
use dal::pkg::export::PkgExporter;
use dal::{DalContext, Schema};
use dal_test::test;
use si_pkg::{SocketSpecArity, SocketSpecKind};
use ulid::Ulid;

#[test]
async fn list_modules(ctx: &DalContext) {
Expand Down Expand Up @@ -169,3 +171,72 @@ async fn module_export_simple(ctx: &mut DalContext) {

assert_eq!(exported_pkg_func_names, expected_func_names);
}

#[test]
async fn dummy_sync(ctx: &DalContext) {
let schema = Schema::find_by_name(ctx, "starfield")
.await
.expect("could not perform find by name")
.expect("schema not found");
let schema_variant_id = schema
.get_default_schema_variant_id(ctx)
.await
.expect("could not perform get default schema variant id")
.expect("no schema variant id found");
let module = Module::find_for_member_id(ctx, schema.id())
.await
.expect("could not perform find for module schema id")
.expect("module not found");

// Create dummy latest modules.
let now = Utc::now();
let dummy_latest_module_upgradeable = si_frontend_types::LatestModule {
id: Ulid::new().to_string(),
name: module.name().to_owned(),
description: Some(module.description().to_owned()),
owner_user_id: "this is BS!".to_string(),
owner_display_name: None,
metadata: serde_json::Value::Null,
latest_hash: Ulid::new().to_string(),
latest_hash_created_at: now,
created_at: now,
schema_id: Some(schema.id().into()),
};
let dummy_latest_module_installable = si_frontend_types::LatestModule {
id: Ulid::new().to_string(),
name: "threadripper 7980X".to_string(),
description: None,
owner_user_id: "this is also BS!".to_string(),
owner_display_name: None,
metadata: serde_json::Value::Null,
latest_hash: Ulid::new().to_string(),
latest_hash_created_at: now,
created_at: now,
schema_id: Some(Ulid::new().to_string()),
};

// Assemble our expected result.
let mut expected = si_frontend_types::SyncedModules::new();
expected.upgradeable.insert(
schema_variant_id.into(),
dummy_latest_module_upgradeable.clone(),
);
expected
.installable
.push(dummy_latest_module_installable.clone());

// Perform the sync and check that the result is what we expect.
let actual = Module::sync(
ctx,
vec![
dummy_latest_module_upgradeable,
dummy_latest_module_installable,
],
)
.await
.expect("could not sync");
assert_eq!(
expected, // expected
actual // actual
);
}
10 changes: 1 addition & 9 deletions lib/sdf-server/src/server/service/v2/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,19 @@ use axum::{
use telemetry::prelude::*;
use thiserror::Error;

use dal::SchemaId;

use crate::{server::state::AppState, service::ApiError};

mod sync;

#[remain::sorted]
#[derive(Debug, Error)]
pub enum ModulesAPIError {
#[error("too many latest modules for schema: {0} (at least two hashes found: {1} and {2})")]
LatestModuleTooManyForSchema(SchemaId, String, String),
#[error("module error: {0}")]
Module(#[from] dal::module::ModuleError),
#[error("module index client error: {0}")]
ModuleIndexClient(#[from] module_index_client::ModuleIndexClientError),
#[error("module index not configured")]
ModuleIndexNotConfigured,
#[error("module missing schema id (module id: {0}) (module hash: {1})")]
ModuleMissingSchemaId(String, String),
#[error("module not found for schema: {0}")]
ModuleNotFoundForSchema(SchemaId),
#[error("schema error: {0}")]
SchemaVariant(#[from] dal::SchemaVariantError),
#[error("transactions error: {0}")]
Expand All @@ -45,7 +37,7 @@ impl IntoResponse for ModulesAPIError {
Self::Transactions(dal::TransactionsError::ConflictsOccurred(_)) => {
StatusCode::CONFLICT
}
Self::ModuleNotFoundForSchema(_)
Self::Module(dal::module::ModuleError::NotFoundForSchema(_))
| Self::SchemaVariant(dal::SchemaVariantError::NotFound(_)) => StatusCode::NOT_FOUND,
_ => ApiError::DEFAULT_ERROR_STATUS_CODE,
};
Expand Down
Loading

0 comments on commit c4d2c47

Please sign in to comment.