Skip to content

Commit

Permalink
Merge pull request #3988 from systeminit/zack/ENG-2572-vector-clock-i…
Browse files Browse the repository at this point in the history
…s-a-tuple

ENG-2572: VectorClockIds are a tuple. Editing changesets are gone
  • Loading branch information
zacharyhamm authored Jul 11, 2024
2 parents a73b07f + 2f82b77 commit 9ac1a8a
Show file tree
Hide file tree
Showing 84 changed files with 4,861 additions and 2,762 deletions.
18 changes: 17 additions & 1 deletion lib/dal-test/src/expand_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,23 @@ use jwt_simple::claims::Claims;
use jwt_simple::prelude::Duration;
use tracing_subscriber::{fmt, util::SubscriberInitExt, EnvFilter, Registry};

use crate::{helpers::generate_fake_name, jwt_private_signing_key, WorkspaceSignup};
use crate::{
helpers::create_user, helpers::generate_fake_name, jwt_private_signing_key, WorkspaceSignup,
};

/// Creates a user for each test to run as
pub async fn setup_history_actor_ctx(ctx: &mut DalContext) {
let user = create_user(ctx).await.expect("unable to create user");
user.associate_workspace(
ctx,
ctx.tenancy()
.workspace_pk()
.expect("no workspace pk set on context"),
)
.await
.expect("unable to associate user with workspace");
ctx.update_history_actor(dal::HistoryActor::User(user.pk()));
}

