Skip to content

Commit

Permalink
WIP: split out approvals API
Browse files Browse the repository at this point in the history
- Add change set approvals table with integration test
- Collect approval status for a change set (only current approvals right
  now and without the validity check)
- Add new approval route
- Detect added, removed and changed nodes
- Collapse the dectector into one module and rename it
- Rename new change detection logic to "detect_changes"
- Add new unit test for "detect_changes"
- Calculate changes hash in an integration test
- Move changes hash contents into approval
- Generate the checksum upon approval (which may come from the frontend
  in the future due to potential desync issues, etc.)
- Calculate the "is_valid" check for real

Co-authored-by: Jacob Helwig <[email protected]>
Co-authored-by: John Obelenus <[email protected]>
Signed-off-by: Nick Gerace <[email protected]>
  • Loading branch information
3 people committed Dec 18, 2024
1 parent 48ea059 commit 145a4d5
Show file tree
Hide file tree
Showing 37 changed files with 578 additions and 58 deletions.
1 change: 1 addition & 0 deletions lib/dal/src/change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::{
WorkspaceError,
};

pub mod approval;
pub mod event;
pub mod status;
pub mod view;
Expand Down
162 changes: 162 additions & 0 deletions lib/dal/src/change_set/approval.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
//! Provides the ability to approve change sets and calculate their approval status.
#![warn(
bad_style,
clippy::missing_panics_doc,
clippy::panic,
clippy::panic_in_result_fn,
clippy::unwrap_in_result,
clippy::unwrap_used,
dead_code,
improper_ctypes,
missing_debug_implementations,
missing_docs,
no_mangle_generic_items,
non_shorthand_field_patterns,
overflowing_literals,
path_statements,
patterns_in_fns_without_body,
unconditional_recursion,
unreachable_pub,
unused,
unused_allocation,
unused_comparisons,
unused_parens,
while_true
)]

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use si_data_pg::{PgError, PgRow};
use si_events::ChangesChecksum;
use si_id::{ChangeSetApprovalId, ChangeSetId, UserPk};
use telemetry::prelude::*;
use thiserror::Error;

pub use si_events::ChangeSetApprovalStatus;

use crate::{DalContext, HistoryActor, TransactionsError, WorkspaceSnapshotError};

