Skip to content

Commit

Permalink
Merge pull request #4295 from systeminit/zack/remove-vector-clocks
Browse files Browse the repository at this point in the history
feat(*): remove vector clocks
  • Loading branch information
zacharyhamm authored Aug 7, 2024
2 parents cf23f48 + 6731097 commit 0b58dd7
Show file tree
Hide file tree
Showing 67 changed files with 1,296 additions and 2,628 deletions.
4 changes: 2 additions & 2 deletions lib/dal/examples/rebase/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use std::{env, fs::File, io::prelude::*};

use si_layer_cache::db::serialize;

use dal::WorkspaceSnapshotGraphV1;
use dal::WorkspaceSnapshotGraphV2;

type Result<T> = std::result::Result<T, Box<dyn std::error::Error + 'static>>;

const USAGE: &str = "usage: cargo run --example rebase <TO_REBASE_FILE_PATH> <ONTO_FILE_PATH>";

fn load_snapshot_graph(path: &str) -> Result<WorkspaceSnapshotGraphV1> {
fn load_snapshot_graph(path: &str) -> Result<WorkspaceSnapshotGraphV2> {
let mut file = File::open(path)?;
let mut bytes = vec![];
file.read_to_end(&mut bytes)?;
Expand Down
28 changes: 8 additions & 20 deletions lib/dal/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,11 @@ use crate::{
func::FuncExecutionPk,
id, implement_add_edge_to,
job::definition::ActionJob,
workspace_snapshot::{
node_weight::{
category_node_weight::CategoryNodeKind, ActionNodeWeight, NodeWeight, NodeWeightError,
},
vector_clock::HasVectorClocks,
workspace_snapshot::node_weight::{
category_node_weight::CategoryNodeKind, ActionNodeWeight, NodeWeight, NodeWeightError,
},
AttributeValue, ChangeSetError, ChangeSetId, ComponentError, ComponentId, DalContext,
EdgeWeightError, EdgeWeightKind, EdgeWeightKindDiscriminants, HelperError, TransactionsError,
EdgeWeightKind, EdgeWeightKindDiscriminants, HelperError, TransactionsError,
WorkspaceSnapshotError, WsEvent, WsEventError, WsEventResult, WsPayload,
};

Expand All @@ -45,8 +42,6 @@ pub enum ActionError {
Component(#[from] ComponentError),
#[error("component not found for action: {0}")]
ComponentNotFoundForAction(ActionId),
#[error("Edge Weight error: {0}")]
EdgeWeight(#[from] EdgeWeightError),
#[error("Helper error: {0}")]
Helper(#[from] HelperError),
#[error("Layer DB error: {0}")]
Expand Down Expand Up @@ -317,8 +312,7 @@ impl Action {
.get_node_weight(idx)
.await?
.get_action_node_weight()?;
let mut new_node_weight =
node_weight.new_with_incremented_vector_clock(ctx.vector_clock_id()?);
let mut new_node_weight = node_weight.clone();
new_node_weight.set_state(state);
ctx.workspace_snapshot()?
.add_node(NodeWeight::Action(new_node_weight))
Expand All @@ -342,17 +336,12 @@ impl Action {
action_prototype_id: ActionPrototypeId,
maybe_component_id: Option<ComponentId>,
) -> ActionResult<Self> {
let vector_clock_id = ctx.vector_clock_id()?;
let new_id: ActionId = ctx.workspace_snapshot()?.generate_ulid().await?.into();
let lineage_id = ctx.workspace_snapshot()?.generate_ulid().await?;

let originating_change_set_id = ctx.change_set_id();
let node_weight = NodeWeight::new_action(
vector_clock_id,
originating_change_set_id,
new_id.into(),
lineage_id,
)?;
let node_weight =
NodeWeight::new_action(originating_change_set_id, new_id.into(), lineage_id);
ctx.workspace_snapshot()?.add_node(node_weight).await?;

let action_category_id = ctx
Expand Down Expand Up @@ -395,7 +384,7 @@ impl Action {

pub async fn remove_by_id(ctx: &DalContext, action_id: ActionId) -> ActionResult<()> {
ctx.workspace_snapshot()?
.remove_node_by_id(ctx.vector_clock_id()?, action_id)
.remove_node_by_id(action_id)
.await?;
Ok(())
}
Expand All @@ -410,8 +399,7 @@ impl Action {
if let Some(action_id) =
Self::find_equivalent(ctx, action_prototype_id, maybe_component_id).await?
{
snap.remove_node_by_id(ctx.vector_clock_id()?, action_id)
.await?;
snap.remove_node_by_id(action_id).await?;
}
Ok(())
}
Expand Down
28 changes: 7 additions & 21 deletions lib/dal/src/action/prototype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ use crate::{
implement_add_edge_to,
workspace_snapshot::node_weight::{ActionPrototypeNodeWeight, NodeWeight, NodeWeightError},
ActionPrototypeId, ChangeSetError, Component, ComponentError, ComponentId, DalContext,
EdgeWeightError, EdgeWeightKind, EdgeWeightKindDiscriminants, HelperError, SchemaVariant,
SchemaVariantError, SchemaVariantId, TransactionsError, WorkspaceSnapshotError, WsEvent,
WsEventError, WsEventResult, WsPayload,
EdgeWeightKind, EdgeWeightKindDiscriminants, HelperError, SchemaVariant, SchemaVariantError,
SchemaVariantId, TransactionsError, WorkspaceSnapshotError, WsEvent, WsEventError,
WsEventResult, WsPayload,
};

#[remain::sorted]
Expand All @@ -34,8 +34,6 @@ pub enum ActionPrototypeError {
Component(#[from] ComponentError),
#[error("diagram error: {0}")]
Diagram(#[from] DiagramError),
#[error("Edge Weight error: {0}")]
EdgeWeight(#[from] EdgeWeightError),
#[error("func not found for prototype: {0}")]
FuncNotFoundForPrototype(ActionPrototypeId),
#[error("func runner error: {0}")]
Expand Down Expand Up @@ -161,17 +159,10 @@ impl ActionPrototype {
schema_variant_id: SchemaVariantId,
func_id: FuncId,
) -> ActionPrototypeResult<Self> {
let vector_clock_id = ctx.vector_clock_id()?;
let new_id: ActionPrototypeId = ctx.workspace_snapshot()?.generate_ulid().await?.into();
let lineage_id = ctx.workspace_snapshot()?.generate_ulid().await?;
let node_weight = NodeWeight::new_action_prototype(
vector_clock_id,
new_id.into(),
lineage_id,
kind,
name,
description,
)?;
let node_weight =
NodeWeight::new_action_prototype(new_id.into(), lineage_id, kind, name, description);
ctx.workspace_snapshot()?.add_node(node_weight).await?;

Self::add_edge_to_func(ctx, new_id, func_id, EdgeWeightKind::new_use()).await?;
Expand Down Expand Up @@ -465,22 +456,17 @@ impl ActionPrototype {
}

pub async fn remove(ctx: &DalContext, id: ActionPrototypeId) -> ActionPrototypeResult<()> {
let vector_clock_id = ctx.vector_clock_id()?;
// check if there are existing actions queued for this prototype and remove them
let enqueued_actions = Self::find_enqueued_actions(ctx, id).await?;

for action in enqueued_actions {
ctx.workspace_snapshot()?
.remove_node_by_id(vector_clock_id, action)
.await?;
ctx.workspace_snapshot()?.remove_node_by_id(action).await?;
WsEvent::action_list_updated(ctx)
.await?
.publish_on_commit(ctx)
.await?;
}
ctx.workspace_snapshot()?
.remove_node_by_id(vector_clock_id, id)
.await?;
ctx.workspace_snapshot()?.remove_node_by_id(id).await?;

Ok(())
}
Expand Down
16 changes: 4 additions & 12 deletions lib/dal/src/attribute/prototype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ use crate::attribute::value::AttributeValueError;
use crate::change_set::ChangeSetError;
use crate::layer_db_types::{AttributePrototypeContent, AttributePrototypeContentV1};
use crate::workspace_snapshot::content_address::{ContentAddress, ContentAddressDiscriminants};
use crate::workspace_snapshot::edge_weight::{
EdgeWeightError, EdgeWeightKind, EdgeWeightKindDiscriminants,
};
use crate::workspace_snapshot::edge_weight::{EdgeWeightKind, EdgeWeightKindDiscriminants};
use crate::workspace_snapshot::node_weight::{
content_node_weight, NodeWeight, NodeWeightDiscriminants, NodeWeightError,
};
Expand All @@ -53,8 +51,6 @@ pub enum AttributePrototypeError {
AttributeValue(#[from] Box<AttributeValueError>),
#[error("change set error: {0}")]
ChangeSet(#[from] ChangeSetError),
#[error("edge weight error: {0}")]
EdgeWeight(#[from] EdgeWeightError),
#[error("func error: {0}")]
Func(#[from] FuncError),
#[error("helper error: {0}")]
Expand Down Expand Up @@ -163,12 +159,8 @@ impl AttributePrototype {
let workspace_snapshot = ctx.workspace_snapshot()?;
let id = workspace_snapshot.generate_ulid().await?;
let lineage_id = workspace_snapshot.generate_ulid().await?;
let node_weight = NodeWeight::new_content(
ctx.vector_clock_id()?,
id,
lineage_id,
ContentAddress::AttributePrototype(hash),
)?;
let node_weight =
NodeWeight::new_content(id, lineage_id, ContentAddress::AttributePrototype(hash));
let _node_index = workspace_snapshot.add_node(node_weight).await?;

let prototype = AttributePrototype::assemble(id.into(), &content);
Expand Down Expand Up @@ -481,7 +473,7 @@ impl AttributePrototype {
prototype_id: AttributePrototypeId,
) -> AttributePrototypeResult<()> {
ctx.workspace_snapshot()?
.remove_node_by_id(ctx.vector_clock_id()?, prototype_id)
.remove_node_by_id(prototype_id)
.await?;

Ok(())
Expand Down
16 changes: 4 additions & 12 deletions lib/dal/src/attribute/prototype/argument.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
socket::input::InputSocketId,
workspace_snapshot::{
content_address::ContentAddressDiscriminants,
edge_weight::{EdgeWeightError, EdgeWeightKind, EdgeWeightKindDiscriminants},
edge_weight::{EdgeWeightKind, EdgeWeightKindDiscriminants},
node_weight::{
AttributePrototypeArgumentNodeWeight, NodeWeight, NodeWeightDiscriminants,
NodeWeightError,
Expand Down Expand Up @@ -65,8 +65,6 @@ pub enum AttributePrototypeArgumentError {
AttributeValue(String),
#[error("change set error: {0}")]
ChangeSet(#[from] ChangeSetError),
#[error("edge weight error: {0}")]
EdgeWeight(#[from] EdgeWeightError),
#[error("func argument error: {0}")]
FuncArgument(#[from] FuncArgumentError),
#[error("helper error: {0}")]
Expand Down Expand Up @@ -210,12 +208,10 @@ impl AttributePrototypeArgument {
prototype_id: AttributePrototypeId,
arg_id: FuncArgumentId,
) -> AttributePrototypeArgumentResult<Self> {
let vector_clock_id = ctx.vector_clock_id()?;
let id = ctx.workspace_snapshot()?.generate_ulid().await?;
let lineage_id = ctx.workspace_snapshot()?.generate_ulid().await?;

let node_weight =
NodeWeight::new_attribute_prototype_argument(vector_clock_id, id, lineage_id, None)?;
let node_weight = NodeWeight::new_attribute_prototype_argument(id, lineage_id, None);

let workspace_snapshot = ctx.workspace_snapshot()?;

Expand Down Expand Up @@ -246,18 +242,16 @@ impl AttributePrototypeArgument {
destination_component_id: ComponentId,
destination_attribute_prototype_id: AttributePrototypeId,
) -> AttributePrototypeArgumentResult<Self> {
let vector_clock_id = ctx.vector_clock_id()?;
let id = ctx.workspace_snapshot()?.generate_ulid().await?;
let lineage_id = ctx.workspace_snapshot()?.generate_ulid().await?;
let node_weight = NodeWeight::new_attribute_prototype_argument(
vector_clock_id,
id,
lineage_id,
Some(ArgumentTargets {
source_component_id,
destination_component_id,
}),
)?;
);

let prototype_func_id =
AttributePrototype::func_id(ctx, destination_attribute_prototype_id).await?;
Expand Down Expand Up @@ -690,9 +684,7 @@ impl AttributePrototypeArgument {
}

// Remove the argument
ctx.workspace_snapshot()?
.remove_node_by_id(ctx.vector_clock_id()?, self.id)
.await?;
ctx.workspace_snapshot()?.remove_node_by_id(self.id).await?;

// Enqueue a dependent values update with the destination attribute values
ctx.add_dependent_values_and_enqueue(avs_to_update).await?;
Expand Down
8 changes: 2 additions & 6 deletions lib/dal/src/attribute/prototype/argument/static_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,8 @@ impl StaticArgumentValue {

let id = ctx.workspace_snapshot()?.generate_ulid().await?;
let lineage_id = ctx.workspace_snapshot()?.generate_ulid().await?;
let node_weight = NodeWeight::new_content(
ctx.vector_clock_id()?,
id,
lineage_id,
ContentAddress::StaticArgumentValue(hash),
)?;
let node_weight =
NodeWeight::new_content(id, lineage_id, ContentAddress::StaticArgumentValue(hash));

ctx.workspace_snapshot()?.add_node(node_weight).await?;

Expand Down
28 changes: 8 additions & 20 deletions lib/dal/src/attribute/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,11 @@ use crate::socket::input::InputSocketError;
use crate::socket::output::OutputSocketError;
use crate::validation::{ValidationError, ValidationOutput};
use crate::workspace_snapshot::content_address::{ContentAddress, ContentAddressDiscriminants};
use crate::workspace_snapshot::edge_weight::{
EdgeWeightError, EdgeWeightKind, EdgeWeightKindDiscriminants,
};
use crate::workspace_snapshot::edge_weight::{EdgeWeightKind, EdgeWeightKindDiscriminants};
use crate::workspace_snapshot::node_weight::{
AttributeValueNodeWeight, NodeWeight, NodeWeightDiscriminants, NodeWeightError,
};
use crate::workspace_snapshot::{
serde_value_to_string_type, vector_clock::HasVectorClocks, WorkspaceSnapshotError,
};
use crate::workspace_snapshot::{serde_value_to_string_type, WorkspaceSnapshotError};
use crate::{
implement_add_edge_to, pk, AttributePrototype, AttributePrototypeId, Component, ComponentError,
ComponentId, DalContext, Func, FuncError, FuncId, HelperError, InputSocket, InputSocketId,
Expand Down Expand Up @@ -130,8 +126,6 @@ pub enum AttributeValueError {
ChangeSet(#[from] ChangeSetError),
#[error("component error: {0}")]
Component(#[from] Box<ComponentError>),
#[error("edge weight error: {0}")]
EdgeWeight(#[from] EdgeWeightError),
#[error("empty attribute prototype arguments for group name: {0}")]
EmptyAttributePrototypeArgumentsForGroup(String),
#[error("func error: {0}")]
Expand Down Expand Up @@ -295,11 +289,9 @@ impl AttributeValue {
maybe_parent_attribute_value: Option<AttributeValueId>,
key: Option<String>,
) -> AttributeValueResult<Self> {
let vector_clock_id = ctx.vector_clock_id()?;
let id = ctx.workspace_snapshot()?.generate_ulid().await?;
let lineage_id = ctx.workspace_snapshot()?.generate_ulid().await?;
let node_weight =
NodeWeight::new_attribute_value(vector_clock_id, id, lineage_id, None, None)?;
let node_weight = NodeWeight::new_attribute_value(id, lineage_id, None, None);
let is_for = is_for.into();

let ordered = if let Some(prop_id) = is_for.prop_id() {
Expand All @@ -315,7 +307,7 @@ impl AttributeValue {

if ordered {
ctx.workspace_snapshot()?
.add_ordered_node(vector_clock_id, node_weight.clone())
.add_ordered_node(node_weight.clone())
.await?;
} else {
ctx.workspace_snapshot()?
Expand Down Expand Up @@ -1135,7 +1127,7 @@ impl AttributeValue {
.id();

workspace_snapshot
.remove_node_by_id(ctx.vector_clock_id()?, current_target_id)
.remove_node_by_id(current_target_id)
.await?;
}

Expand Down Expand Up @@ -1779,7 +1771,6 @@ impl AttributeValue {

ctx.workspace_snapshot()?
.remove_edge_for_ulids(
ctx.vector_clock_id()?,
attribute_value_id,
prototype_id,
EdgeWeightKindDiscriminants::Prototype,
Expand Down Expand Up @@ -1952,8 +1943,7 @@ impl AttributeValue {
)
.await?;

let mut new_av_node_weight =
av_node_weight.new_with_incremented_vector_clock(ctx.vector_clock_id()?);
let mut new_av_node_weight = av_node_weight.clone();

new_av_node_weight.set_value(value_address.map(ContentAddress::JsonValue));
new_av_node_weight
Expand Down Expand Up @@ -2201,11 +2191,9 @@ impl AttributeValue {
.await?
.ok_or(AttributeValueError::RemovingWhenNotChildOrMapOrArray(id))?;

ctx.workspace_snapshot()?.remove_node_by_id(id).await?;
ctx.workspace_snapshot()?
.remove_node_by_id(ctx.vector_clock_id()?, id)
.await?;
ctx.workspace_snapshot()?
.remove_dependent_value_root(ctx.vector_clock_id()?, id)
.remove_dependent_value_root(id)
.await?;

ctx.add_dependent_values_and_enqueue(vec![parent_av_id])
Expand Down
Loading

0 comments on commit 0b58dd7

Please sign in to comment.