diff --git a/app/web/src/api/sdf/dal/workspace.ts b/app/web/src/api/sdf/dal/workspace.ts index 39d8b8ec19..4f680a2589 100644 --- a/app/web/src/api/sdf/dal/workspace.ts +++ b/app/web/src/api/sdf/dal/workspace.ts @@ -1,3 +1,5 @@ +import { ChangeSetId, ChangeSet } from "./change_set"; + export interface Workspace { pk: string; name: string; @@ -5,3 +7,11 @@ export interface Workspace { updated_at: IsoDateString; token: string; } + +export interface WorkspaceMetadata { + name: string; + id: string; + default_change_set_id: ChangeSetId; + change_sets: ChangeSet[]; + approvers: string[]; +} diff --git a/app/web/src/store/change_sets.store.ts b/app/web/src/store/change_sets.store.ts index 72e4d25ffb..f411558a47 100644 --- a/app/web/src/store/change_sets.store.ts +++ b/app/web/src/store/change_sets.store.ts @@ -1,13 +1,14 @@ import { defineStore } from "pinia"; import * as _ from "lodash-es"; import { watch } from "vue"; -import { ApiRequest, addStoreHooks } from "@si/vue-lib/pinia"; +import { ApiRequest, addStoreHooks, URLPattern } from "@si/vue-lib/pinia"; import { useToast } from "vue-toastification"; import { ChangeSet, ChangeSetId, ChangeSetStatus, } from "@/api/sdf/dal/change_set"; +import { WorkspaceMetadata } from "@/api/sdf/dal/workspace"; import router from "@/router"; import { UserId } from "@/store/auth.store"; import IncomingChangesMerging from "@/components/toasts/IncomingChangesMerging.vue"; @@ -35,6 +36,12 @@ export interface OpenChangeSetsView { export function useChangeSetsStore() { const workspacesStore = useWorkspacesStore(); const workspacePk = workspacesStore.selectedWorkspacePk; + const BASE_API = [ + "v2", + "workspaces", + { workspacePk }, + "change-sets", + ] as URLPattern; return addStoreHooks( workspacePk, @@ -226,6 +233,92 @@ export function useChangeSetsStore() { }, }); }, + async FORCE_APPLY_CHANGE_SET(username: string) { + if (!this.selectedChangeSet) throw new Error("Select a change set"); + const selectedChangeSetId = this.selectedChangeSetId; + return new ApiRequest<{ changeSet: ChangeSet }>({ + method: "post", + url: BASE_API.concat([{ selectedChangeSetId }, "force_apply"]), + // todo(brit): decide what needs to happen here + optimistic: () => { + toast({ + component: IncomingChangesMerging, + props: { + username, + }, + }); + }, + _delay: 2000, + onSuccess: (response) => { + // this.changeSetsById[response.changeSet.id] = response.changeSet; + }, + onFail: () => { + // todo: show something! + }, + }); + }, + async REQUEST_CHANGE_SET_APPROVAL() { + if (!this.selectedChangeSet) throw new Error("Select a change set"); + const selectedChangeSetId = this.selectedChangeSetId; + return new ApiRequest({ + method: "post", + url: BASE_API.concat([{ selectedChangeSetId }, "request_approval"]), + }); + }, + async APPROVE_CHANGE_SET_FOR_APPLY() { + if (!this.selectedChangeSet) throw new Error("Select a change set"); + const selectedChangeSetId = this.selectedChangeSetId; + return new ApiRequest({ + method: "post", + url: BASE_API.concat([{ selectedChangeSetId }, "approve"]), + }); + }, + async REJECT_CHANGE_SET_APPLY() { + if (!this.selectedChangeSet) throw new Error("Select a change set"); + const selectedChangeSetId = this.selectedChangeSetId; + return new ApiRequest({ + method: "post", + url: BASE_API.concat([{ selectedChangeSetId }, "reject"]), + }); + }, + async CANCEL_APPROVAL_REQUEST() { + if (!this.selectedChangeSet) throw new Error("Select a change set"); + const selectedChangeSetId = this.selectedChangeSetId; + return new ApiRequest({ + method: "post", + url: BASE_API.concat([ + { selectedChangeSetId }, + "cancel_approval_request", + ]), + }); + }, + async REOPEN_CHANGE_SET() { + if (!this.selectedChangeSet) throw new Error("Select a change set"); + const selectedChangeSetId = this.selectedChangeSetId; + return new ApiRequest({ + method: "post", + url: BASE_API.concat([{ selectedChangeSetId }, "reopen"]), + }); + }, + async APPLY_CHANGE_SET_V2() { + if (!this.selectedChangeSet) throw new Error("Select a change set"); + const selectedChangeSetId = this.selectedChangeSetId; + return new ApiRequest({ + method: "post", + url: BASE_API.concat([{ selectedChangeSetId }, "apply"]), + }); + }, + async FETCH_CHANGE_SETS_V2() { + if (!this.selectedChangeSet) throw new Error("Select a change set"); + return new ApiRequest({ + method: "get", + url: BASE_API.concat(["list"]), + onSuccess: (response) => { + this.headChangeSetId = response.default_change_set_id; + this.changeSetsById = _.keyBy(response.change_sets, "id"); + }, + }); + }, async APPLY_CHANGE_SET_VOTE(vote: string) { if (!this.selectedChangeSet) throw new Error("Select a change set"); return new ApiRequest({ diff --git a/lib/dal-test/src/helpers/change_set.rs b/lib/dal-test/src/helpers/change_set.rs index c71a417052..9ee382c7a7 100644 --- a/lib/dal-test/src/helpers/change_set.rs +++ b/lib/dal-test/src/helpers/change_set.rs @@ -56,34 +56,28 @@ impl ChangeSetTestHelpers { )) } - /// Applies the current [`ChangeSet`] to its base [`ChangeSet`]. Then, it updates the snapshot - /// to the visibility without using an editing [`ChangeSet`]. In other words, the resulting, - /// snapshot is "HEAD" without an editing [`ChangeSet`]. - /// Also locks existing editing funcs and schema variants to mimic SDF - pub async fn apply_change_set_to_base(ctx: &mut DalContext) -> Result<()> { - // Lock all unlocked variants - for schema_id in Schema::list_ids(ctx).await? { - let schema = Schema::get_by_id_or_error(ctx, schema_id).await?; - let Some(variant) = SchemaVariant::get_unlocked_for_schema(ctx, schema_id).await? - else { - continue; - }; + /// Apply Changeset To base Approvals + pub async fn apply_change_set_to_base_approvals(ctx: &mut DalContext) -> Result<()> { + ChangeSet::prepare_for_apply(ctx).await?; - let variant_id = variant.id(); + Self::commit_and_update_snapshot_to_visibility(ctx).await?; - variant.lock(ctx).await?; - schema.set_default_schema_variant(ctx, variant_id).await?; - } - // Lock all unlocked functions too - for func in Func::list_for_default_and_editing(ctx).await? { - if !func.is_locked { - func.lock(ctx).await?; - } - } + Self::apply_change_set_to_base_inner(ctx).await?; + Ok(()) + } + + /// Force Apply Changeset To base Approvals + pub async fn force_apply_change_set_to_base_approvals(ctx: &mut DalContext) -> Result<()> { + ChangeSet::prepare_for_force_apply(ctx).await?; Self::commit_and_update_snapshot_to_visibility(ctx).await?; - let mut open_change_sets = ChangeSet::list_open(ctx) + Self::apply_change_set_to_base_inner(ctx).await?; + Ok(()) + } + + async fn apply_change_set_to_base_inner(ctx: &mut DalContext) -> Result<()> { + let mut open_change_sets = ChangeSet::list_active(ctx) .await? .iter() .map(|change_set| (change_set.id, change_set.updated_at)) @@ -137,6 +131,36 @@ impl ChangeSetTestHelpers { Ok(()) } + /// Applies the current [`ChangeSet`] to its base [`ChangeSet`]. Then, it updates the snapshot + /// to the visibility without using an editing [`ChangeSet`]. In other words, the resulting, + /// snapshot is "HEAD" without an editing [`ChangeSet`]. + /// Also locks existing editing funcs and schema variants to mimic SDF + pub async fn apply_change_set_to_base(ctx: &mut DalContext) -> Result<()> { + // Lock all unlocked variants + for schema_id in Schema::list_ids(ctx).await? { + let schema = Schema::get_by_id_or_error(ctx, schema_id).await?; + let Some(variant) = SchemaVariant::get_unlocked_for_schema(ctx, schema_id).await? + else { + continue; + }; + + let variant_id = variant.id(); + + variant.lock(ctx).await?; + schema.set_default_schema_variant(ctx, variant_id).await?; + } + // Lock all unlocked functions too + for func in Func::list_for_default_and_editing(ctx).await? { + if !func.is_locked { + func.lock(ctx).await?; + } + } + + Self::commit_and_update_snapshot_to_visibility(ctx).await?; + Self::apply_change_set_to_base_inner(ctx).await?; + Ok(()) + } + /// Abandons the current [`ChangeSet`]. pub async fn abandon_change_set(ctx: &mut DalContext) -> Result<()> { let mut abandonment_change_set = ChangeSet::find(ctx, ctx.change_set_id()) diff --git a/lib/dal/src/change_set.rs b/lib/dal/src/change_set.rs index 331084be7b..1516c01ff1 100644 --- a/lib/dal/src/change_set.rs +++ b/lib/dal/src/change_set.rs @@ -21,7 +21,10 @@ use crate::{ TransactionsError, User, UserError, UserPk, Workspace, WorkspacePk, WorkspaceSnapshot, WorkspaceSnapshotError, WsEvent, WsEventError, }; -use crate::{billing_publish, WorkspaceError}; +use crate::{ + billing_publish, Func, FuncError, Schema, SchemaError, SchemaVariant, SchemaVariantError, + WorkspaceError, +}; pub mod event; pub mod status; @@ -34,14 +37,20 @@ const FIND_ANCESTORS_QUERY: &str = include_str!("queries/change_set/find_ancesto pub enum ChangeSetError { #[error("billing publish error: {0}")] BillingPublish(#[from] Box), + #[error("change set not approved for apply. Current state: {0}")] + ChangeSetNotApprovedForApply(ChangeSetStatus), #[error("change set with id {0} not found")] ChangeSetNotFound(ChangeSetId), #[error("could not find default change set: {0}")] DefaultChangeSetNotFound(ChangeSetId), #[error("default change set {0} has no workspace snapshot pointer")] DefaultChangeSetNoWorkspaceSnapshotPointer(ChangeSetId), + #[error("dvu roots are not empty for change set: {0}")] + DvuRootsNotEmpty(ChangeSetId), #[error("enum parse error: {0}")] EnumParse(#[from] strum::ParseError), + #[error("func error: {0}")] + Func(#[from] Box), #[error("history event error: {0}")] HistoryEvent(#[from] HistoryEventError), #[error("invalid user actor pk")] @@ -68,6 +77,10 @@ pub enum ChangeSetError { Pg(#[from] PgError), #[error("rebaser client error: {0}")] RebaserClient(#[from] rebaser_client::ClientError), + #[error("schema error: {0}")] + Schema(#[from] Box), + #[error("schema variant error: {0}")] + SchemaVariant(#[from] Box), #[error("serde json error: {0}")] SerdeJson(#[from] serde_json::Error), #[error("slow runtime error: {0}")] @@ -161,6 +174,9 @@ pub struct ChangeSet { pub workspace_snapshot_address: WorkspaceSnapshotAddress, pub workspace_id: Option, pub merge_requested_by_user_id: Option, + pub merge_requested_at: Option>, + pub reviewed_by_user_id: Option, + pub reviewed_at: Option>, } impl TryFrom for ChangeSet { @@ -179,6 +195,9 @@ impl TryFrom for ChangeSet { workspace_snapshot_address: value.try_get("workspace_snapshot_address")?, workspace_id: value.try_get("workspace_id")?, merge_requested_by_user_id: value.try_get("merge_requested_by_user_id")?, + merge_requested_at: value.try_get("merge_requested_at")?, + reviewed_by_user_id: value.try_get("reviewed_by_user_id")?, + reviewed_at: value.try_get("reviewed_at")?, }) } } @@ -252,14 +271,45 @@ impl ChangeSet { Ok(change_set) } - // pub fn generate_ulid(&self) -> ChangeSetResult { - // self.generator - // .lock() - // .map_err(|e| ChangeSetError::Mutex(e.to_string()))? - // .generate() - // .map(Into::into) - // .map_err(Into::into) - // } + pub async fn into_frontend_type( + &self, + ctx: &DalContext, + ) -> ChangeSetResult { + let merge_requested_by_email = + if let Some(merge_requested_by) = self.merge_requested_by_user_id { + User::get_by_pk(ctx, merge_requested_by) + .await? + .map(|user| user.email().clone()) + } else { + None + }; + + let reviewed_by_email = if let Some(reviewed_by) = self.reviewed_by_user_id { + User::get_by_pk(ctx, reviewed_by) + .await? + .map(|user| user.email().clone()) + } else { + None + }; + + let change_set = si_frontend_types::ChangeSet { + created_at: self.created_at, + id: self.id.into(), + updated_at: self.updated_at, + name: self.name.clone(), + status: self.status.into(), + base_change_set_id: self.base_change_set_id.map(|id| id.into()), + workspace_id: self.workspace_id.map_or("".to_owned(), |id| id.to_string()), + merge_requested_by_user_id: self.merge_requested_by_user_id.map(|s| s.to_string()), + merge_requested_by_user_email: merge_requested_by_email, + merge_requested_at: self.merge_requested_at, + reviewed_by_user_id: self.reviewed_by_user_id.map(|id| id.into()), + reviewed_by_user_email: reviewed_by_email, + reviewed_at: self.reviewed_at, + }; + + Ok(change_set) + } pub async fn update_workspace_id( &mut self, @@ -320,15 +370,53 @@ impl ChangeSet { ctx: &DalContext, status: ChangeSetStatus, ) -> ChangeSetResult<()> { + ctx.txns() + .await? + .pg() + .query_none( + "UPDATE change_set_pointers SET status = $2, updated_at = CLOCK_TIMESTAMP() WHERE id = $1", + &[&self.id, &status.to_string()], + ) + .await?; + + self.status = status; billing_publish::for_change_set_status_update(ctx, self) .await .map_err(Box::new)?; + Ok(()) + } + pub async fn request_change_set_approval(&mut self, ctx: &DalContext) -> ChangeSetResult<()> { + let user_pk = Self::extract_userid_from_context_or_error(ctx).await?; + let status = ChangeSetStatus::NeedsApproval; ctx.txns() .await? .pg() .query_none( - "UPDATE change_set_pointers SET status = $2, updated_at = CLOCK_TIMESTAMP() WHERE id = $1", + "UPDATE change_set_pointers SET merge_requested_by_user_id = $2, merge_requested_at = CLOCK_TIMESTAMP(), status = $3, updated_at = CLOCK_TIMESTAMP() WHERE id = $1", + &[&self.id, &user_pk, &status.to_string()], + ) + .await?; + + self.status = status; + + Ok(()) + } + + /// Set the status to Open, and clear any reviewed/merge requested info + pub async fn reopen_change_set(&mut self, ctx: &DalContext) -> ChangeSetResult<()> { + let status = ChangeSetStatus::Open; + ctx.txns() + .await? + .pg() + .query_none( + "UPDATE change_set_pointers + SET reviewed_by_user_id = NULL, + reviewed_at = NULL, + merge_requested_by_user_id = NULL, + merge_requested_at = NULL, + status = $2, + updated_at = CLOCK_TIMESTAMP() WHERE id = $1", &[&self.id, &status.to_string()], ) .await?; @@ -338,6 +426,121 @@ impl ChangeSet { Ok(()) } + /// First, transitions the status of the [`ChangeSet`] to [`ChangeSetStatus::NeedsApproval`] + /// then [`ChangeSetStatus::Approved`]. Next, checks if DVU Roots still exist. Finally, + /// lock every [`SchemaVariant`] and [`Func`] that is currently unlocked + pub async fn prepare_for_force_apply(ctx: &DalContext) -> ChangeSetResult<()> { + // first change the status to approved and who did it + let mut change_set = ChangeSet::find(ctx, ctx.change_set_id()) + .await? + .ok_or(TransactionsError::ChangeSetNotFound(ctx.change_set_id()))?; + + change_set.request_change_set_approval(ctx).await?; + // then approve it + change_set.approve_change_set_for_apply(ctx).await?; + // then do the rest + Self::prepare_for_apply(ctx).await + } + + /// First, checks if DVU Roots still exist. Next, ensures the [`ChangeSet`] has an + /// [`ChangeSetStatus::Approved`]. Finally, + /// lock every [`SchemaVariant`] and [`Func`] that is currently unlocked + pub async fn prepare_for_apply(ctx: &DalContext) -> ChangeSetResult<()> { + let change_set = ChangeSet::find(ctx, ctx.change_set_id()) + .await? + .ok_or(TransactionsError::ChangeSetNotFound(ctx.change_set_id()))?; + + // Ensure that DVU roots are empty before continuing. + if !ctx + .workspace_snapshot() + .map_err(Box::new)? + .get_dependent_value_roots() + .await + .map_err(Box::new)? + .is_empty() + { + // TODO(nick): we should consider requiring this check in integration tests too. Why did I + // not do this at the time of writing? Tests have multiple ways to call "apply", whether + // its via helpers or through the change set methods directly. In addition, they test + // for success and failure, not solely for success. We should still do this, but not within + // the PR corresponding to when this message was written. + return Err(ChangeSetError::DvuRootsNotEmpty(ctx.change_set_id())); + } + + // if the change set status isn't approved, we shouldn't go + // locking stuff + if change_set.status != ChangeSetStatus::Approved { + return Err(ChangeSetError::ChangeSetNotApprovedForApply( + change_set.status, + )); + } + + // Lock all unlocked variants + for schema_id in Schema::list_ids(ctx).await.map_err(Box::new)? { + let schema = Schema::get_by_id_or_error(ctx, schema_id) + .await + .map_err(Box::new)?; + let Some(variant) = SchemaVariant::get_unlocked_for_schema(ctx, schema_id) + .await + .map_err(Box::new)? + else { + continue; + }; + + let variant_id = variant.id(); + + variant.lock(ctx).await.map_err(Box::new)?; + schema + .set_default_schema_variant(ctx, variant_id) + .await + .map_err(Box::new)?; + } + // Lock all unlocked functions too + for func in Func::list_for_default_and_editing(ctx) + .await + .map_err(Box::new)? + { + if !func.is_locked { + func.lock(ctx).await.map_err(Box::new)?; + } + } + Ok(()) + } + + pub async fn approve_change_set_for_apply(&mut self, ctx: &DalContext) -> ChangeSetResult<()> { + let user_pk = Self::extract_userid_from_context_or_error(ctx).await?; + let status = ChangeSetStatus::Approved; + ctx.txns() + .await? + .pg() + .query_none( + "UPDATE change_set_pointers SET reviewed_by_user_id = $2, reviewed_at = CLOCK_TIMESTAMP(), status = $3, updated_at = CLOCK_TIMESTAMP() WHERE id = $1", + &[&self.id, &user_pk, &status.to_string()], + ) + .await?; + + self.status = status; + + Ok(()) + } + + pub async fn reject_change_set_for_apply(&mut self, ctx: &DalContext) -> ChangeSetResult<()> { + let user_pk = Self::extract_userid_from_context_or_error(ctx).await?; + let status = ChangeSetStatus::Rejected; + ctx.txns() + .await? + .pg() + .query_none( + "UPDATE change_set_pointers SET reviewed_by_user_id = $2, reviewed_at = CLOCK_TIMESTAMP(), status = $3, updated_at = CLOCK_TIMESTAMP() WHERE id = $1", + &[&self.id, &user_pk, &status.to_string()], + ) + .await?; + + self.status = status; + + Ok(()) + } + pub async fn update_merge_requested_by_user_id( &mut self, ctx: &DalContext, @@ -395,19 +598,21 @@ impl ChangeSet { } } - pub async fn list_open(ctx: &DalContext) -> ChangeSetResult> { + pub async fn list_active(ctx: &DalContext) -> ChangeSetResult> { let mut result = vec![]; let rows = ctx .txns() .await? .pg() .query( - "SELECT * from change_set_pointers WHERE workspace_id = $1 AND status IN ($2, $3, $4)", + "SELECT * from change_set_pointers WHERE workspace_id = $1 AND status IN ($2, $3, $4, $5, $6)", &[ &ctx.tenancy().workspace_pk_opt(), &ChangeSetStatus::Open.to_string(), &ChangeSetStatus::NeedsApproval.to_string(), &ChangeSetStatus::NeedsAbandonApproval.to_string(), + &ChangeSetStatus::Approved.to_string(), + &ChangeSetStatus::Rejected.to_string(), ], ) .await?; @@ -690,6 +895,19 @@ impl ChangeSet { }; user_id } + pub async fn extract_userid_from_context_or_error(ctx: &DalContext) -> ChangeSetResult { + let user_id = match ctx.history_actor() { + HistoryActor::User(user_pk) => { + let maybe_user = User::get_by_pk_or_error(ctx, *user_pk).await; + match maybe_user { + Ok(user) => user.pk(), + Err(err) => return Err(ChangeSetError::User(err)), + } + } + HistoryActor::SystemInit => return Err(ChangeSetError::InvalidUserSystemInit), + }; + Ok(user_id) + } #[instrument( name = "change_set.workspace_snapshot_in_use", diff --git a/lib/dal/src/change_set/event.rs b/lib/dal/src/change_set/event.rs index 71c3057517..2058ee6c07 100644 --- a/lib/dal/src/change_set/event.rs +++ b/lib/dal/src/change_set/event.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -use crate::{ChangeSetId, DalContext, UserPk, WsEvent, WsEventResult, WsPayload}; +use crate::{ChangeSetId, ChangeSetStatus, DalContext, UserPk, WsEvent, WsEventResult, WsPayload}; impl WsEvent { pub async fn change_set_written( @@ -17,6 +17,25 @@ impl WsEvent { WsEvent::new(ctx, WsPayload::ChangeSetCreated(change_set_id)).await } + pub async fn change_set_status_changed( + ctx: &DalContext, + change_set_id: ChangeSetId, + user_pk: Option, + from_status: ChangeSetStatus, + to_status: ChangeSetStatus, + ) -> WsEventResult { + WsEvent::new( + ctx, + WsPayload::ChangeSetStatusChanged(ChangeSetStateChangePayload { + change_set_id, + from_status, + to_status, + user_pk, + }), + ) + .await + } + pub async fn change_set_abandoned( ctx: &DalContext, change_set_id: ChangeSetId, @@ -157,6 +176,14 @@ pub struct ChangeSetActorPayload { change_set_id: ChangeSetId, user_pk: Option, } +#[derive(Clone, Deserialize, Serialize, Debug, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct ChangeSetStateChangePayload { + change_set_id: ChangeSetId, + from_status: ChangeSetStatus, + to_status: ChangeSetStatus, + user_pk: Option, +} #[derive(Clone, Deserialize, Serialize, Debug, PartialEq, Eq)] #[serde(rename_all = "camelCase")] diff --git a/lib/dal/src/change_set/status.rs b/lib/dal/src/change_set/status.rs index cdbfcf2aa2..a702f58c6c 100644 --- a/lib/dal/src/change_set/status.rs +++ b/lib/dal/src/change_set/status.rs @@ -13,16 +13,19 @@ pub enum ChangeSetStatus { Abandoned, /// Applied this changeset to its parent Applied, - /// TODO appears to be unused - Closed, - /// TODO appears to be unused + /// Approved by relevant parties and ready to be applied + Approved, + /// Migration of Workspace Snapshot for this change set failed Failed, /// Planned to be abandoned but needs approval first + /// todo(brit): Remove once rebac is done NeedsAbandonApproval, /// Planned to be applied but needs approval first NeedsApproval, - /// Normal state: potentially usable + /// Available for user's to modify Open, + /// Request to apply was rejected + Rejected, } impl From for ChangeSetStatus { @@ -30,13 +33,14 @@ impl From for ChangeSetStatus { match value { si_events::ChangeSetStatus::Abandoned => Self::Abandoned, si_events::ChangeSetStatus::Applied => Self::Applied, - si_events::ChangeSetStatus::Closed => Self::Closed, si_events::ChangeSetStatus::Failed => Self::Failed, si_events::ChangeSetStatus::NeedsAbandonApproval => { ChangeSetStatus::NeedsAbandonApproval } si_events::ChangeSetStatus::NeedsApproval => Self::NeedsApproval, si_events::ChangeSetStatus::Open => Self::Open, + si_events::ChangeSetStatus::Approved => Self::Approved, + si_events::ChangeSetStatus::Rejected => Self::Rejected, } } } @@ -46,11 +50,12 @@ impl From for si_events::ChangeSetStatus { match value { ChangeSetStatus::Abandoned => Self::Abandoned, ChangeSetStatus::Applied => Self::Applied, - ChangeSetStatus::Closed => Self::Closed, ChangeSetStatus::Failed => Self::Failed, ChangeSetStatus::NeedsAbandonApproval => Self::NeedsAbandonApproval, ChangeSetStatus::NeedsApproval => Self::NeedsApproval, ChangeSetStatus::Open => Self::Open, + ChangeSetStatus::Approved => Self::Approved, + ChangeSetStatus::Rejected => Self::Rejected, } } } diff --git a/lib/dal/src/change_set/view.rs b/lib/dal/src/change_set/view.rs index 93bb2b11b1..61972361cb 100644 --- a/lib/dal/src/change_set/view.rs +++ b/lib/dal/src/change_set/view.rs @@ -27,7 +27,7 @@ pub struct ChangeSetView { impl OpenChangeSetsView { pub async fn assemble(ctx: &DalContext) -> ChangeSetResult { // List all open change sets and assemble them into individual views. - let open_change_sets = ChangeSet::list_open(ctx).await?; + let open_change_sets = ChangeSet::list_active(ctx).await?; let mut views = Vec::with_capacity(open_change_sets.len()); for change_set in open_change_sets { views.push(ChangeSetView { diff --git a/lib/dal/src/func/binding/action.rs b/lib/dal/src/func/binding/action.rs index 6c9e73d28c..03265e93da 100644 --- a/lib/dal/src/func/binding/action.rs +++ b/lib/dal/src/func/binding/action.rs @@ -58,7 +58,6 @@ impl ActionBinding { let func_id = ActionPrototype::func_id(ctx, action_prototype_id).await?; let func = Func::get_by_id_or_error(ctx, func_id).await?; // delete and recreate the prototype - //brit todo: there might be existing actions enqueued, we should find them and reassociate the prototype? ActionPrototype::remove(ctx, action_prototype_id).await?; ActionPrototype::new( ctx, diff --git a/lib/dal/src/migrations/U3002__change_set_approvers.sql b/lib/dal/src/migrations/U3002__change_set_approvers.sql new file mode 100644 index 0000000000..9f2f6c6111 --- /dev/null +++ b/lib/dal/src/migrations/U3002__change_set_approvers.sql @@ -0,0 +1,3 @@ +ALTER TABLE change_set_pointers ADD COLUMN reviewed_by_user_id ident; +ALTER TABLE change_set_pointers ADD COLUMN reviewed_at TIMESTAMPTZ; +ALTER TABLE change_set_pointers ADD COLUMN merge_requested_at TIMESTAMPTZ; \ No newline at end of file diff --git a/lib/dal/src/workspace.rs b/lib/dal/src/workspace.rs index 3786e6312d..6331424ebc 100644 --- a/lib/dal/src/workspace.rs +++ b/lib/dal/src/workspace.rs @@ -496,7 +496,7 @@ impl Workspace { let mut content_hashes = vec![]; let mut change_sets: HashMap> = HashMap::new(); let mut default_change_set_base = Ulid::nil(); - for change_set in ChangeSet::list_open(ctx).await? { + for change_set in ChangeSet::list_active(ctx).await? { let snap = WorkspaceSnapshot::find_for_change_set(ctx, change_set.id).await?; // From root, get every value from every node, store with hash @@ -592,7 +592,7 @@ impl Workspace { } = workspace_data.into_latest(); // ABANDON PREVIOUS CHANGESETS - for mut change_set in ChangeSet::list_open(ctx).await? { + for mut change_set in ChangeSet::list_active(ctx).await? { change_set.abandon(ctx).await?; } diff --git a/lib/dal/src/ws_event.rs b/lib/dal/src/ws_event.rs index 3598f7fbe1..e38e4aa1dc 100644 --- a/lib/dal/src/ws_event.rs +++ b/lib/dal/src/ws_event.rs @@ -9,6 +9,7 @@ use ulid::Ulid; use crate::change_set::event::{ ChangeSetActorPayload, ChangeSetAppliedPayload, ChangeSetMergeVotePayload, + ChangeSetStateChangePayload, }; use crate::component::{ ComponentCreatedPayload, ComponentDeletedPayload, ComponentSetPositionPayload, @@ -80,6 +81,7 @@ pub enum WsPayload { ChangeSetCanceled(ChangeSetId), ChangeSetCreated(ChangeSetId), ChangeSetMergeVote(ChangeSetMergeVotePayload), + ChangeSetStatusChanged(ChangeSetStateChangePayload), ChangeSetWritten(ChangeSetId), CheckedQualifications(QualificationCheckPayload), ComponentCreated(ComponentCreatedPayload), diff --git a/lib/dal/tests/integration_test/change_set.rs b/lib/dal/tests/integration_test/change_set.rs index c7a3bd7547..b983384445 100644 --- a/lib/dal/tests/integration_test/change_set.rs +++ b/lib/dal/tests/integration_test/change_set.rs @@ -3,8 +3,12 @@ use dal::{ context::TransactionsErrorDiscriminants, DalContext, DalContextBuilder, HistoryActor, RequestContext, Workspace, WorkspacePk, }; -use dal_test::helpers::{create_user, ChangeSetTestHelpers}; +use dal::{ChangeSet, ChangeSetStatus, Component}; +use dal_test::helpers::{ + create_component_for_default_schema_name, create_user, ChangeSetTestHelpers, +}; use dal_test::test; +use itertools::Itertools; use pretty_assertions_sorted::assert_eq; use std::collections::HashSet; @@ -302,3 +306,169 @@ async fn build_from_request_context_allows_change_set_from_workspace_with_access } assert!(builder_result.is_ok()); } + +#[test] +async fn change_set_approval_flow(ctx: &mut DalContext) { + // create a new change set + let new_change_set = ChangeSetTestHelpers::fork_from_head_change_set(ctx) + .await + .expect("could not fork head"); + let current_user = ChangeSet::extract_userid_from_context(ctx).await; + + // do something in it + let component = create_component_for_default_schema_name(ctx, "small odd lego", "small") + .await + .expect("could not create component"); + ChangeSetTestHelpers::commit_and_update_snapshot_to_visibility(ctx) + .await + .expect("could not commit and update"); + // request approval + let mut change_set = ChangeSet::find(ctx, new_change_set.id) + .await + .expect("could not find change set") + .expect("change set is some"); + + change_set + .request_change_set_approval(ctx) + .await + .expect("could not request approval"); + + ChangeSetTestHelpers::commit_and_update_snapshot_to_visibility(ctx) + .await + .expect("could not commit and update"); + let mut change_set = ChangeSet::find(ctx, new_change_set.id) + .await + .expect("could not find change set") + .expect("change set is some"); + + // make sure everything looks right + assert_eq!(change_set.status, ChangeSetStatus::NeedsApproval); + assert!(change_set.merge_requested_at.is_some()); + assert_eq!(change_set.merge_requested_by_user_id, current_user); + assert_eq!(change_set.reviewed_at, None); + assert_eq!(change_set.reviewed_by_user_id, None); + + // let's reject it + change_set + .reject_change_set_for_apply(ctx) + .await + .expect("could not reject change set"); + ChangeSetTestHelpers::commit_and_update_snapshot_to_visibility(ctx) + .await + .expect("could not commit and update"); + let mut change_set = ChangeSet::find(ctx, new_change_set.id) + .await + .expect("could not find change set") + .expect("change set is some"); + assert_eq!(change_set.status, ChangeSetStatus::Rejected); + assert!(change_set.merge_requested_at.is_some()); + assert_eq!(change_set.merge_requested_by_user_id, current_user); + assert!(change_set.reviewed_at.is_some()); + assert_eq!(change_set.reviewed_by_user_id, current_user); + + // let's see if we can apply now, it should fail because the change set has not been approved + let apply_result = ChangeSetTestHelpers::apply_change_set_to_base_approvals(ctx).await; + assert!(apply_result.is_err()); + + // now let's re-open it + change_set + .reopen_change_set(ctx) + .await + .expect("could not update status"); + ChangeSetTestHelpers::commit_and_update_snapshot_to_visibility(ctx) + .await + .expect("could not commit and update"); + let mut change_set = ChangeSet::find(ctx, new_change_set.id) + .await + .expect("could not find change set") + .expect("change set is some"); + assert_eq!(change_set.status, ChangeSetStatus::Open); + assert_eq!(change_set.merge_requested_at, None); + assert_eq!(change_set.merge_requested_by_user_id, None); + assert_eq!(change_set.reviewed_at, None); + assert_eq!(change_set.reviewed_by_user_id, None); + + // now let's request approval again + change_set + .request_change_set_approval(ctx) + .await + .expect("could not request approval"); + + ChangeSetTestHelpers::commit_and_update_snapshot_to_visibility(ctx) + .await + .expect("could not commit and update"); + let mut change_set = ChangeSet::find(ctx, new_change_set.id) + .await + .expect("could not find change set") + .expect("change set is some"); + + // make sure everything looks right + assert_eq!(change_set.status, ChangeSetStatus::NeedsApproval); + assert!(change_set.merge_requested_at.is_some()); + assert_eq!(change_set.merge_requested_by_user_id, current_user); + assert_eq!(change_set.reviewed_at, None); + assert_eq!(change_set.reviewed_by_user_id, None); + + // this time we will approve + change_set + .approve_change_set_for_apply(ctx) + .await + .expect("could not approve"); + + ChangeSetTestHelpers::commit_and_update_snapshot_to_visibility(ctx) + .await + .expect("could not commit and update"); + let change_set = ChangeSet::find(ctx, new_change_set.id) + .await + .expect("could not find change set") + .expect("change set is some"); + + // make sure everything looks right + assert_eq!(change_set.status, ChangeSetStatus::Approved); + assert!(change_set.merge_requested_at.is_some()); + assert_eq!(change_set.merge_requested_by_user_id, current_user); + assert!(change_set.reviewed_at.is_some()); + assert_eq!(change_set.reviewed_by_user_id, current_user); + + // now let's apply it! + + ChangeSetTestHelpers::apply_change_set_to_base_approvals(ctx) + .await + .expect("could not apply to head"); + + // should have one component + let mut components = Component::list(ctx) + .await + .expect("could not list components"); + assert_eq!(components.len(), 1); + let only_component = components.pop().expect("has one in there"); + assert_eq!(only_component.id(), component.id()); + + // now let's create another change set and ensure force_apply works as expected + let _new_change_set = ChangeSetTestHelpers::fork_from_head_change_set(ctx) + .await + .expect("could not fork head"); + + // do something in it + let second_component = create_component_for_default_schema_name(ctx, "small odd lego", "small") + .await + .expect("could not create component"); + ChangeSetTestHelpers::commit_and_update_snapshot_to_visibility(ctx) + .await + .expect("could not commit and update"); + // force apply + ChangeSetTestHelpers::force_apply_change_set_to_base_approvals(ctx) + .await + .expect("could not force apply change set"); + // should have two components now + let components = Component::list(ctx) + .await + .expect("could not list components"); + assert_eq!(components.len(), 2); + let component_ids = [component.id(), second_component.id()]; + let components = components + .into_iter() + .filter(|comp| component_ids.contains(&comp.id())) + .collect_vec(); + assert_eq!(components.len(), 2); +} diff --git a/lib/rebaser-server/src/rebase.rs b/lib/rebaser-server/src/rebase.rs index 100ca5ccc1..5f22bf3263 100644 --- a/lib/rebaser-server/src/rebase.rs +++ b/lib/rebaser-server/src/rebase.rs @@ -146,7 +146,10 @@ pub async fn perform_rebase( } if updating_head && *workspace.pk() != WorkspacePk::NONE { - let all_open_change_sets = ChangeSet::list_open(ctx).await?; + //todo(brit): what do we want to do about change sets that haven't + // been applied yet, but are approved? (like gh merge-queue) + // should we 'unapprove' them? + let all_open_change_sets = ChangeSet::list_active(ctx).await?; for target_change_set in all_open_change_sets.into_iter().filter(|cs| { cs.id != workspace.default_change_set_id() && cs.id != to_rebase_change_set.id diff --git a/lib/sdf-server/src/service/v2.rs b/lib/sdf-server/src/service/v2.rs index 5b9ea0b4da..10bebe9c6a 100644 --- a/lib/sdf-server/src/service/v2.rs +++ b/lib/sdf-server/src/service/v2.rs @@ -15,9 +15,9 @@ const PREFIX: &str = "/workspaces/:workspace_id/change-sets/:change_set_id"; pub fn routes(state: AppState) -> Router { Router::new() - .nest("/admin", admin::v2_routes(state)) + .nest("/admin", admin::v2_routes(state.clone())) .nest(&format!("{PREFIX}/audit-logs"), audit_log::v2_routes()) - .nest(&format!("{PREFIX}/change-sets"), change_set::v2_routes()) + .nest("{PREFIX}", change_set::v2_routes(state.clone())) .nest(&format!("{PREFIX}/funcs"), func::v2_routes()) .nest(&format!("{PREFIX}/modules"), module::v2_routes()) .nest(&format!("{PREFIX}/schema-variants"), variant::v2_routes()) diff --git a/lib/sdf-server/src/service/v2/change_set.rs b/lib/sdf-server/src/service/v2/change_set.rs index 025e09b559..09cff2f1ef 100644 --- a/lib/sdf-server/src/service/v2/change_set.rs +++ b/lib/sdf-server/src/service/v2/change_set.rs @@ -1,30 +1,56 @@ use std::result; -use axum::{http::StatusCode, response::IntoResponse, routing::post, Router}; -use dal::ChangeSetId; +use axum::{ + http::StatusCode, + response::IntoResponse, + routing::{get, post}, + Router, +}; +use dal::{ChangeSetId, ChangeSetStatus, WsEventError}; use thiserror::Error; -use crate::{service::ApiError, AppState}; +use crate::{middleware::WorkspacePermissionLayer, service::ApiError, AppState}; mod apply; +mod approve; +mod cancel_approval_request; +mod force_apply; +mod list; +mod reject; +mod reopen; +mod request_approval; #[remain::sorted] #[derive(Debug, Error)] pub enum Error { + #[error("change set error: {0}")] + ChangeSet(#[from] dal::ChangeSetError), #[error("change set apply error: {0}")] ChangeSetApply(#[from] dal::ChangeSetApplyError), + #[error("change set not approved for apply. Current state: {0}")] + ChangeSetNotApprovedForApply(ChangeSetStatus), + #[error("change set not found: {0}")] + ChangeSetNotFound(ChangeSetId), #[error("dvu roots are not empty for change set: {0}")] DvuRootsNotEmpty(ChangeSetId), #[error("func error: {0}")] Func(#[from] dal::FuncError), + #[error("permissions error: {0}")] + Permissions(#[from] permissions::Error), #[error("schema error: {0}")] Schema(#[from] dal::SchemaError), #[error("schema variant error: {0}")] SchemaVariant(#[from] dal::SchemaVariantError), + #[error("spicedb not found")] + SpiceDBNotFound, #[error("transactions error: {0}")] Transactions(#[from] dal::TransactionsError), + #[error("found an unexpected number of open change sets matching default change set (should be one, found {0:?})")] + UnexpectedNumberOfOpenChangeSetsMatchingDefaultChangeSet(Vec), #[error("workspace snapshot error: {0}")] WorkspaceSnapshot(#[from] dal::WorkspaceSnapshotError), + #[error("ws event error: {0}")] + WsEvent(#[from] WsEventError), } impl IntoResponse for Error { @@ -46,6 +72,39 @@ pub type ChangeSetAPIError = Error; type Result = result::Result; -pub fn v2_routes() -> Router { - Router::new().route("/apply", post(apply::apply)) +pub fn v2_routes(state: AppState) -> Router { + Router::new() + .route("/apply", post(apply::apply)) + .route( + "/request_approval", + post(request_approval::request_approval), + ) + .route( + "/approve", + post(approve::approve).layer(WorkspacePermissionLayer::new( + state.clone(), + permissions::Permission::Approve, + )), + ) + .route( + "/reject", + post(reject::reject).layer(WorkspacePermissionLayer::new( + state.clone(), + permissions::Permission::Approve, + )), + ) + .route( + "/cancel_approval_request", + post(cancel_approval_request::cancel_approval_request), + ) + // Consider how we make it editable again after it's been rejected + .route("/reopen", post(reopen::reopen)) + .route("/list", get(list::list_actionable)) + .route( + "/force_apply", + post(force_apply::force_apply).layer(WorkspacePermissionLayer::new( + state.clone(), + permissions::Permission::Approve, + )), + ) } diff --git a/lib/sdf-server/src/service/v2/change_set/apply.rs b/lib/sdf-server/src/service/v2/change_set/apply.rs index 7e6900dde3..aca9346e77 100644 --- a/lib/sdf-server/src/service/v2/change_set/apply.rs +++ b/lib/sdf-server/src/service/v2/change_set/apply.rs @@ -1,7 +1,7 @@ use axum::extract::{Host, OriginalUri, Path}; -use dal::{ChangeSet, ChangeSetId, Func, Schema, SchemaVariant, WorkspacePk}; +use dal::{ChangeSet, ChangeSetId, WorkspacePk}; -use super::{Error, Result}; +use super::Result; use crate::{ extract::{AccessBuilder, HandlerContext, PosthogClient}, track, @@ -19,39 +19,7 @@ pub async fn apply( .build(request_ctx.build(change_set_id.into())) .await?; - // Ensure that DVU roots are empty before continuing. - if !ctx - .workspace_snapshot()? - .get_dependent_value_roots() - .await? - .is_empty() - { - // TODO(nick): we should consider requiring this check in integration tests too. Why did I - // not do this at the time of writing? Tests have multiple ways to call "apply", whether - // its via helpers or through the change set methods directly. In addition, they test - // for success and failure, not solely for success. We should still do this, but not within - // the PR corresponding to when this message was written. - return Err(Error::DvuRootsNotEmpty(ctx.change_set_id())); - } - - // Lock all unlocked variants - for schema_id in Schema::list_ids(&ctx).await? { - let schema = Schema::get_by_id_or_error(&ctx, schema_id).await?; - let Some(variant) = SchemaVariant::get_unlocked_for_schema(&ctx, schema_id).await? else { - continue; - }; - - let variant_id = variant.id(); - - variant.lock(&ctx).await?; - schema.set_default_schema_variant(&ctx, variant_id).await?; - } - // Lock all unlocked functions too - for func in Func::list_for_default_and_editing(&ctx).await? { - if !func.is_locked { - func.lock(&ctx).await?; - } - } + ChangeSet::prepare_for_apply(&ctx).await?; // We need to run a commit before apply so changes get saved ctx.commit().await?; @@ -69,13 +37,7 @@ pub async fn apply( }), ); - // // If anything fails with uploading the workspace backup module, just log it. We shouldn't - // // have the change set apply itself fail because of this. - // tokio::task::spawn( - // super::upload_workspace_backup_module(ctx, raw_access_token) - // .instrument(info_span!("Workspace backup module upload")), - // ); - + // WS Event fires from the dal ctx.commit().await?; Ok(()) diff --git a/lib/sdf-server/src/service/v2/change_set/approve.rs b/lib/sdf-server/src/service/v2/change_set/approve.rs new file mode 100644 index 0000000000..b91446290d --- /dev/null +++ b/lib/sdf-server/src/service/v2/change_set/approve.rs @@ -0,0 +1,69 @@ +use axum::extract::{Host, OriginalUri, Path}; +use dal::{ChangeSet, ChangeSetId, WorkspacePk, WsEvent}; + +use super::{Error, Result}; +use crate::{ + extract::{AccessBuilder, HandlerContext, PosthogClient}, + track, +}; + +pub async fn approve( + HandlerContext(builder): HandlerContext, + AccessBuilder(request_ctx): AccessBuilder, + PosthogClient(posthog_client): PosthogClient, + OriginalUri(original_uri): OriginalUri, + Host(host_name): Host, + Path((_workspace_pk, change_set_id)): Path<(WorkspacePk, ChangeSetId)>, +) -> Result<()> { + let ctx = builder + .build(request_ctx.build(change_set_id.into())) + .await?; + + // Ensure that DVU roots are empty before continuing? + // todo(brit): maybe we can get away without this. Ex: Approve a PR before tests finish + if !ctx + .workspace_snapshot()? + .get_dependent_value_roots() + .await? + .is_empty() + { + // TODO(nick): we should consider requiring this check in integration tests too. Why did I + // not do this at the time of writing? Tests have multiple ways to call "apply", whether + // its via helpers or through the change set methods directly. In addition, they test + // for success and failure, not solely for success. We should still do this, but not within + // the PR corresponding to when this message was written. + return Err(Error::DvuRootsNotEmpty(ctx.change_set_id())); + } + + let mut change_set = ChangeSet::find(&ctx, ctx.visibility().change_set_id) + .await? + .ok_or(Error::ChangeSetNotFound(ctx.change_set_id()))?; + let old_status = change_set.status; + change_set.approve_change_set_for_apply(&ctx).await?; + + track( + &posthog_client, + &ctx, + &original_uri, + &host_name, + "approve_change_set_apply", + serde_json::json!({ + "merged_change_set": change_set_id, + }), + ); + + WsEvent::change_set_status_changed( + &ctx, + change_set.id, + ChangeSet::extract_userid_from_context(&ctx).await, + old_status, + change_set.status, + ) + .await? + .publish_on_commit(&ctx) + .await?; + + ctx.commit().await?; + + Ok(()) +} diff --git a/lib/sdf-server/src/service/v2/change_set/cancel_approval_request.rs b/lib/sdf-server/src/service/v2/change_set/cancel_approval_request.rs new file mode 100644 index 0000000000..05bb4185fb --- /dev/null +++ b/lib/sdf-server/src/service/v2/change_set/cancel_approval_request.rs @@ -0,0 +1,54 @@ +use axum::extract::{Host, OriginalUri, Path}; +use dal::{ChangeSet, ChangeSetId, WorkspacePk, WsEvent}; + +use super::{Error, Result}; +use crate::{ + extract::{AccessBuilder, HandlerContext, PosthogClient}, + track, +}; + +pub async fn cancel_approval_request( + HandlerContext(builder): HandlerContext, + AccessBuilder(request_ctx): AccessBuilder, + PosthogClient(posthog_client): PosthogClient, + OriginalUri(original_uri): OriginalUri, + Host(host_name): Host, + Path((_workspace_pk, change_set_id)): Path<(WorkspacePk, ChangeSetId)>, +) -> Result<()> { + let ctx = builder + .build(request_ctx.build(change_set_id.into())) + .await?; + + let mut change_set = ChangeSet::find(&ctx, ctx.visibility().change_set_id) + .await? + .ok_or(Error::ChangeSetNotFound(ctx.change_set_id()))?; + let old_status = change_set.status; + + change_set.reopen_change_set(&ctx).await?; + + track( + &posthog_client, + &ctx, + &original_uri, + &host_name, + "approve_change_set_apply", + serde_json::json!({ + "merged_change_set": change_set_id, + }), + ); + + WsEvent::change_set_status_changed( + &ctx, + change_set.id, + ChangeSet::extract_userid_from_context(&ctx).await, + old_status, + change_set.status, + ) + .await? + .publish_on_commit(&ctx) + .await?; + + ctx.commit().await?; + + Ok(()) +} diff --git a/lib/sdf-server/src/service/v2/change_set/force_apply.rs b/lib/sdf-server/src/service/v2/change_set/force_apply.rs new file mode 100644 index 0000000000..da7441ff97 --- /dev/null +++ b/lib/sdf-server/src/service/v2/change_set/force_apply.rs @@ -0,0 +1,49 @@ +use axum::extract::{Host, OriginalUri, Path}; +use dal::{ChangeSet, ChangeSetId, WorkspacePk}; + +use super::{Error, Result}; +use crate::{ + extract::{AccessBuilder, HandlerContext, PosthogClient}, + track, +}; + +pub async fn force_apply( + HandlerContext(builder): HandlerContext, + AccessBuilder(request_ctx): AccessBuilder, + PosthogClient(posthog_client): PosthogClient, + OriginalUri(original_uri): OriginalUri, + Host(host_name): Host, + Path((_workspace_pk, change_set_id)): Path<(WorkspacePk, ChangeSetId)>, +) -> Result<()> { + let mut ctx = builder + .build(request_ctx.build(change_set_id.into())) + .await?; + + ChangeSet::prepare_for_force_apply(&ctx).await?; + + // We need to run a commit before apply so changes get saved + ctx.commit().await?; + + ChangeSet::apply_to_base_change_set(&mut ctx).await?; + + track( + &posthog_client, + &ctx, + &original_uri, + &host_name, + "apply_change_set", + serde_json::json!({ + "merged_change_set": change_set_id, + }), + ); + + let _change_set = ChangeSet::find(&ctx, ctx.visibility().change_set_id) + .await? + .ok_or(Error::ChangeSetNotFound(ctx.change_set_id()))?; + + // Ws Event fires from the dal + + ctx.commit().await?; + + Ok(()) +} diff --git a/lib/sdf-server/src/service/v2/change_set/list.rs b/lib/sdf-server/src/service/v2/change_set/list.rs new file mode 100644 index 0000000000..85be4db6ba --- /dev/null +++ b/lib/sdf-server/src/service/v2/change_set/list.rs @@ -0,0 +1,82 @@ +use axum::{ + extract::{Host, OriginalUri, Path, State}, + Json, +}; +use dal::{ChangeSet, ChangeSetId, WorkspacePk}; +use permissions::{ObjectType, Relation, RelationBuilder}; + +use super::{AppState, Error, Result}; +use crate::{ + extract::{AccessBuilder, HandlerContext, PosthogClient}, + track, +}; + +pub async fn list_actionable( + HandlerContext(builder): HandlerContext, + AccessBuilder(request_ctx): AccessBuilder, + PosthogClient(posthog_client): PosthogClient, + OriginalUri(original_uri): OriginalUri, + Host(host_name): Host, + State(mut state): State, + Path(workspace_pk): Path, +) -> Result> { + let ctx = builder.build_head(request_ctx).await?; + + // List all actionable change sets and assemble them into individual views. + let open_change_sets = ChangeSet::list_active(&ctx).await?; + let mut views = Vec::with_capacity(open_change_sets.len()); + for change_set in open_change_sets { + views.push(change_set.into_frontend_type(&ctx).await?); + } + let client = state.spicedb_client().ok_or(Error::SpiceDBNotFound)?; + let existing_approvers = RelationBuilder::new() + .object(ObjectType::Workspace, workspace_pk) + .relation(Relation::Approver) + .read(client) + .await?; + + let existing_approver_ids: Vec = existing_approvers + .into_iter() + .map(|w| w.subject().id().to_string()) + .collect(); + + // Ensure that we find exactly one change set view that matches the open change sets found. + let head_change_set_id = ctx.get_workspace_default_change_set_id().await?; + let maybe_head_change_set_id: Vec = views + .iter() + .filter_map(|v| { + if v.id == head_change_set_id.into() { + Some(head_change_set_id) + } else { + None + } + }) + .collect(); + if maybe_head_change_set_id.len() != 1 { + return Err( + Error::UnexpectedNumberOfOpenChangeSetsMatchingDefaultChangeSet( + maybe_head_change_set_id, + ), + ); + } + let workspace = &ctx.get_workspace().await?; + let workspace_view = si_frontend_types::WorkspaceMetadata { + name: workspace.name().to_string(), + id: workspace.pk().to_string(), + default_change_set_id: head_change_set_id.into(), + change_sets: views, + approvers: existing_approver_ids, + }; + track( + &posthog_client, + &ctx, + &original_uri, + &host_name, + "list", + serde_json::json!({ + "workspace_id": workspace_pk, + }), + ); + + Ok(Json(workspace_view)) +} diff --git a/lib/sdf-server/src/service/v2/change_set/reject.rs b/lib/sdf-server/src/service/v2/change_set/reject.rs new file mode 100644 index 0000000000..0dd5399578 --- /dev/null +++ b/lib/sdf-server/src/service/v2/change_set/reject.rs @@ -0,0 +1,54 @@ +use axum::extract::{Host, OriginalUri, Path}; +use dal::{ChangeSet, ChangeSetId, WorkspacePk, WsEvent}; + +use super::{Error, Result}; +use crate::{ + extract::{AccessBuilder, HandlerContext, PosthogClient}, + track, +}; + +pub async fn reject( + HandlerContext(builder): HandlerContext, + AccessBuilder(request_ctx): AccessBuilder, + PosthogClient(posthog_client): PosthogClient, + OriginalUri(original_uri): OriginalUri, + Host(host_name): Host, + Path((_workspace_pk, change_set_id)): Path<(WorkspacePk, ChangeSetId)>, +) -> Result<()> { + let ctx = builder + .build(request_ctx.build(change_set_id.into())) + .await?; + + let mut change_set = ChangeSet::find(&ctx, ctx.visibility().change_set_id) + .await? + .ok_or(Error::ChangeSetNotFound(ctx.change_set_id()))?; + let old_status = change_set.status; + + change_set.reject_change_set_for_apply(&ctx).await?; + + track( + &posthog_client, + &ctx, + &original_uri, + &host_name, + "reject_change_set_apply", + serde_json::json!({ + "change_set": change_set_id, + }), + ); + + WsEvent::change_set_status_changed( + &ctx, + change_set.id, + ChangeSet::extract_userid_from_context(&ctx).await, + old_status, + change_set.status, + ) + .await? + .publish_on_commit(&ctx) + .await?; + + ctx.commit().await?; + + Ok(()) +} diff --git a/lib/sdf-server/src/service/v2/change_set/reopen.rs b/lib/sdf-server/src/service/v2/change_set/reopen.rs new file mode 100644 index 0000000000..a060e73336 --- /dev/null +++ b/lib/sdf-server/src/service/v2/change_set/reopen.rs @@ -0,0 +1,56 @@ +use axum::extract::{Host, OriginalUri, Path}; +use dal::{ChangeSet, ChangeSetId, WorkspacePk, WsEvent}; + +use super::{Error, Result}; +use crate::{ + extract::{AccessBuilder, HandlerContext, PosthogClient}, + track, +}; + +pub async fn reopen( + HandlerContext(builder): HandlerContext, + AccessBuilder(request_ctx): AccessBuilder, + PosthogClient(posthog_client): PosthogClient, + OriginalUri(original_uri): OriginalUri, + Host(host_name): Host, + Path((_workspace_pk, change_set_id)): Path<(WorkspacePk, ChangeSetId)>, +) -> Result<()> { + let ctx = builder + .build(request_ctx.build(change_set_id.into())) + .await?; + + let mut change_set = ChangeSet::find(&ctx, ctx.visibility().change_set_id) + .await? + .ok_or(Error::ChangeSetNotFound(ctx.change_set_id()))?; + let old_status = change_set.status; + + //todo(brit): should we guard against re-opening abandoned change sets? + // this might be helpful if we don't... + change_set.reopen_change_set(&ctx).await?; + + WsEvent::change_set_status_changed( + &ctx, + change_set.id, + ChangeSet::extract_userid_from_context(&ctx).await, + old_status, + change_set.status, + ) + .await? + .publish_on_commit(&ctx) + .await?; + + track( + &posthog_client, + &ctx, + &original_uri, + &host_name, + "reject_change_set_apply", + serde_json::json!({ + "change_set": change_set_id, + }), + ); + + ctx.commit().await?; + + Ok(()) +} diff --git a/lib/sdf-server/src/service/v2/change_set/request_approval.rs b/lib/sdf-server/src/service/v2/change_set/request_approval.rs new file mode 100644 index 0000000000..17d190e14b --- /dev/null +++ b/lib/sdf-server/src/service/v2/change_set/request_approval.rs @@ -0,0 +1,54 @@ +use axum::extract::{Host, OriginalUri, Path}; +use dal::{ChangeSet, ChangeSetId, WorkspacePk, WsEvent}; + +use super::{Error, Result}; +use crate::{ + extract::{AccessBuilder, HandlerContext, PosthogClient}, + track, +}; + +pub async fn request_approval( + HandlerContext(builder): HandlerContext, + AccessBuilder(request_ctx): AccessBuilder, + PosthogClient(posthog_client): PosthogClient, + OriginalUri(original_uri): OriginalUri, + Host(host_name): Host, + Path((_workspace_pk, change_set_id)): Path<(WorkspacePk, ChangeSetId)>, +) -> Result<()> { + let ctx = builder + .build(request_ctx.build(change_set_id.into())) + .await?; + + let mut change_set = ChangeSet::find(&ctx, ctx.visibility().change_set_id) + .await? + .ok_or(Error::ChangeSetNotFound(ctx.change_set_id()))?; + let old_status = change_set.status; + + change_set.request_change_set_approval(&ctx).await?; + + track( + &posthog_client, + &ctx, + &original_uri, + &host_name, + "request_change_set_approval", + serde_json::json!({ + "change_set": change_set_id, + }), + ); + + WsEvent::change_set_status_changed( + &ctx, + change_set.id, + ChangeSet::extract_userid_from_context(&ctx).await, + old_status, + change_set.status, + ) + .await? + .publish_on_commit(&ctx) + .await?; + + ctx.commit().await?; + + Ok(()) +} diff --git a/lib/si-events-rs/src/change_set_status.rs b/lib/si-events-rs/src/change_set_status.rs index cae555ac54..7b6540ef94 100644 --- a/lib/si-events-rs/src/change_set_status.rs +++ b/lib/si-events-rs/src/change_set_status.rs @@ -8,9 +8,10 @@ use strum::{AsRefStr, Display, EnumString}; pub enum ChangeSetStatus { Abandoned, Applied, - Closed, + Approved, Failed, NeedsAbandonApproval, NeedsApproval, Open, + Rejected, } diff --git a/lib/si-frontend-types-rs/src/change_set.rs b/lib/si-frontend-types-rs/src/change_set.rs new file mode 100644 index 0000000000..2960f8987b --- /dev/null +++ b/lib/si-frontend-types-rs/src/change_set.rs @@ -0,0 +1,21 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use si_events::{ChangeSetId, ChangeSetStatus}; + +#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct ChangeSet { + pub created_at: DateTime, + pub updated_at: DateTime, + pub name: String, + pub id: ChangeSetId, + pub status: ChangeSetStatus, + pub base_change_set_id: Option, + pub workspace_id: String, + pub merge_requested_by_user_id: Option, + pub merge_requested_by_user_email: Option, + pub merge_requested_at: Option>, + pub reviewed_by_user_id: Option, + pub reviewed_by_user_email: Option, + pub reviewed_at: Option>, +} diff --git a/lib/si-frontend-types-rs/src/lib.rs b/lib/si-frontend-types-rs/src/lib.rs index ac4dcb6a21..7e6120d511 100644 --- a/lib/si-frontend-types-rs/src/lib.rs +++ b/lib/si-frontend-types-rs/src/lib.rs @@ -1,11 +1,14 @@ mod audit_log; +mod change_set; mod component; mod conflict; mod func; mod module; mod schema_variant; +mod workspace; pub use crate::audit_log::AuditLog; +pub use crate::change_set::ChangeSet; pub use crate::component::{ ChangeStatus, ConnectionAnnotation, DiagramSocket, DiagramSocketDirection, DiagramSocketNodeSide, GridPoint, Size2D, SummaryDiagramComponent, @@ -21,3 +24,4 @@ pub use crate::module::{ pub use crate::schema_variant::{ ComponentType, InputSocket, OutputSocket, Prop, PropKind, SchemaVariant, UninstalledVariant, }; +pub use crate::workspace::WorkspaceMetadata; diff --git a/lib/si-frontend-types-rs/src/workspace.rs b/lib/si-frontend-types-rs/src/workspace.rs new file mode 100644 index 0000000000..58b937bbb8 --- /dev/null +++ b/lib/si-frontend-types-rs/src/workspace.rs @@ -0,0 +1,15 @@ +use serde::{Deserialize, Serialize}; +use si_events::ChangeSetId; + +use crate::change_set::ChangeSet; + +#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct WorkspaceMetadata { + pub name: String, + pub id: String, + pub default_change_set_id: ChangeSetId, + pub change_sets: Vec, + /// list of user ids that are approvers for this workspace + pub approvers: Vec, +}