From 6d7778e979414e160a797517ee4ca8599b54566a Mon Sep 17 00:00:00 2001 From: Zachary Hamm Date: Thu, 1 Aug 2024 18:13:26 -0500 Subject: [PATCH] fix(dal): automatically resolve prototype edge conflicts If we detect an "exclusive edge mismatch" for an attribute value's prototype edge, we can automatically resolve it if: 1. The vector clock ids are the same. 2. The edge in to_rebase is newer than the edge in onto. In this case, automatic resolution just means removing the NewEdge update and the conflict, leaving the edge as it exists in the to_rebase graph. --- lib/rebaser-server/src/rebase.rs | 162 ++++++++++++++++++++++++++++++- 1 file changed, 158 insertions(+), 4 deletions(-) diff --git a/lib/rebaser-server/src/rebase.rs b/lib/rebaser-server/src/rebase.rs index f665774800..5fbe4c9cbc 100644 --- a/lib/rebaser-server/src/rebase.rs +++ b/lib/rebaser-server/src/rebase.rs @@ -1,7 +1,13 @@ use dal::change_set::{ChangeSet, ChangeSetError, ChangeSetId}; -use dal::workspace_snapshot::vector_clock::VectorClockId; -use dal::workspace_snapshot::WorkspaceSnapshotError; -use dal::{DalContext, TransactionsError, WorkspacePk, WorkspaceSnapshot, WsEventError}; +use dal::workspace_snapshot::conflict::Conflict; +use dal::workspace_snapshot::graph::ConflictsAndUpdates; +use dal::workspace_snapshot::update::Update; +use dal::workspace_snapshot::vector_clock::{HasVectorClocks, VectorClockId}; +use dal::workspace_snapshot::{NodeId, NodeInformation, WorkspaceSnapshotError}; +use dal::{ + DalContext, EdgeWeight, EdgeWeightKindDiscriminants, TransactionsError, WorkspacePk, + WorkspaceSnapshot, WsEventError, +}; use si_events::WorkspaceSnapshotAddress; use si_layer_cache::activities::rebase::RebaseStatus; use si_layer_cache::activities::ActivityRebaseRequest; @@ -84,13 +90,14 @@ pub async fn perform_rebase( to_rebase_change_set.id, ))?; - let conflicts_and_updates = to_rebase_workspace_snapshot + let mut conflicts_and_updates = to_rebase_workspace_snapshot .detect_conflicts_and_updates( to_rebase_vector_clock_id, &onto_workspace_snapshot, onto_vector_clock_id, ) .await?; + info!( "count: conflicts ({}) and updates ({}), {:?}", conflicts_and_updates.conflicts.len(), @@ -98,6 +105,19 @@ pub async fn perform_rebase( start.elapsed() ); + let len_before = conflicts_and_updates.conflicts.len(); + if !conflicts_and_updates.conflicts.is_empty() { + conflicts_and_updates = fix_prototype_race_conflicts( + conflicts_and_updates.clone(), + &to_rebase_workspace_snapshot, + ) + .await?; + } + + if conflicts_and_updates.conflicts.len() < len_before { + info!("automatically resolved prototype edge exclusive edge mismatch"); + } + // If there are conflicts, immediately assemble a reply message that conflicts were found. // Otherwise, we can perform updates and assemble a "success" reply message. let message: RebaseStatus = if conflicts_and_updates.conflicts.is_empty() { @@ -190,3 +210,137 @@ pub(crate) async fn evict_unused_snapshots( } Ok(()) } + +/// If the same user modifies two attributes in two+ components in quick +/// succession, they can race against themselves, producing a conflict where +/// an out of date attribute that was *not* changed in "onto" attempts to +/// stomp on the more up to date attribute in "to_rebase". We can fix this +/// by just removing the new edge update if there are no other updates for +/// the source id. +async fn fix_prototype_race_conflicts( + mut conflicts_and_updates: ConflictsAndUpdates, + to_rebase_snapshot: &WorkspaceSnapshot, +) -> RebaseResult { + let original_conflicts = conflicts_and_updates.conflicts.clone(); + for conflict in &original_conflicts { + match conflict { + Conflict::ExclusiveEdgeMismatch { + source, edge_kind, .. + } if edge_kind == &EdgeWeightKindDiscriminants::Prototype => { + let mut new_edge_updates = find_new_edge_updates_for_source( + &conflicts_and_updates.updates, + source.id, + *edge_kind, + ); + if new_edge_updates.len() != 1 { + // We can't resolve this one automatically because there + // is either no new edge update or more than one for + // this kind + continue; + } + let to_rebase_edge = to_rebase_snapshot + .edges_directed_for_edge_weight_kind( + source.id, + dal::workspace_snapshot::Direction::Outgoing, + *edge_kind, + ) + .await? + .pop() + .map(|(edge_weight, _, _)| edge_weight); + + if let (Some(to_rebase_edge), Some((_, _, onto_edge_weight))) = + (to_rebase_edge, new_edge_updates.pop()) + { + let to_rebase_clock = to_rebase_edge.vector_clock_write().max(None); + let onto_clock = onto_edge_weight.vector_clock_write().max(None); + if let ( + Some((to_rebase_clock_id, to_rebase_clock_stamp)), + Some((onto_clock_id, onto_clock_stamp)), + ) = (to_rebase_clock, onto_clock) + { + if to_rebase_clock_id == onto_clock_id + && to_rebase_clock_stamp > onto_clock_stamp + { + conflicts_and_updates = + remove_new_edge_update_and_conflict_for_source_if_safe( + conflicts_and_updates, + source.id, + *edge_kind, + ); + } + } + } + } + _ => {} + } + } + + Ok(conflicts_and_updates) +} + +// NOTE: This is only safe when used by the fix_prototype_race_conflicts function... +fn remove_new_edge_update_and_conflict_for_source_if_safe( + mut conflicts_and_updates: ConflictsAndUpdates, + source_id: NodeId, + new_edge_kind: EdgeWeightKindDiscriminants, +) -> ConflictsAndUpdates { + let is_it_safe = !conflicts_and_updates + .updates + .iter() + .any(|update| match update { + Update::NewEdge { + source, + edge_weight, + .. + } => source.id == source_id && new_edge_kind != edge_weight.kind().into(), + Update::RemoveEdge { source, .. } => source.id == source_id, + Update::ReplaceSubgraph { onto, .. } => onto.id == source_id, + _ => false, + }); + + if !is_it_safe { + return conflicts_and_updates; + } + + conflicts_and_updates.updates.retain(|update| match update { + Update::NewEdge { + source, + edge_weight, + .. + } if source.id == source_id => new_edge_kind != edge_weight.kind().into(), + _ => true, + }); + + conflicts_and_updates + .conflicts + .retain(|conflict| match conflict { + Conflict::ExclusiveEdgeMismatch { + source, edge_kind, .. + } if source.id == source_id => edge_kind != &new_edge_kind, + _ => true, + }); + + conflicts_and_updates +} + +fn find_new_edge_updates_for_source( + updates: &[Update], + source_id: NodeId, + kind: EdgeWeightKindDiscriminants, +) -> Vec<(NodeInformation, NodeInformation, EdgeWeight)> { + updates + .iter() + .filter_map(|update| match update { + Update::NewEdge { + source, + destination, + edge_weight, + } if source.id == source_id && kind == edge_weight.kind().into() => Some(( + source.to_owned(), + destination.to_owned(), + edge_weight.to_owned(), + )), + _ => None, + }) + .collect() +}