/// This function is used during macro expansion for setting up a [`ChangeSet`] in an integration test.
pub async fn create_change_set_and_update_ctx(
Expand Down
2 changes: 1 addition & 1 deletion lib/dal-test/src/helpers/change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl ChangeSetTestHelpers {

Self::blocking_commit(ctx).await?;

ctx.update_visibility_and_snapshot_to_visibility_no_editing_change_set(
ctx.update_visibility_and_snapshot_to_visibility(
applied_change_set.base_change_set_id.ok_or(eyre!(
"base change set not found for change set: {}",
applied_change_set.id
Expand Down
87 changes: 87 additions & 0 deletions lib/dal/examples/rebase/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use std::{env, fs::File, io::prelude::*};

use si_layer_cache::db::serialize;

use dal::{
workspace_snapshot::node_weight::NodeWeight, NodeWeightDiscriminants, WorkspaceSnapshotGraphV1,
};

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + 'static>>;

const USAGE: &str = "usage: cargo run --example rebase <TO_REBASE_FILE_PATH> <ONTO_FILE_PATH>";

fn load_snapshot_graph(path: &str) -> Result<WorkspaceSnapshotGraphV1> {
let mut file = File::open(path)?;
let mut bytes = vec![];
file.read_to_end(&mut bytes)?;

Ok(serialize::from_bytes(&bytes)?)
}

fn main() -> Result<()> {
let args: Vec<String> = env::args().take(3).map(Into::into).collect();
let to_rebase_path = args.get(1).expect(USAGE);
let onto_path = args.get(2).expect(USAGE);

let mut to_rebase_graph = load_snapshot_graph(&to_rebase_path)?;
let onto_graph = load_snapshot_graph(&onto_path)?;

let to_rebase_vector_clock_id = dbg!(to_rebase_graph
.max_recently_seen_clock_id(None)
.expect("Unable to find a vector clock id in to_rebase"));
let onto_vector_clock_id = dbg!(onto_graph
.max_recently_seen_clock_id(None)
.expect("Unable to find a vector clock id in onto"));

let conflicts_and_updates = to_rebase_graph.detect_conflicts_and_updates(
dbg!(to_rebase_vector_clock_id),
&onto_graph,
onto_vector_clock_id,
)?;

let mut last_ordering_node = None;
for update in &conflicts_and_updates.updates {
match update {
dal::workspace_snapshot::update::Update::NewEdge {
source,
destination,
edge_weight,
} => {
if matches!(source.node_weight_kind, NodeWeightDiscriminants::Ordering) {
if let Some(ordering_node) = &last_ordering_node {
if let NodeWeight::Ordering(ordering) = ordering_node {
dbg!(destination, ordering.order());
}
}
}
}
dal::workspace_snapshot::update::Update::RemoveEdge {
source,
destination,
edge_kind,
} => {}
dal::workspace_snapshot::update::Update::ReplaceSubgraph { onto, to_rebase } => {
if matches!(onto.node_weight_kind, NodeWeightDiscriminants::Ordering) {
last_ordering_node = onto_graph
.get_node_weight_opt(onto.index)
.expect("couldn't get node")
.map(ToOwned::to_owned);
}
}
dal::workspace_snapshot::update::Update::MergeCategoryNodes {
to_rebase_category_id,
onto_category_id,
} => {}
}
}

dbg!(to_rebase_graph.perform_updates(
to_rebase_vector_clock_id,
&onto_graph,
&conflicts_and_updates.updates,
))?;

Ok(())
}

// 01J2F7HKZFMTN6GVKXE73044AT
24 changes: 15 additions & 9 deletions lib/dal/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl From<ActionNodeWeight> for Action {
Self {
id: value.id().into(),
state: value.state(),
originating_changeset_id: value.originating_changeset_id(),
originating_changeset_id: value.originating_change_set_id(),
func_execution_pk: None,
}
}
Expand Down Expand Up @@ -317,7 +317,7 @@ impl Action {
.await?
.get_action_node_weight()?;
let mut new_node_weight =
node_weight.new_with_incremented_vector_clock(ctx.change_set()?.vector_clock_id());
node_weight.new_with_incremented_vector_clock(ctx.vector_clock_id()?);
new_node_weight.set_state(state);
ctx.workspace_snapshot()?
.add_node(NodeWeight::Action(new_node_weight))
Expand All @@ -341,11 +341,17 @@ impl Action {
action_prototype_id: ActionPrototypeId,
maybe_component_id: Option<ComponentId>,
) -> ActionResult<Self> {
let change_set = ctx.change_set()?;
let new_id: ActionId = change_set.generate_ulid()?.into();
let vector_clock_id = ctx.vector_clock_id()?;
let new_id: ActionId = ctx.workspace_snapshot()?.generate_ulid().await?.into();
let lineage_id = ctx.workspace_snapshot()?.generate_ulid().await?;

let originating_change_set_id = ctx.change_set_id();
let node_weight =
NodeWeight::new_action(change_set, originating_change_set_id, new_id.into())?;
let node_weight = NodeWeight::new_action(
vector_clock_id,
originating_change_set_id,
new_id.into(),
lineage_id,
)?;
ctx.workspace_snapshot()?.add_node(node_weight).await?;

let action_category_id = ctx
Expand Down Expand Up @@ -388,7 +394,7 @@ impl Action {

pub async fn remove_by_id(ctx: &DalContext, action_id: ActionId) -> ActionResult<()> {
ctx.workspace_snapshot()?
.remove_node_by_id(ctx.change_set()?, action_id)
.remove_node_by_id(ctx.vector_clock_id()?, action_id)
.await?;
Ok(())
}
Expand All @@ -398,13 +404,13 @@ impl Action {
action_prototype_id: ActionPrototypeId,
maybe_component_id: Option<ComponentId>,
) -> ActionResult<()> {
let change_set = ctx.change_set()?;
let snap = ctx.workspace_snapshot()?;

if let Some(action_id) =
Self::find_equivalent(ctx, action_prototype_id, maybe_component_id).await?
{
snap.remove_node_by_id(change_set, action_id).await?;
snap.remove_node_by_id(ctx.vector_clock_id()?, action_id)
.await?;
}
Ok(())
}
Expand Down
21 changes: 14 additions & 7 deletions lib/dal/src/action/prototype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,17 @@ impl ActionPrototype {
schema_variant_id: SchemaVariantId,
func_id: FuncId,
) -> ActionPrototypeResult<Self> {
let change_set = ctx.change_set()?;
let new_id: ActionPrototypeId = change_set.generate_ulid()?.into();
let node_weight =
NodeWeight::new_action_prototype(change_set, new_id.into(), kind, name, description)?;
let vector_clock_id = ctx.vector_clock_id()?;
let new_id: ActionPrototypeId = ctx.workspace_snapshot()?.generate_ulid().await?.into();
let lineage_id = ctx.workspace_snapshot()?.generate_ulid().await?;
let node_weight = NodeWeight::new_action_prototype(
vector_clock_id,
new_id.into(),
lineage_id,
kind,
name,
description,
)?;
ctx.workspace_snapshot()?.add_node(node_weight).await?;

Self::add_edge_to_func(ctx, new_id, func_id, EdgeWeightKind::new_use()).await?;
Expand Down Expand Up @@ -458,21 +465,21 @@ impl ActionPrototype {
}

pub async fn remove(ctx: &DalContext, id: ActionPrototypeId) -> ActionPrototypeResult<()> {
let change_set = ctx.change_set()?;
let vector_clock_id = ctx.vector_clock_id()?;
// check if there are existing actions queued for this prototype and remove them
let enqueued_actions = Self::find_enqueued_actions(ctx, id).await?;

for action in enqueued_actions {
ctx.workspace_snapshot()?
.remove_node_by_id(change_set, action)
.remove_node_by_id(vector_clock_id, action)
.await?;
WsEvent::action_list_updated(ctx)
.await?
.publish_on_commit(ctx)
.await?;
}
ctx.workspace_snapshot()?
.remove_node_by_id(change_set, id)
.remove_node_by_id(vector_clock_id, id)
.await?;

Ok(())
Expand Down
19 changes: 10 additions & 9 deletions lib/dal/src/attribute/prototype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,15 @@ impl AttributePrototype {
)
.await?;

let change_set = ctx.change_set()?;
let id = change_set.generate_ulid()?;
let node_weight =
NodeWeight::new_content(change_set, id, ContentAddress::AttributePrototype(hash))?;
let workspace_snapshot = ctx.workspace_snapshot()?;
let id = workspace_snapshot.generate_ulid().await?;
let lineage_id = workspace_snapshot.generate_ulid().await?;
let node_weight = NodeWeight::new_content(
ctx.vector_clock_id()?,
id,
lineage_id,
ContentAddress::AttributePrototype(hash),
)?;
let _node_index = workspace_snapshot.add_node(node_weight).await?;

let prototype = AttributePrototype::assemble(id.into(), &content);
Expand Down Expand Up @@ -344,10 +348,9 @@ impl AttributePrototype {
attribute_prototype_id,
))?;

let change_set = ctx.change_set()?;
workspace_snapshot
.remove_edge(
change_set,
ctx.vector_clock_id()?,
attribute_prototype_idx,
current_func_node_idx,
EdgeWeightKindDiscriminants::Use,
Expand Down Expand Up @@ -478,10 +481,8 @@ impl AttributePrototype {
ctx: &DalContext,
prototype_id: AttributePrototypeId,
) -> AttributePrototypeResult<()> {
let change_set = ctx.change_set()?;

ctx.workspace_snapshot()?
.remove_node_by_id(change_set, prototype_id)
.remove_node_by_id(ctx.vector_clock_id()?, prototype_id)
.await?;

Ok(())
Expand Down
22 changes: 13 additions & 9 deletions lib/dal/src/attribute/prototype/argument.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,12 @@ impl AttributePrototypeArgument {
prototype_id: AttributePrototypeId,
arg_id: FuncArgumentId,
) -> AttributePrototypeArgumentResult<Self> {
let change_set = ctx.change_set()?;
let id = change_set.generate_ulid()?;
let node_weight = NodeWeight::new_attribute_prototype_argument(change_set, id, None)?;
let vector_clock_id = ctx.vector_clock_id()?;
let id = ctx.workspace_snapshot()?.generate_ulid().await?;
let lineage_id = ctx.workspace_snapshot()?.generate_ulid().await?;

let node_weight =
NodeWeight::new_attribute_prototype_argument(vector_clock_id, id, lineage_id, None)?;

let workspace_snapshot = ctx.workspace_snapshot()?;

Expand Down Expand Up @@ -243,11 +246,13 @@ impl AttributePrototypeArgument {
destination_component_id: ComponentId,
destination_attribute_prototype_id: AttributePrototypeId,
) -> AttributePrototypeArgumentResult<Self> {
let change_set = ctx.change_set()?;
let id = change_set.generate_ulid()?;
let vector_clock_id = ctx.vector_clock_id()?;
let id = ctx.workspace_snapshot()?.generate_ulid().await?;
let lineage_id = ctx.workspace_snapshot()?.generate_ulid().await?;
let node_weight = NodeWeight::new_attribute_prototype_argument(
change_set,
vector_clock_id,
id,
lineage_id,
Some(ArgumentTargets {
source_component_id,
destination_component_id,
Expand Down Expand Up @@ -424,7 +429,6 @@ impl AttributePrototypeArgument {
value_id: Ulid,
) -> AttributePrototypeArgumentResult<Self> {
let workspace_snapshot = ctx.workspace_snapshot()?;
let change_set = ctx.change_set()?;

for existing_value_source in workspace_snapshot
.outgoing_targets_for_edge_weight_kind(
Expand All @@ -436,7 +440,7 @@ impl AttributePrototypeArgument {
let self_node_index = workspace_snapshot.get_node_index_by_id(self.id).await?;
workspace_snapshot
.remove_edge(
change_set,
ctx.vector_clock_id()?,
self_node_index,
existing_value_source,
EdgeWeightKindDiscriminants::PrototypeArgumentValue,
Expand Down Expand Up @@ -688,7 +692,7 @@ impl AttributePrototypeArgument {

// Remove the argument
ctx.workspace_snapshot()?
.remove_node_by_id(ctx.change_set()?, self.id)
.remove_node_by_id(ctx.vector_clock_id()?, self.id)
.await?;

// Enqueue a dependent values update with the destination attribute values
Expand Down
12 changes: 8 additions & 4 deletions lib/dal/src/attribute/prototype/argument/static_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,14 @@ impl StaticArgumentValue {
)
.await?;

let change_set = ctx.change_set()?;
let id = change_set.generate_ulid()?;
let node_weight =
NodeWeight::new_content(change_set, id, ContentAddress::StaticArgumentValue(hash))?;
let id = ctx.workspace_snapshot()?.generate_ulid().await?;
let lineage_id = ctx.workspace_snapshot()?.generate_ulid().await?;
let node_weight = NodeWeight::new_content(
ctx.vector_clock_id()?,
id,
lineage_id,
ContentAddress::StaticArgumentValue(hash),
)?;

ctx.workspace_snapshot()?.add_node(node_weight).await?;

Expand Down
Loading

0 comments on commit 9ac1a8a

Please sign in to comment.