From a2c6b45eebf00de2bfa1f104631aef7816486989 Mon Sep 17 00:00:00 2001 From: Zachary Hamm Date: Fri, 15 Nov 2024 15:40:08 -0600 Subject: [PATCH 1/2] feat: allow parallel edges We have been using `update_edge` when adding new edges, which is fast but means we cannot create parallel edges. But parallel edges are very useful, and without them, management edges will have to be made convoluted. Note that you should always be careful to ensure you do not assume there can only be one edge between two nodes in one direction. For example, if using edges_directed always be sure to check the edge weight kind. --- lib/dal/src/attribute/value.rs | 6 +- lib/dal/src/workspace_snapshot.rs | 28 +++++---- .../graph/tests/detect_updates.rs | 2 +- lib/dal/src/workspace_snapshot/graph/v4.rs | 63 +++++++++++-------- 4 files changed, 59 insertions(+), 40 deletions(-) diff --git a/lib/dal/src/attribute/value.rs b/lib/dal/src/attribute/value.rs index 91ceef0aca..741556ea31 100644 --- a/lib/dal/src/attribute/value.rs +++ b/lib/dal/src/attribute/value.rs @@ -2630,7 +2630,11 @@ impl AttributeValue { ) -> AttributeValueResult> { Ok(ctx .workspace_snapshot()? - .find_edge(parent_attribute_value_id, child_attribute_value_id) + .find_edge( + parent_attribute_value_id, + child_attribute_value_id, + EdgeWeightKindDiscriminants::Contain, + ) .await? .and_then(|weight| match weight.kind() { EdgeWeightKind::Contain(key) => key.to_owned(), diff --git a/lib/dal/src/workspace_snapshot.rs b/lib/dal/src/workspace_snapshot.rs index 6919754604..d144de868e 100644 --- a/lib/dal/src/workspace_snapshot.rs +++ b/lib/dal/src/workspace_snapshot.rs @@ -658,13 +658,13 @@ impl WorkspaceSnapshot { from_node_id: impl Into, edge_weight: EdgeWeight, to_node_id: impl Into, - ) -> WorkspaceSnapshotResult { + ) -> WorkspaceSnapshotResult<()> { let from_node_index = self .working_copy() .await .get_node_index_by_id(from_node_id)?; let to_node_index = self.working_copy().await.get_node_index_by_id(to_node_id)?; - Ok(if self.cycle_check().await { + if self.cycle_check().await { let self_clone = self.clone(); slow_rt::spawn(async move { let mut working_copy = self_clone.working_copy_mut().await; @@ -675,7 +675,9 @@ impl WorkspaceSnapshot { self.working_copy_mut() .await .add_edge(from_node_index, edge_weight, to_node_index)? - }) + } + + Ok(()) } /// Add an edge to the graph, bypassing any cycle checks and using node @@ -685,13 +687,12 @@ impl WorkspaceSnapshot { from_node_index: NodeIndex, edge_weight: EdgeWeight, to_node_index: NodeIndex, - ) -> WorkspaceSnapshotResult { - let edge_index = - self.working_copy_mut() - .await - .add_edge(from_node_index, edge_weight, to_node_index)?; + ) -> WorkspaceSnapshotResult<()> { + self.working_copy_mut() + .await + .add_edge(from_node_index, edge_weight, to_node_index)?; - Ok(edge_index) + Ok(()) } pub async fn add_ordered_edge( @@ -699,19 +700,19 @@ impl WorkspaceSnapshot { from_node_id: impl Into, edge_weight: EdgeWeight, to_node_id: impl Into, - ) -> WorkspaceSnapshotResult { + ) -> WorkspaceSnapshotResult<()> { let from_node_index = self .working_copy() .await .get_node_index_by_id(from_node_id)?; let to_node_index = self.working_copy().await.get_node_index_by_id(to_node_id)?; - let (edge_index, _) = self.working_copy_mut().await.add_ordered_edge( + self.working_copy_mut().await.add_ordered_edge( from_node_index, edge_weight, to_node_index, )?; - Ok(edge_index) + Ok(()) } #[instrument( @@ -1232,6 +1233,7 @@ impl WorkspaceSnapshot { &self, from_id: impl Into, to_id: impl Into, + edge_weight_kind: EdgeWeightKindDiscriminants, ) -> WorkspaceSnapshotResult> { let working_copy = self.working_copy().await; @@ -1239,7 +1241,7 @@ impl WorkspaceSnapshot { let to_idx = working_copy.get_node_index_by_id(to_id)?; Ok(working_copy - .find_edge(from_idx, to_idx) + .find_edge(from_idx, to_idx, edge_weight_kind) .map(ToOwned::to_owned)) } diff --git a/lib/dal/src/workspace_snapshot/graph/tests/detect_updates.rs b/lib/dal/src/workspace_snapshot/graph/tests/detect_updates.rs index af42c597c0..705719ff7b 100644 --- a/lib/dal/src/workspace_snapshot/graph/tests/detect_updates.rs +++ b/lib/dal/src/workspace_snapshot/graph/tests/detect_updates.rs @@ -71,7 +71,7 @@ mod test { ContentAddress::Component(ContentHash::from("Component B")), )) .expect("Unable to add Component B"); - let _new_onto_root_component_edge_index = base_graph + base_graph .add_edge( base_graph.root(), EdgeWeight::new(EdgeWeightKind::new_use()), diff --git a/lib/dal/src/workspace_snapshot/graph/v4.rs b/lib/dal/src/workspace_snapshot/graph/v4.rs index a4db65e429..682bc8a899 100644 --- a/lib/dal/src/workspace_snapshot/graph/v4.rs +++ b/lib/dal/src/workspace_snapshot/graph/v4.rs @@ -263,19 +263,30 @@ impl WorkspaceSnapshotGraphV4 { edge_weight: EdgeWeight, to_node_index: NodeIndex, cycle_check: bool, - ) -> WorkspaceSnapshotGraphResult { + ) -> WorkspaceSnapshotGraphResult<()> { if cycle_check { self.add_temp_edge_cycle_check(from_node_index, edge_weight.clone(), to_node_index)?; } self.touch_node(from_node_index); - // Add the new edge to the new version of the "from" node. - let edge_index = self + let discrim: EdgeWeightKindDiscriminants = edge_weight.kind().into(); + + if !self .graph - .update_edge(from_node_index, to_node_index, edge_weight); + .edges_directed(from_node_index, Direction::Outgoing) + // Only allow one edge of each weight kind between two nodes. This + // keeps "add_edge" idempotent, and guards against any places where + // we might add the same edge twice + .any(|edge_ref| { + edge_ref.target() == to_node_index && discrim == edge_ref.weight().kind().into() + }) + { + self.graph + .add_edge(from_node_index, to_node_index, edge_weight); + } - Ok(edge_index) + Ok(()) } fn add_temp_edge_cycle_check( @@ -286,7 +297,7 @@ impl WorkspaceSnapshotGraphV4 { ) -> WorkspaceSnapshotGraphResult<()> { let temp_edge = self .graph - .update_edge(from_node_index, to_node_index, edge_weight.clone()); + .add_edge(from_node_index, to_node_index, edge_weight.clone()); let would_create_a_cycle = !self.is_acyclic_directed(); self.graph.remove_edge(temp_edge); @@ -322,7 +333,7 @@ impl WorkspaceSnapshotGraphV4 { from_node_index: NodeIndex, edge_weight: EdgeWeight, to_node_index: NodeIndex, - ) -> WorkspaceSnapshotGraphResult { + ) -> WorkspaceSnapshotGraphResult<()> { self.add_edge_inner(from_node_index, edge_weight, to_node_index, true) } @@ -331,7 +342,7 @@ impl WorkspaceSnapshotGraphV4 { from_node_id: Ulid, edge_weight: EdgeWeight, to_node_id: Ulid, - ) -> WorkspaceSnapshotGraphResult { + ) -> WorkspaceSnapshotGraphResult<()> { let from_node_index = *self .node_index_by_id .get(&from_node_id) @@ -349,7 +360,7 @@ impl WorkspaceSnapshotGraphV4 { from_node_index: NodeIndex, edge_weight: EdgeWeight, to_node_index: NodeIndex, - ) -> WorkspaceSnapshotGraphResult { + ) -> WorkspaceSnapshotGraphResult<()> { // Temporarily add the edge to the existing tree to see if it would create a cycle. // Configured to run only in tests because it has a major perf impact otherwise #[cfg(test)] @@ -465,10 +476,16 @@ impl WorkspaceSnapshotGraphV4 { self.graph.neighbors_directed(node_index, direction) } - pub fn find_edge(&self, from_idx: NodeIndex, to_idx: NodeIndex) -> Option<&EdgeWeight> { + pub fn find_edge( + &self, + from_idx: NodeIndex, + to_idx: NodeIndex, + edge_kind: EdgeWeightKindDiscriminants, + ) -> Option<&EdgeWeight> { self.graph - .find_edge(from_idx, to_idx) - .and_then(|edge_idx| self.graph.edge_weight(edge_idx)) + .edges_connecting(from_idx, to_idx) + .find(|edge_ref| edge_kind == edge_ref.weight().kind().into()) + .map(|edge_ref| edge_ref.weight()) } pub fn edges_directed_for_edge_weight_kind( @@ -518,17 +535,17 @@ impl WorkspaceSnapshotGraphV4 { from_node_index: NodeIndex, edge_weight: EdgeWeight, to_node_index: NodeIndex, - ) -> WorkspaceSnapshotGraphResult<(EdgeIndex, Option)> { - let new_edge_index = self.add_edge(from_node_index, edge_weight, to_node_index)?; + ) -> WorkspaceSnapshotGraphResult<()> { + self.add_edge(from_node_index, edge_weight, to_node_index)?; // Find the ordering node of the "container" if there is one, and add the thing pointed to // by the `to_node_index` to the ordering. Also point the ordering node at the thing with // an `Ordinal` edge, so that Ordering nodes must be touched *after* the things they order // in a depth first search - let maybe_ordinal_edge_index = if let Some(container_ordering_node_index) = + if let Some(container_ordering_node_index) = self.ordering_node_index_for_container(from_node_index)? { - let ordinal_edge_index = self.add_edge( + self.add_edge( container_ordering_node_index, EdgeWeight::new(EdgeWeightKind::Ordinal), to_node_index, @@ -544,13 +561,9 @@ impl WorkspaceSnapshotGraphV4 { ordering_node_weight.push_to_order(element_id); self.touch_node(container_ordering_node_index); } + } - Some(ordinal_edge_index) - } else { - None - }; - - Ok((new_edge_index, maybe_ordinal_edge_index)) + Ok(()) } pub fn add_ordered_node( @@ -565,13 +578,13 @@ impl WorkspaceSnapshotGraphV4 { OrderingNodeWeight::new(ordering_node_id, ordering_node_lineage_id), ))?; - let edge_index = self.add_edge( + self.add_edge( new_node_index, EdgeWeight::new(EdgeWeightKind::Ordering), ordering_node_index, )?; - let (source, _) = self.edge_endpoints(edge_index)?; - Ok(source) + + Ok(new_node_index) } /// Remove any orphaned nodes from the graph, then recalculate the merkle From 86a1326e30f0c5b08d389e5b4197aada919d6897 Mon Sep 17 00:00:00 2001 From: Zachary Hamm Date: Fri, 15 Nov 2024 15:42:43 -0600 Subject: [PATCH 2/2] fix: frames can manage children, connection events sent after component events In order for frames to manage children, we had to allow parallel edges in our graph. There should be no problem here, nothing we do should break on parallel edges. Also, we need to ensure we send the connection WsEvents *after* component WsEvents so that the frontend knows about all components before getting connection messages. --- lib/dal/src/management/mod.rs | 109 ++++++++++++------- lib/dal/tests/integration_test/management.rs | 55 +++++++++- 2 files changed, 122 insertions(+), 42 deletions(-) diff --git a/lib/dal/src/management/mod.rs b/lib/dal/src/management/mod.rs index fc6f5c1162..808f5db0c5 100644 --- a/lib/dal/src/management/mod.rs +++ b/lib/dal/src/management/mod.rs @@ -9,7 +9,6 @@ use veritech_client::{ManagementFuncStatus, ManagementResultSuccess}; use crate::component::frame::{Frame, FrameError}; use crate::diagram::view::{View, ViewId}; use crate::diagram::DiagramError; -use crate::WorkspaceSnapshotError; use crate::{ action::{ prototype::{ActionKind, ActionPrototype, ActionPrototypeError}, @@ -29,6 +28,7 @@ use crate::{ Func, FuncError, InputSocket, InputSocketId, OutputSocket, OutputSocketId, Prop, PropKind, Schema, SchemaError, SchemaId, SchemaVariantId, StandardModelError, WsEvent, WsEventError, }; +use crate::{EdgeWeightKind, WorkspaceSnapshotError}; pub mod prototype; @@ -396,7 +396,9 @@ struct PendingParent { #[derive(Clone, Debug)] enum PendingOperation { Connect(PendingConnect), + Manage(ComponentId), Parent(PendingParent), + RemoveConnection(PendingConnect), } impl<'a> ManagementOperator<'a> { @@ -649,6 +651,20 @@ impl<'a> ManagementOperator<'a> { Ok(()) } + async fn manage(&self, component_id: ComponentId) -> ManagementResult<()> { + let cycle_check_guard = self.ctx.workspace_snapshot()?.enable_cycle_check().await; + Component::add_manages_edge_to_component( + self.ctx, + self.manager_component_id, + component_id, + EdgeWeightKind::Manages, + ) + .await?; + drop(cycle_check_guard); + + Ok(()) + } + async fn creates(&mut self) -> ManagementResult> { // We take here to avoid holding on to an immutable ref to self throughout the loop let creates = self.operations.create.take(); @@ -712,16 +728,7 @@ impl<'a> ManagementOperator<'a> { parent: parent.to_owned(), })); } - - let cycle_check_guard = self.ctx.workspace_snapshot()?.enable_cycle_check().await; - Component::add_manages_edge_to_component( - self.ctx, - self.manager_component_id, - component_id, - crate::EdgeWeightKind::Manages, - ) - .await?; - drop(cycle_check_guard); + pending_operations.push(PendingOperation::Manage(component_id)); } } @@ -737,10 +744,12 @@ impl<'a> ManagementOperator<'a> { )) } - async fn updates(&mut self) -> ManagementResult<()> { + async fn updates(&mut self) -> ManagementResult> { + let mut pending = vec![]; + let updates = self.operations.update.take(); let Some(updates) = &updates else { - return Ok(()); + return Ok(pending); }; for (placeholder, operation) in updates { @@ -777,25 +786,34 @@ impl<'a> ManagementOperator<'a> { if let Some(update_conns) = &operation.connect { if let Some(remove_conns) = &update_conns.remove { for to_remove in remove_conns { - self.remove_connection(component_id, to_remove).await?; + pending.push(PendingOperation::RemoveConnection(PendingConnect { + from_component_id: component_id, + connection: to_remove.to_owned(), + })); } } if let Some(add_conns) = &update_conns.add { for to_add in add_conns { - self.create_connection(component_id, to_add).await?; + pending.push(PendingOperation::Connect(PendingConnect { + from_component_id: component_id, + connection: to_add.to_owned(), + })); } } } if let Some(new_parent) = &operation.parent { - self.set_parent(component_id, new_parent).await?; + pending.push(PendingOperation::Parent(PendingParent { + child_component_id: component_id, + parent: new_parent.to_owned(), + })); } self.updated_components.push(component_id); } - Ok(()) + Ok(pending) } async fn actions(&self) -> ManagementResult<()> { @@ -816,26 +834,19 @@ impl<'a> ManagementOperator<'a> { } pub async fn operate(&mut self) -> ManagementResult<()> { - let pending_operations = self.creates().await?; - self.updates().await?; - - // We have to execute these after the creation of the component, and - // after updates, so that they can reference other created components - // and so that we can ensure the updates have been applied - for pending_operation in pending_operations { - match pending_operation { - PendingOperation::Connect(pending_connect) => { - self.create_connection( - pending_connect.from_component_id, - &pending_connect.connection, - ) - .await? - } - PendingOperation::Parent(pending_parent) => { - self.set_parent(pending_parent.child_component_id, &pending_parent.parent) - .await? - } - } + let mut pending_operations = self.creates().await?; + pending_operations.extend(self.updates().await?); + + // Parents have to be set before component events are sent + for pending_parent in pending_operations + .iter() + .filter_map(|pending_op| match pending_op { + PendingOperation::Parent(pending_parent) => Some(pending_parent), + _ => None, + }) + { + self.set_parent(pending_parent.child_component_id, &pending_parent.parent) + .await? } for &created_id in &self.created_components { @@ -874,6 +885,30 @@ impl<'a> ManagementOperator<'a> { .await?; } + // Now, the rest of the pending ops can be executed, which need to have + // their wsevents sent *after* the component ws events (otherwise some + // will be discarded by the frontend, since it does not know about the + // newly created components until the above events are sent) + for pending_op in pending_operations { + match pending_op { + PendingOperation::Connect(pending_connect) => { + self.create_connection( + pending_connect.from_component_id, + &pending_connect.connection, + ) + .await?; + } + PendingOperation::RemoveConnection(remove) => { + self.remove_connection(remove.from_component_id, &remove.connection) + .await?; + } + PendingOperation::Manage(managed_id) => { + self.manage(managed_id).await?; + } + PendingOperation::Parent(_) => {} + } + } + self.actions().await?; Ok(()) diff --git a/lib/dal/tests/integration_test/management.rs b/lib/dal/tests/integration_test/management.rs index fddbf9e411..2d959864ba 100644 --- a/lib/dal/tests/integration_test/management.rs +++ b/lib/dal/tests/integration_test/management.rs @@ -1,3 +1,5 @@ +use std::collections::HashSet; + use dal::{ diagram::view::View, management::{ @@ -5,7 +7,7 @@ use dal::{ }, AttributeValue, Component, DalContext, SchemaId, }; -use dal_test::expected::ExpectView; +use dal_test::expected::{apply_change_set_to_base, ExpectView}; use dal_test::{ helpers::create_component_for_default_schema_name_in_default_view, test, SCHEMA_ID_SMALL_EVEN_LEGO, @@ -183,7 +185,7 @@ async fn create_component_of_other_schema(ctx: &DalContext) { } #[test] -async fn create_and_connect_to_self_as_children(ctx: &DalContext) { +async fn create_and_connect_to_self_as_children(ctx: &mut DalContext) { let small_odd_lego = create_component_for_default_schema_name_in_default_view( ctx, "small odd lego", @@ -245,21 +247,64 @@ async fn create_and_connect_to_self_as_children(ctx: &DalContext) { let components = Component::list(ctx).await.expect("get components"); assert_eq!(4, components.len()); - let children = Component::get_children_for_id(ctx, small_odd_lego.id()) + let workspace_snapshot = ctx.workspace_snapshot().expect("get snap"); + let edges = workspace_snapshot + .edges_directed(small_odd_lego.id(), petgraph::Direction::Outgoing) + .await + .expect("get edges"); + for (weight, _, tgt) in edges { + let target_id = workspace_snapshot + .get_node_weight(tgt) + .await + .expect("get target") + .id(); + println!("{:?} -> {}", weight.kind(), target_id); + } + + let children: HashSet<_> = Component::get_children_for_id(ctx, small_odd_lego.id()) .await - .expect("get frame children"); + .expect("get frame children") + .into_iter() + .collect(); assert_eq!(3, children.len()); + let managed: HashSet<_> = small_odd_lego + .get_managed(ctx) + .await + .expect("get managed") + .into_iter() + .collect(); + assert_eq!(children, managed); + let small_even_lego_schema_id: SchemaId = ulid::Ulid::from_string(SCHEMA_ID_SMALL_EVEN_LEGO) .expect("make ulid") .into(); - for child_id in children { + for &child_id in &children { let c = Component::get_by_id(ctx, child_id) .await .expect("get component"); let schema_id = c.schema(ctx).await.expect("get schema").id(); assert_eq!(small_even_lego_schema_id, schema_id); } + + // Ensure parallel edges make it through the rebase + apply_change_set_to_base(ctx).await; + + let children_base: HashSet<_> = Component::get_children_for_id(ctx, small_odd_lego.id()) + .await + .expect("get frame children") + .into_iter() + .collect(); + assert_eq!(3, children_base.len()); + let managed_base: HashSet<_> = small_odd_lego + .get_managed(ctx) + .await + .expect("get managed") + .into_iter() + .collect(); + + assert_eq!(children, children_base); + assert_eq!(children_base, managed_base); } #[test]