#[allow(missing_docs)]
#[derive(Debug, Error)]
pub enum ChangeSetApprovalError {
#[error("invalid user for creating a change set approval")]
InvalidUserForCreation,
#[error("pg error: {0}")]
Pg(#[from] PgError),
#[error("strum parse error: {0}")]
StrumParse(#[from] strum::ParseError),
#[error("transactions error: {0}")]
Transactions(#[from] TransactionsError),
#[error("workspace snapshot error: {0}")]
WorkspaceSnapshot(#[from] WorkspaceSnapshotError),
}

type Result<T> = std::result::Result<T, ChangeSetApprovalError>;

/// An individual approval for applying a [`ChangeSet`](crate::ChangeSet).
#[derive(Debug, Serialize, Deserialize)]
pub struct ChangeSetApproval {
id: ChangeSetApprovalId,
created_at: DateTime<Utc>,
updated_at: DateTime<Utc>,
change_set_id: ChangeSetId,
status: ChangeSetApprovalStatus,
user_id: UserPk,
checksum: String,
}

impl TryFrom<PgRow> for ChangeSetApproval {
type Error = ChangeSetApprovalError;

fn try_from(value: PgRow) -> std::result::Result<Self, Self::Error> {
let status_string: String = value.try_get("status")?;
let status = ChangeSetApprovalStatus::try_from(status_string.as_str())?;
Ok(Self {
id: value.try_get("id")?,
created_at: value.try_get("created_at")?,
updated_at: value.try_get("updated_at")?,
change_set_id: value.try_get("change_set_id")?,
status,
user_id: value.try_get("user_id")?,
checksum: value.try_get("checksum")?,
})
}
}

impl ChangeSetApproval {
/// Creates a new approval.
#[instrument(name = "change_set.approval.new", level = "info", skip_all)]
pub async fn new(ctx: &DalContext, status: ChangeSetApprovalStatus) -> Result<Self> {
let change_set_id = ctx.change_set_id();
let user_id = match ctx.history_actor() {
HistoryActor::User(user_id) => user_id,
HistoryActor::SystemInit => return Err(ChangeSetApprovalError::InvalidUserForCreation),
};
let checksum = Self::calculate_checksum(ctx).await?;
let row = ctx
.txns()
.await?
.pg()
.query_one(
"INSERT INTO change_set_approvals (change_set_id, status, user_id, checksum) VALUES ($1, $2, $3, $4) RETURNING *",
&[&change_set_id, &status.to_string(), &user_id, &checksum.to_string()]
)
.await?;
Self::try_from(row)
}

/// Returns the ID of the approval.
pub fn id(&self) -> ChangeSetApprovalId {
self.id
}

/// Returns the status of the approval.
pub fn status(&self) -> ChangeSetApprovalStatus {
self.status
}

/// Returns the ID of the approver.
pub fn user_id(&self) -> UserPk {
self.user_id
}

/// Returns the checksum based on the changes compared to HEAD when the approval was performed.
pub fn checksum(&self) -> &str {
self.checksum.as_str()
}

/// Lists all approvals in the [`ChangeSet`](crate::ChangeSet).
#[instrument(name = "change_set.approval.list", level = "info", skip_all)]
pub async fn list(ctx: &DalContext) -> Result<Vec<Self>> {
let change_set_id = ctx.change_set_id();
let rows = ctx
.txns()
.await?
.pg()
.query(
"SELECT * from change_set_approvals WHERE change_set_id = $1 ORDER BY id ASC",
&[&change_set_id],
)
.await?;
let mut approvals = Vec::with_capacity(rows.len());
for row in rows {
approvals.push(Self::try_from(row)?);
}
Ok(approvals)
}

/// Calculates a checksum of all detected changes between the current [`ChangeSet`] and HEAD.
pub async fn calculate_checksum(ctx: &DalContext) -> Result<ChangesChecksum> {
let mut changes = ctx
.workspace_snapshot()?
.detect_changes_from_head(ctx)
.await?;
changes.sort_by_key(|c| c.id);
let mut hasher = ChangesChecksum::hasher();
for change in changes {
hasher.update(change.merkle_tree_hash.as_bytes());
}
Ok(hasher.finalize())
}
}
10 changes: 10 additions & 0 deletions lib/dal/src/migrations/U3500__change_set_approvals.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE TABLE change_set_approvals
(
id ident primary key NOT NULL DEFAULT ident_create_v1(),
created_at timestamp with time zone NOT NULL DEFAULT CLOCK_TIMESTAMP(),
updated_at timestamp with time zone NOT NULL DEFAULT CLOCK_TIMESTAMP(),
change_set_id ident NOT NULL,
status text NOT NULL,
user_id ident NOT NULL,
checksum text NOT NULL
);
44 changes: 37 additions & 7 deletions lib/dal/src/workspace_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub mod vector_clock;
pub use traits::{schema::variant::SchemaVariantExt, socket::input::InputSocketExt};

use graph::correct_transforms::correct_transforms;
use graph::detect_updates::Update;
use graph::detector::{Change, Update};
use graph::{RebaseBatch, WorkspaceSnapshotGraph};
use node_weight::traits::CorrectTransformsError;
use std::collections::{HashMap, HashSet};
Expand Down Expand Up @@ -713,12 +713,7 @@ impl WorkspaceSnapshot {
Ok(())
}

#[instrument(
name = "workspace_snapshot.detect_updates",
level = "debug",
skip_all,
fields()
)]
#[instrument(name = "workspace_snapshot.detect_updates", level = "debug", skip_all)]
pub async fn detect_updates(
&self,
onto_workspace_snapshot: &WorkspaceSnapshot,
Expand All @@ -735,6 +730,41 @@ impl WorkspaceSnapshot {
.await?)
}

#[instrument(name = "workspace_snapshot.detect_changes", level = "debug", skip_all)]
pub async fn detect_changes(
&self,
onto_workspace_snapshot: &WorkspaceSnapshot,
) -> WorkspaceSnapshotResult<Vec<Change>> {
let self_clone = self.clone();
let onto_clone = onto_workspace_snapshot.clone();

Ok(slow_rt::spawn(async move {
self_clone
.working_copy()
.await
.detect_changes(&*onto_clone.working_copy().await)
})?
.await?)
}

/// A wrapper around [`Self::detect_changes`](Self::detect_changes) where the "onto" snapshot is derived from the
/// workspace's default [`ChangeSet`](crate::ChangeSet).
#[instrument(
name = "workspace_snapshot.detect_changes_from_head",
level = "debug",
skip_all
)]
pub async fn detect_changes_from_head(
&self,
ctx: &DalContext,
) -> WorkspaceSnapshotResult<Vec<Change>> {
let head_change_set_id = ctx.get_workspace_default_change_set_id().await?;
let head_snapshot = Self::find_for_change_set(&ctx, head_change_set_id).await?;
Ok(head_snapshot
.detect_changes(&ctx.workspace_snapshot()?.clone())
.await?)
}

