Skip to content

Commit

Permalink
Merge pull request #5224 from systeminit/zack/send-inferred-edge-even…
Browse files Browse the repository at this point in the history
…ts-after-component-events

fix: send inferred edge events after component events in mgmt fns
  • Loading branch information
stack72 authored Jan 14, 2025
2 parents 2e6770c + 72ccfb7 commit 18386f6
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 22 deletions.
69 changes: 54 additions & 15 deletions lib/dal/src/component/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ pub type FrameResult<T> = Result<T, FrameError>;
/// A unit struct containing logic for working with frames.
pub struct Frame;

pub struct InferredEdgeChanges {
pub removed_edges: Vec<SummaryDiagramInferredEdge>,
pub upserted_edges: Vec<SummaryDiagramInferredEdge>,
}

impl Frame {
/// Given a [`ComponentId`] and either the parent or a child of it,
/// calculate what needs to be updated given the change in [`ComponentType`]
Expand Down Expand Up @@ -164,7 +169,28 @@ impl Frame {
ctx: &DalContext,
child_id: ComponentId,
new_parent_id: ComponentId,
) -> FrameResult<Option<Vec<SummaryDiagramInferredEdge>>> {
) -> FrameResult<Option<InferredEdgeChanges>> {
Self::upsert_parent_inner(ctx, child_id, new_parent_id, true).await
}

/// Provides the ability to attach or replace a child [`Component`]'s
/// parent, but does not automatically send WsEvents related to the changes
#[instrument(level = "info", skip(ctx))]
pub async fn upsert_parent_no_events(
ctx: &DalContext,
child_id: ComponentId,
new_parent_id: ComponentId,
) -> FrameResult<Option<InferredEdgeChanges>> {
Self::upsert_parent_inner(ctx, child_id, new_parent_id, false).await
}

#[instrument(level = "info", skip(ctx))]
async fn upsert_parent_inner(
ctx: &DalContext,
child_id: ComponentId,
new_parent_id: ComponentId,
send_events: bool,
) -> FrameResult<Option<InferredEdgeChanges>> {
// let's see if we need to even do anything
if let Some(current_parent_id) = Component::get_parent_by_id(ctx, child_id).await? {
if current_parent_id == new_parent_id {
Expand All @@ -173,9 +199,12 @@ impl Frame {
}

match Component::get_type_by_id(ctx, new_parent_id).await? {
ComponentType::ConfigurationFrameDown | ComponentType::ConfigurationFrameUp => Ok(
Some(Self::attach_child_to_parent_inner(ctx, new_parent_id, child_id).await?),
),
ComponentType::ConfigurationFrameDown | ComponentType::ConfigurationFrameUp => {
Ok(Some(
Self::attach_child_to_parent_inner(ctx, new_parent_id, child_id, send_events)
.await?,
))
}
ComponentType::Component => Err(FrameError::ParentIsNotAFrame(child_id, new_parent_id)),
ComponentType::AggregationFrame => {
Err(FrameError::AggregateFramesUnsupported(new_parent_id))
Expand All @@ -191,7 +220,8 @@ impl Frame {
ctx: &DalContext,
parent_id: ComponentId,
child_id: ComponentId,
) -> FrameResult<Vec<SummaryDiagramInferredEdge>> {
send_events: bool,
) -> FrameResult<InferredEdgeChanges> {
// cache current map of input <-> output sockets based on what the parent knows about right now!!!!
let initial_impacted_values: HashSet<SocketAttributeValuePair> =
Self::get_all_inferred_connections_for_component_tree(ctx, parent_id, child_id).await?;
Expand Down Expand Up @@ -256,14 +286,17 @@ impl Frame {
to_delete: false, // irrelevant
})
}
WsEvent::remove_inferred_edges(ctx, inferred_edges_to_remove.clone())
.await?
.publish_on_commit(ctx)
.await?;

if send_events {
WsEvent::remove_inferred_edges(ctx, inferred_edges_to_remove.clone())
.await?
.publish_on_commit(ctx)
.await?;
}

// After we let the frontend know what edges should be removed, now we should handle upsertion.
let removed_edges_to_skip: HashSet<SummaryDiagramInferredEdge> =
HashSet::from_iter(inferred_edges_to_remove.into_iter());
HashSet::from_iter(inferred_edges_to_remove.clone().into_iter());

let mut inferred_edges_to_upsert = Vec::new();
for pair in &current_impacted_values {
Expand All @@ -278,10 +311,13 @@ impl Frame {
inferred_edges_to_upsert.push(edge);
}
}
WsEvent::upsert_inferred_edges(ctx, inferred_edges_to_upsert.clone())
.await?
.publish_on_commit(ctx)
.await?;

if send_events {
WsEvent::upsert_inferred_edges(ctx, inferred_edges_to_upsert.clone())
.await?
.publish_on_commit(ctx)
.await?;
}

// an input socket needs to rerun if:
// the input socket has a new/different output socket driving it
Expand Down Expand Up @@ -309,7 +345,10 @@ impl Frame {
)
.await?;

Ok(inferred_edges_to_upsert)
Ok(InferredEdgeChanges {
removed_edges: inferred_edges_to_remove,
upserted_edges: inferred_edges_to_upsert,
})
}

#[instrument(
Expand Down
46 changes: 40 additions & 6 deletions lib/dal/src/management/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use thiserror::Error;
use veritech_client::{ManagementFuncStatus, ManagementResultSuccess};

use crate::component::delete::{delete_components, ComponentDeletionStatus};
use crate::component::frame::{Frame, FrameError};
use crate::component::frame::{Frame, FrameError, InferredEdgeChanges};
use crate::dependency_graph::DependencyGraph;
use crate::diagram::geometry::Geometry;
use crate::diagram::view::{View, ViewComponentsUpdateSingle, ViewId, ViewView};
Expand Down Expand Up @@ -480,6 +480,8 @@ struct PendingManage {
#[derive(Clone, Debug)]
enum PendingOperation {
Connect(PendingConnect),
InferredEdgeRemoveEvent(Vec<SummaryDiagramInferredEdge>),
InferredEdgeUpsertEvent(Vec<SummaryDiagramInferredEdge>),
Manage(PendingManage),
Parent(PendingParent),
RemoveConnection(PendingConnect),
Expand Down Expand Up @@ -862,9 +864,10 @@ impl<'a> ManagementOperator<'a> {
&self,
child_id: ComponentId,
parent_placeholder: &String,
) -> ManagementResult<(ComponentId, Option<Vec<SummaryDiagramInferredEdge>>)> {
) -> ManagementResult<(ComponentId, Option<InferredEdgeChanges>)> {
let new_parent_id = self.get_real_component_id(parent_placeholder)?;
let inferred_edges = Frame::upsert_parent(self.ctx, child_id, new_parent_id).await?;
let inferred_edges =
Frame::upsert_parent_no_events(self.ctx, child_id, new_parent_id).await?;

Ok((new_parent_id, inferred_edges))
}
Expand Down Expand Up @@ -1406,6 +1409,8 @@ impl<'a> ManagementOperator<'a> {
let mut inferred_edges_by_component_id = HashMap::new();

// Parents have to be set before component events are sent

let mut new_pending_ops_from_parentage = vec![];
for pending_parent in pending_operations
.iter()
.filter_map(|pending_op| match pending_op {
Expand All @@ -1417,8 +1422,22 @@ impl<'a> ManagementOperator<'a> {
.set_parent(pending_parent.child_component_id, &pending_parent.parent)
.await?;
if let Some(inferred_edges) = inferred_edges {
inferred_edges_by_component_id
.insert(pending_parent.child_component_id, inferred_edges);
inferred_edges_by_component_id.insert(
pending_parent.child_component_id,
inferred_edges.upserted_edges.clone(),
);

// Inferred edge events should also come after all component events
if !inferred_edges.upserted_edges.is_empty() {
new_pending_ops_from_parentage.push(PendingOperation::InferredEdgeUpsertEvent(
inferred_edges.upserted_edges,
));
}
if !inferred_edges.removed_edges.is_empty() {
new_pending_ops_from_parentage.push(PendingOperation::InferredEdgeRemoveEvent(
inferred_edges.removed_edges,
));
}
}

component_graph.id_depends_on(pending_parent.child_component_id, parent_id);
Expand All @@ -1434,7 +1453,10 @@ impl<'a> ManagementOperator<'a> {
// their wsevents sent *after* the component ws events (otherwise some
// will be discarded by the frontend, since it does not know about the
// newly created components until the above events are sent)
for pending_op in pending_operations {
for pending_op in pending_operations
.into_iter()
.chain(new_pending_ops_from_parentage.into_iter())
{
match pending_op {
PendingOperation::Connect(pending_connect) => {
self.create_connection(
Expand All @@ -1454,6 +1476,18 @@ impl<'a> ManagementOperator<'a> {
self.manage(managed_component_id, managed_component_schema_id)
.await?;
}
PendingOperation::InferredEdgeRemoveEvent(removed_edges) => {
WsEvent::remove_inferred_edges(self.ctx, removed_edges)
.await?
.publish_on_commit(self.ctx)
.await?;
}
PendingOperation::InferredEdgeUpsertEvent(upserted_edges) => {
WsEvent::upsert_inferred_edges(self.ctx, upserted_edges)
.await?
.publish_on_commit(self.ctx)
.await?;
}
PendingOperation::Parent(_) => {}
}
}
Expand Down
4 changes: 3 additions & 1 deletion lib/sdf-server/src/service/v2/view/create_component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ pub async fn create_component(

let mut maybe_inferred_edges = None;
if let Some(frame_id) = request.parent_id {
maybe_inferred_edges = Frame::upsert_parent(&ctx, component.id(), frame_id).await?;
maybe_inferred_edges = Frame::upsert_parent(&ctx, component.id(), frame_id)
.await?
.map(|edges| edges.upserted_edges);

track(
&posthog_client,
Expand Down

0 comments on commit 18386f6

Please sign in to comment.