From 63d02498d557b81b2abf410903e6038b8608fa96 Mon Sep 17 00:00:00 2001 From: Zachary Hamm Date: Fri, 23 Aug 2024 12:21:14 -0500 Subject: [PATCH] feat(dal): graph goes vroom Profiling of slow rebases with lots of updates revealed that most of the time was being spent in replace_references, both calculating the merkle tree hash and cloning node weights in copy_node_index_by_id. This "copy on write" logic no longer makes sense to me with the operational transforms. And in any case, we are still copy on write (from the read only graph to the working copy). This PR removes the "copy on write" logic which no longer makes sense. Instead we mutate the graph in place, and keep a list of touched nodes. Using that list of touched nodes, we recalculate the merkle tree hash for the graph once, in `cleanup_and_merkle_tree_hash`. The result is a massive speed up in graph operations. Even huge rebases come in under a second, and usually under 300ms. Small rebase operations stay very fast, typically under 50ms. --- clippy.toml | 4 + lib/dal/examples/rebase/main.rs | 25 +- lib/dal/src/action.rs | 7 +- lib/dal/src/action/prototype.rs | 4 +- lib/dal/src/attribute/prototype.rs | 2 +- lib/dal/src/attribute/prototype/argument.rs | 8 +- .../prototype/argument/static_value.rs | 4 +- lib/dal/src/attribute/value.rs | 22 +- lib/dal/src/component.rs | 9 +- lib/dal/src/func.rs | 13 +- lib/dal/src/func/argument.rs | 14 +- lib/dal/src/module.rs | 2 +- lib/dal/src/prop.rs | 2 +- lib/dal/src/schema.rs | 2 +- lib/dal/src/schema/variant.rs | 2 +- lib/dal/src/secret.rs | 10 +- lib/dal/src/socket/input.rs | 2 +- lib/dal/src/socket/output.rs | 2 +- lib/dal/src/validation.rs | 5 +- lib/dal/src/workspace_snapshot.rs | 69 ++-- lib/dal/src/workspace_snapshot/graph.rs | 387 ++++++++---------- lib/dal/src/workspace_snapshot/graph/tests.rs | 121 +++--- .../graph/tests/detect_updates.rs | 82 ++-- .../graph/tests/exclusive_outgoing_edges.rs | 8 +- .../workspace_snapshot/graph/tests/rebase.rs | 19 +- 25 files changed, 419 insertions(+), 406 deletions(-) diff --git a/clippy.toml b/clippy.toml index bdb2e97c71..9f450d28ce 100644 --- a/clippy.toml +++ b/clippy.toml @@ -3,4 +3,8 @@ disallowed-methods = [ { path = "std::env::vars", reason = "should not directly access environment variables within library crates, favor configuration injection, passing parameters, etc." }, { path = "std::env::var_os", reason = "should not directly access environment variables within library crates, favor configuration injection, passing parameters, etc." }, { path = "std::env::vars_os", reason = "should not directly access environment variables within library crates, favor configuration injection, passing parameters, etc." }, + { path = "dal::workspace_snapshot::WorkspaceSnapshot::write_working_copy_to_disk", reason = "The snapshot should only be written to disk when debugging"}, + { path = "dal::workspace_snapshot::WorkspaceSnapshot::write_readonly_graph_to_disk", reason = "The snapshot should only be written to disk when debugging"}, + { path = "dal::workspace_snapshot::WorkspaceSnapshot::tiny_dot_to_file", reason = "The snapshot should only be written to disk when debugging"}, + { path = "dal::workspace_snapshot::graph::RebaseBatch::write_to_disk", reason = "Rebase batches should only be written to disk when debugging"}, ] diff --git a/lib/dal/examples/rebase/main.rs b/lib/dal/examples/rebase/main.rs index b6c19a9966..28243f17b8 100644 --- a/lib/dal/examples/rebase/main.rs +++ b/lib/dal/examples/rebase/main.rs @@ -1,14 +1,18 @@ use std::{env, fs::File, io::prelude::*}; +use serde::de::DeserializeOwned; use si_layer_cache::db::serialize; -use dal::WorkspaceSnapshotGraphV2; +use dal::{ + workspace_snapshot::graph::{correct_transforms::correct_transforms, RebaseBatch}, + WorkspaceSnapshotGraphV2, +}; type Result = std::result::Result>; -const USAGE: &str = "usage: cargo run --example rebase "; +const USAGE: &str = "usage: cargo run --example rebase "; -fn load_snapshot_graph(path: &str) -> Result { +fn load_snapshot_graph(path: &str) -> Result { let mut file = File::open(path)?; let mut bytes = vec![]; file.read_to_end(&mut bytes)?; @@ -19,16 +23,17 @@ fn load_snapshot_graph(path: &str) -> Result { fn main() -> Result<()> { let args: Vec = env::args().take(3).map(Into::into).collect(); let to_rebase_path = args.get(1).expect(USAGE); - let onto_path = args.get(2).expect(USAGE); + let rebase_batch_path = args.get(2).expect(USAGE); - let to_rebase_graph = load_snapshot_graph(to_rebase_path)?; - let onto_graph = load_snapshot_graph(onto_path)?; + let mut to_rebase_graph: WorkspaceSnapshotGraphV2 = load_snapshot_graph(to_rebase_path)?; + let rebase_batch: RebaseBatch = load_snapshot_graph(rebase_batch_path)?; - let updates = to_rebase_graph.detect_updates(&onto_graph); + let corrected_transforms = + correct_transforms(&to_rebase_graph, rebase_batch.updates().to_vec(), false)?; - for update in &updates { - dbg!(update); - } + to_rebase_graph.perform_updates(&corrected_transforms)?; + + dbg!(to_rebase_graph.node_count()); Ok(()) } diff --git a/lib/dal/src/action.rs b/lib/dal/src/action.rs index 193fb3a6b7..d58399344d 100644 --- a/lib/dal/src/action.rs +++ b/lib/dal/src/action.rs @@ -315,9 +315,8 @@ impl Action { let mut new_node_weight = node_weight.clone(); new_node_weight.set_state(state); ctx.workspace_snapshot()? - .add_node(NodeWeight::Action(new_node_weight)) + .add_or_replace_node(NodeWeight::Action(new_node_weight)) .await?; - ctx.workspace_snapshot()?.replace_references(idx).await?; Ok(()) } @@ -342,7 +341,9 @@ impl Action { let originating_change_set_id = ctx.change_set_id(); let node_weight = NodeWeight::new_action(originating_change_set_id, new_id.into(), lineage_id); - ctx.workspace_snapshot()?.add_node(node_weight).await?; + ctx.workspace_snapshot()? + .add_or_replace_node(node_weight) + .await?; let action_category_id = ctx .workspace_snapshot()? diff --git a/lib/dal/src/action/prototype.rs b/lib/dal/src/action/prototype.rs index fe6912a62c..9d91ebcbcd 100644 --- a/lib/dal/src/action/prototype.rs +++ b/lib/dal/src/action/prototype.rs @@ -164,7 +164,9 @@ impl ActionPrototype { let lineage_id = ctx.workspace_snapshot()?.generate_ulid().await?; let node_weight = NodeWeight::new_action_prototype(new_id.into(), lineage_id, kind, name, description); - ctx.workspace_snapshot()?.add_node(node_weight).await?; + ctx.workspace_snapshot()? + .add_or_replace_node(node_weight) + .await?; Self::add_edge_to_func(ctx, new_id, func_id, EdgeWeightKind::new_use()).await?; diff --git a/lib/dal/src/attribute/prototype.rs b/lib/dal/src/attribute/prototype.rs index 025307c08f..3a0b550f0b 100644 --- a/lib/dal/src/attribute/prototype.rs +++ b/lib/dal/src/attribute/prototype.rs @@ -159,7 +159,7 @@ impl AttributePrototype { let lineage_id = workspace_snapshot.generate_ulid().await?; let node_weight = NodeWeight::new_content(id, lineage_id, ContentAddress::AttributePrototype(hash)); - let _node_index = workspace_snapshot.add_node(node_weight).await?; + let _node_index = workspace_snapshot.add_or_replace_node(node_weight).await?; let prototype = AttributePrototype::assemble(id.into(), &content); diff --git a/lib/dal/src/attribute/prototype/argument.rs b/lib/dal/src/attribute/prototype/argument.rs index 7d0df84d96..8e65dee761 100644 --- a/lib/dal/src/attribute/prototype/argument.rs +++ b/lib/dal/src/attribute/prototype/argument.rs @@ -213,7 +213,9 @@ impl AttributePrototypeArgument { let workspace_snapshot = ctx.workspace_snapshot()?; - workspace_snapshot.add_node(node_weight.clone()).await?; + workspace_snapshot + .add_or_replace_node(node_weight.clone()) + .await?; AttributePrototype::add_edge_to_argument( ctx, @@ -268,7 +270,9 @@ impl AttributePrototypeArgument { let prototype_arg: Self = { let workspace_snapshot = ctx.workspace_snapshot()?; - workspace_snapshot.add_node(node_weight.clone()).await?; + workspace_snapshot + .add_or_replace_node(node_weight.clone()) + .await?; AttributePrototype::add_edge_to_argument( ctx, diff --git a/lib/dal/src/attribute/prototype/argument/static_value.rs b/lib/dal/src/attribute/prototype/argument/static_value.rs index 3fe56f1970..fe3ad62575 100644 --- a/lib/dal/src/attribute/prototype/argument/static_value.rs +++ b/lib/dal/src/attribute/prototype/argument/static_value.rs @@ -61,7 +61,9 @@ impl StaticArgumentValue { let node_weight = NodeWeight::new_content(id, lineage_id, ContentAddress::StaticArgumentValue(hash)); - ctx.workspace_snapshot()?.add_node(node_weight).await?; + ctx.workspace_snapshot()? + .add_or_replace_node(node_weight) + .await?; Ok(StaticArgumentValue::assemble(id.into(), content)) } diff --git a/lib/dal/src/attribute/value.rs b/lib/dal/src/attribute/value.rs index e7100b65cd..9594270839 100644 --- a/lib/dal/src/attribute/value.rs +++ b/lib/dal/src/attribute/value.rs @@ -350,7 +350,7 @@ impl AttributeValue { .await?; } else { ctx.workspace_snapshot()? - .add_node(node_weight.clone()) + .add_or_replace_node(node_weight.clone()) .await?; }; @@ -1943,19 +1943,10 @@ impl AttributeValue { func_run_value: FuncRunValue, ) -> AttributeValueResult<()> { let workspace_snapshot = ctx.workspace_snapshot()?; - let (av_idx, av_node_weight) = { - let av_idx = workspace_snapshot - .get_node_index_by_id(attribute_value_id) - .await?; - - ( - av_idx, - workspace_snapshot - .get_node_weight(av_idx) - .await? - .get_attribute_value_node_weight()?, - ) - }; + let av_node_weight = workspace_snapshot + .get_node_weight_by_id(attribute_value_id) + .await? + .get_attribute_value_node_weight()?; let content_value: Option = func_run_value.value().cloned().map(Into::into); @@ -2012,9 +2003,8 @@ impl AttributeValue { .set_unprocessed_value(unprocessed_value_address.map(ContentAddress::JsonValue)); workspace_snapshot - .add_node(NodeWeight::AttributeValue(new_av_node_weight)) + .add_or_replace_node(NodeWeight::AttributeValue(new_av_node_weight)) .await?; - workspace_snapshot.replace_references(av_idx).await?; if ValidationOutput::get_format_for_attribute_value_id(ctx, attribute_value_id) .await? diff --git a/lib/dal/src/component.rs b/lib/dal/src/component.rs index e262ebfc29..c5b8f861a7 100644 --- a/lib/dal/src/component.rs +++ b/lib/dal/src/component.rs @@ -442,7 +442,7 @@ impl Component { let node_weight = NodeWeight::new_component(id, lineage_id, content_address); // Attach component to category and add use edge to schema variant - workspace_snapshot.add_node(node_weight).await?; + workspace_snapshot.add_or_replace_node(node_weight).await?; // Root --> Component Category --> Component (this) let component_category_id = workspace_snapshot @@ -2100,7 +2100,7 @@ impl Component { // We only need to import the AttributePrototypeArgument node, as all of the other relevant // nodes should already exist. ctx.workspace_snapshot()? - .add_node(base_attribute_prototype_argument_node_weight.clone()) + .add_or_replace_node(base_attribute_prototype_argument_node_weight.clone()) .await?; ctx.workspace_snapshot()? .add_edge( @@ -2296,10 +2296,7 @@ impl Component { let mut new_component_node_weight = component_node_weight.clone(); new_component_node_weight.set_to_delete(component.to_delete); ctx.workspace_snapshot()? - .add_node(NodeWeight::Component(new_component_node_weight)) - .await?; - ctx.workspace_snapshot()? - .replace_references(component_idx) + .add_or_replace_node(NodeWeight::Component(new_component_node_weight)) .await?; } diff --git a/lib/dal/src/func.rs b/lib/dal/src/func.rs index c6b4a56225..690ea021da 100644 --- a/lib/dal/src/func.rs +++ b/lib/dal/src/func.rs @@ -268,7 +268,9 @@ impl Func { NodeWeight::new_func(id, lineage_id, name.clone().into(), func_kind, hash); let workspace_snapshot = ctx.workspace_snapshot()?; - workspace_snapshot.add_node(node_weight.clone()).await?; + workspace_snapshot + .add_or_replace_node(node_weight.clone()) + .await?; let func_category_id = workspace_snapshot .get_category_node_or_err(None, CategoryNodeKind::Func) @@ -462,16 +464,9 @@ impl Func { // have changed, this ends up updating the node for the function twice. This could be // optimized to do it only once. if func.name.as_str() != node_weight.name() { - let original_node_index = workspace_snapshot.get_node_index_by_id(func.id).await?; - node_weight.set_name(func.name.as_str()); - - workspace_snapshot - .add_node(NodeWeight::Func(node_weight.clone())) - .await?; - workspace_snapshot - .replace_references(original_node_index) + .add_or_replace_node(NodeWeight::Func(node_weight.clone())) .await?; } let updated = FuncContent::from(func.clone()); diff --git a/lib/dal/src/func/argument.rs b/lib/dal/src/func/argument.rs index 7fe195d955..b7f27f1c5c 100644 --- a/lib/dal/src/func/argument.rs +++ b/lib/dal/src/func/argument.rs @@ -247,7 +247,9 @@ impl FuncArgument { let lineage_id = workspace_snapshot.generate_ulid().await?; let node_weight = NodeWeight::new_func_argument(id, lineage_id, name, hash); - workspace_snapshot.add_node(node_weight.clone()).await?; + workspace_snapshot + .add_or_replace_node(node_weight.clone()) + .await?; Func::add_edge_to_argument(ctx, func_id, id.into(), EdgeWeightKind::new_use()).await?; let func_argument_node_weight = node_weight.get_func_argument_node_weight()?; @@ -441,18 +443,10 @@ impl FuncArgument { // have changed, this ends up updating the node for the function twice. This could be // optimized to do it only once. if func_argument.name.as_str() != node_weight.name() { - let original_node_index = workspace_snapshot - .get_node_index_by_id(func_argument.id) - .await?; - node_weight.set_name(func_argument.name.as_str()); workspace_snapshot - .add_node(NodeWeight::FuncArgument(node_weight.clone())) - .await?; - - workspace_snapshot - .replace_references(original_node_index) + .add_or_replace_node(NodeWeight::FuncArgument(node_weight.clone())) .await?; } let updated = FuncArgumentContentV1::from(func_argument.clone()); diff --git a/lib/dal/src/module.rs b/lib/dal/src/module.rs index cda09c8f25..54852df975 100644 --- a/lib/dal/src/module.rs +++ b/lib/dal/src/module.rs @@ -166,7 +166,7 @@ impl Module { let lineage_id = workspace_snapshot.generate_ulid().await?; let node_weight = NodeWeight::new_content(id, lineage_id, ContentAddress::Module(hash)); - workspace_snapshot.add_node(node_weight).await?; + workspace_snapshot.add_or_replace_node(node_weight).await?; let schema_module_index_id = workspace_snapshot .get_category_node_or_err(None, CategoryNodeKind::Module) diff --git a/lib/dal/src/prop.rs b/lib/dal/src/prop.rs index 279456645b..f9dcf3d8d2 100644 --- a/lib/dal/src/prop.rs +++ b/lib/dal/src/prop.rs @@ -551,7 +551,7 @@ impl Prop { if ordered { workspace_snapshot.add_ordered_node(node_weight).await?; } else { - workspace_snapshot.add_node(node_weight).await?; + workspace_snapshot.add_or_replace_node(node_weight).await?; } Ok(Self::assemble(prop_node_weight, content)) diff --git a/lib/dal/src/schema.rs b/lib/dal/src/schema.rs index 32ad9a5a63..a50a6f6312 100644 --- a/lib/dal/src/schema.rs +++ b/lib/dal/src/schema.rs @@ -157,7 +157,7 @@ impl Schema { let node_weight = NodeWeight::new_content(id.into(), lineage_id, ContentAddress::Schema(hash)); - workspace_snapshot.add_node(node_weight).await?; + workspace_snapshot.add_or_replace_node(node_weight).await?; let schema_category_index_id = workspace_snapshot .get_category_node_or_err(None, CategoryNodeKind::Schema) diff --git a/lib/dal/src/schema/variant.rs b/lib/dal/src/schema/variant.rs index e174615732..b25fdc53d9 100644 --- a/lib/dal/src/schema/variant.rs +++ b/lib/dal/src/schema/variant.rs @@ -493,7 +493,7 @@ impl SchemaVariant { let lineage_id = workspace_snapshot.generate_ulid().await?; let node_weight = NodeWeight::new_content(id, lineage_id, ContentAddress::SchemaVariant(hash)); - workspace_snapshot.add_node(node_weight).await?; + workspace_snapshot.add_or_replace_node(node_weight).await?; // Schema --Use--> SchemaVariant (this) Schema::add_edge_to_variant(ctx, schema_id, id.into(), EdgeWeightKind::new_use()).await?; diff --git a/lib/dal/src/secret.rs b/lib/dal/src/secret.rs index e922127713..d593321cb3 100644 --- a/lib/dal/src/secret.rs +++ b/lib/dal/src/secret.rs @@ -249,7 +249,7 @@ impl Secret { let secret_node_weight = node_weight.get_secret_node_weight()?; let workspace_snapshot = ctx.workspace_snapshot()?; - workspace_snapshot.add_node(node_weight).await?; + workspace_snapshot.add_or_replace_node(node_weight).await?; // Root --> Secret Category --> Secret (this) let secret_category_id = workspace_snapshot @@ -641,16 +641,10 @@ impl Secret { // we always update the actor and timestamp data if anything has changed. This could be // optimized to do it only once. if secret.encrypted_secret_key() != secret_node_weight.encrypted_secret_key() { - let original_node_index = workspace_snapshot.get_node_index_by_id(secret.id).await?; - secret_node_weight.set_encrypted_secret_key(secret.encrypted_secret_key); workspace_snapshot - .add_node(NodeWeight::Secret(secret_node_weight.clone())) - .await?; - - workspace_snapshot - .replace_references(original_node_index) + .add_or_replace_node(NodeWeight::Secret(secret_node_weight.clone())) .await?; } let updated = SecretContentV1::from(secret.clone()); diff --git a/lib/dal/src/socket/input.rs b/lib/dal/src/socket/input.rs index 335ed855e8..d38771cb28 100644 --- a/lib/dal/src/socket/input.rs +++ b/lib/dal/src/socket/input.rs @@ -273,7 +273,7 @@ impl InputSocket { let node_weight = NodeWeight::new_content(id, lineage_id, ContentAddress::InputSocket(hash)); - workspace_snapshot.add_node(node_weight).await?; + workspace_snapshot.add_or_replace_node(node_weight).await?; SchemaVariant::add_edge_to_input_socket( ctx, schema_variant_id, diff --git a/lib/dal/src/socket/output.rs b/lib/dal/src/socket/output.rs index bb60eec8b4..c388ee9317 100644 --- a/lib/dal/src/socket/output.rs +++ b/lib/dal/src/socket/output.rs @@ -199,7 +199,7 @@ impl OutputSocket { let node_weight = NodeWeight::new_content(id, lineage_id, ContentAddress::OutputSocket(hash)); - workspace_snapshot.add_node(node_weight).await?; + workspace_snapshot.add_or_replace_node(node_weight).await?; SchemaVariant::add_edge_to_output_socket( ctx, diff --git a/lib/dal/src/validation.rs b/lib/dal/src/validation.rs index d03f20f62a..80a547b7b6 100644 --- a/lib/dal/src/validation.rs +++ b/lib/dal/src/validation.rs @@ -161,9 +161,8 @@ impl ValidationOutputNode { new_node_weight.new_content_hash(hash)?; workspace_snapshot - .add_node(NodeWeight::Content(new_node_weight)) + .add_or_replace_node(NodeWeight::Content(new_node_weight)) .await?; - workspace_snapshot.replace_references(idx).await?; id } else { @@ -171,7 +170,7 @@ impl ValidationOutputNode { let lineage_id = workspace_snapshot.generate_ulid().await?; let node_weight = NodeWeight::new_content(id, lineage_id, ContentAddress::ValidationOutput(hash)); - workspace_snapshot.add_node(node_weight).await?; + workspace_snapshot.add_or_replace_node(node_weight).await?; workspace_snapshot .add_edge( diff --git a/lib/dal/src/workspace_snapshot.rs b/lib/dal/src/workspace_snapshot.rs index b076edc6c7..6c7e30e268 100644 --- a/lib/dal/src/workspace_snapshot.rs +++ b/lib/dal/src/workspace_snapshot.rs @@ -355,18 +355,22 @@ impl WorkspaceSnapshot { let self_clone = self.clone(); let updates = slow_rt::spawn(async move { let mut working_copy = self_clone.working_copy_mut().await; - working_copy.cleanup(); + working_copy.cleanup_and_merkle_tree_hash()?; - self_clone.read_only_graph.detect_updates(&working_copy) + Ok::, WorkspaceSnapshotGraphError>( + self_clone.read_only_graph.detect_updates(&working_copy), + ) })? - .await?; + .await??; Ok((!updates.is_empty()).then_some(RebaseBatch::new(updates))) } /// Calculates the set of updates made to `updated_snapshot` against - /// `base_snapshot`. For these updates to be correct, `updated_snapshot` must - /// have already seen all the changes made to `base_snapshot` + /// `base_snapshot`. For these updates to be correct, `updated_snapshot` + /// must have already seen all the changes made to `base_snapshot`, and both + /// snapshots should have had their merkle tree hashes calculated with + /// `Self::cleanup_and_merkle_tree_hash`. pub async fn calculate_rebase_batch( base_snapshot: Arc, updated_snapshot: Arc, @@ -430,7 +434,7 @@ impl WorkspaceSnapshot { // listening for requests/processing a nats queue let new_address = slow_rt::spawn(async move { let mut working_copy = self_clone.working_copy_mut().await; - working_copy.cleanup(); + working_copy.cleanup_and_merkle_tree_hash()?; let (new_address, _) = layer_db .workspace_snapshot() @@ -527,8 +531,13 @@ impl WorkspaceSnapshot { self.working_copy().await.is_acyclic_directed() } - pub async fn add_node(&self, node: NodeWeight) -> WorkspaceSnapshotResult { - let new_node_index = self.working_copy_mut().await.add_node(node)?; + /// Adds this node to the graph, or replaces it if a node with the same id + /// already exists. + pub async fn add_or_replace_node( + &self, + node: NodeWeight, + ) -> WorkspaceSnapshotResult { + let new_node_index = self.working_copy_mut().await.add_or_replace_node(node)?; Ok(new_node_index) } @@ -580,8 +589,7 @@ impl WorkspaceSnapshot { } /// Add an edge to the graph, bypassing any cycle checks and using node - /// indices directly. Use with care, since node indices are only reliably - /// if the graph has not yet been modified. + /// indices directly. pub async fn add_edge_unchecked( &self, from_node_index: NodeIndex, @@ -610,6 +618,7 @@ impl WorkspaceSnapshot { edge_weight, to_node_index, )?; + Ok(edge_index) } @@ -635,8 +644,7 @@ impl WorkspaceSnapshot { .await?) } - /// Gives the exact node index endpoints of an edge. Use with care, since - /// node indexes can't be relied on after modifications to the graph. + /// Gives the exact node index endpoints of an edge. pub async fn edge_endpoints( &self, edge_index: EdgeIndex, @@ -661,16 +669,6 @@ impl WorkspaceSnapshot { .import_component_subgraph(&other.read_only_graph, component_node_index)?) } - pub async fn replace_references( - &self, - original_node_index: NodeIndex, - ) -> WorkspaceSnapshotResult<()> { - Ok(self - .working_copy_mut() - .await - .replace_references(original_node_index)?) - } - pub async fn get_node_weight_by_id( &self, id: impl Into, @@ -711,11 +709,27 @@ impl WorkspaceSnapshot { .find_equivalent_node(id, lineage_id)?) } + /// Remove any nodes without incoming edges from the graph, and update the + /// index tables. If you are about to persist the graph, or calculate + /// updates based on this graph and another one, then you want to call + /// `Self::cleanup_and_merkle_tree_hash` instead. pub async fn cleanup(&self) -> WorkspaceSnapshotResult<()> { self.working_copy_mut().await.cleanup(); Ok(()) } + /// Remove any orphaned nodes from the graph, update indexes then + /// recalculate the merkle tree hash based on the nodes touched. *ALWAYS* + /// call this before persisting a snapshot, or calculating updates (it is + /// called already in `Self::write` and `Self::calculate_rebase_batch`) + pub async fn cleanup_and_merkle_tree_hash(&self) -> WorkspaceSnapshotResult<()> { + let mut working_copy = self.working_copy_mut().await; + + working_copy.cleanup_and_merkle_tree_hash()?; + + Ok(()) + } + #[instrument(name = "workspace_snapshot.nodes", level = "debug", skip_all, fields())] pub async fn nodes(&self) -> WorkspaceSnapshotResult> { Ok(self @@ -780,13 +794,6 @@ impl WorkspaceSnapshot { self.working_copy().await.get_node_index_by_id_opt(id) } - pub async fn get_latest_node_index( - &self, - node_index: NodeIndex, - ) -> WorkspaceSnapshotResult { - Ok(self.working_copy().await.get_latest_node_idx(node_index)?) - } - #[instrument(name = "workspace_snapshot.find", level = "debug", skip_all, fields())] pub async fn find( ctx: &DalContext, @@ -1220,8 +1227,6 @@ impl WorkspaceSnapshot { .update_node_id(idx, new_id, new_lineage_id) .await?; - self.replace_references(idx).await?; - Ok(()) } @@ -1412,7 +1417,7 @@ impl WorkspaceSnapshot { let new_dependent_value_node = NodeWeight::new_dependent_value_root(id, lineage_id, value_id); let new_dv_node_id = new_dependent_value_node.id(); - self.add_node(new_dependent_value_node).await?; + self.add_or_replace_node(new_dependent_value_node).await?; self.add_edge( dv_category_id, diff --git a/lib/dal/src/workspace_snapshot/graph.rs b/lib/dal/src/workspace_snapshot/graph.rs index df7a91c3d5..570938feae 100644 --- a/lib/dal/src/workspace_snapshot/graph.rs +++ b/lib/dal/src/workspace_snapshot/graph.rs @@ -126,6 +126,8 @@ pub struct WorkspaceSnapshotGraphV2 { #[serde(skip)] ulid_generator: Arc>, + #[serde(skip)] + touched_node_indices: HashSet, } #[derive(Debug, Clone, Default, Serialize, Deserialize)] @@ -141,6 +143,21 @@ impl RebaseBatch { pub fn updates(&self) -> &[Update] { &self.updates } + + /// Write the rebase batch to disk. This *MAY PANIC*. Use only for + /// debugging. + #[allow(clippy::disallowed_methods)] + pub fn write_to_disk(&self, file_suffix: &str) { + let serialized = serialize::to_vec(self).expect("unable to serialize"); + let filename = format!("{}-{}", Ulid::new(), file_suffix); + + let home_env = std::env::var("HOME").expect("No HOME environment variable set"); + let home = std::path::Path::new(&home_env); + let mut file = File::create(home.join(&filename)).expect("could not create file"); + file.write_all(&serialized).expect("could not write file"); + + println!("Wrote rebase batch to {}", home.join(&filename).display()); + } } impl std::fmt::Debug for WorkspaceSnapshotGraphV2 { @@ -181,6 +198,15 @@ impl WorkspaceSnapshotGraphV2 { Ok(result) } + /// Add a node to the list of touched nodes, so that it is taken into + /// account when recalculating the merkle tree hash for this graph. If a + /// node weight is modified, or if a an outgoing edge is added or removed + /// to/from this node, you must touch the node, or the merkel tree hash will + /// not be updated correctly. + pub fn touch_node(&mut self, node_index: NodeIndex) { + self.touched_node_indices.insert(node_index); + } + pub fn new_from_parts( graph: StableDiGraph, node_index_by_id: HashMap, @@ -193,6 +219,7 @@ impl WorkspaceSnapshotGraphV2 { node_indices_by_lineage_id, root_index, ulid_generator: Arc::new(Mutex::new(Generator::new())), + touched_node_indices: HashSet::new(), } } @@ -232,17 +259,6 @@ impl WorkspaceSnapshotGraphV2 { Ok(()) } - pub fn get_latest_node_idx_opt( - &self, - node_idx: NodeIndex, - ) -> WorkspaceSnapshotGraphResult> { - if !self.graph.contains_node(node_idx) { - return Ok(None); - } - - Ok(Some(self.get_latest_node_idx(node_idx)?)) - } - #[inline(always)] pub fn get_latest_node_idx( &self, @@ -262,21 +278,15 @@ impl WorkspaceSnapshotGraphV2 { if cycle_check { self.add_temp_edge_cycle_check(from_node_index, edge_weight.clone(), to_node_index)?; } - // Because outgoing edges are part of a node's identity, we create a new "from" node - // as we are effectively writing to that node (we'll need to update the merkle tree - // hash), and everything in the graph should be treated as copy-on-write. - let new_from_node_index = self.copy_node_by_index(from_node_index)?; - // Add the new edge to the new version of the "from" node. - let new_edge_index = - self.graph - .update_edge(new_from_node_index, to_node_index, edge_weight); - self.update_merkle_tree_hash(new_from_node_index)?; + self.touch_node(from_node_index); - // Update the rest of the graph to reflect the new node/edge. - self.replace_references(from_node_index)?; + // Add the new edge to the new version of the "from" node. + let edge_index = self + .graph + .update_edge(from_node_index, to_node_index, edge_weight); - Ok(new_edge_index) + Ok(edge_index) } fn add_temp_edge_cycle_check( @@ -343,7 +353,7 @@ impl WorkspaceSnapshotGraphV2 { self.add_edge_inner(from_node_index, edge_weight, to_node_index, false) } - pub(crate) fn remove_node_id(&mut self, id: impl Into) { + pub fn remove_node_id(&mut self, id: impl Into) { self.node_index_by_id.remove(&id.into()); } @@ -362,18 +372,37 @@ impl WorkspaceSnapshotGraphV2 { }) .or_insert_with(|| HashSet::from([node_idx])); self.update_merkle_tree_hash(node_idx)?; + self.touch_node(node_idx); Ok(()) } - pub fn add_node(&mut self, node: NodeWeight) -> WorkspaceSnapshotGraphResult { + /// Adds this node to the graph, or replaces it if a node with the same id + /// already exists. Then, adds it to the list of touched nodes so that the + /// merkle tree hash for it, and any parents, is recalculated. + pub fn add_or_replace_node( + &mut self, + node: NodeWeight, + ) -> WorkspaceSnapshotGraphResult { let node_id = node.id(); let lineage_id = node.lineage_id(); - let new_node_index = self.graph.add_node(node); + let node_idx = self + .get_node_index_by_id_opt(node_id) + .and_then(|current_index| { + self.graph.node_weight_mut(current_index).map(|weight_mut| { + node.clone_into(weight_mut); + current_index + }) + }); - self.add_node_finalize(node_id, lineage_id, new_node_index)?; + let node_idx = match node_idx { + Some(swapped_node_idx) => swapped_node_idx, + None => self.graph.add_node(node), + }; - Ok(new_node_index) + self.add_node_finalize(node_id, lineage_id, node_idx)?; + + Ok(node_idx) } pub fn add_category_node( @@ -383,7 +412,7 @@ impl WorkspaceSnapshotGraphV2 { kind: CategoryNodeKind, ) -> WorkspaceSnapshotGraphResult { let inner_weight = CategoryNodeWeight::new(id, lineage_id, kind); - let new_node_index = self.add_node(NodeWeight::Category(inner_weight))?; + let new_node_index = self.add_or_replace_node(NodeWeight::Category(inner_weight))?; Ok(new_node_index) } @@ -469,25 +498,19 @@ impl WorkspaceSnapshotGraphV2 { }) } - // TODO(nick): fix this clippy error. - #[allow(clippy::type_complexity)] pub fn add_ordered_edge( &mut self, from_node_index: NodeIndex, edge_weight: EdgeWeight, to_node_index: NodeIndex, - ) -> WorkspaceSnapshotGraphResult<(EdgeIndex, Option<(EdgeIndex, NodeIndex, NodeIndex)>)> { - let _start = std::time::Instant::now(); + ) -> WorkspaceSnapshotGraphResult<(EdgeIndex, Option)> { let new_edge_index = self.add_edge(from_node_index, edge_weight, to_node_index)?; - let from_node_index = self.get_latest_node_idx(from_node_index)?; - let to_node_index = self.get_latest_node_idx(to_node_index)?; - // Find the ordering node of the "container" if there is one, and add the thing pointed to // by the `to_node_index` to the ordering. Also point the ordering node at the thing with // an `Ordinal` edge, so that Ordering nodes must be touched *after* the things they order // in a depth first search - let maybe_ordinal_edge_information = if let Some(container_ordering_node_index) = + let maybe_ordinal_edge_index = if let Some(container_ordering_node_index) = self.ordering_node_index_for_container(from_node_index)? { let ordinal_edge_index = self.add_edge( @@ -496,47 +519,36 @@ impl WorkspaceSnapshotGraphV2 { to_node_index, )?; - let container_ordering_node_index = - self.get_latest_node_idx(container_ordering_node_index)?; + let element_id = self + .node_index_to_id(to_node_index) + .ok_or(WorkspaceSnapshotGraphError::NodeWeightNotFound)?; - if let NodeWeight::Ordering(previous_container_ordering_node_weight) = - self.get_node_weight(container_ordering_node_index)? + if let NodeWeight::Ordering(ordering_node_weight) = + self.get_node_weight_mut(container_ordering_node_index)? { - let element_id = self - .node_index_to_id(to_node_index) - .ok_or(WorkspaceSnapshotGraphError::NodeWeightNotFound)?; - - let mut new_container_ordering_node_weight = - previous_container_ordering_node_weight.clone(); - new_container_ordering_node_weight.push_to_order(element_id); - self.add_node(NodeWeight::Ordering(new_container_ordering_node_weight))?; - self.replace_references(container_ordering_node_index)?; + ordering_node_weight.push_to_order(element_id); + self.touch_node(container_ordering_node_index); } - Some(( - ordinal_edge_index, - container_ordering_node_index, - to_node_index, - )) + Some(ordinal_edge_index) } else { None }; - Ok((new_edge_index, maybe_ordinal_edge_information)) + Ok((new_edge_index, maybe_ordinal_edge_index)) } pub fn add_ordered_node( &mut self, node: NodeWeight, ) -> WorkspaceSnapshotGraphResult { - let new_node_index = self.add_node(node)?; + let new_node_index = self.add_or_replace_node(node)?; let ordering_node_id = self.generate_ulid()?; let ordering_node_lineage_id = self.generate_ulid()?; - let ordering_node_index = self.add_node(NodeWeight::Ordering(OrderingNodeWeight::new( - ordering_node_id, - ordering_node_lineage_id, - )))?; + let ordering_node_index = self.add_or_replace_node(NodeWeight::Ordering( + OrderingNodeWeight::new(ordering_node_id, ordering_node_lineage_id), + ))?; let edge_index = self.add_edge( new_node_index, @@ -547,6 +559,19 @@ impl WorkspaceSnapshotGraphV2 { Ok(source) } + /// Remove any orphaned nodes from the graph, then recalculate the merkle + /// tree hash based on the nodes touched. *ALWAYS* call this before + /// persisting a snapshot + pub fn cleanup_and_merkle_tree_hash(&mut self) -> WorkspaceSnapshotGraphResult<()> { + self.cleanup(); + self.recalculate_entire_merkle_tree_hash_based_on_touched_nodes()?; + + Ok(()) + } + + /// Remove any orphaned nodes from the graph. If you are about to persist + /// the graph, or calculate updates based on this graph and another one, then + /// you want to call `Self::cleanup_and_merkle_tree_hash` instead. pub fn cleanup(&mut self) { let start = tokio::time::Instant::now(); @@ -630,13 +655,6 @@ impl WorkspaceSnapshotGraphV2 { Ok(maybe_equivalent_node) } - fn copy_node_by_index( - &mut self, - node_index_to_copy: NodeIndex, - ) -> WorkspaceSnapshotGraphResult { - self.add_node(self.get_node_weight(node_index_to_copy)?.clone()) - } - pub fn detect_updates(&self, updated_graph: &Self) -> Vec { Detector::new(self, updated_graph).detect_updates() } @@ -999,6 +1017,10 @@ impl WorkspaceSnapshotGraphV2 { .and_then(|index| self.get_node_weight_opt(index)) } + /// Gets a mutable reference to the node weight at `node_index`. If you + /// modify this node, you must also touch it by calling `Self::touch_node` + /// so that its merkle tree hash and the merkle tree hash of its parents are + /// both updated. fn get_node_weight_mut( &mut self, node_index: NodeIndex, @@ -1103,7 +1125,7 @@ impl WorkspaceSnapshotGraphV2 { } // Import the node. - self.add_node(other_node_weight.clone())?; + self.add_or_replace_node(other_node_weight.clone())?; // Create all edges with this node as the tail. if let Entry::Occupied(edges) = edges_by_tail.entry(other_node_index) { @@ -1263,86 +1285,60 @@ impl WorkspaceSnapshotGraphV2 { Ok(prop_node_indexes.first().copied()) } + /// Removes the node from the graph. Edges to this node will be + /// automatically removed by petgraph. Be sure to remove the node id from + /// the mappings with `Self::remove_node_id` pub fn remove_node(&mut self, node_index: NodeIndex) { + let incoming_sources: Vec<_> = self + .graph + .neighbors_directed(node_index, Incoming) + .collect(); + + // We have to be sure that we recalculate the merkle tree hash for every + // node that had an outgoing edge to this node + for incoming in incoming_sources { + self.touch_node(incoming); + } + self.graph.remove_node(node_index); } - /// [`StableGraph`] guarantees the stability of [`NodeIndex`] across removals, however there - /// are **NO** guarantees around the stability of [`EdgeIndex`] across removals. If - /// [`Self::cleanup()`] has been called, then any [`EdgeIndex`] found before - /// [`Self::cleanup()`] has run should be considered invalid. + /// Removes an edge of the specified kind between `source_node_index` and + /// `target_node_index`. + /// + /// If the source node has an associated ordering node, the function also + /// removes the edge from the ordering node to the target node, updating the + /// ordering node's order pub fn remove_edge( &mut self, source_node_index: NodeIndex, target_node_index: NodeIndex, edge_kind: EdgeWeightKindDiscriminants, ) -> WorkspaceSnapshotGraphResult<()> { - self.remove_edge_inner(source_node_index, target_node_index, edge_kind) - } - - /// Removes an edge from `source_node_index` to `target_node_index`, and - /// also handles removing an edge from the Ordering node if one exists for - /// the node at `source_node_index`. - fn remove_edge_inner( - &mut self, - source_node_index: NodeIndex, - target_node_index: NodeIndex, - edge_kind: EdgeWeightKindDiscriminants, - ) -> WorkspaceSnapshotGraphResult<()> { - let source_node_index = self.get_latest_node_idx(source_node_index)?; - let target_node_index = self.get_latest_node_idx(target_node_index)?; - - self.copy_node_by_index(source_node_index)?; - self.replace_references(source_node_index)?; - // replace references may copy the node again to a new index - let source_node_index = self.get_latest_node_idx(source_node_index)?; - self.remove_edge_of_kind(source_node_index, target_node_index, edge_kind); + self.touch_node(source_node_index); - if let Some(previous_container_ordering_node_index) = + if let Some(container_ordering_node_idx) = self.ordering_node_index_for_container(source_node_index)? { let element_id = self .node_index_to_id(target_node_index) .ok_or(WorkspaceSnapshotGraphError::NodeWeightNotFound)?; - if let NodeWeight::Ordering(previous_container_ordering_node_weight) = - self.get_node_weight(previous_container_ordering_node_index)? + if let NodeWeight::Ordering(container_ordering_node_weight) = + self.get_node_weight_mut(container_ordering_node_idx)? { - let mut new_container_ordering_node_weight = - previous_container_ordering_node_weight.clone(); - - // We only want to update the ordering of the container if we removed an edge to - // one of the ordered relationships. - if new_container_ordering_node_weight.remove_from_order(element_id) { + if container_ordering_node_weight.remove_from_order(element_id) { self.remove_edge_of_kind( - previous_container_ordering_node_index, + container_ordering_node_idx, target_node_index, EdgeWeightKindDiscriminants::Ordinal, ); - - self.add_node(NodeWeight::Ordering(new_container_ordering_node_weight))?; - self.replace_references(previous_container_ordering_node_index)?; + self.touch_node(container_ordering_node_idx); } } } - let source_node_index = self.get_latest_node_idx(source_node_index)?; - let mut work_queue = VecDeque::from([source_node_index]); - - while let Some(node_index) = work_queue.pop_front() { - self.update_merkle_tree_hash( - // If we updated the ordering node, that means we've invalidated the container's - // NodeIndex (new_source_node_index), so we need to find the new NodeIndex to be able - // to update the container's merkle tree hash. - node_index, - )?; - - for edge_ref in self.graph.edges_directed(node_index, Incoming) { - work_queue.push_back(edge_ref.source()); - } - } - Ok(()) } @@ -1377,83 +1373,15 @@ impl WorkspaceSnapshotGraphV2 { Ok((source, destination)) } - /// Replace references should be called when a node has been changed and copied into the graph. - /// It will use the original_node_index to find the most up to date version of the new node, - /// and replace all edges that point to that old node with edges pointing to the new node. - /// Because the graph is treated as an immutable, copy-on-write structure, this means walking - /// up the graph to the root and copying all nodes that have edges that point to the - /// original_node_index, and all nodes that have edges that point to *those* parent nodes, - /// etc, until we've processed the entire parent tree of the original node. - pub fn replace_references( - &mut self, - original_node_index: NodeIndex, - ) -> WorkspaceSnapshotGraphResult<()> { - let mut work_q = VecDeque::from([original_node_index]); - let mut seen_list = HashSet::new(); - - while let Some(old_node_index) = work_q.pop_front() { - if seen_list.contains(&old_node_index) { - continue; - } - seen_list.insert(old_node_index); - - for edge_ref in self.edges_directed(old_node_index, Direction::Incoming) { - work_q.push_back(edge_ref.source()) - } - - let latest_node_idx = self.get_latest_node_idx(old_node_index)?; - let new_node_index = if latest_node_idx != old_node_index { - latest_node_idx - } else { - self.copy_node_by_index(old_node_index)? - }; - - // Find all outgoing edges weights and find the edge targets. - let mut edges_to_create = Vec::new(); - for edge_ref in self.graph.edges_directed(old_node_index, Outgoing) { - edges_to_create.push((edge_ref.weight().clone(), edge_ref.target(), edge_ref.id())); - } - - // Make copies of these edges where the source is the new node index and the - // destination is one of the following... - // - If an entry exists in `old_to_new_node_indices` for the destination node index, - // use the value of the entry (the destination was affected by the replacement, - // and needs to use the new node index to reflect this). - // - There is no entry in `old_to_new_node_indices`; use the same destination node - // index as the old edge (the destination was *NOT* affected by the replacement, - // and does not have any new information to reflect). - for (edge_weight, destination_node_index, edge_idx) in edges_to_create { - // Need to directly add the edge, without going through `self.add_edge` to avoid - // infinite recursion, and because we're the place doing all the book keeping - // that we'd be interested in happening from `self.add_edge`. - let destination_node_index = self.get_latest_node_idx(destination_node_index)?; - - self.graph.remove_edge(edge_idx); - - self.graph - .update_edge(new_node_index, destination_node_index, edge_weight); - } - - self.update_merkle_tree_hash(new_node_index)?; - } - - // Use the new version of the old root node as our root node. - self.root_index = self.get_latest_node_idx(self.root_index)?; - - Ok(()) - } - pub fn update_content( &mut self, id: Ulid, new_content_hash: ContentHash, ) -> WorkspaceSnapshotGraphResult<()> { - let original_node_index = self.get_node_index_by_id(id)?; - let new_node_index = self.copy_node_by_index(original_node_index)?; - let node_weight = self.get_node_weight_mut(new_node_index)?; + let node_index = self.get_node_index_by_id(id)?; + let node_weight = self.get_node_weight_mut(node_index)?; node_weight.new_content_hash(new_content_hash)?; - - self.replace_references(original_node_index)?; + self.touch_node(node_index); Ok(()) } @@ -1462,14 +1390,13 @@ impl WorkspaceSnapshotGraphV2 { container_id: Ulid, new_order: Vec, ) -> WorkspaceSnapshotGraphResult<()> { - let original_node_index = self + let node_index = self .ordering_node_index_for_container(self.get_node_index_by_id(container_id)?)? .ok_or(WorkspaceSnapshotGraphError::NodeWeightNotFound)?; - let new_node_index = self.copy_node_by_index(original_node_index)?; - let node_weight = self.get_node_weight_mut(new_node_index)?; + let node_weight = self.get_node_weight_mut(node_index)?; node_weight.set_order(new_order)?; + self.touch_node(node_index); - self.replace_references(original_node_index)?; Ok(()) } @@ -1500,8 +1427,9 @@ impl WorkspaceSnapshotGraphV2 { { // Only add the neighbor if it's not one of the ones with an explicit ordering. if !explicitly_ordered_children.contains(&neighbor_node) { - let neighbor_id = self.get_node_weight(neighbor_node)?.id(); - unordered_neighbors.push((neighbor_id, neighbor_node)); + if let Some(neighbor_id) = self.node_index_to_id(neighbor_node) { + unordered_neighbors.push((neighbor_id, neighbor_node)); + } } } // We'll sort the neighbors by the ID in the NodeWeight, as that will result in more stable @@ -1590,6 +1518,36 @@ impl WorkspaceSnapshotGraphV2 { Ok(()) } + /// Does a dfs post-order walk of the DAG, recalculating the merkle tree + /// hash for any nodes we have touched while working on the graph. Should be + /// more efficient than recalculating the entire merkle tree hash, since we + /// will only update the hash for the branches of the graph that have been + /// touched and thus need to be recalculated. + pub fn recalculate_entire_merkle_tree_hash_based_on_touched_nodes( + &mut self, + ) -> WorkspaceSnapshotGraphResult<()> { + let mut dfs = petgraph::visit::DfsPostOrder::new(&self.graph, self.root_index); + + let mut discovered_nodes = HashSet::new(); + + while let Some(node_index) = dfs.next(&self.graph) { + if self.touched_node_indices.contains(&node_index) + || discovered_nodes.contains(&node_index) + { + self.update_merkle_tree_hash(node_index)?; + self.graph + .neighbors_directed(node_index, Incoming) + .for_each(|node_idx| { + discovered_nodes.insert(node_idx); + }); + } + } + + self.touched_node_indices.clear(); + + Ok(()) + } + /// Perform [`Updates`](Update) using [`self`](WorkspaceSnapshotGraph) as the "to rebase" graph /// and a provided graph as the "onto" graph. pub fn perform_updates(&mut self, updates: &[Update]) -> WorkspaceSnapshotGraphResult<()> { @@ -1600,12 +1558,17 @@ impl WorkspaceSnapshotGraphV2 { destination, edge_weight, } => { - let updated_source = self.get_node_index_by_id_opt(source.id); - let destination = self.get_node_index_by_id_opt(destination.id); + let source_idx = self.get_node_index_by_id_opt(source.id); + let destination_idx = self.get_node_index_by_id_opt(destination.id); - if let (Some(updated_source), Some(destination)) = (updated_source, destination) + if let (Some(source_idx), Some(destination_idx)) = (source_idx, destination_idx) { - self.add_edge(updated_source, edge_weight.clone(), destination)?; + self.add_edge_inner( + source_idx, + edge_weight.clone(), + destination_idx, + false, + )?; } } Update::RemoveEdge { @@ -1613,28 +1576,27 @@ impl WorkspaceSnapshotGraphV2 { destination, edge_kind, } => { - let updated_source = self.get_node_index_by_id_opt(source.id); - let destination = self.get_node_index_by_id_opt(destination.id); + let source_idx = self.get_node_index_by_id_opt(source.id); + let destination_idx = self.get_node_index_by_id_opt(destination.id); - if let (Some(updated_source), Some(destination)) = (updated_source, destination) + if let (Some(source_idx), Some(destination_idx)) = (source_idx, destination_idx) { - self.remove_edge_inner(updated_source, destination, *edge_kind)?; + self.remove_edge(source_idx, destination_idx, *edge_kind)?; } } Update::NewNode { node_weight } => { if self.get_node_index_by_id_opt(node_weight.id()).is_none() { - self.add_node(node_weight.to_owned())?; + self.add_or_replace_node(node_weight.to_owned())?; } } Update::ReplaceNode { node_weight } => { - let updated_to_rebase = self.get_node_index_by_id_opt(node_weight.id()); - if let Some(updated_to_rebase) = updated_to_rebase { - self.add_node(node_weight.to_owned())?; - self.replace_references(updated_to_rebase)?; + if self.get_node_index_by_id_opt(node_weight.id()).is_some() { + self.add_or_replace_node(node_weight.to_owned())?; } } } } + Ok(()) } @@ -1656,6 +1618,7 @@ impl WorkspaceSnapshotGraphV2 { .ok_or(WorkspaceSnapshotGraphError::NodeWeightNotFound)?; lambda(node_weight)?; + self.touch_node(node_idx); Ok(()) } diff --git a/lib/dal/src/workspace_snapshot/graph/tests.rs b/lib/dal/src/workspace_snapshot/graph/tests.rs index 9301e0168f..fd1ceab538 100644 --- a/lib/dal/src/workspace_snapshot/graph/tests.rs +++ b/lib/dal/src/workspace_snapshot/graph/tests.rs @@ -36,7 +36,7 @@ fn add_prop_nodes_to_graph<'a, 'b>( .expect("Unable to add prop"); } else { graph - .add_node(prop_node_weight) + .add_or_replace_node(prop_node_weight) .expect("Unable to add prop"); } @@ -181,7 +181,9 @@ mod test { let node_id_map = add_prop_nodes_to_graph(&mut graph, &nodes, false); add_edges(&mut graph, &node_id_map, &edges); - graph.cleanup(); + graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); for (source, target) in edges { let source_idx = match source { @@ -236,7 +238,7 @@ mod test { let schema_id = graph.generate_ulid().expect("Unable to generate Ulid"); let schema_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( schema_id, Ulid::new(), ContentAddress::Schema(ContentHash::new( @@ -246,7 +248,7 @@ mod test { .expect("Unable to add schema"); let schema_variant_id = graph.generate_ulid().expect("Unable to generate Ulid"); let schema_variant_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( schema_variant_id, Ulid::new(), ContentAddress::SchemaVariant(ContentHash::new( @@ -256,7 +258,7 @@ mod test { .expect("Unable to add schema variant"); let component_id = graph.generate_ulid().expect("Unable to generate Ulid"); let component_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( component_id, Ulid::new(), ContentAddress::Component(ContentHash::new( @@ -302,7 +304,7 @@ mod test { let func_id = graph.generate_ulid().expect("Unable to generate Ulid"); let func_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( func_id, Ulid::new(), ContentAddress::Func(ContentHash::new(FuncId::generate().to_string().as_bytes())), @@ -310,7 +312,7 @@ mod test { .expect("Unable to add func"); let prop_id = graph.generate_ulid().expect("Unable to generate Ulid"); let prop_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( prop_id, Ulid::new(), ContentAddress::Prop(ContentHash::new(PropId::generate().to_string().as_bytes())), @@ -355,7 +357,7 @@ mod test { let schema_id = graph.generate_ulid().expect("Unable to generate Ulid"); let initial_schema_node_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( schema_id, Ulid::new(), ContentAddress::Schema(ContentHash::new( @@ -365,7 +367,7 @@ mod test { .expect("Unable to add schema"); let schema_variant_id = graph.generate_ulid().expect("Unable to generate Ulid"); let initial_schema_variant_node_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( schema_variant_id, Ulid::new(), ContentAddress::SchemaVariant(ContentHash::new( @@ -375,7 +377,7 @@ mod test { .expect("Unable to add schema variant"); let component_id = graph.generate_ulid().expect("Unable to generate Ulid"); let initial_component_node_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( component_id, Ulid::new(), ContentAddress::Component(ContentHash::new( @@ -444,7 +446,7 @@ mod test { let schema_id = graph.generate_ulid().expect("Unable to generate Ulid"); let schema_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( schema_id, Ulid::new(), ContentAddress::Schema(ContentHash::from("Constellation")), @@ -452,7 +454,7 @@ mod test { .expect("Unable to add schema"); let schema_variant_id = graph.generate_ulid().expect("Unable to generate Ulid"); let schema_variant_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( schema_variant_id, Ulid::new(), ContentAddress::SchemaVariant(ContentHash::new("Freestar Collective".as_bytes())), @@ -460,7 +462,7 @@ mod test { .expect("Unable to add schema variant"); let component_id = graph.generate_ulid().expect("Unable to generate Ulid"); let component_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( component_id, Ulid::new(), ContentAddress::Component(ContentHash::from("Crimson Fleet")), @@ -503,6 +505,9 @@ mod test { .expect("Unable to add component -> schema variant edge"); // Ensure that the root node merkle tree hash looks as we expect before the update. + graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); let pre_update_root_node_merkle_tree_hash: MerkleTreeHash = MerkleTreeHash::from_str("49a6baef5d1c29f43653e0b7c02dfb73") .expect("able to create hash from hex string"); @@ -518,6 +523,9 @@ mod test { graph .update_content(component_id, updated_content_hash) .expect("Unable to update Component content hash"); + graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); let post_update_root_node_merkle_tree_hash: MerkleTreeHash = MerkleTreeHash::from_str("75febafba241026c63e27ab5b129cb26") @@ -541,7 +549,9 @@ mod test { .content_hash(), // actual ); - graph.cleanup(); + graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); // Ensure that there are not more nodes than the ones that should be in use. assert_eq!(4, graph.node_count()); @@ -574,7 +584,7 @@ mod test { let schema_id = graph.generate_ulid().expect("Unable to generate Ulid"); let schema_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( schema_id, Ulid::new(), ContentAddress::Schema(ContentHash::new( @@ -584,7 +594,7 @@ mod test { .expect("Unable to add schema"); let schema_variant_id = graph.generate_ulid().expect("Unable to generate Ulid"); let schema_variant_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( schema_variant_id, Ulid::new(), ContentAddress::SchemaVariant(ContentHash::new( @@ -612,7 +622,7 @@ mod test { let func_id = graph.generate_ulid().expect("Unable to generate Ulid"); let func_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( func_id, Ulid::new(), ContentAddress::Func(ContentHash::new(FuncId::generate().to_string().as_bytes())), @@ -654,12 +664,14 @@ mod test { .expect("Unable to get NodeIndex"), ) .expect("Unable to add prop -> func edge"); - graph.cleanup(); + graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); graph.dot(); let ordered_prop_1_id = graph.generate_ulid().expect("Unable to generate Ulid"); let ordered_prop_1_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( ordered_prop_1_id, Ulid::new(), ContentAddress::Prop(ContentHash::new(ordered_prop_1_id.to_string().as_bytes())), @@ -677,7 +689,7 @@ mod test { let ordered_prop_2_id = graph.generate_ulid().expect("Unable to generate Ulid"); let ordered_prop_2_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( ordered_prop_2_id, Ulid::new(), ContentAddress::Prop(ContentHash::new(ordered_prop_2_id.to_string().as_bytes())), @@ -695,7 +707,7 @@ mod test { let ordered_prop_3_id = graph.generate_ulid().expect("Unable to generate Ulid"); let ordered_prop_3_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( ordered_prop_3_id, Ulid::new(), ContentAddress::Prop(ContentHash::new(ordered_prop_3_id.to_string().as_bytes())), @@ -710,7 +722,9 @@ mod test { ordered_prop_3_index, ) .expect("Unable to add prop -> ordered_prop_3 edge"); - graph.cleanup(); + graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); graph.dot(); assert_eq!( @@ -752,7 +766,9 @@ mod test { ) .expect("Unable to add root -> prop edge"); - graph.cleanup(); + graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); assert_eq!( Vec::::new(), graph @@ -773,7 +789,7 @@ mod test { let schema_id = graph.generate_ulid().expect("Unable to generate Ulid"); let schema_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( schema_id, Ulid::new(), ContentAddress::Schema(ContentHash::new( @@ -783,7 +799,7 @@ mod test { .expect("Unable to add schema"); let schema_variant_id = graph.generate_ulid().expect("Unable to generate Ulid"); let schema_variant_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( schema_variant_id, Ulid::new(), ContentAddress::SchemaVariant(ContentHash::new( @@ -811,7 +827,7 @@ mod test { let func_id = graph.generate_ulid().expect("Unable to generate Ulid"); let func_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( func_id, Ulid::new(), ContentAddress::Func(ContentHash::new(FuncId::generate().to_string().as_bytes())), @@ -853,12 +869,14 @@ mod test { .expect("Unable to get NodeIndex"), ) .expect("Unable to add prop -> func edge"); - graph.cleanup(); + graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); graph.dot(); let ordered_prop_1_id = graph.generate_ulid().expect("Unable to generate Ulid"); let ordered_prop_1_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( ordered_prop_1_id, Ulid::new(), ContentAddress::Prop(ContentHash::new(ordered_prop_1_id.to_string().as_bytes())), @@ -876,7 +894,7 @@ mod test { let ordered_prop_2_id = graph.generate_ulid().expect("Unable to generate Ulid"); let ordered_prop_2_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( ordered_prop_2_id, Ulid::new(), ContentAddress::Prop(ContentHash::new(ordered_prop_2_id.to_string().as_bytes())), @@ -894,7 +912,7 @@ mod test { let ordered_prop_3_id = graph.generate_ulid().expect("Unable to generate Ulid"); let ordered_prop_3_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( ordered_prop_3_id, Ulid::new(), ContentAddress::Prop(ContentHash::new(ordered_prop_3_id.to_string().as_bytes())), @@ -912,7 +930,7 @@ mod test { let ordered_prop_4_id = graph.generate_ulid().expect("Unable to generate Ulid"); let ordered_prop_4_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( ordered_prop_4_id, Ulid::new(), ContentAddress::Prop(ContentHash::new(ordered_prop_4_id.to_string().as_bytes())), @@ -928,7 +946,9 @@ mod test { ) .expect("Unable to add prop -> ordered_prop_4 edge"); - graph.cleanup(); + graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); graph.dot(); assert_eq!( @@ -984,7 +1004,7 @@ mod test { let schema_id = graph.generate_ulid().expect("Unable to generate Ulid"); let schema_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( schema_id, Ulid::new(), ContentAddress::Schema(ContentHash::new( @@ -994,7 +1014,7 @@ mod test { .expect("Unable to add schema"); let schema_variant_id = graph.generate_ulid().expect("Unable to generate Ulid"); let schema_variant_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( schema_variant_id, Ulid::new(), ContentAddress::SchemaVariant(ContentHash::new( @@ -1022,7 +1042,7 @@ mod test { let schema_variant_2_id = graph.generate_ulid().expect("Unable to generate Ulid"); let schema_variant_2_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( schema_variant_2_id, Ulid::new(), ContentAddress::SchemaVariant(ContentHash::new( @@ -1090,6 +1110,9 @@ mod test { "confirm edges after deletion" ); + graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); let updates = graph.detect_updates(&graph_with_deleted_edge); assert_eq!(1, updates.len()); @@ -1107,7 +1130,7 @@ mod test { let schema_id = graph.generate_ulid().expect("Unable to generate Ulid"); let schema_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( schema_id, Ulid::new(), ContentAddress::Schema(ContentHash::new( @@ -1117,7 +1140,7 @@ mod test { .expect("Unable to add schema"); let schema_variant_id = graph.generate_ulid().expect("Unable to generate Ulid"); let schema_variant_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( Ulid::new(), schema_variant_id, ContentAddress::SchemaVariant(ContentHash::new( @@ -1145,7 +1168,7 @@ mod test { let schema_variant_2_id = graph.generate_ulid().expect("Unable to generate Ulid"); let schema_variant_2_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( schema_variant_2_id, Ulid::new(), ContentAddress::SchemaVariant(ContentHash::new( @@ -1215,7 +1238,7 @@ mod test { let schema_id = graph.generate_ulid().expect("Unable to generate Ulid"); let schema_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( schema_id, Ulid::new(), ContentAddress::Schema(ContentHash::new( @@ -1225,7 +1248,7 @@ mod test { .expect("Unable to add schema"); let schema_variant_id = graph.generate_ulid().expect("Unable to generate Ulid"); let schema_variant_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( schema_variant_id, Ulid::new(), ContentAddress::SchemaVariant(ContentHash::new( @@ -1253,7 +1276,7 @@ mod test { let func_id = graph.generate_ulid().expect("Unable to generate Ulid"); let func_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( func_id, Ulid::new(), ContentAddress::Func(ContentHash::new(FuncId::generate().to_string().as_bytes())), @@ -1295,12 +1318,14 @@ mod test { .expect("Unable to get NodeIndex"), ) .expect("Unable to add prop -> func edge"); - graph.cleanup(); + graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); graph.dot(); let ordered_prop_1_id = graph.generate_ulid().expect("Unable to generate Ulid"); let ordered_prop_1_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( ordered_prop_1_id, Ulid::new(), ContentAddress::Prop(ContentHash::new(ordered_prop_1_id.to_string().as_bytes())), @@ -1318,7 +1343,7 @@ mod test { let ordered_prop_2_id = graph.generate_ulid().expect("Unable to generate Ulid"); let ordered_prop_2_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( ordered_prop_2_id, Ulid::new(), ContentAddress::Prop(ContentHash::new(ordered_prop_2_id.to_string().as_bytes())), @@ -1336,7 +1361,7 @@ mod test { let ordered_prop_3_id = graph.generate_ulid().expect("Unable to generate Ulid"); let ordered_prop_3_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( ordered_prop_3_id, Ulid::new(), ContentAddress::Prop(ContentHash::new(ordered_prop_3_id.to_string().as_bytes())), @@ -1354,7 +1379,7 @@ mod test { let ordered_prop_4_id = graph.generate_ulid().expect("Unable to generate Ulid"); let ordered_prop_4_index = graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( ordered_prop_4_id, Ulid::new(), ContentAddress::Prop(ContentHash::new(ordered_prop_4_id.to_string().as_bytes())), @@ -1370,7 +1395,9 @@ mod test { ) .expect("Unable to add prop -> ordered_prop_4 edge"); - graph.cleanup(); + graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); graph.dot(); assert_eq!( diff --git a/lib/dal/src/workspace_snapshot/graph/tests/detect_updates.rs b/lib/dal/src/workspace_snapshot/graph/tests/detect_updates.rs index 2cdfc768a8..109e34cfad 100644 --- a/lib/dal/src/workspace_snapshot/graph/tests/detect_updates.rs +++ b/lib/dal/src/workspace_snapshot/graph/tests/detect_updates.rs @@ -25,7 +25,7 @@ mod test { let schema_id = base_graph.generate_ulid().expect("Unable to generate Ulid"); let schema_index = base_graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( schema_id, Ulid::new(), ContentAddress::Schema(ContentHash::from("Schema A")), @@ -33,7 +33,7 @@ mod test { .expect("Unable to add Schema A"); let schema_variant_id = base_graph.generate_ulid().expect("Unable to generate Ulid"); let schema_variant_index = base_graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( schema_variant_id, Ulid::new(), ContentAddress::SchemaVariant(ContentHash::from("Schema Variant A")), @@ -59,11 +59,14 @@ mod test { base_graph.dot(); + base_graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); let new_graph = base_graph.clone(); let new_onto_component_id = new_graph.generate_ulid().expect("Unable to generate Ulid"); let new_onto_component_index = base_graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( new_onto_component_id, Ulid::new(), ContentAddress::Component(ContentHash::from("Component B")), @@ -90,6 +93,9 @@ mod test { base_graph.dot(); + base_graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); let updates = new_graph.detect_updates(&base_graph); let _new_onto_component_index = base_graph @@ -111,7 +117,7 @@ mod test { let component_id = base_graph.generate_ulid().expect("Unable to generate Ulid"); let component_index = base_graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( component_id, Ulid::new(), ContentAddress::Component(ContentHash::from("Component A")), @@ -125,14 +131,16 @@ mod test { ) .expect("Unable to add root -> component edge"); - base_graph.cleanup(); + base_graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); base_graph.dot(); let mut new_graph = base_graph.clone(); let new_component_id = new_graph.generate_ulid().expect("Unable to generate Ulid"); let new_component_index = new_graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( new_component_id, Ulid::new(), ContentAddress::Component(ContentHash::from("Component B")), @@ -146,7 +154,9 @@ mod test { ) .expect("Unable to add root -> component edge"); - new_graph.cleanup(); + new_graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); new_graph.dot(); let updates = base_graph.detect_updates(&new_graph); @@ -194,7 +204,7 @@ mod test { .generate_ulid() .expect("Unable to generate Ulid"); let ordered_prop_1_index = active_graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( ordered_prop_1_id, Ulid::new(), ContentAddress::Prop(ContentHash::new(ordered_prop_1_id.to_string().as_bytes())), @@ -210,7 +220,9 @@ mod test { ) .expect("Unable to add prop -> ordered_prop_1 edge"); - active_graph.cleanup(); + active_graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); // Get new graph let mut new_graph = base_graph.clone(); @@ -224,7 +236,7 @@ mod test { ); let ordered_prop_2_index = new_graph - .add_node(ordered_prop_node_weight.clone()) + .add_or_replace_node(ordered_prop_node_weight.clone()) .expect("Unable to add ordered prop"); new_graph .add_ordered_edge( @@ -248,6 +260,9 @@ mod test { .expect("Node is not an ordered node") ); + new_graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); let updates = base_graph.detect_updates(new_graph); let update_1 = updates.first().expect("update exists").to_owned(); @@ -289,6 +304,9 @@ mod test { new_graph_2.remove_node(ordered_prop_2_index); new_graph_2.remove_node_id(ordered_prop_2_id); + new_graph_2 + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); let updates = new_graph.detect_updates(&new_graph_2); let update_1 = updates.first().expect("update exists").to_owned(); @@ -340,7 +358,9 @@ mod test { prop_id }; - active_graph.cleanup(); + active_graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); // Create two prop nodes children of base prop let ordered_prop_1_index = { @@ -348,7 +368,7 @@ mod test { .generate_ulid() .expect("Unable to generate Ulid"); let ordered_prop_index = active_graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( ordered_prop_id, Ulid::new(), ContentAddress::Prop(ContentHash::new(ordered_prop_id.to_string().as_bytes())), @@ -367,14 +387,16 @@ mod test { ordered_prop_index }; - active_graph.cleanup(); + active_graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); let attribute_prototype_id = { let node_id = active_graph .generate_ulid() .expect("Unable to generate Ulid"); let node_index = active_graph - .add_node(NodeWeight::new_content( + .add_or_replace_node(NodeWeight::new_content( node_id, Ulid::new(), ContentAddress::AttributePrototype(ContentHash::new( @@ -394,7 +416,9 @@ mod test { node_id }; - active_graph.cleanup(); + active_graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); // Get new graph let mut new_graph = base_graph.clone(); @@ -412,7 +436,9 @@ mod test { .expect("Unable to get prop NodeIndex"), ) .expect("Unable to add sv -> prop edge"); - new_graph.cleanup(); + new_graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); let base_prop_node_index = new_graph .get_node_index_by_id(base_prop_id) .expect("Unable to get base prop NodeIndex"); @@ -464,7 +490,7 @@ mod test { ContentHash::new(node.as_bytes()), ); base_graph - .add_node(prop_node_weight) + .add_or_replace_node(prop_node_weight) .expect("Unable to add prop"); node_id_map.insert(node, node_id); @@ -499,7 +525,9 @@ mod test { } // Clean up the graph before ensuring that it was constructed properly. - base_graph.cleanup(); + base_graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); // Ensure the graph construction worked. for (source, target) in edges { @@ -576,11 +604,16 @@ mod test { new_graph.remove_node(c_idx); new_graph.remove_node_id(c_id); - new_graph.cleanup(); + new_graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); // base_graph.tiny_dot_to_file(Some("to_rebase")); // new_graph.tiny_dot_to_file(Some("onto")); + base_graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); let updates = base_graph.detect_updates(&new_graph); assert_eq!( @@ -612,7 +645,7 @@ mod test { ); to_rebase_graph - .add_node(prototype_node) + .add_or_replace_node(prototype_node) .expect("unable to add node"); to_rebase_graph .add_edge( @@ -625,7 +658,9 @@ mod test { .expect("unable to add edge"); // "write" the graph - to_rebase_graph.cleanup(); + to_rebase_graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); // "fork" a working changeset from the current one let mut onto_graph = to_rebase_graph.clone(); @@ -640,8 +675,9 @@ mod test { ) .expect("remove_edge"); - onto_graph.cleanup(); - + onto_graph + .cleanup_and_merkle_tree_hash() + .expect("cleanup and merkle"); let updates = to_rebase_graph.detect_updates(&onto_graph); assert_eq!(1, updates.len()); diff --git a/lib/dal/src/workspace_snapshot/graph/tests/exclusive_outgoing_edges.rs b/lib/dal/src/workspace_snapshot/graph/tests/exclusive_outgoing_edges.rs index 4bd3ef5f94..c8189862f7 100644 --- a/lib/dal/src/workspace_snapshot/graph/tests/exclusive_outgoing_edges.rs +++ b/lib/dal/src/workspace_snapshot/graph/tests/exclusive_outgoing_edges.rs @@ -48,7 +48,7 @@ mod test { ContentHash::new(&component_id.inner().to_bytes()), ); - let sv_1_idx = graph.add_node(sv_1.clone())?; + let sv_1_idx = graph.add_or_replace_node(sv_1.clone())?; graph.add_edge( graph.root(), @@ -56,7 +56,7 @@ mod test { sv_1_idx, )?; - let component_idx = graph.add_node(component.clone())?; + let component_idx = graph.add_or_replace_node(component.clone())?; graph.add_edge( graph.root_index, @@ -181,7 +181,7 @@ mod test { let component_2_id = graph.generate_ulid()?; let action = NodeWeight::new_action(Ulid::new().into(), action_id, action_id); - graph.add_node(action.clone())?; + graph.add_or_replace_node(action.clone())?; let prototype_1 = NodeWeight::new_action_prototype( prototype_1_id, @@ -211,7 +211,7 @@ mod test { ContentHash::new(&component_2_id.inner().to_bytes()), ); - let prototype_1_idx = graph.add_node(prototype_1.clone())?; + let prototype_1_idx = graph.add_or_replace_node(prototype_1.clone())?; graph.add_edge( graph.root(), diff --git a/lib/dal/src/workspace_snapshot/graph/tests/rebase.rs b/lib/dal/src/workspace_snapshot/graph/tests/rebase.rs index 10a1517b64..37af54ee92 100644 --- a/lib/dal/src/workspace_snapshot/graph/tests/rebase.rs +++ b/lib/dal/src/workspace_snapshot/graph/tests/rebase.rs @@ -54,7 +54,7 @@ mod test { ); let func_node_index = onto - .add_node(NodeWeight::Func(func_node_weight)) + .add_or_replace_node(NodeWeight::Func(func_node_weight)) .expect("could not add node"); onto.add_edge( func_category_node_index, @@ -71,7 +71,7 @@ mod test { ); let schema_node_index = onto - .add_node(NodeWeight::Content(schema_node_weight)) + .add_or_replace_node(NodeWeight::Content(schema_node_weight)) .expect("could not add node"); onto.add_edge( schema_category_node_index, @@ -88,7 +88,7 @@ mod test { ); let schema_variant_node_index = onto - .add_node(NodeWeight::Content(schema_variant_node_weight)) + .add_or_replace_node(NodeWeight::Content(schema_variant_node_weight)) .expect("could not add node"); onto.add_edge( schema_node_index, @@ -108,12 +108,11 @@ mod test { ) .expect("could not add edge"); - // Before cleanup, detect conflicts and updates. - let before_cleanup_updates = to_rebase.detect_updates(&onto); - // Cleanup and check node count. - onto.cleanup(); - to_rebase.cleanup(); + onto.cleanup_and_merkle_tree_hash().expect("merkle it!"); + to_rebase + .cleanup_and_merkle_tree_hash() + .expect("merkle it!"); assert_eq!( 6, // expected onto.node_count() // actual @@ -125,10 +124,6 @@ mod test { 7, // expected updates.len() // actual ); - assert_eq!( - before_cleanup_updates, // expected - updates // actual - ); // Ensure that we do not have duplicate updates. let mut deduped_updates = updates.clone();