/// Gives the exact node index endpoints of an edge.
pub async fn edge_endpoints(
&self,
Expand Down
11 changes: 5 additions & 6 deletions lib/dal/src/workspace_snapshot/graph.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::{fs::File, io::Write};

use deprecated::DeprecatedWorkspaceSnapshotGraphV1;
use detect_updates::Update;
use detector::Update;
use petgraph::prelude::*;
/// Ensure [`NodeIndex`], and [`Direction`] are usable externally.
pub use petgraph::{graph::NodeIndex, Direction};
use serde::{Deserialize, Serialize};
use si_events::{merkle_tree_hash::MerkleTreeHash, ulid::Ulid};
use si_layer_cache::db::serialize;
Expand All @@ -13,6 +11,9 @@ use strum::{EnumDiscriminants, EnumIter, EnumString, IntoEnumIterator};
use telemetry::prelude::*;
use thiserror::Error;

/// Ensure [`NodeIndex`], and [`Direction`] are usable externally.
pub use petgraph::{graph::NodeIndex, Direction};

use crate::{
socket::input::InputSocketError,
workspace_snapshot::node_weight::{category_node_weight::CategoryNodeKind, NodeWeightError},
Expand All @@ -21,7 +22,7 @@ use crate::{

pub mod correct_transforms;
pub mod deprecated;
pub mod detect_updates;
pub mod detector;
mod tests;
pub mod traits;
pub mod v2;
Expand All @@ -44,8 +45,6 @@ pub enum WorkspaceSnapshotGraphError {
CannotCompareOrderedAndUnorderedContainers(NodeIndex, NodeIndex),
#[error("could not find category node of kind: {0:?}")]
CategoryNodeNotFound(CategoryNodeKind),
// #[error("ChangeSet error: {0}")]
// ChangeSet(#[from] ChangeSetError),
#[error("Component error: {0}")]
Component(#[from] Box<ComponentError>),
#[error("Unable to retrieve content for ContentHash")]
Expand Down
2 changes: 1 addition & 1 deletion lib/dal/src/workspace_snapshot/graph/correct_transforms.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::workspace_snapshot::node_weight::NodeWeight;
use crate::workspace_snapshot::NodeInformation;
use crate::{EdgeWeight, EdgeWeightKind, NodeWeightDiscriminants};

use super::{detect_updates::Update, WorkspaceSnapshotGraphVCurrent};
use super::{detector::Update, WorkspaceSnapshotGraphVCurrent};

pub fn correct_transforms(
graph: &WorkspaceSnapshotGraphVCurrent,
Expand Down
Loading

0 comments on commit 145a4d5

Please sign in to comment.