Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dal): speed up get diagram #4380

Merged
merged 1 commit into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/dal/src/action/dependency_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl ActionDependencyGraph {
// Get all inferred connections up front so we don't build this tree each time
let components_to_find = actions_by_component_id.keys().copied().collect_vec();
let component_tree =
InferredConnectionGraph::assemble_for_components(ctx, components_to_find).await?;
InferredConnectionGraph::assemble_for_components(ctx, components_to_find, None).await?;
// Action dependencies are primarily based on the data flow between Components. Things that
// feed data into other things must have their actions run before the actions for the
// things they are feeding data into.
Expand Down
2 changes: 1 addition & 1 deletion lib/dal/src/attribute/value/dependent_value_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl DependentValueGraph {
// job always operates on the current state of the change set's snapshot, not the state
// at the time the job was created.
if workspace_snapshot
.try_get_node_index_by_id(initial_id)
.get_node_index_by_id_opt(initial_id)
.await
.is_none()
{
Expand Down
4 changes: 2 additions & 2 deletions lib/dal/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1377,7 +1377,7 @@ impl Component {
component_id: ComponentId,
) -> ComponentResult<Option<(ComponentNodeWeight, ContentHash)>> {
let id: Ulid = component_id.into();
if let Some(node_index) = ctx.workspace_snapshot()?.try_get_node_index_by_id(id).await {
if let Some(node_index) = ctx.workspace_snapshot()?.get_node_index_by_id_opt(id).await {
let node_weight = ctx
.workspace_snapshot()?
.get_node_weight(node_index)
Expand Down Expand Up @@ -2328,7 +2328,7 @@ impl Component {
) -> ComponentResult<Option<bool>> {
match ctx
.workspace_snapshot()?
.try_get_node_index_by_id(component_id)
.get_node_index_by_id_opt(component_id)
.await
{
Some(component_idx) => {
Expand Down
11 changes: 7 additions & 4 deletions lib/dal/src/component/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,13 @@ impl Frame {
ctx: &DalContext,
parent_id: ComponentId,
child_id: ComponentId,
// get the new state of the tree (from the perspective of both components, now in disjoint trees because they were detached!)
) -> FrameResult<()> {
// cache current state of the tree
let before_change_impacted_input_sockets: HashSet<SocketAttributeValuePair> =
Self::get_all_inferred_connections_for_component_tree(ctx, parent_id, child_id).await?;
// remove the edge
Component::remove_edge_from_frame(ctx, parent_id, child_id).await?;
// get the new state of the tree (from the perspective of both components, now in disjoint trees because they were detached!)
let current_impacted_sockets =
Self::get_all_inferred_connections_for_component_tree(ctx, parent_id, child_id).await?;
// find the edges that have been removed due to the detachment
Expand Down Expand Up @@ -296,9 +296,12 @@ impl Frame {
child_id: ComponentId,
) -> FrameResult<HashSet<SocketAttributeValuePair>> {
let mut impacted_connections = HashSet::new();
let tree =
InferredConnectionGraph::assemble_for_components(ctx, [child_id, parent_id].to_vec())
.await?;
let tree = InferredConnectionGraph::assemble_for_components(
ctx,
[child_id, parent_id].to_vec(),
None,
)
.await?;
let incoming_connections = tree.get_all_inferred_connections();
for incoming_connection in incoming_connections {
impacted_connections.insert(SocketAttributeValuePair {
Expand Down
54 changes: 46 additions & 8 deletions lib/dal/src/component/inferred_connection_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use telemetry::prelude::*;

use super::{
socket::{ComponentInputSocket, ComponentOutputSocket},
ComponentResult,
ComponentResult, IncomingConnection,
};
use crate::{
Component, ComponentId, ComponentType, DalContext, InputSocket, OutputSocket, OutputSocketId,
Expand Down Expand Up @@ -56,6 +56,7 @@ impl InferredConnectionGraph {
pub async fn assemble_for_components(
ctx: &DalContext,
component_ids: Vec<ComponentId>,
precomputed_incoming_connections: Option<&HashMap<ComponentId, Vec<IncomingConnection>>>,
) -> ComponentResult<Self> {
let mut component_incoming_connections: HashMap<
ComponentId,
Expand All @@ -64,6 +65,14 @@ impl InferredConnectionGraph {
let mut top_parents: HashSet<ComponentId> = HashSet::new();
let mut components: HashMap<ComponentId, HashSet<ComponentId>> = HashMap::new();

let component_ids = if let Some(precomputed_ids) =
precomputed_incoming_connections.map(|precomp| precomp.keys().map(ToOwned::to_owned))
{
precomputed_ids.collect()
} else {
component_ids
};

for component_id in component_ids {
// Check if the component_id is either a key or a value in the components HashMap
let is_in_tree = components.contains_key(&component_id)
Expand All @@ -86,7 +95,19 @@ impl InferredConnectionGraph {
work_queue.push_back(*parent);

while let Some(component) = work_queue.pop_front() {
let (input_sockets, duplicates) = Self::process_component(ctx, component).await?;
let (input_sockets, duplicates) = if let Some(precomputed) =
precomputed_incoming_connections.and_then(|precomp| precomp.get(&component))
{
Self::process_component(ctx, component, precomputed.as_slice()).await?
} else {
Self::process_component(
ctx,
component,
&Component::incoming_connections_for_id(ctx, component).await?,
)
.await?
};

Self::update_incoming_connections(
&mut component_incoming_connections,
component,
Expand Down Expand Up @@ -123,7 +144,19 @@ impl InferredConnectionGraph {
.into_iter()
.map(|component| component.id())
.collect_vec();
Self::assemble_for_components(ctx, components).await
Self::assemble_for_components(ctx, components, None).await
}

#[instrument(
name = "component.inferred_connection_graph.with_precomputed_incoming_connections",
level = "info",
skip(ctx, precomputed_incoming_connections)
)]
pub async fn assemble_with_precomputed_incoming_connections(
ctx: &DalContext,
precomputed_incoming_connections: &HashMap<ComponentId, Vec<IncomingConnection>>,
) -> ComponentResult<Self> {
Self::assemble_for_components(ctx, vec![], Some(precomputed_incoming_connections)).await
}

/// Assembles the [`InferredConnectionGraph`] from the perspective of this single [`ComponentId`] by first creating a
Expand All @@ -138,7 +171,7 @@ impl InferredConnectionGraph {
ctx: &DalContext,
with_component_id: ComponentId,
) -> ComponentResult<Self> {
Self::assemble_for_components(ctx, vec![with_component_id]).await
Self::assemble_for_components(ctx, vec![with_component_id], None).await
}

/// Assembles the a map of Incoming Connections from the perspective of this single [`ComponentId`]
Expand All @@ -155,7 +188,12 @@ impl InferredConnectionGraph {
ComponentId,
HashMap<ComponentInputSocket, HashSet<ComponentOutputSocket>>,
> = HashMap::new();
let (input_sockets, duplicates) = Self::process_component(ctx, for_component_id).await?;
let (input_sockets, duplicates) = Self::process_component(
ctx,
for_component_id,
&Component::incoming_connections_for_id(ctx, for_component_id).await?,
)
.await?;
Self::update_incoming_connections(
&mut component_incoming_connections,
for_component_id,
Expand Down Expand Up @@ -194,6 +232,7 @@ impl InferredConnectionGraph {
async fn process_component(
ctx: &DalContext,
component_id: ComponentId,
existing_incoming_connections: &[IncomingConnection],
) -> ComponentResult<(
HashMap<ComponentInputSocket, HashSet<ComponentOutputSocket>>,
HashSet<ComponentOutputSocket>,
Expand All @@ -208,9 +247,8 @@ impl InferredConnectionGraph {

for input_socket in current_component_input_sockets {
let incoming_connections = Self::find_available_connections(ctx, input_socket).await?;
// Get all existing explicit outgoing connections so that we don't create an inferred connection to the same output socket
let existing_incoming_connections =
Component::incoming_connections_for_id(ctx, component_id).await?;

// Check all existing explicit outgoing connections so that we don't create an inferred connection to the same output socket
for incoming_connection in existing_incoming_connections {
let component_output_socket = ComponentOutputSocket::get_by_ids_or_error(
ctx,
Expand Down
Loading