Skip to content

Commit

Permalink
feat(dal): VectorClockIds are a tuple, no editing changesets
Browse files Browse the repository at this point in the history
Previously, vector clock ids were the same as change set ids. And, we
generated a "editing change set" anytime we mutated the graph. This
changeset was ephemeral, and not connected to a real "change set" in the
system. In addition, for conflict detection to work correctly, the node
write clocks have to store every vector clock write that has *ever*
happened to them. This meant these clocks would grow indefinitely, since
they have to store every ephemeral "editing change set" in the node,
forever.

This change transforms the vector clock id into a tuple of the real
ChangeSetId and the UserPk/ActorId of the current user. In the context
of system actors, like Pinga and the Rebaser, the WorkspacePk is used in
place of the UserPk and removes editing change sets entirely. Now the
bound on the vector clock write clocks in node weights is the number of
change sets and users in the system, which will grow much more slowly
than the editing change sets.

This is a breaking change, since it changes both Node and Edge weight
data strucutres. Migration must be in place before this can be deployed.
  • Loading branch information
zacharyhamm committed Jun 20, 2024
1 parent 9aa591d commit 055406c
Show file tree
Hide file tree
Showing 53 changed files with 2,515 additions and 1,656 deletions.
18 changes: 17 additions & 1 deletion lib/dal-test/src/expand_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,23 @@ use jwt_simple::claims::Claims;
use jwt_simple::prelude::Duration;
use tracing_subscriber::{fmt, util::SubscriberInitExt, EnvFilter, Registry};

use crate::{helpers::generate_fake_name, jwt_private_signing_key, WorkspaceSignup};
use crate::{
helpers::create_user, helpers::generate_fake_name, jwt_private_signing_key, WorkspaceSignup,
};

/// Creates a user for each test to run as
pub async fn setup_history_actor_ctx(ctx: &mut DalContext) {
let user = create_user(ctx).await.expect("unable to create user");
user.associate_workspace(
ctx,
ctx.tenancy()
.workspace_pk()
.expect("no workspace pk set on context"),
)
.await
.expect("unable to associate user with workspace");
ctx.update_history_actor(dal::HistoryActor::User(user.pk()));
}

