Skip to content

Commit

Permalink
Merge pull request #4425 from systeminit/zack/remove-replace-refs-cal…
Browse files Browse the repository at this point in the history
…cualte-merkle-tree-speedups

feat(dal): graph goes vroom
  • Loading branch information
zacharyhamm authored Aug 28, 2024
2 parents fe64ba9 + 63d0249 commit 1d16073
Show file tree
Hide file tree
Showing 25 changed files with 419 additions and 406 deletions.
4 changes: 4 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,8 @@ disallowed-methods = [
{ path = "std::env::vars", reason = "should not directly access environment variables within library crates, favor configuration injection, passing parameters, etc." },
{ path = "std::env::var_os", reason = "should not directly access environment variables within library crates, favor configuration injection, passing parameters, etc." },
{ path = "std::env::vars_os", reason = "should not directly access environment variables within library crates, favor configuration injection, passing parameters, etc." },
{ path = "dal::workspace_snapshot::WorkspaceSnapshot::write_working_copy_to_disk", reason = "The snapshot should only be written to disk when debugging"},
{ path = "dal::workspace_snapshot::WorkspaceSnapshot::write_readonly_graph_to_disk", reason = "The snapshot should only be written to disk when debugging"},
{ path = "dal::workspace_snapshot::WorkspaceSnapshot::tiny_dot_to_file", reason = "The snapshot should only be written to disk when debugging"},
{ path = "dal::workspace_snapshot::graph::RebaseBatch::write_to_disk", reason = "Rebase batches should only be written to disk when debugging"},
]
25 changes: 15 additions & 10 deletions lib/dal/examples/rebase/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use std::{env, fs::File, io::prelude::*};

use serde::de::DeserializeOwned;
use si_layer_cache::db::serialize;

use dal::WorkspaceSnapshotGraphV2;
use dal::{
workspace_snapshot::graph::{correct_transforms::correct_transforms, RebaseBatch},
WorkspaceSnapshotGraphV2,
};

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + 'static>>;

const USAGE: &str = "usage: cargo run --example rebase <TO_REBASE_FILE_PATH> <ONTO_FILE_PATH>";
const USAGE: &str = "usage: cargo run --example rebase <TO_REBASE_FILE_PATH> <REBASE_BATCH_PATH>";

