From e715c78f192b39fe4eb1edc9669ca1cc615b66bc Mon Sep 17 00:00:00 2001 From: Nick Gerace Date: Wed, 18 Dec 2024 16:53:43 -0500 Subject: [PATCH] WIP: calculate changes hash in an integration test Signed-off-by: Nick Gerace --- lib/dal/src/change_set/approval.rs | 6 +++- .../src/change_set/approval/changes_hash.rs | 27 ++++++++++++++++ lib/dal/src/workspace_snapshot.rs | 26 +++++++++++---- .../src/workspace_snapshot/graph/detector.rs | 25 ++++++++++----- .../graph/tests/detect_changes.rs | 2 +- lib/dal/src/workspace_snapshot/graph/v4.rs | 4 ++- .../integration_test/change_set/approval.rs | 32 ++++++++++++++++--- .../service/v2/change_set/approval_status.rs | 15 +++------ lib/si-events-rs/src/change_set_approval.rs | 4 +++ lib/si-events-rs/src/lib.rs | 1 + 10 files changed, 110 insertions(+), 32 deletions(-) create mode 100644 lib/dal/src/change_set/approval/changes_hash.rs diff --git a/lib/dal/src/change_set/approval.rs b/lib/dal/src/change_set/approval.rs index ee90c7bc4f..bea6796c59 100644 --- a/lib/dal/src/change_set/approval.rs +++ b/lib/dal/src/change_set/approval.rs @@ -7,7 +7,9 @@ use thiserror::Error; pub use si_events::ChangeSetApprovalStatus; -use crate::{DalContext, HistoryActor, TransactionsError}; +use crate::{DalContext, HistoryActor, TransactionsError, WorkspaceSnapshotError}; + +pub mod changes_hash; #[derive(Debug, Error)] pub enum ChangeSetApprovalError { @@ -19,6 +21,8 @@ pub enum ChangeSetApprovalError { StrumParse(#[from] strum::ParseError), #[error("transactions error: {0}")] Transactions(#[from] TransactionsError), + #[error("workspace snapshot error: {0}")] + WorkspaceSnapshot(#[from] WorkspaceSnapshotError), } type Result = std::result::Result; diff --git a/lib/dal/src/change_set/approval/changes_hash.rs b/lib/dal/src/change_set/approval/changes_hash.rs new file mode 100644 index 0000000000..9cc1e2de9a --- /dev/null +++ b/lib/dal/src/change_set/approval/changes_hash.rs @@ -0,0 +1,27 @@ +use si_events::ChangesHash; + +use crate::{workspace_snapshot::graph::detector::Change, DalContext, WorkspaceSnapshot}; + +use super::ChangeSetApprovalError; + +type Result = std::result::Result; + +pub async fn changes_hash(ctx: &DalContext) -> Result { + let changes = detect_changes_from_head(ctx).await?; + let mut hasher = ChangesHash::hasher(); + for change in changes { + hasher.update(change.merkle_tree_hash.as_bytes()); + } + Ok(hasher.finalize()) +} + +pub async fn detect_changes_from_head(ctx: &DalContext) -> Result> { + let head_change_set_id = ctx.get_workspace_default_change_set_id().await?; + let head_snapshot = WorkspaceSnapshot::find_for_change_set(&ctx, head_change_set_id).await?; + let mut changes = head_snapshot + .detect_changes(&ctx.workspace_snapshot()?.clone()) + .await?; + + changes.sort_by_key(|c| c.id); + Ok(changes) +} diff --git a/lib/dal/src/workspace_snapshot.rs b/lib/dal/src/workspace_snapshot.rs index 591e151982..5ff8a54527 100644 --- a/lib/dal/src/workspace_snapshot.rs +++ b/lib/dal/src/workspace_snapshot.rs @@ -37,7 +37,7 @@ pub mod vector_clock; pub use traits::{schema::variant::SchemaVariantExt, socket::input::InputSocketExt}; use graph::correct_transforms::correct_transforms; -use graph::detector::Update; +use graph::detector::{Change, Update}; use graph::{RebaseBatch, WorkspaceSnapshotGraph}; use node_weight::traits::CorrectTransformsError; use std::collections::{HashMap, HashSet}; @@ -713,12 +713,7 @@ impl WorkspaceSnapshot { Ok(()) } - #[instrument( - name = "workspace_snapshot.detect_updates", - level = "debug", - skip_all, - fields() - )] + #[instrument(name = "workspace_snapshot.detect_updates", level = "debug", skip_all)] pub async fn detect_updates( &self, onto_workspace_snapshot: &WorkspaceSnapshot, @@ -735,6 +730,23 @@ impl WorkspaceSnapshot { .await?) } + #[instrument(name = "workspace_snapshot.detect_changes", level = "debug", skip_all)] + pub async fn detect_changes( + &self, + onto_workspace_snapshot: &WorkspaceSnapshot, + ) -> WorkspaceSnapshotResult> { + let self_clone = self.clone(); + let onto_clone = onto_workspace_snapshot.clone(); + + Ok(slow_rt::spawn(async move { + self_clone + .working_copy() + .await + .detect_changes(&*onto_clone.working_copy().await) + })? + .await?) + } + /// Gives the exact node index endpoints of an edge. pub async fn edge_endpoints( &self, diff --git a/lib/dal/src/workspace_snapshot/graph/detector.rs b/lib/dal/src/workspace_snapshot/graph/detector.rs index fa77c3ab94..255096c494 100644 --- a/lib/dal/src/workspace_snapshot/graph/detector.rs +++ b/lib/dal/src/workspace_snapshot/graph/detector.rs @@ -5,7 +5,7 @@ use petgraph::{ visit::{Control, DfsEvent}, }; use serde::{Deserialize, Serialize}; -use si_events::ulid::Ulid; +use si_events::{merkle_tree_hash::MerkleTreeHash, ulid::Ulid}; use strum::EnumDiscriminants; use telemetry::prelude::*; @@ -36,6 +36,12 @@ pub enum Update { }, } +#[derive(Debug)] +pub struct Change { + pub id: Ulid, + pub merkle_tree_hash: MerkleTreeHash, +} + #[derive(Clone, Debug)] enum NodeDifference { NewNode, @@ -82,16 +88,16 @@ impl<'a, 'b> Detector<'a, 'b> { /// /// This assumes that all graphs involved to not have any "garbage" laying around. If in doubt, perform "cleanup" /// on both graphs before creating the [`Detector`]. - pub fn detect_changes(&self) -> Vec { - let mut ids = Vec::new(); + pub fn detect_changes(&self) -> Vec { + let mut changes = Vec::new(); petgraph::visit::depth_first_search( self.updated_graph.graph(), Some(self.updated_graph.root()), - |event| self.calculate_changes_dfs_event(event, &mut ids), + |event| self.calculate_changes_dfs_event(event, &mut changes), ); - ids + changes } fn node_diff_from_base_graph( @@ -368,7 +374,7 @@ impl<'a, 'b> Detector<'a, 'b> { fn calculate_changes_dfs_event( &self, event: DfsEvent, - ids: &mut Vec, + changes: &mut Vec, ) -> Control<()> { if let DfsEvent::Discover(updated_graph_index, _) = event { match self.updated_graph.get_node_weight(updated_graph_index) { @@ -380,8 +386,11 @@ impl<'a, 'b> Detector<'a, 'b> { } // If either the original node weight was not found or it was found the merkle tree hashes differ, - // then we have a node ID that needs to be collected! - ids.push(updated_node_weight.id()); + // then we have information that needs to be collected! + changes.push(Change { + id: updated_node_weight.id(), + merkle_tree_hash: updated_node_weight.merkle_tree_hash(), + }); } Err(err) => error!(?err, "heat death of the universe error: updated node weight not found by updated node index from the same graph"), } diff --git a/lib/dal/src/workspace_snapshot/graph/tests/detect_changes.rs b/lib/dal/src/workspace_snapshot/graph/tests/detect_changes.rs index a246bbb571..801c3d19ab 100644 --- a/lib/dal/src/workspace_snapshot/graph/tests/detect_changes.rs +++ b/lib/dal/src/workspace_snapshot/graph/tests/detect_changes.rs @@ -5,7 +5,7 @@ mod test { type Result = std::result::Result>; #[test] - fn new() -> Result<()> { + fn identical_graphs() -> Result<()> { let base_graph = WorkspaceSnapshotGraphVCurrent::new_for_unit_tests()?; let updated_graph = base_graph.clone(); assert!(base_graph.is_acyclic_directed()); diff --git a/lib/dal/src/workspace_snapshot/graph/v4.rs b/lib/dal/src/workspace_snapshot/graph/v4.rs index 0054cc1543..c9ed66adaf 100644 --- a/lib/dal/src/workspace_snapshot/graph/v4.rs +++ b/lib/dal/src/workspace_snapshot/graph/v4.rs @@ -33,6 +33,8 @@ use crate::{ Timestamp, }; +use super::detector::Change; + pub mod component; pub mod diagram; pub mod schema; @@ -689,7 +691,7 @@ impl WorkspaceSnapshotGraphV4 { Detector::new(self, updated_graph).detect_updates() } - pub fn detect_changes(&self, updated_graph: &Self) -> Vec { + pub fn detect_changes(&self, updated_graph: &Self) -> Vec { Detector::new(self, updated_graph).detect_changes() } diff --git a/lib/dal/tests/integration_test/change_set/approval.rs b/lib/dal/tests/integration_test/change_set/approval.rs index ea91ffd710..29831a322a 100644 --- a/lib/dal/tests/integration_test/change_set/approval.rs +++ b/lib/dal/tests/integration_test/change_set/approval.rs @@ -1,14 +1,22 @@ -use dal::change_set::approval::{ChangeSetApproval, ChangeSetApprovalStatus}; -use dal::DalContext; +use std::collections::HashSet; + +use dal::change_set::approval::{self, ChangeSetApproval, ChangeSetApprovalStatus}; +use dal::{DalContext, Ulid}; use dal_test::color_eyre::eyre::OptionExt; +use dal_test::helpers::{ + create_component_for_default_schema_name_in_default_view, ChangeSetTestHelpers, +}; use dal_test::{test, Result}; use pretty_assertions_sorted::assert_eq; #[test] async fn new(ctx: &mut DalContext) -> Result<()> { + create_component_for_default_schema_name_in_default_view(ctx, "fallout", "soken").await?; + ChangeSetTestHelpers::commit_and_update_snapshot_to_visibility(ctx).await?; + let status = ChangeSetApprovalStatus::Approved; - // FIXME(nick): use a real checksum here. - let checksum = "FIXME".to_string(); + let hash = approval::changes_hash::changes_hash(ctx).await?; + let checksum = hash.to_string(); let new_approval = ChangeSetApproval::new(ctx, status, checksum).await?; assert_eq!( @@ -30,3 +38,19 @@ async fn new(ctx: &mut DalContext) -> Result<()> { Ok(()) } + +#[test] +async fn status(ctx: &mut DalContext) -> Result<()> { + create_component_for_default_schema_name_in_default_view(ctx, "fallout", "find the flame") + .await?; + ChangeSetTestHelpers::commit_and_update_snapshot_to_visibility(ctx).await?; + + let changes = approval::changes_hash::detect_changes_from_head(ctx).await?; + let seen: HashSet = HashSet::from_iter(changes.iter().map(|c| c.id)); + assert_eq!( + changes.len(), // expected + seen.len() // actual + ); + + Ok(()) +} diff --git a/lib/sdf-server/src/service/v2/change_set/approval_status.rs b/lib/sdf-server/src/service/v2/change_set/approval_status.rs index b0ccfdea96..0969936a7a 100644 --- a/lib/sdf-server/src/service/v2/change_set/approval_status.rs +++ b/lib/sdf-server/src/service/v2/change_set/approval_status.rs @@ -1,14 +1,9 @@ -use axum::{ - extract::{Host, OriginalUri, Path, State}, - Json, -}; -use dal::{change_set::approval::ChangeSetApproval, ChangeSet, ChangeSetId, WorkspacePk}; +use axum::{extract::Path, Json}; +use dal::{change_set::approval::ChangeSetApproval, ChangeSetId, WorkspacePk, WorkspaceSnapshot}; -use super::{AppState, Error, Result}; -use crate::{ - extract::{AccessBuilder, HandlerContext, PosthogClient}, - track, -}; +use crate::extract::{AccessBuilder, HandlerContext}; + +use super::Result; pub async fn approval_status( HandlerContext(builder): HandlerContext, diff --git a/lib/si-events-rs/src/change_set_approval.rs b/lib/si-events-rs/src/change_set_approval.rs index 58b88714f4..918f4e3975 100644 --- a/lib/si-events-rs/src/change_set_approval.rs +++ b/lib/si-events-rs/src/change_set_approval.rs @@ -2,6 +2,10 @@ use postgres_types::ToSql; use serde::{Deserialize, Serialize}; use strum::{AsRefStr, Display, EnumString}; +use crate::create_xxhash_type; + +create_xxhash_type!(ChangesHash); + #[remain::sorted] #[derive( AsRefStr, Deserialize, Serialize, Debug, Display, EnumString, PartialEq, Eq, Copy, Clone, ToSql, diff --git a/lib/si-events-rs/src/lib.rs b/lib/si-events-rs/src/lib.rs index 421f4861e2..f39bbc6981 100644 --- a/lib/si-events-rs/src/lib.rs +++ b/lib/si-events-rs/src/lib.rs @@ -31,6 +31,7 @@ pub use crate::{ actor::Actor, actor::UserPk, cas::CasValue, + change_set_approval::ChangesHash, change_set_approval::{ChangeSetApprovalKind, ChangeSetApprovalStatus}, change_set_status::ChangeSetStatus, content_hash::ContentHash,