/// This function is used during macro expansion for setting up a [`ChangeSet`] in an integration test.
pub async fn create_change_set_and_update_ctx(
Expand Down
2 changes: 1 addition & 1 deletion lib/dal-test/src/helpers/change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl ChangeSetTestHelpers {

Self::blocking_commit(ctx).await?;

ctx.update_visibility_and_snapshot_to_visibility_no_editing_change_set(
ctx.update_visibility_and_snapshot_to_visibility(
applied_change_set.base_change_set_id.ok_or(eyre!(
"base change set not found for change set: {}",
applied_change_set.id
Expand Down
22 changes: 14 additions & 8 deletions lib/dal/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ impl Action {
.await?
.get_action_node_weight()?;
let mut new_node_weight =
node_weight.new_with_incremented_vector_clock(ctx.change_set()?.vector_clock_id());
node_weight.new_with_incremented_vector_clock(ctx.vector_clock_id()?);
new_node_weight.set_state(state);
ctx.workspace_snapshot()?
.add_node(NodeWeight::Action(new_node_weight))
Expand All @@ -341,11 +341,17 @@ impl Action {
action_prototype_id: ActionPrototypeId,
maybe_component_id: Option<ComponentId>,
) -> ActionResult<Self> {
let change_set = ctx.change_set()?;
let new_id: ActionId = change_set.generate_ulid()?.into();
let vector_clock_id = ctx.vector_clock_id()?;
let new_id: ActionId = ctx.workspace_snapshot()?.generate_ulid().await?.into();
let lineage_id = ctx.workspace_snapshot()?.generate_ulid().await?;

let originating_change_set_id = ctx.change_set_id();
let node_weight =
NodeWeight::new_action(change_set, originating_change_set_id, new_id.into())?;
let node_weight = NodeWeight::new_action(
vector_clock_id,
originating_change_set_id,
new_id.into(),
lineage_id,
)?;
ctx.workspace_snapshot()?.add_node(node_weight).await?;

let action_category_id = ctx
Expand Down Expand Up @@ -388,7 +394,7 @@ impl Action {

pub async fn remove_by_id(ctx: &DalContext, action_id: ActionId) -> ActionResult<()> {
ctx.workspace_snapshot()?
.remove_node_by_id(ctx.change_set()?, action_id)
.remove_node_by_id(ctx.vector_clock_id()?, action_id)
.await?;
Ok(())
}
Expand All @@ -398,13 +404,13 @@ impl Action {
action_prototype_id: ActionPrototypeId,
maybe_component_id: Option<ComponentId>,
) -> ActionResult<()> {
let change_set = ctx.change_set()?;
let snap = ctx.workspace_snapshot()?;

if let Some(action_id) =
Self::find_equivalent(ctx, action_prototype_id, maybe_component_id).await?
{
snap.remove_node_by_id(change_set, action_id).await?;
snap.remove_node_by_id(ctx.vector_clock_id()?, action_id)
.await?;
}
Ok(())
}
Expand Down
19 changes: 12 additions & 7 deletions lib/dal/src/action/prototype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,17 @@ impl ActionPrototype {
schema_variant_id: SchemaVariantId,
func_id: FuncId,
) -> ActionPrototypeResult<Self> {
let change_set = ctx.change_set()?;
let new_id: ActionPrototypeId = change_set.generate_ulid()?.into();
let node_weight =
NodeWeight::new_action_prototype(change_set, new_id.into(), kind, name, description)?;
let vector_clock_id = ctx.vector_clock_id()?;
let new_id: ActionPrototypeId = ctx.workspace_snapshot()?.generate_ulid().await?.into();
let lineage_id = ctx.workspace_snapshot()?.generate_ulid().await?;
let node_weight = NodeWeight::new_action_prototype(
vector_clock_id,
new_id.into(),
lineage_id,
kind,
name,
description,
)?;
ctx.workspace_snapshot()?.add_node(node_weight).await?;

Self::add_edge_to_func(ctx, new_id, func_id, EdgeWeightKind::new_use()).await?;
Expand Down Expand Up @@ -422,10 +429,8 @@ impl ActionPrototype {
Ok(triggered_actions)
}
pub async fn remove(ctx: &DalContext, id: ActionPrototypeId) -> ActionPrototypeResult<()> {
let change_set = ctx.change_set()?;

ctx.workspace_snapshot()?
.remove_node_by_id(change_set, id)
.remove_node_by_id(ctx.vector_clock_id()?, id)
.await?;

Ok(())
Expand Down
19 changes: 10 additions & 9 deletions lib/dal/src/attribute/prototype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,15 @@ impl AttributePrototype {
)
.await?;

let change_set = ctx.change_set()?;
let id = change_set.generate_ulid()?;
let node_weight =
NodeWeight::new_content(change_set, id, ContentAddress::AttributePrototype(hash))?;
let workspace_snapshot = ctx.workspace_snapshot()?;
let id = workspace_snapshot.generate_ulid().await?;
let lineage_id = workspace_snapshot.generate_ulid().await?;
let node_weight = NodeWeight::new_content(
ctx.vector_clock_id()?,
id,
lineage_id,
ContentAddress::AttributePrototype(hash),
)?;
let _node_index = workspace_snapshot.add_node(node_weight).await?;

let prototype = AttributePrototype::assemble(id.into(), &content);
Expand Down Expand Up @@ -328,10 +332,9 @@ impl AttributePrototype {
attribute_prototype_id,
))?;

let change_set = ctx.change_set()?;
workspace_snapshot
.remove_edge(
change_set,
ctx.vector_clock_id()?,
attribute_prototype_idx,
current_func_node_idx,
EdgeWeightKindDiscriminants::Use,
Expand Down Expand Up @@ -462,10 +465,8 @@ impl AttributePrototype {
ctx: &DalContext,
prototype_id: AttributePrototypeId,
) -> AttributePrototypeResult<()> {
let change_set = ctx.change_set()?;

ctx.workspace_snapshot()?
.remove_node_by_id(change_set, prototype_id)
.remove_node_by_id(ctx.vector_clock_id()?, prototype_id)
.await?;

Ok(())
Expand Down
22 changes: 13 additions & 9 deletions lib/dal/src/attribute/prototype/argument.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,12 @@ impl AttributePrototypeArgument {
prototype_id: AttributePrototypeId,
arg_id: FuncArgumentId,
) -> AttributePrototypeArgumentResult<Self> {
let change_set = ctx.change_set()?;
let id = change_set.generate_ulid()?;
let node_weight = NodeWeight::new_attribute_prototype_argument(change_set, id, None)?;
let vector_clock_id = ctx.vector_clock_id()?;
let id = ctx.workspace_snapshot()?.generate_ulid().await?;
let lineage_id = ctx.workspace_snapshot()?.generate_ulid().await?;

let node_weight =
NodeWeight::new_attribute_prototype_argument(vector_clock_id, id, lineage_id, None)?;

let workspace_snapshot = ctx.workspace_snapshot()?;

Expand Down Expand Up @@ -230,11 +233,13 @@ impl AttributePrototypeArgument {
destination_component_id: ComponentId,
destination_attribute_prototype_id: AttributePrototypeId,
) -> AttributePrototypeArgumentResult<Self> {
let change_set = ctx.change_set()?;
let id = change_set.generate_ulid()?;
let vector_clock_id = ctx.vector_clock_id()?;
let id = ctx.workspace_snapshot()?.generate_ulid().await?;
let lineage_id = ctx.workspace_snapshot()?.generate_ulid().await?;
let node_weight = NodeWeight::new_attribute_prototype_argument(
change_set,
vector_clock_id,
id,
lineage_id,
Some(ArgumentTargets {
source_component_id,
destination_component_id,
Expand Down Expand Up @@ -382,7 +387,6 @@ impl AttributePrototypeArgument {
value_id: Ulid,
) -> AttributePrototypeArgumentResult<Self> {
let workspace_snapshot = ctx.workspace_snapshot()?;
let change_set = ctx.change_set()?;

for existing_value_source in workspace_snapshot
.outgoing_targets_for_edge_weight_kind(
Expand All @@ -394,7 +398,7 @@ impl AttributePrototypeArgument {
let self_node_index = workspace_snapshot.get_node_index_by_id(self.id).await?;
workspace_snapshot
.remove_edge(
change_set,
ctx.vector_clock_id()?,
self_node_index,
existing_value_source,
EdgeWeightKindDiscriminants::PrototypeArgumentValue,
Expand Down Expand Up @@ -646,7 +650,7 @@ impl AttributePrototypeArgument {

// Remove the argument
ctx.workspace_snapshot()?
.remove_node_by_id(ctx.change_set()?, self.id)
.remove_node_by_id(ctx.vector_clock_id()?, self.id)
.await?;

// Enqueue a dependent values update with the destination attribute values
Expand Down
12 changes: 8 additions & 4 deletions lib/dal/src/attribute/prototype/argument/static_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,14 @@ impl StaticArgumentValue {
)
.await?;

let change_set = ctx.change_set()?;
let id = change_set.generate_ulid()?;
let node_weight =
NodeWeight::new_content(change_set, id, ContentAddress::StaticArgumentValue(hash))?;
let id = ctx.workspace_snapshot()?.generate_ulid().await?;
let lineage_id = ctx.workspace_snapshot()?.generate_ulid().await?;
let node_weight = NodeWeight::new_content(
ctx.vector_clock_id()?,
id,
lineage_id,
ContentAddress::StaticArgumentValue(hash),
)?;

ctx.workspace_snapshot()?.add_node(node_weight).await?;

Expand Down
20 changes: 11 additions & 9 deletions lib/dal/src/attribute/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,11 @@ impl AttributeValue {
maybe_parent_attribute_value: Option<AttributeValueId>,
key: Option<String>,
) -> AttributeValueResult<Self> {
let change_set = ctx.change_set()?;
let id = change_set.generate_ulid()?;
let node_weight = NodeWeight::new_attribute_value(change_set, id, None, None)?;
let vector_clock_id = ctx.vector_clock_id()?;
let id = ctx.workspace_snapshot()?.generate_ulid().await?;
let lineage_id = ctx.workspace_snapshot()?.generate_ulid().await?;
let node_weight =
NodeWeight::new_attribute_value(vector_clock_id, id, lineage_id, None, None)?;
let is_for = is_for.into();

let ordered = if let Some(prop_id) = is_for.prop_id() {
Expand All @@ -313,7 +315,7 @@ impl AttributeValue {

if ordered {
ctx.workspace_snapshot()?
.add_ordered_node(change_set, node_weight.clone())
.add_ordered_node(vector_clock_id, node_weight.clone())
.await?;
} else {
ctx.workspace_snapshot()?
Expand Down Expand Up @@ -1131,7 +1133,7 @@ impl AttributeValue {
.id();

workspace_snapshot
.remove_node_by_id(ctx.change_set()?, current_target_id)
.remove_node_by_id(ctx.vector_clock_id()?, current_target_id)
.await?;
}

Expand Down Expand Up @@ -1770,7 +1772,7 @@ impl AttributeValue {

ctx.workspace_snapshot()?
.remove_edge_for_ulids(
ctx.change_set()?,
ctx.vector_clock_id()?,
attribute_value_id,
prototype_id,
EdgeWeightKindDiscriminants::Prototype,
Expand Down Expand Up @@ -1943,7 +1945,7 @@ impl AttributeValue {
.await?;

let mut new_av_node_weight =
av_node_weight.new_with_incremented_vector_clock(ctx.change_set()?.vector_clock_id());
av_node_weight.new_with_incremented_vector_clock(ctx.vector_clock_id()?);

new_av_node_weight.set_value(value_address.map(ContentAddress::JsonValue));
new_av_node_weight
Expand Down Expand Up @@ -2192,10 +2194,10 @@ impl AttributeValue {
.ok_or(AttributeValueError::RemovingWhenNotChildOrMapOrArray(id))?;

ctx.workspace_snapshot()?
.remove_node_by_id(ctx.change_set()?, id)
.remove_node_by_id(ctx.vector_clock_id()?, id)
.await?;
ctx.workspace_snapshot()?
.remove_dependent_value_root(ctx.change_set()?, id)
.remove_dependent_value_root(ctx.vector_clock_id()?, id)
.await?;

ctx.add_dependent_values_and_enqueue(vec![parent_av_id])
Expand Down
Loading

0 comments on commit 055406c

Please sign in to comment.