Skip to content

Commit

Permalink
Merge pull request #4283 from systeminit/zack/remove-conflicts-ignore…
Browse files Browse the repository at this point in the history
…-vector-clocks

feat(dal,rebaser): remove conflicts, ignore vector clocks
  • Loading branch information
zacharyhamm authored Aug 6, 2024
2 parents 0409a9e + 2242050 commit e487fbc
Show file tree
Hide file tree
Showing 35 changed files with 708 additions and 2,931 deletions.
6 changes: 2 additions & 4 deletions lib/dal-test/src/helpers/change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,7 @@ impl ChangeSetTestHelpers {
async fn blocking_commit(ctx: &DalContext) -> Result<()> {
// TODO(nick,brit): we need to expand Brit's 409 conflict work to work with blocking commits
// too rather than evaluating an optional set of conflicts.
match ctx.blocking_commit().await? {
Some(conflicts) => Err(eyre!("found conflicts after commit: {conflicts:?}")),
None => Ok(()),
}
ctx.blocking_commit().await?;
Ok(())
}
}
17 changes: 3 additions & 14 deletions lib/dal/examples/rebase/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,12 @@ fn main() -> Result<()> {
let to_rebase_path = args.get(1).expect(USAGE);
let onto_path = args.get(2).expect(USAGE);

let mut to_rebase_graph = load_snapshot_graph(&to_rebase_path)?;
let to_rebase_graph = load_snapshot_graph(&to_rebase_path)?;
let onto_graph = load_snapshot_graph(&onto_path)?;

let to_rebase_vector_clock_id = dbg!(to_rebase_graph
.max_recently_seen_clock_id(None)
.expect("Unable to find a vector clock id in to_rebase"));
let onto_vector_clock_id = dbg!(onto_graph
.max_recently_seen_clock_id(None)
.expect("Unable to find a vector clock id in onto"));
let updates = to_rebase_graph.detect_updates(&onto_graph);

let conflicts_and_updates = to_rebase_graph.detect_conflicts_and_updates(
dbg!(to_rebase_vector_clock_id),
&onto_graph,
onto_vector_clock_id,
)?;

for update in &conflicts_and_updates.updates {
for update in &updates {
dbg!(update);
}

Expand Down
1 change: 0 additions & 1 deletion lib/dal/src/attribute/prototype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,6 @@ impl AttributePrototype {

workspace_snapshot
.remove_edge(
ctx.vector_clock_id()?,
attribute_prototype_idx,
current_func_node_idx,
EdgeWeightKindDiscriminants::Use,
Expand Down
1 change: 0 additions & 1 deletion lib/dal/src/attribute/prototype/argument.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,6 @@ impl AttributePrototypeArgument {
let self_node_index = workspace_snapshot.get_node_index_by_id(self.id).await?;
workspace_snapshot
.remove_edge(
ctx.vector_clock_id()?,
self_node_index,
existing_value_source,
EdgeWeightKindDiscriminants::PrototypeArgumentValue,
Expand Down
40 changes: 11 additions & 29 deletions lib/dal/src/change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,15 @@ use std::sync::Arc;

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use si_events::VectorClockChangeSetId;
use si_layer_cache::LayerDbError;
use thiserror::Error;

use si_data_pg::{PgError, PgRow};
use si_events::{ulid::Ulid, WorkspaceSnapshotAddress};
use telemetry::prelude::*;

use crate::context::{Conflicts, RebaseRequest};
use crate::context::RebaseRequest;
use crate::slow_rt::SlowRuntimeError;
use crate::workspace_snapshot::vector_clock::VectorClockId;
use crate::{
action::{ActionError, ActionId},
id, ChangeSetStatus, ComponentError, DalContext, HistoryActor, HistoryEvent, HistoryEventError,
Expand Down Expand Up @@ -109,8 +107,6 @@ pub enum ChangeSetApplyError {
ChangeSetNotFound(ChangeSetId),
#[error("component error: {0}")]
Component(#[from] ComponentError),
#[error("could not apply to head because of merge conflicts")]
ConflictsOnApply(Conflicts),
#[error("invalid user: {0}")]
InvalidUser(UserPk),
#[error("invalid user system init")]
Expand Down Expand Up @@ -177,10 +173,6 @@ impl ChangeSet {
workspace_snapshot_address: WorkspaceSnapshotAddress,
) -> ChangeSetResult<Self> {
let id: Ulid = Ulid::new();
let vector_clock_id = VectorClockId::new(
VectorClockChangeSetId::new(id),
ctx.vector_clock_id()?.actor_id(),
);
let change_set_id: ChangeSetId = id.into();

let workspace_snapshot = WorkspaceSnapshot::find(ctx, workspace_snapshot_address)
Expand All @@ -191,10 +183,7 @@ impl ChangeSet {
// the edit session vs what the changeset already contained. The "onto"
// changeset needs to have seen the "to_rebase" or we will treat them as
// completely disjoint changesets.
let workspace_snapshot_address = workspace_snapshot
.write(ctx, vector_clock_id)
.await
.map_err(Box::new)?;
let workspace_snapshot_address = workspace_snapshot.write(ctx).await.map_err(Box::new)?;

let workspace_id = ctx.tenancy().workspace_pk();
let name = name.as_ref();
Expand Down Expand Up @@ -437,16 +426,10 @@ impl ChangeSet {
.apply_to_base_change_set_inner(ctx)
.await?;

// do we need this commit?
if let Some(conflicts) = ctx.blocking_commit().await? {
error!("Conflicts when commiting again:{:?}", conflicts);

return Err(ChangeSetApplyError::ConflictsOnApply(conflicts));
}

let change_set_that_was_applied = change_set_to_be_applied;
// This is just to send the ws events
ctx.blocking_commit_no_rebase().await?;

Ok(change_set_that_was_applied)
Ok(change_set_to_be_applied)
}

/// Applies the current [`ChangeSet`] in the provided [`DalContext`] to its base
Expand All @@ -456,33 +439,32 @@ impl ChangeSet {
/// This function neither changes the visibility nor the snapshot after performing the
/// aforementioned actions.
async fn apply_to_base_change_set_inner(&mut self, ctx: &DalContext) -> ChangeSetResult<()> {
let to_rebase_change_set_id = self
let base_change_set_id = self
.base_change_set_id
.ok_or(ChangeSetError::NoBaseChangeSet(self.id))?;

let to_rebase_workspace_snapshot = Arc::new(
WorkspaceSnapshot::find_for_change_set(ctx, to_rebase_change_set_id)
let base_snapshot = Arc::new(
WorkspaceSnapshot::find_for_change_set(ctx, base_change_set_id)
.await
.map_err(Box::new)?,
);

if let Some(rebase_batch) = WorkspaceSnapshot::calculate_rebase_batch(
to_rebase_change_set_id,
to_rebase_workspace_snapshot,
base_snapshot,
ctx.workspace_snapshot().map_err(Box::new)?,
)
.await
.map_err(Box::new)?
{
let rebase_batch_address = ctx.write_rebase_batch(rebase_batch).await?;

let rebase_request = RebaseRequest::new(to_rebase_change_set_id, rebase_batch_address);
let rebase_request = RebaseRequest::new(base_change_set_id, rebase_batch_address);
ctx.do_rebase_request(rebase_request).await?;
}

self.update_status(ctx, ChangeSetStatus::Applied).await?;
let user = Self::extract_userid_from_context(ctx).await;
WsEvent::change_set_applied(ctx, self.id, to_rebase_change_set_id, user)
WsEvent::change_set_applied(ctx, self.id, base_change_set_id, user)
.await?
.publish_on_commit(ctx)
.await?;
Expand Down
Loading

0 comments on commit e487fbc

Please sign in to comment.