Skip to content

Commit

Permalink
Merge pull request #4269 from systeminit/zack/automatically-resolve-p…
Browse files Browse the repository at this point in the history
…rototype-edge-mismatches

fix(dal): automatically resolve prototype edge conflicts
  • Loading branch information
zacharyhamm authored Aug 1, 2024
2 parents 577fad3 + 6d7778e commit 20e1373
Showing 1 changed file with 158 additions and 4 deletions.
162 changes: 158 additions & 4 deletions lib/rebaser-server/src/rebase.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use dal::change_set::{ChangeSet, ChangeSetError, ChangeSetId};
use dal::workspace_snapshot::vector_clock::VectorClockId;
use dal::workspace_snapshot::WorkspaceSnapshotError;
use dal::{DalContext, TransactionsError, WorkspacePk, WorkspaceSnapshot, WsEventError};
use dal::workspace_snapshot::conflict::Conflict;
use dal::workspace_snapshot::graph::ConflictsAndUpdates;
use dal::workspace_snapshot::update::Update;
use dal::workspace_snapshot::vector_clock::{HasVectorClocks, VectorClockId};
use dal::workspace_snapshot::{NodeId, NodeInformation, WorkspaceSnapshotError};
use dal::{
DalContext, EdgeWeight, EdgeWeightKindDiscriminants, TransactionsError, WorkspacePk,
WorkspaceSnapshot, WsEventError,
};
use si_events::WorkspaceSnapshotAddress;
use si_layer_cache::activities::rebase::RebaseStatus;
use si_layer_cache::activities::ActivityRebaseRequest;
Expand Down Expand Up @@ -84,20 +90,34 @@ pub async fn perform_rebase(
to_rebase_change_set.id,
))?;

let conflicts_and_updates = to_rebase_workspace_snapshot
let mut conflicts_and_updates = to_rebase_workspace_snapshot
.detect_conflicts_and_updates(
to_rebase_vector_clock_id,
&onto_workspace_snapshot,
onto_vector_clock_id,
)
.await?;

info!(
"count: conflicts ({}) and updates ({}), {:?}",
conflicts_and_updates.conflicts.len(),
conflicts_and_updates.updates.len(),
start.elapsed()
);

let len_before = conflicts_and_updates.conflicts.len();
if !conflicts_and_updates.conflicts.is_empty() {
conflicts_and_updates = fix_prototype_race_conflicts(
conflicts_and_updates.clone(),
&to_rebase_workspace_snapshot,
)
.await?;
}

if conflicts_and_updates.conflicts.len() < len_before {
info!("automatically resolved prototype edge exclusive edge mismatch");
}

// If there are conflicts, immediately assemble a reply message that conflicts were found.
// Otherwise, we can perform updates and assemble a "success" reply message.
let message: RebaseStatus = if conflicts_and_updates.conflicts.is_empty() {
Expand Down Expand Up @@ -190,3 +210,137 @@ pub(crate) async fn evict_unused_snapshots(
}
Ok(())
}

/// If the same user modifies two attributes in two+ components in quick
/// succession, they can race against themselves, producing a conflict where
/// an out of date attribute that was *not* changed in "onto" attempts to
/// stomp on the more up to date attribute in "to_rebase". We can fix this
/// by just removing the new edge update if there are no other updates for
/// the source id.
async fn fix_prototype_race_conflicts(
mut conflicts_and_updates: ConflictsAndUpdates,
to_rebase_snapshot: &WorkspaceSnapshot,
) -> RebaseResult<ConflictsAndUpdates> {
let original_conflicts = conflicts_and_updates.conflicts.clone();
for conflict in &original_conflicts {
match conflict {
Conflict::ExclusiveEdgeMismatch {
source, edge_kind, ..
} if edge_kind == &EdgeWeightKindDiscriminants::Prototype => {
let mut new_edge_updates = find_new_edge_updates_for_source(
&conflicts_and_updates.updates,
source.id,
*edge_kind,
);
if new_edge_updates.len() != 1 {
// We can't resolve this one automatically because there
// is either no new edge update or more than one for
// this kind
continue;
}
let to_rebase_edge = to_rebase_snapshot
.edges_directed_for_edge_weight_kind(
source.id,
dal::workspace_snapshot::Direction::Outgoing,
*edge_kind,
)
.await?
.pop()
.map(|(edge_weight, _, _)| edge_weight);

if let (Some(to_rebase_edge), Some((_, _, onto_edge_weight))) =
(to_rebase_edge, new_edge_updates.pop())
{
let to_rebase_clock = to_rebase_edge.vector_clock_write().max(None);
let onto_clock = onto_edge_weight.vector_clock_write().max(None);
if let (
Some((to_rebase_clock_id, to_rebase_clock_stamp)),
Some((onto_clock_id, onto_clock_stamp)),
) = (to_rebase_clock, onto_clock)
{
if to_rebase_clock_id == onto_clock_id
&& to_rebase_clock_stamp > onto_clock_stamp
{
conflicts_and_updates =
remove_new_edge_update_and_conflict_for_source_if_safe(
conflicts_and_updates,
source.id,
*edge_kind,
);
}
}
}
}
_ => {}
}
}

Ok(conflicts_and_updates)
}

// NOTE: This is only safe when used by the fix_prototype_race_conflicts function...
fn remove_new_edge_update_and_conflict_for_source_if_safe(
mut conflicts_and_updates: ConflictsAndUpdates,
source_id: NodeId,
new_edge_kind: EdgeWeightKindDiscriminants,
) -> ConflictsAndUpdates {
let is_it_safe = !conflicts_and_updates
.updates
.iter()
.any(|update| match update {
Update::NewEdge {
source,
edge_weight,
..
} => source.id == source_id && new_edge_kind != edge_weight.kind().into(),
Update::RemoveEdge { source, .. } => source.id == source_id,
Update::ReplaceSubgraph { onto, .. } => onto.id == source_id,
_ => false,
});

if !is_it_safe {
return conflicts_and_updates;
}

conflicts_and_updates.updates.retain(|update| match update {
Update::NewEdge {
source,
edge_weight,
..
} if source.id == source_id => new_edge_kind != edge_weight.kind().into(),
_ => true,
});

conflicts_and_updates
.conflicts
.retain(|conflict| match conflict {
Conflict::ExclusiveEdgeMismatch {
source, edge_kind, ..
} if source.id == source_id => edge_kind != &new_edge_kind,
_ => true,
});

conflicts_and_updates
}

fn find_new_edge_updates_for_source(
updates: &[Update],
source_id: NodeId,
kind: EdgeWeightKindDiscriminants,
) -> Vec<(NodeInformation, NodeInformation, EdgeWeight)> {
updates
.iter()
.filter_map(|update| match update {
Update::NewEdge {
source,
destination,
edge_weight,
} if source.id == source_id && kind == edge_weight.kind().into() => Some((
source.to_owned(),
destination.to_owned(),
edge_weight.to_owned(),
)),
_ => None,
})
.collect()
}

0 comments on commit 20e1373

Please sign in to comment.