Skip to content

Commit

Permalink
Merge pull request #5261 from systeminit/jkeiser/change-set-not-found
Browse files Browse the repository at this point in the history
chore(dal): Use ChangeSet::get_by_id() anywhere change set is already required
  • Loading branch information
jkeiser authored Jan 15, 2025
2 parents b8a0cb1 + 99c494a commit bebdfe9
Show file tree
Hide file tree
Showing 33 changed files with 115 additions and 203 deletions.
5 changes: 2 additions & 3 deletions lib/dal-test/src/expand_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@ pub async fn create_change_set_and_update_ctx(
ctx: &mut DalContext,
base_change_set_id: ChangeSetId,
) {
let base_change_set = ChangeSet::find(ctx, base_change_set_id)
let base_change_set = ChangeSet::get_by_id(ctx, base_change_set_id)
.await
.expect("could not perform find change set")
.expect("no change set found");
.expect("could not find change set");
let workspace_snapshot_address = base_change_set.workspace_snapshot_address;
let change_set = ChangeSet::new(
ctx,
Expand Down
4 changes: 1 addition & 3 deletions lib/dal-test/src/helpers/change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,7 @@ impl ChangeSetTestHelpers {

/// Abandons the current [`ChangeSet`].
pub async fn abandon_change_set(ctx: &mut DalContext) -> Result<()> {
let mut abandonment_change_set = ChangeSet::find(ctx, ctx.change_set_id())
.await?
.ok_or(eyre!("change set not found by id: {}", ctx.change_set_id()))?;
let mut abandonment_change_set = ChangeSet::get_by_id(ctx, ctx.change_set_id()).await?;
abandonment_change_set.abandon(ctx).await?;
Ok(())
}
Expand Down
60 changes: 26 additions & 34 deletions lib/dal/src/change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ pub enum ChangeSetError {
ChangeSetNotApprovedForApply(ChangeSetStatus),
#[error("change set with id {0} not found")]
ChangeSetNotFound(ChangeSetId),
#[error("could not find default change set: {0}")]
DefaultChangeSetNotFound(ChangeSetId),
#[error("default change set {0} has no workspace snapshot pointer")]
DefaultChangeSetNoWorkspaceSnapshotPointer(ChangeSetId),
#[error("dvu roots are not empty for change set: {0}")]
Expand Down Expand Up @@ -96,8 +94,6 @@ pub enum ChangeSetError {
User(#[from] UserError),
#[error("workspace error: {0}")]
Workspace(#[from] Box<WorkspaceError>),
#[error("workspace not found: {0}")]
WorkspaceNotFound(WorkspacePk),
#[error("workspace snapshot error: {0}")]
WorkspaceSnapshot(#[from] Box<WorkspaceSnapshotError>),
#[error("ws event error: {0}")]
Expand Down Expand Up @@ -129,8 +125,6 @@ pub enum ChangeSetApplyError {
ActionPrototypeNotFound(ActionId),
#[error("change set error: {0}")]
ChangeSet(#[from] ChangeSetError),
#[error("change set not found by id: {0}")]
ChangeSetNotFound(ChangeSetId),
#[error("component error: {0}")]
Component(#[from] ComponentError),
#[error("invalid user: {0}")]
Expand Down Expand Up @@ -232,28 +226,14 @@ impl ChangeSet {
}

pub async fn fork_head(ctx: &DalContext, name: impl AsRef<str>) -> ChangeSetResult<Self> {
let workspace_pk = ctx
.tenancy()
.workspace_pk_opt()
.ok_or(ChangeSetError::NoTenancySet)?;
let workspace_pk = ctx.workspace_pk()?;

let workspace = Workspace::get_by_pk(ctx, &workspace_pk)
.await?
.ok_or(ChangeSetError::WorkspaceNotFound(workspace_pk))?;
let workspace = Workspace::get_by_pk_or_error(ctx, workspace_pk).await?;

let base_change_set = ChangeSet::find(ctx, workspace.default_change_set_id())
.await?
.ok_or(ChangeSetError::DefaultChangeSetNotFound(
workspace.default_change_set_id(),
))?;
let head = workspace.default_change_set(ctx).await?;

let change_set = ChangeSet::new(
ctx,
name,
Some(workspace.default_change_set_id()),
base_change_set.workspace_snapshot_address,
)
.await?;
let change_set =
ChangeSet::new(ctx, name, Some(head.id), head.workspace_snapshot_address).await?;

Ok(change_set)
}
Expand Down Expand Up @@ -426,9 +406,7 @@ impl ChangeSet {
/// lock every [`SchemaVariant`] and [`Func`] that is currently unlocked
pub async fn prepare_for_force_apply(ctx: &DalContext) -> ChangeSetResult<()> {
// first change the status to approved and who did it
let mut change_set = ChangeSet::find(ctx, ctx.change_set_id())
.await?
.ok_or(TransactionsError::ChangeSetNotFound(ctx.change_set_id()))?;
let mut change_set = ChangeSet::get_by_id(ctx, ctx.change_set_id()).await?;

change_set.request_change_set_approval(ctx).await?;
// then approve it
Expand All @@ -441,9 +419,7 @@ impl ChangeSet {
/// [`ChangeSetStatus::Approved`]. Finally,
/// lock every [`SchemaVariant`] and [`Func`] that is currently unlocked
pub async fn prepare_for_apply(ctx: &DalContext) -> ChangeSetResult<()> {
let change_set = ChangeSet::find(ctx, ctx.change_set_id())
.await?
.ok_or(TransactionsError::ChangeSetNotFound(ctx.change_set_id()))?;
let change_set = ChangeSet::get_by_id(ctx, ctx.change_set_id()).await?;

// Ensure that DVU roots are empty before continuing.
if !ctx
Expand Down Expand Up @@ -555,6 +531,17 @@ impl ChangeSet {
Ok(())
}

/// Finds a [`ChangeSet`] across all workspaces, ignoring the provided [`WorkspacePk`] on the
/// current [`DalContext`]
pub async fn get_by_id_across_workspaces(
ctx: &DalContext,
change_set_id: ChangeSetId,
) -> ChangeSetResult<Self> {
Self::find_across_workspaces(ctx, change_set_id)
.await?
.ok_or_else(|| ChangeSetError::ChangeSetNotFound(change_set_id))
}

/// Finds a [`ChangeSet`] across all workspaces, ignoring the provided [`WorkspacePk`] on the
/// current [`DalContext`]
#[instrument(
Expand Down Expand Up @@ -595,6 +582,13 @@ impl ChangeSet {
}
}

/// Get a change set within the [`WorkspacePk`] set for the current [`DalContext`]
pub async fn get_by_id(ctx: &DalContext, change_set_id: ChangeSetId) -> ChangeSetResult<Self> {
Self::find(ctx, change_set_id)
.await?
.ok_or_else(|| ChangeSetError::ChangeSetNotFound(change_set_id))
}

/// Find a change set within the [`WorkspacePk`] set for the current [`DalContext`]
#[instrument(
name = "change_set.find",
Expand Down Expand Up @@ -745,9 +739,7 @@ impl ChangeSet {
#[instrument(level = "info", skip_all)]
pub async fn apply_to_base_change_set(ctx: &mut DalContext) -> ChangeSetApplyResult<ChangeSet> {
// Apply to the base change with the current change set (non-editing) and commit.
let mut change_set_to_be_applied = Self::find(ctx, ctx.change_set_id())
.await?
.ok_or(ChangeSetApplyError::ChangeSetNotFound(ctx.change_set_id()))?;
let mut change_set_to_be_applied = Self::get_by_id(ctx, ctx.change_set_id()).await?;
ctx.update_visibility_and_snapshot_to_visibility(ctx.change_set_id())
.await?;
change_set_to_be_applied
Expand Down
19 changes: 10 additions & 9 deletions lib/dal/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::layer_db_types::ContentTypes;
use crate::slow_rt::SlowRuntimeError;
use crate::workspace_snapshot::graph::{RebaseBatch, WorkspaceSnapshotGraph};
use crate::workspace_snapshot::DependentValueRoot;
use crate::{audit_logging, slow_rt, EncryptedSecret, Workspace, WorkspaceError};
use crate::{audit_logging, slow_rt, ChangeSetError, EncryptedSecret, Workspace, WorkspaceError};
use crate::{
change_set::{ChangeSet, ChangeSetId},
job::{
Expand Down Expand Up @@ -407,10 +407,7 @@ impl DalContext {
/// Note: This does not guarantee that the [`ChangeSetId`] is contained within the [`WorkspacePk`]
/// for the current [`DalContext`]
pub async fn update_snapshot_to_visibility(&mut self) -> TransactionsResult<()> {
let change_set = ChangeSet::find_across_workspaces(self, self.change_set_id())
.await
.map_err(|err| TransactionsError::ChangeSet(err.to_string()))?
.ok_or(TransactionsError::ChangeSetNotFound(self.change_set_id()))?;
let change_set = ChangeSet::get_by_id_across_workspaces(self, self.change_set_id()).await?;

let workspace_snapshot = WorkspaceSnapshot::find_for_change_set(self, change_set.id)
.await
Expand Down Expand Up @@ -1353,9 +1350,7 @@ pub enum TransactionsError {
#[error("Bad Workspace & Change Set")]
BadWorkspaceAndChangeSet,
#[error("change set error: {0}")]
ChangeSet(String),
#[error("change set not found for change set id: {0}")]
ChangeSetNotFound(ChangeSetId),
ChangeSet(#[from] Box<ChangeSetError>),
#[error("change set not set on DalContext")]
ChangeSetNotSet,
#[error("job queue processor error: {0}")]
Expand Down Expand Up @@ -1406,7 +1401,13 @@ pub type TransactionsResult<T> = Result<T, TransactionsError>;

impl From<WorkspaceError> for TransactionsError {
fn from(err: WorkspaceError) -> Self {
TransactionsError::Workspace(Box::new(err))
Box::new(err).into()
}
}

impl From<ChangeSetError> for TransactionsError {
fn from(err: ChangeSetError) -> Self {
Box::new(err).into()
}
}

Expand Down
16 changes: 11 additions & 5 deletions lib/dal/src/workspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ pub enum WorkspaceError {
BuiltinWorkspaceNotFound,
#[error("change set error: {0}")]
ChangeSet(#[from] ChangeSetError),
#[error("change set not found by id: {0}")]
ChangeSetNotFound(ChangeSetId),
#[error("could not find default change set {1} for workspace {0}")]
DefaultChangeSetNotFound(WorkspacePk, ChangeSetId),
#[error("Trying to export from system actor. This can only be done by a user actor")]
ExportingFromSystemActor,
#[error(transparent)]
Expand Down Expand Up @@ -589,9 +589,7 @@ impl Workspace {
let base_changeset_for_default = {
let changeset_id = self.default_change_set_id();

let changeset = ChangeSet::find(ctx, changeset_id)
.await?
.ok_or(WorkspaceError::ChangeSetNotFound(changeset_id))?;
let changeset = ChangeSet::get_by_id(ctx, changeset_id).await?;

changeset.base_change_set_id
};
Expand Down Expand Up @@ -679,6 +677,14 @@ impl Workspace {
Ok(has_change_set)
}

pub async fn default_change_set(&self, ctx: &DalContext) -> WorkspaceResult<ChangeSet> {
ChangeSet::find(ctx, self.default_change_set_id)
.await?
.ok_or_else(|| {
WorkspaceError::DefaultChangeSetNotFound(self.pk, self.default_change_set_id)
})
}

/// Mark all workspaces in the database with a given snapshot version. Use
/// only if you know you have migrated the snapshots for these workspaces to
/// this version!
Expand Down
4 changes: 1 addition & 3 deletions lib/dal/src/workspace_snapshot/migrator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,7 @@ impl SnapshotGraphMigrator {
info!("Migrating {} snapshot(s)", open_change_sets.len(),);

for change_set in open_change_sets {
let mut change_set = ChangeSet::find_across_workspaces(ctx, change_set.id)
.await?
.ok_or(ChangeSetError::ChangeSetNotFound(change_set.id))?;
let mut change_set = ChangeSet::get_by_id_across_workspaces(ctx, change_set.id).await?;
if change_set.workspace_id.is_none() || change_set.status == ChangeSetStatus::Failed {
// These are broken/garbage change sets generated during migrations of the
// "universal" workspace/change set. They're not actually accessible via normal
Expand Down
39 changes: 15 additions & 24 deletions lib/dal/tests/integration_test/change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,12 +319,9 @@ async fn cannot_find_change_set_across_workspaces(
assert!(user_2_change_set_unfound.is_none());

// But if we search for the change set across all workspaces, we find it
let user_2_change_set_found_harshly =
ChangeSet::find_across_workspaces(&user_1_dal_context, user_2_change_set.id)
.await
.expect("could not find change set");

assert!(user_2_change_set_found_harshly.is_some());
ChangeSet::get_by_id_across_workspaces(&user_1_dal_context, user_2_change_set.id)
.await
.expect("could not find change set");
}

#[test]
Expand Down Expand Up @@ -402,10 +399,9 @@ async fn change_set_approval_flow(ctx: &mut DalContext) {
.await
.expect("could not commit and update");
// request approval
let mut change_set = ChangeSet::find(ctx, new_change_set.id)
let mut change_set = ChangeSet::get_by_id(ctx, new_change_set.id)
.await
.expect("could not find change set")
.expect("change set is some");
.expect("could not find change set");

change_set
.request_change_set_approval(ctx)
Expand All @@ -415,10 +411,9 @@ async fn change_set_approval_flow(ctx: &mut DalContext) {
ChangeSetTestHelpers::commit_and_update_snapshot_to_visibility(ctx)
.await
.expect("could not commit and update");
let mut change_set = ChangeSet::find(ctx, new_change_set.id)
let mut change_set = ChangeSet::get_by_id(ctx, new_change_set.id)
.await
.expect("could not find change set")
.expect("change set is some");
.expect("could not find change set");

// make sure everything looks right
assert_eq!(change_set.status, ChangeSetStatus::NeedsApproval);
Expand All @@ -435,10 +430,9 @@ async fn change_set_approval_flow(ctx: &mut DalContext) {
ChangeSetTestHelpers::commit_and_update_snapshot_to_visibility(ctx)
.await
.expect("could not commit and update");
let mut change_set = ChangeSet::find(ctx, new_change_set.id)
let mut change_set = ChangeSet::get_by_id(ctx, new_change_set.id)
.await
.expect("could not find change set")
.expect("change set is some");
.expect("could not find change set");
assert_eq!(change_set.status, ChangeSetStatus::Rejected);
assert!(change_set.merge_requested_at.is_some());
assert_eq!(change_set.merge_requested_by_user_id, current_user);
Expand All @@ -457,10 +451,9 @@ async fn change_set_approval_flow(ctx: &mut DalContext) {
ChangeSetTestHelpers::commit_and_update_snapshot_to_visibility(ctx)
.await
.expect("could not commit and update");
let mut change_set = ChangeSet::find(ctx, new_change_set.id)
let mut change_set = ChangeSet::get_by_id(ctx, new_change_set.id)
.await
.expect("could not find change set")
.expect("change set is some");
.expect("could not find change set");
assert_eq!(change_set.status, ChangeSetStatus::Open);
assert_eq!(change_set.merge_requested_at, None);
assert_eq!(change_set.merge_requested_by_user_id, None);
Expand All @@ -476,10 +469,9 @@ async fn change_set_approval_flow(ctx: &mut DalContext) {
ChangeSetTestHelpers::commit_and_update_snapshot_to_visibility(ctx)
.await
.expect("could not commit and update");
let mut change_set = ChangeSet::find(ctx, new_change_set.id)
let mut change_set = ChangeSet::get_by_id(ctx, new_change_set.id)
.await
.expect("could not find change set")
.expect("change set is some");
.expect("could not find change set");

// make sure everything looks right
assert_eq!(change_set.status, ChangeSetStatus::NeedsApproval);
Expand All @@ -497,10 +489,9 @@ async fn change_set_approval_flow(ctx: &mut DalContext) {
ChangeSetTestHelpers::commit_and_update_snapshot_to_visibility(ctx)
.await
.expect("could not commit and update");
let change_set = ChangeSet::find(ctx, new_change_set.id)
let change_set = ChangeSet::get_by_id(ctx, new_change_set.id)
.await
.expect("could not find change set")
.expect("change set is some");
.expect("could not find change set");

// make sure everything looks right
assert_eq!(change_set.status, ChangeSetStatus::Approved);
Expand Down
5 changes: 2 additions & 3 deletions lib/rebaser-server/src/change_set_processor_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,8 @@ mod handlers {

if let Some(workspace) = Workspace::get_by_pk(&ctx, &workspace_id).await? {
if workspace.default_change_set_id() == ctx.visibility().change_set_id {
let mut change_set = ChangeSet::find(&ctx, ctx.visibility().change_set_id)
.await?
.ok_or(RebaseError::MissingChangeSet(change_set_id))?;
let mut change_set =
ChangeSet::get_by_id(&ctx, ctx.visibility().change_set_id).await?;
if WorkspaceSnapshot::dispatch_actions(&ctx).await? {
// Write out the snapshot to get the new address/id.
let new_snapshot_id = ctx
Expand Down
6 changes: 1 addition & 5 deletions lib/rebaser-server/src/rebase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ pub(crate) enum RebaseError {
ChangeSet(#[from] ChangeSetError),
#[error("layerdb error: {0}")]
LayerDb(#[from] LayerDbError),
#[error("missing change set")]
MissingChangeSet(ChangeSetId),
#[error("missing rebase batch {0}")]
MissingRebaseBatch(RebaseBatchAddress),
#[error("pending events error: {0}")]
Expand Down Expand Up @@ -74,9 +72,7 @@ pub async fn perform_rebase(
let updating_head = request.change_set_id == workspace.default_change_set_id();

// Gather everything we need to detect conflicts and updates from the inbound message.
let mut to_rebase_change_set = ChangeSet::find(ctx, request.change_set_id)
.await?
.ok_or(RebaseError::MissingChangeSet(request.change_set_id))?;
let mut to_rebase_change_set = ChangeSet::get_by_id(ctx, request.change_set_id).await?;
let to_rebase_workspace_snapshot_address = to_rebase_change_set.workspace_snapshot_address;
debug!("before snapshot fetch and parse: {:?}", start.elapsed());
let to_rebase_workspace_snapshot =
Expand Down
6 changes: 3 additions & 3 deletions lib/sdf-server/src/service/change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ pub enum ChangeSetError {
ActionPrototype(#[from] ActionPrototypeError),
#[error("cannot abandon head change set")]
CannotAbandonHead,
#[error("change set not found")]
ChangeSetNotFound,
#[error("component error: {0}")]
Component(#[from] ComponentError),
#[error("dal change set error: {0}")]
Expand Down Expand Up @@ -80,7 +78,9 @@ impl IntoResponse for ChangeSetError {
ChangeSetError::ActionAlreadyEnqueued(_) => {
(StatusCode::NOT_MODIFIED, self.to_string())
}
ChangeSetError::ChangeSetNotFound => (StatusCode::NOT_FOUND, self.to_string()),
ChangeSetError::DalChangeSet(DalChangeSetError::ChangeSetNotFound(..)) => {
(StatusCode::NOT_FOUND, self.to_string())
}
ChangeSetError::DalChangeSetApply(_) => (StatusCode::CONFLICT, self.to_string()),
ChangeSetError::DvuRootsNotEmpty(_) => (
StatusCode::PRECONDITION_REQUIRED,
Expand Down
Loading

0 comments on commit bebdfe9

Please sign in to comment.