diff --git a/lib/dal/src/action/dependency_graph.rs b/lib/dal/src/action/dependency_graph.rs index 15b27b9294..3cb1f5dbcd 100644 --- a/lib/dal/src/action/dependency_graph.rs +++ b/lib/dal/src/action/dependency_graph.rs @@ -74,7 +74,7 @@ impl ActionDependencyGraph { // Get all inferred connections up front so we don't build this tree each time let components_to_find = actions_by_component_id.keys().copied().collect_vec(); let component_tree = - InferredConnectionGraph::assemble_for_components(ctx, components_to_find).await?; + InferredConnectionGraph::assemble_for_components(ctx, components_to_find, None).await?; // Action dependencies are primarily based on the data flow between Components. Things that // feed data into other things must have their actions run before the actions for the // things they are feeding data into. diff --git a/lib/dal/src/attribute/prototype/debug.rs b/lib/dal/src/attribute/prototype/debug.rs index 7d7edd1ee7..9828a2aed7 100644 --- a/lib/dal/src/attribute/prototype/debug.rs +++ b/lib/dal/src/attribute/prototype/debug.rs @@ -129,7 +129,7 @@ impl AttributePrototypeDebugView { targets.destination_component_id == destination_component_id }) { - let arg_used = Component::should_data_flow_between_components( + let arg_used = Component::should_data_flow_between_components_by_id( ctx, destination_component_id, expected_source_component_id, @@ -289,7 +289,7 @@ impl AttributePrototypeDebugView { .await? { info!("output socket match: {:?}", output_match); - let arg_used = Component::should_data_flow_between_components( + let arg_used = Component::should_data_flow_between_components_by_id( ctx, component_input_socket.component_id, output_match.component_id, diff --git a/lib/dal/src/attribute/value.rs b/lib/dal/src/attribute/value.rs index d40acec6ba..523586279b 100644 --- a/lib/dal/src/attribute/value.rs +++ b/lib/dal/src/attribute/value.rs @@ -766,7 +766,7 @@ impl AttributeValue { for component_output_socket in connections { // Both deleted and non deleted components can feed data into deleted components. // ** ONLY ** non-deleted components can feed data into non-deleted components - if Component::should_data_flow_between_components( + if Component::should_data_flow_between_components_by_id( ctx, component_input_socket.component_id, component_output_socket.component_id, diff --git a/lib/dal/src/attribute/value/dependent_value_graph.rs b/lib/dal/src/attribute/value/dependent_value_graph.rs index f593f9a633..9b006d83f4 100644 --- a/lib/dal/src/attribute/value/dependent_value_graph.rs +++ b/lib/dal/src/attribute/value/dependent_value_graph.rs @@ -87,7 +87,7 @@ impl DependentValueGraph { // job always operates on the current state of the change set's snapshot, not the state // at the time the job was created. if workspace_snapshot - .try_get_node_index_by_id(initial_id) + .get_node_index_by_id_opt(initial_id) .await .is_none() { @@ -207,7 +207,7 @@ impl DependentValueGraph { // Both "deleted" and not deleted Components can feed data into // "deleted" Components. **ONLY** not deleted Components can feed // data into not deleted Components. - if Component::should_data_flow_between_components( + if Component::should_data_flow_between_components_by_id( ctx, targets.destination_component_id, targets.source_component_id, @@ -273,7 +273,7 @@ impl DependentValueGraph { // data into not deleted Components. let destination_component_id = AttributeValue::component_id(ctx, current_attribute_value.id()).await?; - if Component::should_data_flow_between_components( + if Component::should_data_flow_between_components_by_id( ctx, destination_component_id, component_input_socket.component_id, diff --git a/lib/dal/src/component.rs b/lib/dal/src/component.rs index bffd0d7d84..368c9689bb 100644 --- a/lib/dal/src/component.rs +++ b/lib/dal/src/component.rs @@ -1377,7 +1377,7 @@ impl Component { component_id: ComponentId, ) -> ComponentResult> { let id: Ulid = component_id.into(); - if let Some(node_index) = ctx.workspace_snapshot()?.try_get_node_index_by_id(id).await { + if let Some(node_index) = ctx.workspace_snapshot()?.get_node_index_by_id_opt(id).await { let node_weight = ctx .workspace_snapshot()? .get_node_weight(node_index) @@ -2303,7 +2303,7 @@ impl Component { /// Both "deleted" and not deleted Components can feed data into /// "deleted" Components. **ONLY** not deleted Components can feed /// data into not deleted Components. - pub async fn should_data_flow_between_components( + pub async fn should_data_flow_between_components_by_id( ctx: &DalContext, destination_component_id: ComponentId, source_component_id: ComponentId, @@ -2328,7 +2328,7 @@ impl Component { ) -> ComponentResult> { match ctx .workspace_snapshot()? - .try_get_node_index_by_id(component_id) + .get_node_index_by_id_opt(component_id) .await { Some(component_idx) => { @@ -2690,7 +2690,7 @@ impl Component { let destination_component = Self::get_by_id(ctx, to_component_id).await?; let source_component = Self::get_by_id(ctx, incoming_connection.output_socket.component_id).await?; - let to_delete = !Self::should_data_flow_between_components( + let to_delete = !Self::should_data_flow_between_components_by_id( ctx, destination_component.id, source_component.id, @@ -2742,7 +2742,7 @@ impl Component { let destination_component = outgoing_connection.input_socket.component_id; let source_component = self.id(); - let to_delete = !Self::should_data_flow_between_components( + let to_delete = !Self::should_data_flow_between_components_by_id( ctx, destination_component, source_component, diff --git a/lib/dal/src/component/frame.rs b/lib/dal/src/component/frame.rs index 331542b7d5..b5d0206c78 100644 --- a/lib/dal/src/component/frame.rs +++ b/lib/dal/src/component/frame.rs @@ -235,13 +235,13 @@ impl Frame { ctx: &DalContext, parent_id: ComponentId, child_id: ComponentId, + // get the new state of the tree (from the perspective of both components, now in disjoint trees because they were detached!) ) -> FrameResult<()> { // cache current state of the tree let before_change_impacted_input_sockets: HashSet = Self::get_all_inferred_connections_for_component_tree(ctx, parent_id, child_id).await?; // remove the edge Component::remove_edge_from_frame(ctx, parent_id, child_id).await?; - // get the new state of the tree (from the perspective of both components, now in disjoint trees because they were detached!) let current_impacted_sockets = Self::get_all_inferred_connections_for_component_tree(ctx, parent_id, child_id).await?; // find the edges that have been removed due to the detachment @@ -296,9 +296,12 @@ impl Frame { child_id: ComponentId, ) -> FrameResult> { let mut impacted_connections = HashSet::new(); - let tree = - InferredConnectionGraph::assemble_for_components(ctx, [child_id, parent_id].to_vec()) - .await?; + let tree = InferredConnectionGraph::assemble_for_components( + ctx, + [child_id, parent_id].to_vec(), + None, + ) + .await?; let incoming_connections = tree.get_all_inferred_connections(); for incoming_connection in incoming_connections { impacted_connections.insert(SocketAttributeValuePair { diff --git a/lib/dal/src/component/inferred_connection_graph.rs b/lib/dal/src/component/inferred_connection_graph.rs index ebf0c95660..16f3a7a14f 100644 --- a/lib/dal/src/component/inferred_connection_graph.rs +++ b/lib/dal/src/component/inferred_connection_graph.rs @@ -18,7 +18,7 @@ use telemetry::prelude::*; use super::{ socket::{ComponentInputSocket, ComponentOutputSocket}, - ComponentResult, + ComponentResult, IncomingConnection, }; use crate::{ Component, ComponentId, ComponentType, DalContext, InputSocket, OutputSocket, OutputSocketId, @@ -56,6 +56,7 @@ impl InferredConnectionGraph { pub async fn assemble_for_components( ctx: &DalContext, component_ids: Vec, + precomputed_incoming_connections: Option<&HashMap>>, ) -> ComponentResult { let mut component_incoming_connections: HashMap< ComponentId, @@ -64,6 +65,14 @@ impl InferredConnectionGraph { let mut top_parents: HashSet = HashSet::new(); let mut components: HashMap> = HashMap::new(); + let component_ids = if let Some(precomputed_ids) = + precomputed_incoming_connections.map(|precomp| precomp.keys().map(ToOwned::to_owned)) + { + precomputed_ids.collect() + } else { + component_ids + }; + for component_id in component_ids { // Check if the component_id is either a key or a value in the components HashMap let is_in_tree = components.contains_key(&component_id) @@ -86,7 +95,19 @@ impl InferredConnectionGraph { work_queue.push_back(*parent); while let Some(component) = work_queue.pop_front() { - let (input_sockets, duplicates) = Self::process_component(ctx, component).await?; + let (input_sockets, duplicates) = if let Some(precomputed) = + precomputed_incoming_connections.and_then(|precomp| precomp.get(&component)) + { + Self::process_component(ctx, component, precomputed.as_slice()).await? + } else { + Self::process_component( + ctx, + component, + &Component::incoming_connections_for_id(ctx, component).await?, + ) + .await? + }; + Self::update_incoming_connections( &mut component_incoming_connections, component, @@ -123,7 +144,19 @@ impl InferredConnectionGraph { .into_iter() .map(|component| component.id()) .collect_vec(); - Self::assemble_for_components(ctx, components).await + Self::assemble_for_components(ctx, components, None).await + } + + #[instrument( + name = "component.inferred_connection_graph.with_precomputed_incoming_connections", + level = "info", + skip(ctx, precomputed_incoming_connections) + )] + pub async fn assemble_with_precomputed_incoming_connections( + ctx: &DalContext, + precomputed_incoming_connections: &HashMap>, + ) -> ComponentResult { + Self::assemble_for_components(ctx, vec![], Some(precomputed_incoming_connections)).await } /// Assembles the [`InferredConnectionGraph`] from the perspective of this single [`ComponentId`] by first creating a @@ -138,7 +171,7 @@ impl InferredConnectionGraph { ctx: &DalContext, with_component_id: ComponentId, ) -> ComponentResult { - Self::assemble_for_components(ctx, vec![with_component_id]).await + Self::assemble_for_components(ctx, vec![with_component_id], None).await } /// Assembles the a map of Incoming Connections from the perspective of this single [`ComponentId`] @@ -155,7 +188,12 @@ impl InferredConnectionGraph { ComponentId, HashMap>, > = HashMap::new(); - let (input_sockets, duplicates) = Self::process_component(ctx, for_component_id).await?; + let (input_sockets, duplicates) = Self::process_component( + ctx, + for_component_id, + &Component::incoming_connections_for_id(ctx, for_component_id).await?, + ) + .await?; Self::update_incoming_connections( &mut component_incoming_connections, for_component_id, @@ -194,6 +232,7 @@ impl InferredConnectionGraph { async fn process_component( ctx: &DalContext, component_id: ComponentId, + existing_incoming_connections: &[IncomingConnection], ) -> ComponentResult<( HashMap>, HashSet, @@ -208,9 +247,8 @@ impl InferredConnectionGraph { for input_socket in current_component_input_sockets { let incoming_connections = Self::find_available_connections(ctx, input_socket).await?; - // Get all existing explicit outgoing connections so that we don't create an inferred connection to the same output socket - let existing_incoming_connections = - Component::incoming_connections_for_id(ctx, component_id).await?; + + // Check all existing explicit outgoing connections so that we don't create an inferred connection to the same output socket for incoming_connection in existing_incoming_connections { let component_output_socket = ComponentOutputSocket::get_by_ids_or_error( ctx, diff --git a/lib/dal/src/diagram.rs b/lib/dal/src/diagram.rs index 3a36f88369..a2ffb7d593 100644 --- a/lib/dal/src/diagram.rs +++ b/lib/dal/src/diagram.rs @@ -1,14 +1,15 @@ use serde::{Deserialize, Serialize}; use si_data_pg::PgError; -use std::collections::{hash_map, HashMap, HashSet}; +use std::collections::{hash_map, HashMap}; use std::num::{ParseFloatError, ParseIntError}; +use std::sync::Arc; use strum::{AsRefStr, Display, EnumIter, EnumString}; use telemetry::prelude::*; use thiserror::Error; use crate::actor_view::ActorView; use crate::attribute::prototype::argument::{ - AttributePrototypeArgument, AttributePrototypeArgumentError, AttributePrototypeArgumentId, + AttributePrototypeArgumentError, AttributePrototypeArgumentId, }; use crate::attribute::value::AttributeValueError; use crate::change_status::ChangeStatus; @@ -19,11 +20,12 @@ use crate::schema::variant::SchemaVariantError; use crate::socket::connection_annotation::ConnectionAnnotation; use crate::socket::input::InputSocketError; use crate::socket::output::OutputSocketError; +use crate::workspace_snapshot::node_weight::category_node_weight::CategoryNodeKind; use crate::workspace_snapshot::WorkspaceSnapshotError; use crate::{ AttributePrototypeId, ChangeSetError, Component, ComponentId, DalContext, HistoryEventError, InputSocketId, OutputSocketId, SchemaId, SchemaVariant, SchemaVariantId, SocketArity, - StandardModelError, TransactionsError, Workspace, WorkspaceError, + StandardModelError, TransactionsError, Workspace, WorkspaceError, WorkspaceSnapshot, }; #[remain::sorted] @@ -120,7 +122,7 @@ pub struct SummaryDiagramComponent { pub schema_variant_id: SchemaVariantId, pub schema_variant_name: String, pub schema_category: String, - pub sockets: serde_json::Value, + pub sockets: Vec, pub display_name: String, pub position: GridPoint, pub size: Size2D, @@ -142,8 +144,8 @@ impl SummaryDiagramComponent { ctx: &DalContext, component: &Component, change_status: ChangeStatus, + diagram_sockets: &mut HashMap>, ) -> DiagramResult { - let mut diagram_sockets: HashMap = HashMap::new(); let schema_variant = component.schema_variant(ctx).await?; let sockets = match diagram_sockets.entry(schema_variant.id()) { @@ -183,18 +185,16 @@ impl SummaryDiagramComponent { }); } - let socket_value = serde_json::to_value(sockets)?; + entry.insert(sockets.to_owned()); - entry.insert(socket_value.to_owned()); - - socket_value + sockets } hash_map::Entry::Occupied(entry) => entry.get().to_owned(), }; let schema = SchemaVariant::schema_for_schema_variant_id(ctx, schema_variant.id()).await?; + let schema_id = schema.id(); - let default_schema_variant = - SchemaVariant::get_default_for_schema(ctx, schema.id()).await?; + let default_schema_variant = SchemaVariant::get_default_for_schema(ctx, schema_id).await?; let position = GridPoint { x: component.x().parse::()?.round() as isize, @@ -230,7 +230,7 @@ impl SummaryDiagramComponent { }; let can_be_upgraded = if let Some(unlocked_schema_variant) = - SchemaVariant::get_unlocked_for_schema(ctx, schema.id()).await? + SchemaVariant::get_unlocked_for_schema(ctx, schema_id).await? { unlocked_schema_variant.id() != schema_variant.id() } else { @@ -241,7 +241,7 @@ impl SummaryDiagramComponent { id: component.id(), component_id: component.id(), schema_name: schema.name().to_owned(), - schema_id: schema.id(), + schema_id, schema_variant_id: schema_variant.id(), schema_variant_name: schema_variant.version().to_owned(), schema_category: schema_variant.category().to_owned(), @@ -385,81 +385,101 @@ pub struct Diagram { } impl Diagram { - /// Assemble a [`Diagram`](Self) based on existing [`Nodes`](crate::Node) and - /// [`Connections`](crate::Connection). - #[instrument(level = "info", skip(ctx))] - pub async fn assemble(ctx: &DalContext) -> DiagramResult { - let mut diagram_edges: Vec = vec![]; - let mut diagram_inferred_edges: Vec = vec![]; - - let components = Component::list(ctx).await?; - - let mut virtual_and_real_components_by_id: HashMap = - components.iter().cloned().map(|c| (c.id(), c)).collect(); + async fn assemble_component_views( + ctx: &DalContext, + base_snapshot: &Arc, + components: &HashMap, + diagram_sockets: &mut HashMap>, + ) -> DiagramResult<( + Vec, + Vec, + HashMap>, + )> { let mut component_views = Vec::with_capacity(components.len()); - let new_component_ids: HashSet = ctx - .workspace_snapshot()? - .components_added_relative_to_base(ctx) - .await? - .iter() - .copied() - .collect(); - - // let new_attribute_prototype_argument_ids: HashSet = ctx - // .workspace_snapshot()? - // .socket_edges_added_relative_to_base(ctx) - // .await? - // .iter() - // .copied() - // .collect(); - - let new_attribute_prototype_argument_ids: HashSet = - HashSet::new(); - - for component in &components { - let component_change_status = if new_component_ids.contains(&component.id()) { - ChangeStatus::Added - } else if component.to_delete() { - ChangeStatus::Deleted - } else { - // TODO: Eventually, we'll want to also handle detecting ChangeStatus::Modified - ChangeStatus::Unmodified - }; - - for incoming_connection in component.incoming_connections(ctx).await? { - let from_component = - Component::get_by_id(ctx, incoming_connection.from_component_id).await?; - let edge_change_status = if component_change_status == ChangeStatus::Added - || new_component_ids.contains(&from_component.id()) - || new_attribute_prototype_argument_ids - .contains(&incoming_connection.attribute_prototype_argument_id) - { - ChangeStatus::Added + let mut diagram_edges = Vec::with_capacity(components.len()); + let mut precomputed_incoming_connections = HashMap::with_capacity(components.len()); + + for component in components.values() { + let exists_in_base = base_snapshot + .get_node_index_by_id_opt(component.id()) + .await + .is_some(); + + let change_status = if exists_in_base { + if component.to_delete() { + ChangeStatus::Deleted } else { ChangeStatus::Unmodified - }; - - diagram_edges.push(SummaryDiagramEdge::assemble( - incoming_connection, - from_component, - component, - edge_change_status, - )?); - } + } + } else { + ChangeStatus::Added + }; component_views.push( - SummaryDiagramComponent::assemble(ctx, component, component_change_status).await?, + SummaryDiagramComponent::assemble(ctx, component, change_status, diagram_sockets) + .await?, ); } - let component_tree = InferredConnectionGraph::for_workspace(ctx).await?; - for incoming_connection in component_tree.get_all_inferred_connections() { - let to_delete = Component::should_data_flow_between_components( + for component in components.values() { + let incoming_connections = component.incoming_connections(ctx).await?; + precomputed_incoming_connections.insert(component.id(), incoming_connections.clone()); + for incoming_connection in incoming_connections { + if let Some(from_component) = components.get(&incoming_connection.from_component_id) + { + let edge_status = if base_snapshot + .get_node_index_by_id_opt( + incoming_connection.attribute_prototype_argument_id, + ) + .await + .is_none() + { + ChangeStatus::Added + } else { + ChangeStatus::Unmodified + }; + + diagram_edges.push(SummaryDiagramEdge::assemble( + incoming_connection, + from_component.to_owned(), + component, + edge_status, + )?); + } + } + } + + Ok(( + component_views, + diagram_edges, + precomputed_incoming_connections, + )) + } + + async fn assemble_inferred_connection_views( + ctx: &DalContext, + components: &HashMap, + precomputed_incoming_connections: &HashMap>, + ) -> DiagramResult> { + let mut diagram_inferred_edges = vec![]; + + let component_tree = + InferredConnectionGraph::assemble_with_precomputed_incoming_connections( ctx, - incoming_connection.input_socket.component_id, - incoming_connection.output_socket.component_id, + precomputed_incoming_connections, ) .await?; + + for incoming_connection in component_tree.get_all_inferred_connections() { + let to_delete = if let (Some(source_component), Some(destination_component)) = ( + components.get(&incoming_connection.output_socket.component_id), + components.get(&incoming_connection.input_socket.component_id), + ) { + source_component.to_delete() || destination_component.to_delete() + } else { + false + }; + diagram_inferred_edges.push(SummaryDiagramInferredEdge { from_component_id: incoming_connection.output_socket.component_id, from_socket_id: incoming_connection.output_socket.output_socket_id, @@ -469,9 +489,16 @@ impl Diagram { }); } - // Even though the default change set for a workspace can have a base change set, we don't - // want to consider anything as new/modified/removed when looking at the default change - // set. + Ok(diagram_inferred_edges) + } + + async fn get_base_snapshot(ctx: &DalContext) -> DiagramResult<(Arc, bool)> { + let base_change_set_id = if let Some(change_set_id) = ctx.change_set()?.base_change_set_id { + change_set_id + } else { + return Ok((ctx.workspace_snapshot()?.clone(), false)); + }; + let workspace = Workspace::get_by_pk_or_error( ctx, &ctx.tenancy() @@ -479,113 +506,171 @@ impl Diagram { .ok_or(WorkspaceSnapshotError::WorkspaceMissing)?, ) .await?; - if workspace.default_change_set_id() != ctx.change_set_id() - && ctx.change_set()?.base_change_set_id.is_some() + + if workspace.default_change_set_id() == ctx.change_set_id() { + return Ok((ctx.workspace_snapshot()?.clone(), false)); + } + + Ok(( + Arc::new(WorkspaceSnapshot::find_for_change_set(ctx, base_change_set_id).await?), + true, + )) + } + + async fn assemble_removed_components( + ctx: &DalContext, + base_snapshot: Arc, + components: &HashMap, + diagram_sockets: &mut HashMap>, + ) -> DiagramResult> { + let mut removed_component_summaries = vec![]; + + let base_change_set_ctx = ctx.clone_with_base().await?; + let base_change_set_ctx = &base_change_set_ctx; + + if let Some(components_cat_id) = base_snapshot + .get_category_node(None, CategoryNodeKind::Component) + .await? { + for component_id in base_snapshot + .all_outgoing_targets(components_cat_id) + .await? + .iter() + .map(|weight| weight.id()) + { + let component_id: ComponentId = component_id.into(); + if !components.contains_key(&component_id) { + let deleted_component = + Component::get_by_id(base_change_set_ctx, component_id).await?; + + let mut summary_diagram_component = SummaryDiagramComponent::assemble( + base_change_set_ctx, + &deleted_component, + ChangeStatus::Deleted, + diagram_sockets, + ) + .await?; + summary_diagram_component.from_base_change_set = true; + + removed_component_summaries.push(summary_diagram_component); + } + } + } + + Ok(removed_component_summaries) + } + + /// Assemble a [`Diagram`](Self) based on existing [`Nodes`](crate::Node) and + /// [`Connections`](crate::Connection). + #[instrument(level = "info", skip(ctx))] + pub async fn assemble(ctx: &DalContext) -> DiagramResult { + let (base_snapshot, not_on_head) = Self::get_base_snapshot(ctx).await?; + + let components = Component::list(ctx).await?; + let virtual_and_real_components_by_id: HashMap = + components.iter().cloned().map(|c| (c.id(), c)).collect(); + + let mut diagram_sockets = HashMap::new(); + let (mut component_views, diagram_edges, precomputed_incoming_connections) = + Self::assemble_component_views( + ctx, + &base_snapshot, + &virtual_and_real_components_by_id, + &mut diagram_sockets, + ) + .await?; + + let diagram_inferred_edges = Self::assemble_inferred_connection_views( + ctx, + &virtual_and_real_components_by_id, + &precomputed_incoming_connections, + ) + .await?; + + // If we're on head, we don't want to calculate deletions + if not_on_head { // We also want to display the edges & components that would be actively removed when we // merge this change set into its base change set. - let removed_component_ids = ctx - .workspace_snapshot()? - .components_removed_relative_to_base(ctx) - .await?; - - // We now need to retrieve these Components from the base change set, and build - // SummaryDiagramComponents for them with from_base_change_set true, and change_status - // ChangeStatus::Removed - let base_change_set_ctx = ctx.clone_with_base().await?; - let base_change_set_ctx = &base_change_set_ctx; - - for removed_component_id in removed_component_ids { - // We don't need to worry about these duplicating SummaryDiagramComponents that - // have already been generated as they're generated from the list of Components - // still in the change set's snapshot, while the list of Component IDs we're - // working on here are explicitly ones that do not exist in that snapshot anymore. - let base_change_set_component = - Component::get_by_id(base_change_set_ctx, removed_component_id).await?; - let mut summary_diagram_component = SummaryDiagramComponent::assemble( - base_change_set_ctx, - &base_change_set_component, - ChangeStatus::Deleted, - ) - .await?; - summary_diagram_component.from_base_change_set = true; - virtual_and_real_components_by_id - .insert(removed_component_id, base_change_set_component); - - component_views.push(summary_diagram_component); - } + let removed_component_summaries = Self::assemble_removed_components( + ctx, + base_snapshot, + &virtual_and_real_components_by_id, + &mut diagram_sockets, + ) + .await?; + component_views.extend(removed_component_summaries); // We need to bring in any AttributePrototypeArguments for incoming & outgoing // connections that have been removed. - let removed_attribute_prototype_argument_ids: Vec = - vec![]; + // let removed_attribute_prototype_argument_ids: Vec = + // vec![]; // = ctx // .workspace_snapshot()? // .socket_edges_removed_relative_to_base(ctx) // .await?; - let mut incoming_connections_by_component_id: HashMap< - ComponentId, - Vec, - > = HashMap::new(); - - for removed_attribute_prototype_argument_id in &removed_attribute_prototype_argument_ids - { - let attribute_prototype_argument = AttributePrototypeArgument::get_by_id( - base_change_set_ctx, - *removed_attribute_prototype_argument_id, - ) - .await?; - - // This should always be Some as - // `WorkspaceSnapshot::socket_edges_removed_relative_to_base` only returns the - // IDs of arguments that have targets. - if let Some(targets) = attribute_prototype_argument.targets() { - if let hash_map::Entry::Vacant(vacant_entry) = - incoming_connections_by_component_id.entry(targets.destination_component_id) - { - let removed_component = Component::get_by_id( - base_change_set_ctx, - targets.destination_component_id, - ) - .await?; - let mut incoming_connections = removed_component - .incoming_connections(base_change_set_ctx) - .await?; - // We only care about connections going to the Component in the base that - // are *ALSO* ones that we are considered to have removed. - incoming_connections.retain(|connection| { - removed_attribute_prototype_argument_ids - .contains(&connection.attribute_prototype_argument_id) - }); - vacant_entry.insert(incoming_connections); - } - } - } - - for incoming_connections in incoming_connections_by_component_id.values() { - for incoming_connection in incoming_connections { - let from_component = virtual_and_real_components_by_id - .get(&incoming_connection.from_component_id) - .cloned() - .ok_or(ComponentError::NotFound( - incoming_connection.from_component_id, - ))?; - let to_component = virtual_and_real_components_by_id - .get(&incoming_connection.to_component_id) - .ok_or(ComponentError::NotFound( - incoming_connection.to_component_id, - ))?; - let mut summary_diagram_edge = SummaryDiagramEdge::assemble( - incoming_connection.clone(), - from_component, - to_component, - ChangeStatus::Deleted, - )?; - summary_diagram_edge.from_base_change_set = true; + // let mut incoming_connections_by_component_id: HashMap< + // ComponentId, + // Vec, + // > = HashMap::new(); + + // for removed_attribute_prototype_argument_id in &removed_attribute_prototype_argument_ids + // { + // let attribute_prototype_argument = AttributePrototypeArgument::get_by_id( + // base_change_set_ctx, + // *removed_attribute_prototype_argument_id, + // ) + // .await?; - diagram_edges.push(summary_diagram_edge); - } - } + // // This should always be Some as + // // `WorkspaceSnapshot::socket_edges_removed_relative_to_base` only returns the + // // IDs of arguments that have targets. + // if let Some(targets) = attribute_prototype_argument.targets() { + // if let hash_map::Entry::Vacant(vacant_entry) = + // incoming_connections_by_component_id.entry(targets.destination_component_id) + // { + // let removed_component = Component::get_by_id( + // base_change_set_ctx, + // targets.destination_component_id, + // ) + // .await?; + // let mut incoming_connections = removed_component + // .incoming_connections(base_change_set_ctx) + // .await?; + // // We only care about connections going to the Component in the base that + // // are *ALSO* ones that we are considered to have removed. + // incoming_connections.retain(|connection| { + // removed_attribute_prototype_argument_ids + // .contains(&connection.attribute_prototype_argument_id) + // }); + // vacant_entry.insert(incoming_connections); + // } + // } + // } + + // for incoming_connections in incoming_connections_by_component_id.values() { + // for incoming_connection in incoming_connections { + // let from_component = virtual_and_real_components_by_id + // .get(&incoming_connection.from_component_id) + // .cloned() + // .ok_or(ComponentError::NotFound( + // incoming_connection.from_component_id, + // ))?; + // let to_component = virtual_and_real_components_by_id + // .get(&incoming_connection.to_component_id) + // .ok_or(ComponentError::NotFound( + // incoming_connection.to_component_id, + // ))?; + // let mut summary_diagram_edge = SummaryDiagramEdge::assemble( + // incoming_connection.clone(), + // from_component.to_owned(), + // to_component, + // ChangeStatus::Deleted, + // )?; + // summary_diagram_edge.from_base_change_set = true; + + // diagram_edges.push(summary_diagram_edge); + // } + // } } Ok(Self { diff --git a/lib/dal/src/job/definition/action.rs b/lib/dal/src/job/definition/action.rs index 2ce3488b76..897ff0ce73 100644 --- a/lib/dal/src/job/definition/action.rs +++ b/lib/dal/src/job/definition/action.rs @@ -1,4 +1,4 @@ -use std::convert::TryFrom; +use std::{collections::HashMap, convert::TryFrom}; use async_trait::async_trait; use serde::{Deserialize, Serialize}; @@ -266,10 +266,12 @@ async fn process_and_record_execution( Component::remove(&ctx, component.id()).await?; to_remove_nodes.push(component.id().into()); } else { + let mut diagram_sockets = HashMap::new(); let summary = SummaryDiagramComponent::assemble( &ctx, &component, ChangeStatus::Unmodified, + &mut diagram_sockets, ) .await?; WsEvent::resource_refreshed(&ctx, summary) @@ -278,9 +280,14 @@ async fn process_and_record_execution( .await?; } } else { - let summary = - SummaryDiagramComponent::assemble(&ctx, &component, ChangeStatus::Unmodified) - .await?; + let mut diagram_sockets = HashMap::new(); + let summary = SummaryDiagramComponent::assemble( + &ctx, + &component, + ChangeStatus::Unmodified, + &mut diagram_sockets, + ) + .await?; WsEvent::resource_refreshed(&ctx, summary) .await? .publish_on_commit(&ctx) diff --git a/lib/dal/src/job/definition/compute_validation.rs b/lib/dal/src/job/definition/compute_validation.rs index 9149d7531f..7b82844147 100644 --- a/lib/dal/src/job/definition/compute_validation.rs +++ b/lib/dal/src/job/definition/compute_validation.rs @@ -93,7 +93,7 @@ impl JobConsumer for ComputeValidation { // executing, as the job always operates on the current state of the change set's snapshot, not the state at the time // the job was created. if workspace_snapshot - .try_get_node_index_by_id(av_id) + .get_node_index_by_id_opt(av_id) .await .is_none() { diff --git a/lib/dal/src/workspace.rs b/lib/dal/src/workspace.rs index f1b40a2fa9..47a8d00483 100644 --- a/lib/dal/src/workspace.rs +++ b/lib/dal/src/workspace.rs @@ -531,8 +531,7 @@ impl Workspace { for change_set_data in change_sets { let imported_snapshot = WorkspaceSnapshot::from_bytes( &change_set_data.workspace_snapshot_serialized_data, - ) - .await?; + )?; // If base_change_set is default_change_set_base, it pointed to the builtin workspace // originally, so this change set needs to be the new default for the workspace - HEAD diff --git a/lib/dal/src/workspace_snapshot.rs b/lib/dal/src/workspace_snapshot.rs index 99f2f647f2..644b04c7a7 100644 --- a/lib/dal/src/workspace_snapshot.rs +++ b/lib/dal/src/workspace_snapshot.rs @@ -52,9 +52,7 @@ use tokio::task::JoinError; use self::node_weight::{NodeWeightDiscriminants, OrderingNodeWeight}; use crate::action::{Action, ActionError}; -use crate::attribute::prototype::argument::{ - AttributePrototypeArgument, AttributePrototypeArgumentError, AttributePrototypeArgumentId, -}; +use crate::attribute::prototype::argument::AttributePrototypeArgumentError; use crate::change_set::{ChangeSetError, ChangeSetId}; use crate::component::inferred_connection_graph::InferredConnectionGraph; use crate::slow_rt::{self, SlowRuntimeError}; @@ -65,9 +63,7 @@ use crate::workspace_snapshot::edge_weight::{ use crate::workspace_snapshot::graph::LineageId; use crate::workspace_snapshot::node_weight::category_node_weight::CategoryNodeKind; use crate::workspace_snapshot::node_weight::NodeWeight; -use crate::{ - pk, AttributeValueId, Component, ComponentError, ComponentId, Workspace, WorkspaceError, -}; +use crate::{pk, AttributeValueId, ComponentError, ComponentId, WorkspaceError}; use crate::{ workspace_snapshot::{graph::WorkspaceSnapshotGraphError, node_weight::NodeWeightError}, DalContext, TransactionsError, WorkspaceSnapshotGraphV2, @@ -505,7 +501,7 @@ impl WorkspaceSnapshot { )?) } - pub async fn from_bytes(bytes: &[u8]) -> WorkspaceSnapshotResult { + pub fn from_bytes(bytes: &[u8]) -> WorkspaceSnapshotResult { let graph: Arc = si_layer_cache::db::serialize::from_bytes(bytes)?; Ok(Self { @@ -529,23 +525,11 @@ impl WorkspaceSnapshot { Ok(new_node_index) } - #[instrument( - name = "workspace_snapshot.add_ordered_node", - level = "debug", - skip_all, - fields() - )] pub async fn add_ordered_node(&self, node: NodeWeight) -> WorkspaceSnapshotResult { let new_node_index = self.working_copy_mut().await.add_ordered_node(node)?; Ok(new_node_index) } - #[instrument( - name = "workspace_snapshot.update_content", - level = "debug", - skip_all, - fields() - )] pub async fn update_content( &self, id: Ulid, @@ -591,12 +575,6 @@ impl WorkspaceSnapshot { /// Add an edge to the graph, bypassing any cycle checks and using node /// indices directly. Use with care, since node indices are only reliably /// if the graph has not yet been modified. - #[instrument( - name = "workspace_snapshot.add_edge_unchecked", - level = "debug", - skip_all, - fields() - )] pub async fn add_edge_unchecked( &self, from_node_index: NodeIndex, @@ -609,12 +587,6 @@ impl WorkspaceSnapshot { .add_edge(from_node_index, edge_weight, to_node_index)?) } - #[instrument( - name = "workspace_snapshot.add_ordered_edge", - level = "debug", - skip_all, - fields() - )] pub async fn add_ordered_edge( &self, from_node_id: impl Into, @@ -658,12 +630,6 @@ impl WorkspaceSnapshot { /// Gives the exact node index endpoints of an edge. Use with care, since /// node indexes can't be relied on after modifications to the graph. - #[instrument( - name = "workspace_snapshot.edge_endpoints", - level = "debug", - skip_all, - fields() - )] pub async fn edge_endpoints( &self, edge_index: EdgeIndex, @@ -688,13 +654,6 @@ impl WorkspaceSnapshot { .import_component_subgraph(&other.read_only_graph, component_node_index)?) } - /// Calls [`WorkspaceSnapshotGraph::replace_references()`] - #[instrument( - name = "workspace_snapshot.replace_references", - level = "debug", - skip_all, - fields() - )] pub async fn replace_references( &self, original_node_index: NodeIndex, @@ -705,12 +664,6 @@ impl WorkspaceSnapshot { .replace_references(original_node_index)?) } - #[instrument( - name = "workspace_snapshot.get_node_weight_by_id", - level = "debug", - skip_all, - fields() - )] pub async fn get_node_weight_by_id( &self, id: impl Into, @@ -723,12 +676,6 @@ impl WorkspaceSnapshot { .to_owned()) } - #[instrument( - name = "workspace_snapshot.get_node_weight", - level = "debug", - skip_all, - fields() - )] pub async fn get_node_weight( &self, node_index: NodeIndex, @@ -757,12 +704,6 @@ impl WorkspaceSnapshot { .find_equivalent_node(id, lineage_id)?) } - #[instrument( - name = "workspace_snapshot.cleanup", - level = "debug", - skip_all, - fields() - )] pub async fn cleanup(&self) -> WorkspaceSnapshotResult<()> { self.working_copy_mut().await.cleanup(); Ok(()) @@ -816,16 +757,11 @@ impl WorkspaceSnapshot { self.working_copy().await.write_to_disk(file_suffix); } + /// Write the read only snapshot to disk. *WARNING* can panic! Use only for debugging pub fn write_readonly_graph_to_disk(&self, file_suffix: &str) { self.read_only_graph.write_to_disk(file_suffix); } - #[instrument( - name = "workspace_snapshot.get_node_index_by_id", - level = "debug", - skip_all, - fields() - )] pub async fn get_node_index_by_id( &self, id: impl Into, @@ -833,22 +769,10 @@ impl WorkspaceSnapshot { Ok(self.working_copy().await.get_node_index_by_id(id)?) } - #[instrument( - name = "workspace_snapshot.try_get_node_index_by_id", - level = "debug", - skip_all, - fields() - )] - pub async fn try_get_node_index_by_id(&self, id: impl Into) -> Option { + pub async fn get_node_index_by_id_opt(&self, id: impl Into) -> Option { self.working_copy().await.get_node_index_by_id_opt(id) } - #[instrument( - name = "workspace_snapshot.get_latest_node_index", - level = "debug", - skip_all, - fields() - )] pub async fn get_latest_node_index( &self, node_index: NodeIndex, @@ -939,12 +863,6 @@ impl WorkspaceSnapshot { Err(WorkspaceSnapshotError::WorkspaceSnapshotNotFetched) } - #[instrument( - name = "workspace_snapshot.get_category_node", - level = "debug", - skip_all, - fields() - )] pub async fn get_category_node_or_err( &self, source: Option, @@ -955,12 +873,6 @@ impl WorkspaceSnapshot { .ok_or(WorkspaceSnapshotError::CategoryNodeNotFound(kind)) } - #[instrument( - name = "workspace_snapshot.get_category_node_opt", - level = "debug", - skip_all, - fields() - )] pub async fn get_category_node( &self, source: Option, @@ -973,12 +885,6 @@ impl WorkspaceSnapshot { .map(|(category_node_id, _)| category_node_id)) } - #[instrument( - name = "workspace_snapshot.edges_directed", - level = "debug", - skip_all, - fields() - )] pub async fn edges_directed( &self, id: impl Into, @@ -999,12 +905,6 @@ impl WorkspaceSnapshot { .collect()) } - #[instrument( - name = "workspace_snapshot.edges_directed_for_edge_weight_kind", - level = "debug", - skip_all, - fields() - )] pub async fn edges_directed_for_edge_weight_kind( &self, id: impl Into, @@ -1019,12 +919,6 @@ impl WorkspaceSnapshot { .edges_directed_for_edge_weight_kind(node_index, direction, edge_kind)) } - #[instrument( - name = "workspace_snapshot.edges_directed_by_index", - level = "debug", - skip_all, - fields() - )] pub async fn edges_directed_by_index( &self, node_index: NodeIndex, @@ -1044,12 +938,6 @@ impl WorkspaceSnapshot { .collect()) } - #[instrument( - name = "workspace_snapshot.remove_all_edges", - level = "debug", - skip_all, - fields() - )] pub async fn remove_all_edges(&self, id: impl Into) -> WorkspaceSnapshotResult<()> { let id = id.into(); for (edge_weight, source, target) in self.edges_directed(id, Direction::Outgoing).await? { @@ -1063,12 +951,6 @@ impl WorkspaceSnapshot { Ok(()) } - #[instrument( - name = "workspace_snapshot.incoming_sources_for_edge_weight_kind", - level = "debug", - skip_all, - fields() - )] pub async fn incoming_sources_for_edge_weight_kind( &self, id: impl Into, @@ -1088,12 +970,6 @@ impl WorkspaceSnapshot { .collect()) } - #[instrument( - name = "workspace_snapshot.outgoing_targets_for_edge_weight_kind", - level = "debug", - skip_all, - fields() - )] pub async fn outgoing_targets_for_edge_weight_kind( &self, id: impl Into, @@ -1114,12 +990,6 @@ impl WorkspaceSnapshot { .collect()) } - #[instrument( - name = "workspace_snapshot.outgoing_targets_for_edge_weight_kind_by_index", - level = "debug", - skip_all, - fields() - )] pub async fn outgoing_targets_for_edge_weight_kind_by_index( &self, node_index: NodeIndex, @@ -1139,12 +1009,6 @@ impl WorkspaceSnapshot { .collect()) } - #[instrument( - name = "workspace_snapshot.all_outgoing_targets", - level = "debug", - skip_all, - fields() - )] pub async fn all_outgoing_targets( &self, id: impl Into, @@ -1165,12 +1029,6 @@ impl WorkspaceSnapshot { Ok(result) } - #[instrument( - name = "workspace_snapshot.all_incoming_sources", - level = "debug", - skip_all, - fields() - )] pub async fn all_incoming_sources( &self, id: impl Into, @@ -1191,12 +1049,6 @@ impl WorkspaceSnapshot { Ok(result) } - #[instrument( - name = "workspace_snapshot.remove_incoming_edges_of_kind", - level = "debug", - skip_all, - fields() - )] pub async fn remove_incoming_edges_of_kind( &self, target_id: impl Into, @@ -1255,12 +1107,6 @@ impl WorkspaceSnapshot { Ok(()) } - #[instrument( - name = "workspace_snapshot.remove_edge", - level = "debug", - skip_all, - fields() - )] pub async fn remove_edge( &self, source_node_index: NodeIndex, @@ -1289,12 +1135,6 @@ impl WorkspaceSnapshot { .map(ToOwned::to_owned)) } - #[instrument( - name = "workspace_snapshot.remove_edge_for_ulids", - level = "debug", - skip_all, - fields() - )] pub async fn remove_edge_for_ulids( &self, source_node_id: impl Into, @@ -1336,12 +1176,6 @@ impl WorkspaceSnapshot { /// Mark whether a prop can be used as an input to a function. Props below /// Maps and Arrays are not valid inputs. Must only be used when /// "finalizing" a schema variant! - #[instrument( - name = "workspace_snapshot.mark_prop_as_able_to_be_used_as_prototype_arg", - level = "debug", - skip_all, - fields() - )] pub async fn mark_prop_as_able_to_be_used_as_prototype_arg( &self, node_index: NodeIndex, @@ -1359,12 +1193,6 @@ impl WorkspaceSnapshot { Ok(()) } - #[instrument( - name = "workspace_snapshot.ordering_node_for_container", - level = "debug", - skip_all, - fields() - )] pub async fn ordering_node_for_container( &self, id: impl Into, @@ -1373,12 +1201,6 @@ impl WorkspaceSnapshot { Ok(self.working_copy().await.ordering_node_for_container(idx)?) } - #[instrument( - name = "workspace_snapshot.update_node_id", - level = "debug", - skip_all, - fields() - )] pub async fn update_node_id( &self, current_id: impl Into, @@ -1396,12 +1218,6 @@ impl WorkspaceSnapshot { Ok(()) } - #[instrument( - name = "workspace_snapshot.ordered_children_for_node", - level = "debug", - skip_all, - fields() - )] pub async fn ordered_children_for_node( &self, id: impl Into, @@ -1421,357 +1237,6 @@ impl WorkspaceSnapshot { ) } - #[instrument( - name = "workspace_snapshot.components_added_relative_to_base", - level = "info", - skip_all - )] - pub async fn components_added_relative_to_base( - &self, - ctx: &DalContext, - ) -> WorkspaceSnapshotResult> { - let mut new_component_ids = Vec::new(); - let base_change_set_id = if let Some(change_set_id) = ctx.change_set()?.base_change_set_id { - change_set_id - } else { - return Ok(new_component_ids); - }; - - // Even though the default change set for a workspace can have a base change set, we don't - // want to consider anything as new/modified/removed when looking at the default change - // set. - let workspace = Workspace::get_by_pk_or_error( - ctx, - &ctx.tenancy() - .workspace_pk() - .ok_or(WorkspaceSnapshotError::WorkspaceMissing)?, - ) - .await - .map_err(Box::new)?; - if workspace.default_change_set_id() == ctx.change_set_id() { - return Ok(new_component_ids); - } - - let base_snapshot = WorkspaceSnapshot::find_for_change_set(ctx, base_change_set_id).await?; - - let updates = base_snapshot - .read_only_graph - .detect_updates(&self.read_only_graph); - - for update in &updates { - match update { - Update::RemoveEdge { .. } | Update::ReplaceNode { .. } | Update::NewEdge { .. } => { - } - Update::NewNode { node_weight } => { - if let NodeWeight::Component(inner) = node_weight { - if base_snapshot - .read_only_graph - .get_node_index_by_id_opt(inner.id) - .is_none() - { - new_component_ids.push(inner.id.into()) - } - } - } - } - } - - Ok(new_component_ids) - } - - #[instrument( - name = "workspace_snapshot.components_removed_relative_to_base", - level = "debug", - skip_all - )] - pub async fn components_removed_relative_to_base( - &self, - ctx: &DalContext, - ) -> WorkspaceSnapshotResult> { - let mut removed_component_ids = Vec::new(); - let base_change_set_id = if let Some(change_set_id) = ctx.change_set()?.base_change_set_id { - change_set_id - } else { - return Ok(removed_component_ids); - }; - - // Even though the default change set for a workspace can have a base change set, we don't - // want to consider anything as new/modified/removed when looking at the default change - // set. - let workspace = Workspace::get_by_pk_or_error( - ctx, - &ctx.tenancy() - .workspace_pk() - .ok_or(WorkspaceSnapshotError::WorkspaceMissing)?, - ) - .await - .map_err(Box::new)?; - if workspace.default_change_set_id() == ctx.change_set_id() { - return Ok(removed_component_ids); - } - - let base_snapshot = WorkspaceSnapshot::find_for_change_set(ctx, base_change_set_id).await?; - let component_category_id = if let Some((category_id, _)) = base_snapshot - .read_only_graph - .get_category_node(None, CategoryNodeKind::Component)? - { - category_id - } else { - // Can't have any new Components if there's no Component category node to put them - // under. - return Ok(removed_component_ids); - }; - - let updates = base_snapshot - .read_only_graph - .detect_updates(&self.read_only_graph); - - let mut added_component_ids = HashSet::new(); - - for update in &updates { - match update { - Update::ReplaceNode { .. } | Update::NewNode { .. } => { - /* Updates unused for determining if a Component is removed in regard to the updates */ - } - Update::NewEdge { - source, - destination, - edge_weight: _, - } => { - // get updates that add an edge from the Components category to a component, which implies component creation - if source.id != component_category_id.into() - || destination.node_weight_kind != NodeWeightDiscriminants::Component - { - continue; - } - - added_component_ids.insert(ComponentId::from(Ulid::from(destination.id))); - } - Update::RemoveEdge { - source, - destination, - edge_kind: _, - } => { - // get updates that remove an edge from the Components category to a component, which implies component deletion - if source.id != component_category_id.into() - || destination.node_weight_kind != NodeWeightDiscriminants::Component - { - continue; - } - - removed_component_ids.push(ComponentId::from(Ulid::from(destination.id))); - } - } - } - - // Filter out ComponentIds that have both been deleted and created, since that implies an upgrade and not a real deletion - Ok(removed_component_ids - .into_iter() - .filter(|id| !added_component_ids.contains(id)) - .collect()) - } - - #[instrument( - name = "workspace_snapshot.socket_edges_added_relative_to_base", - level = "debug", - skip_all - )] - pub async fn socket_edges_added_relative_to_base( - &self, - ctx: &DalContext, - ) -> WorkspaceSnapshotResult> { - let mut new_attribute_prototype_argument_ids = Vec::new(); - let base_change_set_id = if let Some(change_set_id) = ctx.change_set()?.base_change_set_id { - change_set_id - } else { - return Ok(new_attribute_prototype_argument_ids); - }; - - // Even though the default change set for a workspace can have a base change set, we don't - // want to consider anything as new/modified/removed when looking at the default change - // set. - let workspace = Workspace::get_by_pk_or_error( - ctx, - &ctx.tenancy() - .workspace_pk() - .ok_or(WorkspaceSnapshotError::WorkspaceMissing)?, - ) - .await - .map_err(Box::new)?; - if workspace.default_change_set_id() == ctx.change_set_id() { - return Ok(new_attribute_prototype_argument_ids); - } - - let base_snapshot = WorkspaceSnapshot::find_for_change_set(ctx, base_change_set_id).await?; - - let updates = base_snapshot - .read_only_graph - .detect_updates(&self.read_only_graph); - - for update in &updates { - match update { - Update::NewNode { .. } | Update::RemoveEdge { .. } | Update::ReplaceNode { .. } => { - // Updates unused for determining if a socket to socket connection (in frontend - // terms) is new. - } - Update::NewEdge { - source: _source, - destination, - edge_weight: _, - } => { - if destination.node_weight_kind - != NodeWeightDiscriminants::AttributePrototypeArgument - { - // We're interested in new AttributePrototypeArguments as they represent - // the connection between sockets. (The input socket has the output - // socket as one of its function arguments.) - continue; - } - - let prototype_argument = AttributePrototypeArgument::get_by_id( - ctx, - AttributePrototypeArgumentId::from(Ulid::from(destination.id)), - ) - .await - .map_err(Box::new)?; - if prototype_argument.targets().is_some() { - new_attribute_prototype_argument_ids.push(prototype_argument.id()); - } else { - // If the AttributePrototypeArgument doesn't have targets, then it's - // not for a socket to socket connection. - continue; - } - } - } - } - - Ok(new_attribute_prototype_argument_ids) - } - - #[instrument( - name = "workspace_snapshot.socket_edges_removed_relative_to_base", - level = "debug", - skip_all - )] - pub async fn socket_edges_removed_relative_to_base( - &self, - ctx: &DalContext, - ) -> WorkspaceSnapshotResult> { - let mut removed_attribute_prototype_argument_ids = Vec::new(); - - // Even though the default change set for a workspace can have a base change set, we don't - // want to consider anything as new/modified/removed when looking at the default change - // set. - let workspace = Workspace::get_by_pk_or_error( - ctx, - &ctx.tenancy() - .workspace_pk() - .ok_or(WorkspaceSnapshotError::WorkspaceMissing)?, - ) - .await - .map_err(Box::new)?; - if workspace.default_change_set_id() == ctx.change_set_id() { - return Ok(removed_attribute_prototype_argument_ids); - } - - let base_change_set_ctx = ctx.clone_with_base().await?; - let base_change_set_ctx = &base_change_set_ctx; - - // * For each Component being removed (all edges to/from removed components should also - // show as removed): - let removed_component_ids: HashSet = self - .components_removed_relative_to_base(ctx) - .await? - .iter() - .copied() - .collect(); - let remaining_component_ids: HashSet = Component::list(ctx) - .await - .map_err(Box::new)? - .iter() - .map(Component::id) - .collect(); - for removed_component_id in &removed_component_ids { - let base_change_set_component = - Component::get_by_id(base_change_set_ctx, *removed_component_id) - .await - .map_err(Box::new)?; - - // * Get incoming edges - for incoming_connection in base_change_set_component - .incoming_connections(base_change_set_ctx) - .await - .map_err(Box::new)? - { - //* Interested in: - // * Edge is coming from a Component being removed - // * Edge is coming from a Component that exists in current change set - if removed_component_ids.contains(&incoming_connection.from_component_id) - || remaining_component_ids.contains(&incoming_connection.from_component_id) - { - removed_attribute_prototype_argument_ids - .push(incoming_connection.attribute_prototype_argument_id); - } - } - - //* Get outgoing edges - for outgoing_connection in base_change_set_component - .outgoing_connections(base_change_set_ctx) - .await - .map_err(Box::new)? - { - // * Interested in: - // * Edge is going to a Component being removed - // * Edge is going to a Component that exists in current change set - if removed_component_ids.contains(&outgoing_connection.to_component_id) - || remaining_component_ids.contains(&outgoing_connection.to_component_id) - { - removed_attribute_prototype_argument_ids - .push(outgoing_connection.attribute_prototype_argument_id); - } - } - } - - // * For each removed AttributePrototypeArgument (removed edge connects two Components - // that have not been removed): - let base_snapshot = base_change_set_ctx.workspace_snapshot()?; - let updates = base_snapshot - .read_only_graph - .detect_updates(&self.read_only_graph); - - for update in updates { - match update { - Update::ReplaceNode { .. } | Update::NewEdge { .. } | Update::NewNode { .. } => { - /* Updates unused for determining if a connection between sockets has been removed */ - } - Update::RemoveEdge { - source: _, - destination, - edge_kind, - } => { - if edge_kind != EdgeWeightKindDiscriminants::PrototypeArgument { - continue; - } - let attribute_prototype_argument = AttributePrototypeArgument::get_by_id( - base_change_set_ctx, - AttributePrototypeArgumentId::from(Ulid::from(destination.id)), - ) - .await - .map_err(Box::new)?; - - // * Interested in all of them that have targets (connecting two Components - // via sockets). - if attribute_prototype_argument.targets().is_some() { - removed_attribute_prototype_argument_ids - .push(attribute_prototype_argument.id()); - } - } - } - } - - Ok(removed_attribute_prototype_argument_ids) - } - /// Returns whether or not any Actions were dispatched. pub async fn dispatch_actions(ctx: &DalContext) -> WorkspaceSnapshotResult { let mut did_dispatch = false; diff --git a/lib/sdf-server/src/server/service/component/delete_property_editor_value.rs b/lib/sdf-server/src/server/service/component/delete_property_editor_value.rs index 6afdff0295..360f5d5200 100644 --- a/lib/sdf-server/src/server/service/component/delete_property_editor_value.rs +++ b/lib/sdf-server/src/server/service/component/delete_property_editor_value.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use crate::server::extract::{AccessBuilder, HandlerContext}; use crate::service::component::ComponentResult; use axum::response::IntoResponse; @@ -33,8 +35,14 @@ pub async fn delete_property_editor_value( AttributeValue::remove_by_id(&ctx, request.attribute_value_id).await?; let component = Component::get_by_id(&ctx, request.component_id).await?; - let payload: SummaryDiagramComponent = - SummaryDiagramComponent::assemble(&ctx, &component, ChangeStatus::Unmodified).await?; + let mut socket_map = HashMap::new(); + let payload: SummaryDiagramComponent = SummaryDiagramComponent::assemble( + &ctx, + &component, + ChangeStatus::Unmodified, + &mut socket_map, + ) + .await?; WsEvent::component_updated(&ctx, payload) .await? .publish_on_commit(&ctx) diff --git a/lib/sdf-server/src/server/service/component/insert_property_editor_value.rs b/lib/sdf-server/src/server/service/component/insert_property_editor_value.rs index 0114d17a50..58726fabbf 100644 --- a/lib/sdf-server/src/server/service/component/insert_property_editor_value.rs +++ b/lib/sdf-server/src/server/service/component/insert_property_editor_value.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use axum::{response::IntoResponse, Json}; use serde::{Deserialize, Serialize}; @@ -40,8 +42,14 @@ pub async fn insert_property_editor_value( .await?; let component: Component = Component::get_by_id(&ctx, request.component_id).await?; - let payload: SummaryDiagramComponent = - SummaryDiagramComponent::assemble(&ctx, &component, ChangeStatus::Unmodified).await?; + let mut socket_map = HashMap::new(); + let payload: SummaryDiagramComponent = SummaryDiagramComponent::assemble( + &ctx, + &component, + ChangeStatus::Unmodified, + &mut socket_map, + ) + .await?; WsEvent::component_updated(&ctx, payload) .await? .publish_on_commit(&ctx) diff --git a/lib/sdf-server/src/server/service/component/set_type.rs b/lib/sdf-server/src/server/service/component/set_type.rs index f5156835c3..fbe5b85d83 100644 --- a/lib/sdf-server/src/server/service/component/set_type.rs +++ b/lib/sdf-server/src/server/service/component/set_type.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use axum::extract::{Host, OriginalUri}; use axum::{response::IntoResponse, Json}; @@ -56,8 +58,14 @@ pub async fn set_type( let component = Component::get_by_id(&ctx, component_id).await?; // TODO: We'll want to figure out whether this component is Added/Modified, depending on // whether it existed in the base change set already or not. - let payload: SummaryDiagramComponent = - SummaryDiagramComponent::assemble(&ctx, &component, ChangeStatus::Unmodified).await?; + let mut socket_map = HashMap::new(); + let payload: SummaryDiagramComponent = SummaryDiagramComponent::assemble( + &ctx, + &component, + ChangeStatus::Unmodified, + &mut socket_map, + ) + .await?; WsEvent::component_updated(&ctx, payload) .await? .publish_on_commit(&ctx) diff --git a/lib/sdf-server/src/server/service/component/update_property_editor_value.rs b/lib/sdf-server/src/server/service/component/update_property_editor_value.rs index 3c3cbcb2e6..4376cd731f 100644 --- a/lib/sdf-server/src/server/service/component/update_property_editor_value.rs +++ b/lib/sdf-server/src/server/service/component/update_property_editor_value.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use axum::extract::{Host, OriginalUri}; use axum::{response::IntoResponse, Json}; use dal::change_status::ChangeStatus; @@ -88,8 +90,14 @@ pub async fn update_property_editor_value( ); } - let payload: SummaryDiagramComponent = - SummaryDiagramComponent::assemble(&ctx, &component, ChangeStatus::Unmodified).await?; + let mut socket_map = HashMap::new(); + let payload: SummaryDiagramComponent = SummaryDiagramComponent::assemble( + &ctx, + &component, + ChangeStatus::Unmodified, + &mut socket_map, + ) + .await?; WsEvent::component_updated(&ctx, payload) .await? .publish_on_commit(&ctx) diff --git a/lib/sdf-server/src/server/service/component/upgrade.rs b/lib/sdf-server/src/server/service/component/upgrade.rs index 125e5c56fe..7dd844e33a 100644 --- a/lib/sdf-server/src/server/service/component/upgrade.rs +++ b/lib/sdf-server/src/server/service/component/upgrade.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use crate::server::extract::{AccessBuilder, HandlerContext, PosthogClient}; use crate::server::tracking::track; use crate::service::component::{ComponentError, ComponentResult}; @@ -75,9 +77,14 @@ pub async fn upgrade( }), ); - let payload: SummaryDiagramComponent = - SummaryDiagramComponent::assemble(&ctx, &upgraded_component, ChangeStatus::Unmodified) - .await?; + let mut socket_map = HashMap::new(); + let payload: SummaryDiagramComponent = SummaryDiagramComponent::assemble( + &ctx, + &upgraded_component, + ChangeStatus::Unmodified, + &mut socket_map, + ) + .await?; WsEvent::component_upgraded(&ctx, payload, request.component_id) .await? .publish_on_commit(&ctx) diff --git a/lib/sdf-server/src/server/service/diagram.rs b/lib/sdf-server/src/server/service/diagram.rs index 204f06f88d..49af57a88a 100644 --- a/lib/sdf-server/src/server/service/diagram.rs +++ b/lib/sdf-server/src/server/service/diagram.rs @@ -7,6 +7,7 @@ use dal::attribute::prototype::argument::AttributePrototypeArgumentError; use dal::attribute::prototype::AttributePrototypeError; use dal::attribute::value::AttributeValueError; use dal::component::ComponentError; +use dal::slow_rt::SlowRuntimeError; use dal::socket::input::InputSocketError; use dal::socket::output::OutputSocketError; use dal::workspace_snapshot::WorkspaceSnapshotError; @@ -15,6 +16,7 @@ use dal::{ChangeSetError, SchemaVariantId, StandardModelError, TransactionsError use std::num::ParseFloatError; use telemetry::prelude::*; use thiserror::Error; +use tokio::task::JoinError; use crate::server::state::AppState; @@ -70,6 +72,8 @@ pub enum DiagramError { InvalidRequest, #[error("invalid system")] InvalidSystem, + #[error("tokio join error: {0}")] + Join(#[from] JoinError), #[error(transparent)] Nats(#[from] si_data_nats::NatsError), #[error("not authorized")] @@ -88,6 +92,8 @@ pub enum DiagramError { SchemaNotFound, #[error("serde error: {0}")] Serde(#[from] serde_json::Error), + #[error("slow runtime error: {0}")] + SlowRuntime(#[from] SlowRuntimeError), #[error("socket not found")] SocketNotFound, #[error(transparent)] diff --git a/lib/sdf-server/src/server/service/diagram/delete_component.rs b/lib/sdf-server/src/server/service/diagram/delete_component.rs index 4305e353ec..f017cabed6 100644 --- a/lib/sdf-server/src/server/service/diagram/delete_component.rs +++ b/lib/sdf-server/src/server/service/diagram/delete_component.rs @@ -35,6 +35,7 @@ pub async fn delete_components( let force_change_set_id = ChangeSet::force_new(&mut ctx).await?; let mut components = HashMap::new(); + let mut socket_map = HashMap::new(); for component_id in request.component_ids { let component_still_exists = delete_single_component( &ctx, @@ -50,8 +51,13 @@ pub async fn delete_components( if component_still_exists { // to_delete=True let component: Component = Component::get_by_id(&ctx, component_id).await?; - let payload: SummaryDiagramComponent = - SummaryDiagramComponent::assemble(&ctx, &component, ChangeStatus::Deleted).await?; + let payload: SummaryDiagramComponent = SummaryDiagramComponent::assemble( + &ctx, + &component, + ChangeStatus::Deleted, + &mut socket_map, + ) + .await?; WsEvent::component_updated(&ctx, payload) .await? .publish_on_commit(&ctx) diff --git a/lib/sdf-server/src/server/service/diagram/get_diagram.rs b/lib/sdf-server/src/server/service/diagram/get_diagram.rs index 0b47ae2dfb..7be2b3414a 100644 --- a/lib/sdf-server/src/server/service/diagram/get_diagram.rs +++ b/lib/sdf-server/src/server/service/diagram/get_diagram.rs @@ -1,10 +1,10 @@ use axum::extract::{Host, OriginalUri}; use axum::{extract::Query, Json}; use dal::diagram::Diagram; -use dal::Visibility; +use dal::{slow_rt, Visibility}; use serde::{Deserialize, Serialize}; -use super::DiagramResult; +use super::{DiagramError, DiagramResult}; use crate::server::extract::{AccessBuilder, HandlerContext, PosthogClient}; use crate::server::tracking::track; @@ -26,7 +26,13 @@ pub async fn get_diagram( Query(request): Query, ) -> DiagramResult> { let ctx = builder.build(request_ctx.build(request.visibility)).await?; - let response = Diagram::assemble(&ctx).await?; + let ctx_clone = ctx.clone(); + + let response = slow_rt::spawn(async move { + let ctx = &ctx_clone; + Ok::(Diagram::assemble(ctx).await?) + })? + .await??; track( &posthog_client, diff --git a/lib/sdf-server/src/server/service/diagram/set_component_position.rs b/lib/sdf-server/src/server/service/diagram/set_component_position.rs index 072cd850aa..5c1abea46a 100644 --- a/lib/sdf-server/src/server/service/diagram/set_component_position.rs +++ b/lib/sdf-server/src/server/service/diagram/set_component_position.rs @@ -48,23 +48,32 @@ pub async fn set_component_position( let mut components: Vec = vec![]; let mut diagram_inferred_edges: Vec = vec![]; + let mut socket_map = HashMap::new(); for (id, update) in request.data_by_component_id { let mut component = Component::get_by_id(&ctx, id).await?; if update.detach { Frame::orphan_child(&ctx, component.id()).await?; - let payload: SummaryDiagramComponent = - SummaryDiagramComponent::assemble(&ctx, &component, ChangeStatus::Unmodified) - .await?; + let payload: SummaryDiagramComponent = SummaryDiagramComponent::assemble( + &ctx, + &component, + ChangeStatus::Unmodified, + &mut socket_map, + ) + .await?; WsEvent::component_updated(&ctx, payload) .await? .publish_on_commit(&ctx) .await?; } else if let Some(new_parent) = update.new_parent { Frame::upsert_parent(&ctx, component.id(), new_parent).await?; - let payload: SummaryDiagramComponent = - SummaryDiagramComponent::assemble(&ctx, &component, ChangeStatus::Unmodified) - .await?; + let payload: SummaryDiagramComponent = SummaryDiagramComponent::assemble( + &ctx, + &component, + ChangeStatus::Unmodified, + &mut socket_map, + ) + .await?; WsEvent::component_updated(&ctx, payload) .await? .publish_on_commit(&ctx)