fn load_snapshot_graph(path: &str) -> Result<WorkspaceSnapshotGraphV2> {
fn load_snapshot_graph<T: DeserializeOwned>(path: &str) -> Result<T> {
let mut file = File::open(path)?;
let mut bytes = vec![];
file.read_to_end(&mut bytes)?;
Expand All @@ -19,16 +23,17 @@ fn load_snapshot_graph(path: &str) -> Result<WorkspaceSnapshotGraphV2> {
fn main() -> Result<()> {
let args: Vec<String> = env::args().take(3).map(Into::into).collect();
let to_rebase_path = args.get(1).expect(USAGE);
let onto_path = args.get(2).expect(USAGE);
let rebase_batch_path = args.get(2).expect(USAGE);

let to_rebase_graph = load_snapshot_graph(to_rebase_path)?;
let onto_graph = load_snapshot_graph(onto_path)?;
let mut to_rebase_graph: WorkspaceSnapshotGraphV2 = load_snapshot_graph(to_rebase_path)?;
let rebase_batch: RebaseBatch = load_snapshot_graph(rebase_batch_path)?;

let updates = to_rebase_graph.detect_updates(&onto_graph);
let corrected_transforms =
correct_transforms(&to_rebase_graph, rebase_batch.updates().to_vec(), false)?;

for update in &updates {
dbg!(update);
}
to_rebase_graph.perform_updates(&corrected_transforms)?;

dbg!(to_rebase_graph.node_count());

Ok(())
}
Expand Down
7 changes: 4 additions & 3 deletions lib/dal/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,8 @@ impl Action {
let mut new_node_weight = node_weight.clone();
new_node_weight.set_state(state);
ctx.workspace_snapshot()?
.add_node(NodeWeight::Action(new_node_weight))
.add_or_replace_node(NodeWeight::Action(new_node_weight))
.await?;
ctx.workspace_snapshot()?.replace_references(idx).await?;
Ok(())
}

Expand All @@ -342,7 +341,9 @@ impl Action {
let originating_change_set_id = ctx.change_set_id();
let node_weight =
NodeWeight::new_action(originating_change_set_id, new_id.into(), lineage_id);
ctx.workspace_snapshot()?.add_node(node_weight).await?;
ctx.workspace_snapshot()?
.add_or_replace_node(node_weight)
.await?;

let action_category_id = ctx
.workspace_snapshot()?
Expand Down
4 changes: 3 additions & 1 deletion lib/dal/src/action/prototype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,9 @@ impl ActionPrototype {
let lineage_id = ctx.workspace_snapshot()?.generate_ulid().await?;
let node_weight =
NodeWeight::new_action_prototype(new_id.into(), lineage_id, kind, name, description);
ctx.workspace_snapshot()?.add_node(node_weight).await?;
ctx.workspace_snapshot()?
.add_or_replace_node(node_weight)
.await?;

Self::add_edge_to_func(ctx, new_id, func_id, EdgeWeightKind::new_use()).await?;

Expand Down
2 changes: 1 addition & 1 deletion lib/dal/src/attribute/prototype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ impl AttributePrototype {
let lineage_id = workspace_snapshot.generate_ulid().await?;
let node_weight =
NodeWeight::new_content(id, lineage_id, ContentAddress::AttributePrototype(hash));
let _node_index = workspace_snapshot.add_node(node_weight).await?;
let _node_index = workspace_snapshot.add_or_replace_node(node_weight).await?;

let prototype = AttributePrototype::assemble(id.into(), &content);

Expand Down
8 changes: 6 additions & 2 deletions lib/dal/src/attribute/prototype/argument.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,9 @@ impl AttributePrototypeArgument {

let workspace_snapshot = ctx.workspace_snapshot()?;

workspace_snapshot.add_node(node_weight.clone()).await?;
workspace_snapshot
.add_or_replace_node(node_weight.clone())
.await?;

AttributePrototype::add_edge_to_argument(
ctx,
Expand Down Expand Up @@ -268,7 +270,9 @@ impl AttributePrototypeArgument {
let prototype_arg: Self = {
let workspace_snapshot = ctx.workspace_snapshot()?;

workspace_snapshot.add_node(node_weight.clone()).await?;
workspace_snapshot
.add_or_replace_node(node_weight.clone())
.await?;

AttributePrototype::add_edge_to_argument(
ctx,
Expand Down
4 changes: 3 additions & 1 deletion lib/dal/src/attribute/prototype/argument/static_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ impl StaticArgumentValue {
let node_weight =
NodeWeight::new_content(id, lineage_id, ContentAddress::StaticArgumentValue(hash));

ctx.workspace_snapshot()?.add_node(node_weight).await?;
ctx.workspace_snapshot()?
.add_or_replace_node(node_weight)
.await?;

Ok(StaticArgumentValue::assemble(id.into(), content))
}
Expand Down
22 changes: 6 additions & 16 deletions lib/dal/src/attribute/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ impl AttributeValue {
.await?;
} else {
ctx.workspace_snapshot()?
.add_node(node_weight.clone())
.add_or_replace_node(node_weight.clone())
.await?;
};

Expand Down Expand Up @@ -1943,19 +1943,10 @@ impl AttributeValue {
func_run_value: FuncRunValue,
) -> AttributeValueResult<()> {
let workspace_snapshot = ctx.workspace_snapshot()?;
let (av_idx, av_node_weight) = {
let av_idx = workspace_snapshot
.get_node_index_by_id(attribute_value_id)
.await?;

(
av_idx,
workspace_snapshot
.get_node_weight(av_idx)
.await?
.get_attribute_value_node_weight()?,
)
};
let av_node_weight = workspace_snapshot
.get_node_weight_by_id(attribute_value_id)
.await?
.get_attribute_value_node_weight()?;

let content_value: Option<si_events::CasValue> =
func_run_value.value().cloned().map(Into::into);
Expand Down Expand Up @@ -2012,9 +2003,8 @@ impl AttributeValue {
.set_unprocessed_value(unprocessed_value_address.map(ContentAddress::JsonValue));

workspace_snapshot
.add_node(NodeWeight::AttributeValue(new_av_node_weight))
.add_or_replace_node(NodeWeight::AttributeValue(new_av_node_weight))
.await?;
workspace_snapshot.replace_references(av_idx).await?;

if ValidationOutput::get_format_for_attribute_value_id(ctx, attribute_value_id)
.await?
Expand Down
9 changes: 3 additions & 6 deletions lib/dal/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ impl Component {
let node_weight = NodeWeight::new_component(id, lineage_id, content_address);

// Attach component to category and add use edge to schema variant
workspace_snapshot.add_node(node_weight).await?;
workspace_snapshot.add_or_replace_node(node_weight).await?;

// Root --> Component Category --> Component (this)
let component_category_id = workspace_snapshot
Expand Down Expand Up @@ -2100,7 +2100,7 @@ impl Component {
// We only need to import the AttributePrototypeArgument node, as all of the other relevant
// nodes should already exist.
ctx.workspace_snapshot()?
.add_node(base_attribute_prototype_argument_node_weight.clone())
.add_or_replace_node(base_attribute_prototype_argument_node_weight.clone())
.await?;
ctx.workspace_snapshot()?
.add_edge(
Expand Down Expand Up @@ -2296,10 +2296,7 @@ impl Component {
let mut new_component_node_weight = component_node_weight.clone();
new_component_node_weight.set_to_delete(component.to_delete);
ctx.workspace_snapshot()?
.add_node(NodeWeight::Component(new_component_node_weight))
.await?;
ctx.workspace_snapshot()?
.replace_references(component_idx)
.add_or_replace_node(NodeWeight::Component(new_component_node_weight))
.await?;
}

Expand Down
13 changes: 4 additions & 9 deletions lib/dal/src/func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,9 @@ impl Func {
NodeWeight::new_func(id, lineage_id, name.clone().into(), func_kind, hash);

let workspace_snapshot = ctx.workspace_snapshot()?;
workspace_snapshot.add_node(node_weight.clone()).await?;
workspace_snapshot
.add_or_replace_node(node_weight.clone())
.await?;

let func_category_id = workspace_snapshot
.get_category_node_or_err(None, CategoryNodeKind::Func)
Expand Down Expand Up @@ -462,16 +464,9 @@ impl Func {
// have changed, this ends up updating the node for the function twice. This could be
// optimized to do it only once.
if func.name.as_str() != node_weight.name() {
let original_node_index = workspace_snapshot.get_node_index_by_id(func.id).await?;

node_weight.set_name(func.name.as_str());

workspace_snapshot
.add_node(NodeWeight::Func(node_weight.clone()))
.await?;

workspace_snapshot
.replace_references(original_node_index)
.add_or_replace_node(NodeWeight::Func(node_weight.clone()))
.await?;
}
let updated = FuncContent::from(func.clone());
Expand Down
14 changes: 4 additions & 10 deletions lib/dal/src/func/argument.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,9 @@ impl FuncArgument {
let lineage_id = workspace_snapshot.generate_ulid().await?;
let node_weight = NodeWeight::new_func_argument(id, lineage_id, name, hash);

workspace_snapshot.add_node(node_weight.clone()).await?;
workspace_snapshot
.add_or_replace_node(node_weight.clone())
.await?;
Func::add_edge_to_argument(ctx, func_id, id.into(), EdgeWeightKind::new_use()).await?;

let func_argument_node_weight = node_weight.get_func_argument_node_weight()?;
Expand Down Expand Up @@ -441,18 +443,10 @@ impl FuncArgument {
// have changed, this ends up updating the node for the function twice. This could be
// optimized to do it only once.
if func_argument.name.as_str() != node_weight.name() {
let original_node_index = workspace_snapshot
.get_node_index_by_id(func_argument.id)
.await?;

node_weight.set_name(func_argument.name.as_str());

workspace_snapshot
.add_node(NodeWeight::FuncArgument(node_weight.clone()))
.await?;

workspace_snapshot
.replace_references(original_node_index)
.add_or_replace_node(NodeWeight::FuncArgument(node_weight.clone()))
.await?;
}
let updated = FuncArgumentContentV1::from(func_argument.clone());
Expand Down
2 changes: 1 addition & 1 deletion lib/dal/src/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl Module {
let lineage_id = workspace_snapshot.generate_ulid().await?;
let node_weight = NodeWeight::new_content(id, lineage_id, ContentAddress::Module(hash));

workspace_snapshot.add_node(node_weight).await?;
workspace_snapshot.add_or_replace_node(node_weight).await?;

let schema_module_index_id = workspace_snapshot
.get_category_node_or_err(None, CategoryNodeKind::Module)
Expand Down
2 changes: 1 addition & 1 deletion lib/dal/src/prop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ impl Prop {
if ordered {
workspace_snapshot.add_ordered_node(node_weight).await?;
} else {
workspace_snapshot.add_node(node_weight).await?;
workspace_snapshot.add_or_replace_node(node_weight).await?;
}

Ok(Self::assemble(prop_node_weight, content))
Expand Down
2 changes: 1 addition & 1 deletion lib/dal/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl Schema {
let node_weight =
NodeWeight::new_content(id.into(), lineage_id, ContentAddress::Schema(hash));

workspace_snapshot.add_node(node_weight).await?;
workspace_snapshot.add_or_replace_node(node_weight).await?;

let schema_category_index_id = workspace_snapshot
.get_category_node_or_err(None, CategoryNodeKind::Schema)
Expand Down
2 changes: 1 addition & 1 deletion lib/dal/src/schema/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ impl SchemaVariant {
let lineage_id = workspace_snapshot.generate_ulid().await?;
let node_weight =
NodeWeight::new_content(id, lineage_id, ContentAddress::SchemaVariant(hash));
workspace_snapshot.add_node(node_weight).await?;
workspace_snapshot.add_or_replace_node(node_weight).await?;

// Schema --Use--> SchemaVariant (this)
Schema::add_edge_to_variant(ctx, schema_id, id.into(), EdgeWeightKind::new_use()).await?;
Expand Down
10 changes: 2 additions & 8 deletions lib/dal/src/secret.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ impl Secret {
let secret_node_weight = node_weight.get_secret_node_weight()?;

let workspace_snapshot = ctx.workspace_snapshot()?;
workspace_snapshot.add_node(node_weight).await?;
workspace_snapshot.add_or_replace_node(node_weight).await?;

// Root --> Secret Category --> Secret (this)
let secret_category_id = workspace_snapshot
Expand Down Expand Up @@ -641,16 +641,10 @@ impl Secret {
// we always update the actor and timestamp data if anything has changed. This could be
// optimized to do it only once.
if secret.encrypted_secret_key() != secret_node_weight.encrypted_secret_key() {
let original_node_index = workspace_snapshot.get_node_index_by_id(secret.id).await?;

secret_node_weight.set_encrypted_secret_key(secret.encrypted_secret_key);

workspace_snapshot
.add_node(NodeWeight::Secret(secret_node_weight.clone()))
.await?;

workspace_snapshot
.replace_references(original_node_index)
.add_or_replace_node(NodeWeight::Secret(secret_node_weight.clone()))
.await?;
}
let updated = SecretContentV1::from(secret.clone());
Expand Down
2 changes: 1 addition & 1 deletion lib/dal/src/socket/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl InputSocket {

let node_weight =
NodeWeight::new_content(id, lineage_id, ContentAddress::InputSocket(hash));
workspace_snapshot.add_node(node_weight).await?;
workspace_snapshot.add_or_replace_node(node_weight).await?;
SchemaVariant::add_edge_to_input_socket(
ctx,
schema_variant_id,
Expand Down
2 changes: 1 addition & 1 deletion lib/dal/src/socket/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl OutputSocket {
let node_weight =
NodeWeight::new_content(id, lineage_id, ContentAddress::OutputSocket(hash));

workspace_snapshot.add_node(node_weight).await?;
workspace_snapshot.add_or_replace_node(node_weight).await?;

SchemaVariant::add_edge_to_output_socket(
ctx,
Expand Down
5 changes: 2 additions & 3 deletions lib/dal/src/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,17 +161,16 @@ impl ValidationOutputNode {
new_node_weight.new_content_hash(hash)?;

workspace_snapshot
.add_node(NodeWeight::Content(new_node_weight))
.add_or_replace_node(NodeWeight::Content(new_node_weight))
.await?;
workspace_snapshot.replace_references(idx).await?;

id
} else {
let id = workspace_snapshot.generate_ulid().await?;
let lineage_id = workspace_snapshot.generate_ulid().await?;
let node_weight =
NodeWeight::new_content(id, lineage_id, ContentAddress::ValidationOutput(hash));
workspace_snapshot.add_node(node_weight).await?;
workspace_snapshot.add_or_replace_node(node_weight).await?;

workspace_snapshot
.add_edge(
Expand Down
Loading

0 comments on commit 1d16073

Please sign in to comment.