From f3b0e3441a96444a1fbbd65206c884ce870a2542 Mon Sep 17 00:00:00 2001 From: Nick Gerace Date: Tue, 19 Nov 2024 12:28:24 -0500 Subject: [PATCH] Write migration for audit logs table This commit contains the migration for the audit logs table. It contains a mirrored Rust struct with an insertion method. None of the Rust code replaces what exists in "si-frontend-types" or "si-events" but may do so in the future. Signed-off-by: Nick Gerace --- Cargo.lock | 1 + lib/audit-logs/BUCK | 1 + lib/audit-logs/Cargo.toml | 1 + lib/audit-logs/src/lib.rs | 162 ++++++++++++++++++ .../src/migrations/U0002__audit_logs.sql | 14 ++ lib/audit-logs/src/pg.rs | 5 + 6 files changed, 184 insertions(+) create mode 100644 lib/audit-logs/src/migrations/U0002__audit_logs.sql diff --git a/Cargo.lock b/Cargo.lock index e2ef2c68bf..9757ac44b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -419,6 +419,7 @@ dependencies = [ "si-data-nats", "si-data-pg", "si-events", + "strum", "telemetry", "telemetry-nats", "thiserror", diff --git a/lib/audit-logs/BUCK b/lib/audit-logs/BUCK index 6cb9a05b94..c22330bfbd 100644 --- a/lib/audit-logs/BUCK +++ b/lib/audit-logs/BUCK @@ -12,6 +12,7 @@ rust_library( "//third-party/rust:remain", "//third-party/rust:serde", "//third-party/rust:serde_json", + "//third-party/rust:strum", "//third-party/rust:thiserror", ], srcs = glob([ diff --git a/lib/audit-logs/Cargo.toml b/lib/audit-logs/Cargo.toml index d89c94a79d..1dfbd13924 100644 --- a/lib/audit-logs/Cargo.toml +++ b/lib/audit-logs/Cargo.toml @@ -19,4 +19,5 @@ refinery = { workspace = true } remain = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } +strum = { workspace = true } thiserror = { workspace = true } diff --git a/lib/audit-logs/src/lib.rs b/lib/audit-logs/src/lib.rs index a8608eaa9e..fb0d7532fd 100644 --- a/lib/audit-logs/src/lib.rs +++ b/lib/audit-logs/src/lib.rs @@ -28,5 +28,167 @@ pub mod pg; mod stream; +use pg::AuditDatabaseContext; +use serde::Deserialize; +use serde::Serialize; +use si_data_pg::PgError; +use si_data_pg::PgPoolError; +use si_events::Actor; +use si_events::ChangeSetId; +use si_events::ChangeSetStatus; +use si_events::UserPk; +use si_events::WorkspacePk; +use strum::Display; +use strum::EnumDiscriminants; +use telemetry::prelude::*; +use thiserror::Error; + pub use stream::AuditLogsStream; pub use stream::AuditLogsStreamError; + +#[allow(missing_docs)] +#[derive(Debug, Error)] +pub enum AuditLogError { + #[error("pg error: {0}")] + Pg(#[from] PgError), + #[error("pg pool error: {0}")] + PgPool(#[from] PgPoolError), + #[error("serde json error: {0}")] + SerdeJson(#[from] serde_json::Error), +} + +type Result = std::result::Result; + +// FIXME(nick): delete this once accessor patterns are in place. +// impl TryFrom for AuditLogRow { +// type Error = AuditLogError; +// +// fn try_from(value: PgRow) -> std::result::Result { +// Ok(Self { +// actor: todo!(), +// kind: AuditLogKind::CreateChangeSet, +// entity_name: todo!(), +// timestamp: todo!(), +// change_set_id: todo!(), +// }) +// let status_string: String = value.try_get("status")?; +// let status = ChangeSetStatus::try_from(status_string.as_str())?; +// Ok(Self { +// id: value.try_get("id")?, +// created_at: value.try_get("created_at")?, +// updated_at: value.try_get("updated_at")?, +// name: value.try_get("name")?, +// status, +// base_change_set_id: value.try_get("base_change_set_id")?, +// workspace_snapshot_address: value.try_get("workspace_snapshot_address")?, +// workspace_id: value.try_get("workspace_id")?, +// merge_requested_by_user_id: value.try_get("merge_requested_by_user_id")?, +// merge_requested_at: value.try_get("merge_requested_at")?, +// reviewed_by_user_id: value.try_get("reviewed_by_user_id")?, +// reviewed_at: value.try_get("reviewed_at")?, +// }) +// } +// } + +#[allow(missing_docs)] +#[remain::sorted] +#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq, Display, EnumDiscriminants)] +pub enum AuditLogKind { + #[allow(missing_docs)] + AbandonChangeSet { from_status: ChangeSetStatus }, + #[allow(missing_docs)] + CreateChangeSet, +} + +#[allow(missing_docs)] +#[remain::sorted] +#[derive(Debug, Serialize, Deserialize, EnumDiscriminants)] +#[serde(untagged, rename_all = "camelCase")] +pub enum AuditLogMetadata { + #[allow(missing_docs)] + #[serde(rename_all = "camelCase")] + AbandonChangeSet { from_status: ChangeSetStatus }, + #[allow(missing_docs)] + #[serde(rename_all = "camelCase")] + CreateChangeSet, +} + +impl From for AuditLogMetadata { + fn from(value: AuditLogKind) -> Self { + match value { + AuditLogKind::AbandonChangeSet { from_status } => { + Self::AbandonChangeSet { from_status } + } + AuditLogKind::CreateChangeSet => Self::CreateChangeSet, + } + } +} + +#[allow(clippy::too_many_arguments, missing_docs)] +#[instrument( + name = "audit_log.insert", + level = "debug", + skip_all, + fields( + si.workspace.id = %workspace_id, + ), +)] +pub async fn insert( + context: &AuditDatabaseContext, + workspace_id: WorkspacePk, + kind: AuditLogKind, + timestamp: String, + title: String, + change_set_id: Option, + actor: Actor, + entity_name: Option, + entity_type: Option, +) -> Result<()> { + let kind_as_string = kind.to_string(); + let user_id: Option = match actor { + Actor::System => None, + Actor::User(user_id) => Some(user_id), + }; + let serialized_metadata = serde_json::to_value(AuditLogMetadata::from(kind))?; + + let _ = context + .pg_pool() + .get() + .await? + .query_one( + "INSERT INTO audit_logs ( + workspace_id, + kind, + timestamp, + title, + change_set_id, + user_id, + entity_name, + entity_type, + metadata + ) VALUES ( + $1, + $2, + $3, + $4, + $5, + $6, + $7, + $8, + $9 + )", + &[ + &workspace_id, + &kind_as_string, + ×tamp, + &title, + &change_set_id, + &user_id, + &entity_name, + &entity_type, + &serialized_metadata, + ], + ) + .await?; + Ok(()) +} diff --git a/lib/audit-logs/src/migrations/U0002__audit_logs.sql b/lib/audit-logs/src/migrations/U0002__audit_logs.sql new file mode 100644 index 0000000000..318b924934 --- /dev/null +++ b/lib/audit-logs/src/migrations/U0002__audit_logs.sql @@ -0,0 +1,14 @@ +CREATE TABLE audit_logs ( + pk bigserial PRIMARY KEY, + workspace_id text NOT NULL, + kind text NOT NULL, + timestamp timestamp with time zone NOT NULL, + title text NOT NULL, + change_set_id text, + user_id text, + entity_name text, + entity_type text, + metadata jsonb +); + +CREATE INDEX audit_logs_workspace_and_change_set ON audit_logs (workspace_id, change_set_id); diff --git a/lib/audit-logs/src/pg.rs b/lib/audit-logs/src/pg.rs index 8c45e352e2..a4fbaf45ed 100644 --- a/lib/audit-logs/src/pg.rs +++ b/lib/audit-logs/src/pg.rs @@ -37,6 +37,11 @@ impl AuditDatabaseContext { pg_pool: PgPool::new(&config.pg).await?, }) } + + /// Returns a reference to the [`PgPool`]. + pub fn pg_pool(&self) -> &PgPool { + &self.pg_pool + } } /// The configuration used for communicating with and setting up the audit database.