Skip to content

Commit

Permalink
Add precision to which values need to rerun when upserting/orphaning …
Browse files Browse the repository at this point in the history
…components, fix bug with nested propagation through up frames, enable up frames to take inputs from parent down frames
  • Loading branch information
britmyerss committed Jun 15, 2024
1 parent d12feb4 commit 4fa9b9c
Show file tree
Hide file tree
Showing 5 changed files with 489 additions and 128 deletions.
21 changes: 19 additions & 2 deletions lib/dal-test/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use color_eyre::eyre::eyre;
use color_eyre::Result;
use dal::key_pair::KeyPairPk;
use dal::{
AttributeValue, Component, ComponentId, DalContext, InputSocket, KeyPair, OutputSocket, Schema,
SchemaVariant, SchemaVariantId, User, UserPk,
AttributeValue, Component, ComponentId, ComponentType, DalContext, InputSocket, KeyPair,
OutputSocket, Schema, SchemaVariant, SchemaVariantId, User, UserPk,
};
use names::{Generator, Name};

Expand Down Expand Up @@ -70,6 +70,23 @@ pub async fn create_component_for_schema_name(
Ok(Component::new(ctx, name.as_ref().to_string(), schema_variant_id).await?)
}

/// Creates a [`Component`] from the default [`SchemaVariant`] corresponding to a provided
/// [`Schema`] name.
pub async fn create_component_for_schema_name_with_type(
ctx: &DalContext,
schema_name: impl AsRef<str>,
name: impl AsRef<str>,
component_type: ComponentType,
) -> Result<Component> {
let schema = Schema::find_by_name(ctx, schema_name)
.await?
.ok_or(eyre!("schema not found"))?;
let schema_variant_id = SchemaVariant::get_default_id_for_schema(ctx, schema.id()).await?;
let component = Component::new(ctx, name.as_ref().to_string(), schema_variant_id).await?;
component.set_type(ctx, component_type).await?;
Ok(component)
}

/// Creates a [`Component`] for a given [`SchemaVariantId`](SchemaVariant).
pub async fn create_component_for_schema_variant(
ctx: &DalContext,
Expand Down
69 changes: 45 additions & 24 deletions lib/dal/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2332,7 +2332,7 @@ impl Component {
/// if the input socket has arity many and the matches are all siblings
///
/// Note: this does not check for whether data should actually flow between components
#[instrument(level = "debug", skip(ctx))]
#[instrument(level = "info", skip(ctx))]
pub async fn find_available_inferred_connections_to_input_socket(
ctx: &DalContext,
input_socket_match: InputSocketMatch,
Expand Down Expand Up @@ -2362,22 +2362,42 @@ impl Component {
}
}
ComponentType::ConfigurationFrameUp => {
// An up frame's input sockets are sourced from its children's output sockets
// For now, we won't let down frames send outputs to parents and children
// This might need to change, but we can change it when we've got a use case.
let mut matches = Self::find_available_output_socket_match_in_descendants(
ctx,
input_socket_match,
vec![
ComponentType::ConfigurationFrameUp,
ComponentType::Component,
],
)
.await?;
// if there is more than one match, sort by component Ulid so they're
// consistently ordered
matches.sort_by_key(|output_socket| output_socket.component_id);
matches
// An up frame's input sockets are sourced from either its children's output sockets
// or an ancestor. Based on the input socket's arity, we match many (sorted by component ulid)
// or if the arity is single, we return none
let mut matches = vec![];
let descendant_matches =
Self::find_available_output_socket_match_in_descendants(
ctx,
input_socket_match,
vec![
ComponentType::ConfigurationFrameUp,
ComponentType::Component,
],
)
.await?;
matches.extend(descendant_matches);
if let Some(ascendant_match) =
Self::find_first_output_socket_match_in_ancestors(
ctx,
input_socket_match,
vec![ComponentType::ConfigurationFrameDown],
)
.await?
{
matches.push(ascendant_match);
}

let input_socket =
InputSocket::get_by_id(ctx, input_socket_match.input_socket_id).await?;
if input_socket.arity() == SocketArity::One && matches.len() > 1 {
vec![]
} else {
// if there is more than one match, sort by component Ulid so they're
// consistently ordered
matches.sort_by_key(|output_socket| output_socket.component_id);
matches
}
}
ComponentType::AggregationFrame => vec![],
};
Expand Down Expand Up @@ -2425,9 +2445,10 @@ impl Component {
}
}
}
for child in Self::get_children_for_id(ctx, component_id).await? {
work_queue.push_back(child);
}
}
// regardless whether the component type matches, we need to continue to descend
for child in Self::get_children_for_id(ctx, component_id).await? {
work_queue.push_back(child);
}
}

Expand Down Expand Up @@ -2876,7 +2897,7 @@ impl Component {
/// Up Frame.
///
/// Down Frames can drive Input Sockets of their children if the child is a Down Frame
/// or a Component.
/// or a Component or an Up Frame.
#[instrument(level = "debug", skip(ctx))]
pub async fn find_inferred_values_using_this_output_socket(
ctx: &DalContext,
Expand Down Expand Up @@ -2908,21 +2929,21 @@ impl Component {
}
ComponentType::ConfigurationFrameDown => {
// if the type is a down frame, find all descendants
// who have a matching input socket AND are a Down Frame or Component
// who have a matching input socket AND are a Down Frame, Component, or Up Frame
Component::find_all_potential_inferred_input_socket_matches_in_descendants(
ctx,
output_socket_id,
component_id,
vec![
ComponentType::ConfigurationFrameDown,
ComponentType::Component,
ComponentType::ConfigurationFrameUp,
],
)
.await?
}

// we are not supporting aggregation frames right now
_ => vec![],
ComponentType::AggregationFrame => vec![],
};

Ok(maybe_target_sockets)
Expand Down
Loading

0 comments on commit 4fa9b9c

Please sign in to comment.