Skip to content

Commit

Permalink
fix(dal): speed up get diagram
Browse files Browse the repository at this point in the history
Avoid expensive recomputation of various values in get_diagram. Provides
about a 50% speedup locally. Also removes some telemetry on workspace
snapshot that would be mostly noise, and would overwhelm the collector
in debug mode.
  • Loading branch information
zacharyhamm committed Aug 20, 2024
1 parent 9508e32 commit f2ca49d
Show file tree
Hide file tree
Showing 21 changed files with 432 additions and 769 deletions.
2 changes: 1 addition & 1 deletion lib/dal/src/action/dependency_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl ActionDependencyGraph {
// Get all inferred connections up front so we don't build this tree each time
let components_to_find = actions_by_component_id.keys().copied().collect_vec();
let component_tree =
InferredConnectionGraph::assemble_for_components(ctx, components_to_find).await?;
InferredConnectionGraph::assemble_for_components(ctx, components_to_find, None).await?;
// Action dependencies are primarily based on the data flow between Components. Things that
// feed data into other things must have their actions run before the actions for the
// things they are feeding data into.
Expand Down
4 changes: 2 additions & 2 deletions lib/dal/src/attribute/prototype/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl AttributePrototypeDebugView {
targets.destination_component_id == destination_component_id
})
{
let arg_used = Component::should_data_flow_between_components(
let arg_used = Component::should_data_flow_between_components_by_id(
ctx,
destination_component_id,
expected_source_component_id,
Expand Down Expand Up @@ -289,7 +289,7 @@ impl AttributePrototypeDebugView {
.await?
{
info!("output socket match: {:?}", output_match);
let arg_used = Component::should_data_flow_between_components(
let arg_used = Component::should_data_flow_between_components_by_id(
ctx,
component_input_socket.component_id,
output_match.component_id,
Expand Down
2 changes: 1 addition & 1 deletion lib/dal/src/attribute/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ impl AttributeValue {
for component_output_socket in connections {
// Both deleted and non deleted components can feed data into deleted components.
// ** ONLY ** non-deleted components can feed data into non-deleted components
if Component::should_data_flow_between_components(
if Component::should_data_flow_between_components_by_id(
ctx,
component_input_socket.component_id,
component_output_socket.component_id,
Expand Down
6 changes: 3 additions & 3 deletions lib/dal/src/attribute/value/dependent_value_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl DependentValueGraph {
// job always operates on the current state of the change set's snapshot, not the state
// at the time the job was created.
if workspace_snapshot
.try_get_node_index_by_id(initial_id)
.get_node_index_by_id_opt(initial_id)
.await
.is_none()
{
Expand Down Expand Up @@ -207,7 +207,7 @@ impl DependentValueGraph {
// Both "deleted" and not deleted Components can feed data into
// "deleted" Components. **ONLY** not deleted Components can feed
// data into not deleted Components.
if Component::should_data_flow_between_components(
if Component::should_data_flow_between_components_by_id(
ctx,
targets.destination_component_id,
targets.source_component_id,
Expand Down Expand Up @@ -273,7 +273,7 @@ impl DependentValueGraph {
// data into not deleted Components.
let destination_component_id =
AttributeValue::component_id(ctx, current_attribute_value.id()).await?;
if Component::should_data_flow_between_components(
if Component::should_data_flow_between_components_by_id(
ctx,
destination_component_id,
component_input_socket.component_id,
Expand Down
10 changes: 5 additions & 5 deletions lib/dal/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1377,7 +1377,7 @@ impl Component {
component_id: ComponentId,
) -> ComponentResult<Option<(ComponentNodeWeight, ContentHash)>> {
let id: Ulid = component_id.into();
if let Some(node_index) = ctx.workspace_snapshot()?.try_get_node_index_by_id(id).await {
if let Some(node_index) = ctx.workspace_snapshot()?.get_node_index_by_id_opt(id).await {
let node_weight = ctx
.workspace_snapshot()?
.get_node_weight(node_index)
Expand Down Expand Up @@ -2303,7 +2303,7 @@ impl Component {
/// Both "deleted" and not deleted Components can feed data into
/// "deleted" Components. **ONLY** not deleted Components can feed
/// data into not deleted Components.
pub async fn should_data_flow_between_components(
pub async fn should_data_flow_between_components_by_id(
ctx: &DalContext,
destination_component_id: ComponentId,
source_component_id: ComponentId,
Expand All @@ -2328,7 +2328,7 @@ impl Component {
) -> ComponentResult<Option<bool>> {
match ctx
.workspace_snapshot()?
.try_get_node_index_by_id(component_id)
.get_node_index_by_id_opt(component_id)
.await
{
Some(component_idx) => {
Expand Down Expand Up @@ -2690,7 +2690,7 @@ impl Component {
let destination_component = Self::get_by_id(ctx, to_component_id).await?;
let source_component =
Self::get_by_id(ctx, incoming_connection.output_socket.component_id).await?;
let to_delete = !Self::should_data_flow_between_components(
let to_delete = !Self::should_data_flow_between_components_by_id(
ctx,
destination_component.id,
source_component.id,
Expand Down Expand Up @@ -2742,7 +2742,7 @@ impl Component {
let destination_component = outgoing_connection.input_socket.component_id;
let source_component = self.id();

let to_delete = !Self::should_data_flow_between_components(
let to_delete = !Self::should_data_flow_between_components_by_id(
ctx,
destination_component,
source_component,
Expand Down
11 changes: 7 additions & 4 deletions lib/dal/src/component/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,13 +235,13 @@ impl Frame {
ctx: &DalContext,
parent_id: ComponentId,
child_id: ComponentId,
// get the new state of the tree (from the perspective of both components, now in disjoint trees because they were detached!)
) -> FrameResult<()> {
// cache current state of the tree
let before_change_impacted_input_sockets: HashSet<SocketAttributeValuePair> =
Self::get_all_inferred_connections_for_component_tree(ctx, parent_id, child_id).await?;
// remove the edge
Component::remove_edge_from_frame(ctx, parent_id, child_id).await?;
// get the new state of the tree (from the perspective of both components, now in disjoint trees because they were detached!)
let current_impacted_sockets =
Self::get_all_inferred_connections_for_component_tree(ctx, parent_id, child_id).await?;
// find the edges that have been removed due to the detachment
Expand Down Expand Up @@ -296,9 +296,12 @@ impl Frame {
child_id: ComponentId,
) -> FrameResult<HashSet<SocketAttributeValuePair>> {
let mut impacted_connections = HashSet::new();
let tree =
InferredConnectionGraph::assemble_for_components(ctx, [child_id, parent_id].to_vec())
.await?;
let tree = InferredConnectionGraph::assemble_for_components(
ctx,
[child_id, parent_id].to_vec(),
None,
)
.await?;
let incoming_connections = tree.get_all_inferred_connections();
for incoming_connection in incoming_connections {
impacted_connections.insert(SocketAttributeValuePair {
Expand Down
54 changes: 46 additions & 8 deletions lib/dal/src/component/inferred_connection_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use telemetry::prelude::*;

use super::{
socket::{ComponentInputSocket, ComponentOutputSocket},
ComponentResult,
ComponentResult, IncomingConnection,
};
use crate::{
Component, ComponentId, ComponentType, DalContext, InputSocket, OutputSocket, OutputSocketId,
Expand Down Expand Up @@ -56,6 +56,7 @@ impl InferredConnectionGraph {
pub async fn assemble_for_components(
ctx: &DalContext,
component_ids: Vec<ComponentId>,
precomputed_incoming_connections: Option<&HashMap<ComponentId, Vec<IncomingConnection>>>,
) -> ComponentResult<Self> {
let mut component_incoming_connections: HashMap<
ComponentId,
Expand All @@ -64,6 +65,14 @@ impl InferredConnectionGraph {
let mut top_parents: HashSet<ComponentId> = HashSet::new();
let mut components: HashMap<ComponentId, HashSet<ComponentId>> = HashMap::new();

let component_ids = if let Some(precomputed_ids) =
precomputed_incoming_connections.map(|precomp| precomp.keys().map(ToOwned::to_owned))
{
precomputed_ids.collect()
} else {
component_ids
};

for component_id in component_ids {
// Check if the component_id is either a key or a value in the components HashMap
let is_in_tree = components.contains_key(&component_id)
Expand All @@ -86,7 +95,19 @@ impl InferredConnectionGraph {
work_queue.push_back(*parent);

while let Some(component) = work_queue.pop_front() {
let (input_sockets, duplicates) = Self::process_component(ctx, component).await?;
let (input_sockets, duplicates) = if let Some(precomputed) =
precomputed_incoming_connections.and_then(|precomp| precomp.get(&component))
{
Self::process_component(ctx, component, precomputed.as_slice()).await?
} else {
Self::process_component(
ctx,
component,
&Component::incoming_connections_for_id(ctx, component).await?,
)
.await?
};

Self::update_incoming_connections(
&mut component_incoming_connections,
component,
Expand Down Expand Up @@ -123,7 +144,19 @@ impl InferredConnectionGraph {
.into_iter()
.map(|component| component.id())
.collect_vec();
Self::assemble_for_components(ctx, components).await
Self::assemble_for_components(ctx, components, None).await
}

#[instrument(
name = "component.inferred_connection_graph.with_precomputed_incoming_connections",
level = "info",
skip(ctx, precomputed_incoming_connections)
)]
pub async fn assemble_with_precomputed_incoming_connections(
ctx: &DalContext,
precomputed_incoming_connections: &HashMap<ComponentId, Vec<IncomingConnection>>,
) -> ComponentResult<Self> {
Self::assemble_for_components(ctx, vec![], Some(precomputed_incoming_connections)).await
}

/// Assembles the [`InferredConnectionGraph`] from the perspective of this single [`ComponentId`] by first creating a
Expand All @@ -138,7 +171,7 @@ impl InferredConnectionGraph {
ctx: &DalContext,
with_component_id: ComponentId,
) -> ComponentResult<Self> {
Self::assemble_for_components(ctx, vec![with_component_id]).await
Self::assemble_for_components(ctx, vec![with_component_id], None).await
}

/// Assembles the a map of Incoming Connections from the perspective of this single [`ComponentId`]
Expand All @@ -155,7 +188,12 @@ impl InferredConnectionGraph {
ComponentId,
HashMap<ComponentInputSocket, HashSet<ComponentOutputSocket>>,
> = HashMap::new();
let (input_sockets, duplicates) = Self::process_component(ctx, for_component_id).await?;
let (input_sockets, duplicates) = Self::process_component(
ctx,
for_component_id,
&Component::incoming_connections_for_id(ctx, for_component_id).await?,
)
.await?;
Self::update_incoming_connections(
&mut component_incoming_connections,
for_component_id,
Expand Down Expand Up @@ -194,6 +232,7 @@ impl InferredConnectionGraph {
async fn process_component(
ctx: &DalContext,
component_id: ComponentId,
existing_incoming_connections: &[IncomingConnection],
) -> ComponentResult<(
HashMap<ComponentInputSocket, HashSet<ComponentOutputSocket>>,
HashSet<ComponentOutputSocket>,
Expand All @@ -208,9 +247,8 @@ impl InferredConnectionGraph {

for input_socket in current_component_input_sockets {
let incoming_connections = Self::find_available_connections(ctx, input_socket).await?;
// Get all existing explicit outgoing connections so that we don't create an inferred connection to the same output socket
let existing_incoming_connections =
Component::incoming_connections_for_id(ctx, component_id).await?;

// Check all existing explicit outgoing connections so that we don't create an inferred connection to the same output socket
for incoming_connection in existing_incoming_connections {
let component_output_socket = ComponentOutputSocket::get_by_ids_or_error(
ctx,
Expand Down
Loading

0 comments on commit f2ca49d

Please sign in to comment.