From 68a760592088df50e85f6b10a5a450a78abc5aef Mon Sep 17 00:00:00 2001 From: Nick Gerace Date: Tue, 19 Nov 2024 17:51:42 -0500 Subject: [PATCH] Write audit logs to database in forklift This commit adds the ability for forklift to write audit logs to the new audit database. By default, this functionality is disabled and can be enabled with the "--enable-audit-logs-app" flag or configuration file option for forklift. Changes to audit logs: - Move all postgres-related logic to new "database" module, splitting up all functionality into sub-modules - Refactor insert method to ensure insert query works - Move all functionality out of the root library file - Move metadata struct from the frontend types crate to the events crate and ensure that it is versioned because we need it for database insertions - Make entity type optional for the metadata rather than using whitespaces or empty strings Changes to forklift: - Add optional inner audit logs app based on the new config option - Move all apps to a new app module, including the handlers and app states - Ensure app setup flows through a single error type which then owns app setup error types for each app - Ensure audit logs app durable consumer uses an ack layer, but relies on the limits-based queue semantics to keep the messages on it - Add postgres cert for local development, including development environment detection - Add flag and config option to enable audit logs app - Add audit config option for the corresponding postgres config Misc changes: - Ensure the existing audit logging list function works from other changes in this commit - Remove unused pub export for development environment detection for sdf Considerations: - Empty metadata for audit logs will be written as "JSON null" and not "PG null" for the database despite the column being nullable (this is okay, just so long as we know!) - We have faith in naxum concurrency limits and PgPool configuration enough such that we can singularly insert audit logs into the database one at a time (this is subject to change) Signed-off-by: Nick Gerace --- Cargo.lock | 7 + bin/forklift/BUCK | 3 + bin/forklift/src/args.rs | 5 + lib/audit-logs/BUCK | 1 + lib/audit-logs/Cargo.toml | 1 + lib/audit-logs/src/database.rs | 109 ++++ lib/audit-logs/src/database/config.rs | 27 + lib/audit-logs/src/database/context.rs | 35 ++ .../src/{pg => database}/migrate.rs | 2 +- lib/audit-logs/src/lib.rs | 164 +----- lib/audit-logs/src/pg.rs | 66 --- lib/audit-logs/src/stream.rs | 5 + lib/dal/src/audit_logging.rs | 12 +- lib/forklift-server/BUCK | 6 + lib/forklift-server/Cargo.toml | 15 +- lib/forklift-server/src/config.rs | 103 +++- lib/forklift-server/src/lib.rs | 2 - lib/forklift-server/src/server.rs | 269 +++------ lib/forklift-server/src/server/app.rs | 73 +++ .../src/server/app/audit_logs.rs | 153 ++++++ .../src/server/app/audit_logs/app_state.rs | 25 + .../src/server/app/audit_logs/handlers.rs | 83 +++ .../src/server/app/billing_events.rs | 217 ++++++++ .../app/billing_events}/app_state.rs | 0 .../app/billing_events}/handlers.rs | 2 +- lib/sdf-server/src/config.rs | 8 +- lib/sdf-server/src/lib.rs | 7 +- lib/sdf-server/src/migrations.rs | 4 +- lib/sdf-server/src/server.rs | 2 +- lib/si-events-rs/src/audit_log.rs | 3 +- lib/si-events-rs/src/audit_log/v1.rs | 512 +++++++++++++++++ lib/si-frontend-types-rs/src/audit_log.rs | 520 +----------------- lib/si-frontend-types-rs/src/lib.rs | 2 +- 33 files changed, 1458 insertions(+), 985 deletions(-) create mode 100644 lib/audit-logs/src/database.rs create mode 100644 lib/audit-logs/src/database/config.rs create mode 100644 lib/audit-logs/src/database/context.rs rename lib/audit-logs/src/{pg => database}/migrate.rs (95%) delete mode 100644 lib/audit-logs/src/pg.rs create mode 100644 lib/forklift-server/src/server/app.rs create mode 100644 lib/forklift-server/src/server/app/audit_logs.rs create mode 100644 lib/forklift-server/src/server/app/audit_logs/app_state.rs create mode 100644 lib/forklift-server/src/server/app/audit_logs/handlers.rs create mode 100644 lib/forklift-server/src/server/app/billing_events.rs rename lib/forklift-server/src/{ => server/app/billing_events}/app_state.rs (100%) rename lib/forklift-server/src/{ => server/app/billing_events}/handlers.rs (96%) diff --git a/Cargo.lock b/Cargo.lock index 9757ac44b0..a32449bc47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -412,6 +412,7 @@ dependencies = [ name = "audit-logs" version = "0.1.0" dependencies = [ + "chrono", "refinery", "remain", "serde", @@ -2622,18 +2623,24 @@ dependencies = [ name = "forklift-server" version = "0.1.0" dependencies = [ + "audit-logs", "billing-events", + "buck2-resources", "data-warehouse-stream-client", "derive_builder", + "futures", "naxum", "remain", "serde", "serde_json", "si-data-nats", + "si-events", "si-settings", + "si-std", "telemetry", "telemetry-nats", "thiserror", + "tokio", "tokio-util", "ulid", ] diff --git a/bin/forklift/BUCK b/bin/forklift/BUCK index adebe2d04e..c9610da103 100644 --- a/bin/forklift/BUCK +++ b/bin/forklift/BUCK @@ -15,6 +15,9 @@ rust_binary( ], srcs = glob(["src/**/*.rs"]), env = {"CARGO_BIN_NAME": "forklift"}, + resources = { + "dev.postgres.root.crt": "//config/keys:dev.postgres.root.crt", + }, ) docker_image( diff --git a/bin/forklift/src/args.rs b/bin/forklift/src/args.rs index da59d9579b..d0a7968dbe 100644 --- a/bin/forklift/src/args.rs +++ b/bin/forklift/src/args.rs @@ -79,6 +79,10 @@ pub(crate) struct Args { /// The name of the data warehouse stream #[arg(long)] pub(crate) data_warehouse_stream_name: Option, + + /// Enables the audit logs app + #[arg(long, default_value = "false")] + pub(crate) enable_audit_logs_app: bool, } impl TryFrom for Config { @@ -105,6 +109,7 @@ impl TryFrom for Config { if let Some(data_warehouse_stream_name) = args.data_warehouse_stream_name { config_map.set("data_warehouse_stream_name", data_warehouse_stream_name); } + config_map.set("enable_audit_logs_app", args.enable_audit_logs_app); })? .try_into() } diff --git a/lib/audit-logs/BUCK b/lib/audit-logs/BUCK index c22330bfbd..8676b309ad 100644 --- a/lib/audit-logs/BUCK +++ b/lib/audit-logs/BUCK @@ -8,6 +8,7 @@ rust_library( "//lib/si-events-rs:si-events", "//lib/telemetry-nats-rs:telemetry-nats", "//lib/telemetry-rs:telemetry", + "//third-party/rust:chrono", "//third-party/rust:refinery", "//third-party/rust:remain", "//third-party/rust:serde", diff --git a/lib/audit-logs/Cargo.toml b/lib/audit-logs/Cargo.toml index 1dfbd13924..9889cddf1e 100644 --- a/lib/audit-logs/Cargo.toml +++ b/lib/audit-logs/Cargo.toml @@ -15,6 +15,7 @@ si-events = { path = "../../lib/si-events-rs" } telemetry = { path = "../../lib/telemetry-rs" } telemetry-nats = { path = "../../lib/telemetry-nats-rs" } +chrono = { workspace = true } refinery = { workspace = true } remain = { workspace = true } serde = { workspace = true } diff --git a/lib/audit-logs/src/database.rs b/lib/audit-logs/src/database.rs new file mode 100644 index 0000000000..55daa1cd2b --- /dev/null +++ b/lib/audit-logs/src/database.rs @@ -0,0 +1,109 @@ +//! Contains functionality for setting up and communicating with the audit database. + +use chrono::DateTime; +use chrono::Utc; +use si_data_pg::PgError; +use si_data_pg::PgPoolError; +use si_events::audit_log::AuditLogKind; +use si_events::audit_log::AuditLogMetadata; +use si_events::Actor; +use si_events::ChangeSetId; +use si_events::WorkspacePk; +use telemetry::prelude::*; +use thiserror::Error; + +mod config; +mod context; +mod migrate; + +pub use config::AuditDatabaseConfig; +pub use config::DBNAME; +pub use context::AuditDatabaseContext; +pub use context::AuditDatabaseContextError; +pub use migrate::{migrate, AuditDatabaseMigrationError}; + +#[allow(missing_docs)] +#[derive(Error, Debug)] +pub enum AuditDatabaseError { + #[error("chrono parse error: {0}")] + ChronoParse(#[from] chrono::ParseError), + #[error("pg error: {0}")] + Pg(#[from] PgError), + #[error("pg pool error: {0}")] + PgPool(#[from] PgPoolError), + #[error("serde json error: {0}")] + SerdeJson(#[from] serde_json::Error), +} + +type Result = std::result::Result; + +#[allow(clippy::too_many_arguments, missing_docs)] +#[instrument( + name = "audit_log.insert", + level = "debug", + skip_all, + fields( + si.workspace.id = %workspace_id, + ), +)] +pub async fn insert( + context: &AuditDatabaseContext, + workspace_id: WorkspacePk, + kind: AuditLogKind, + timestamp: String, + change_set_id: Option, + actor: Actor, + entity_name: Option, +) -> Result<()> { + let kind_as_string = kind.to_string(); + let user_id = match actor { + Actor::System => None, + Actor::User(user_id) => Some(user_id), + }; + + let metadata = AuditLogMetadata::from(kind); + let (title, entity_type) = metadata.title_and_entity_type(); + let serialized_metadata = serde_json::to_value(metadata)?; + let timestamp: DateTime = timestamp.parse()?; + + context + .pg_pool() + .get() + .await? + .query_one( + "INSERT INTO audit_logs ( + workspace_id, + kind, + timestamp, + title, + change_set_id, + user_id, + entity_name, + entity_type, + metadata + ) VALUES ( + $1, + $2, + $3, + $4, + $5, + $6, + $7, + $8, + $9 + ) RETURNING *", + &[ + &workspace_id, + &kind_as_string, + ×tamp, + &title, + &change_set_id.map(|id| id.to_string()), + &user_id.map(|id| id.to_string()), + &entity_name, + &entity_type, + &serialized_metadata, + ], + ) + .await?; + Ok(()) +} diff --git a/lib/audit-logs/src/database/config.rs b/lib/audit-logs/src/database/config.rs new file mode 100644 index 0000000000..8e3d63924c --- /dev/null +++ b/lib/audit-logs/src/database/config.rs @@ -0,0 +1,27 @@ +use serde::{Deserialize, Serialize}; +use si_data_pg::PgPoolConfig; + +/// The name of the audit database. +pub const DBNAME: &str = "si_audit"; +const APPLICATION_NAME: &str = "si-audit"; + +/// The configuration used for communicating with and setting up the audit database. +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct AuditDatabaseConfig { + /// The configuration for the PostgreSQL pool. + /// + /// _Note:_ this is called "pg" for ease of use with layered load configuration files. + pub pg: PgPoolConfig, +} + +impl Default for AuditDatabaseConfig { + fn default() -> Self { + Self { + pg: PgPoolConfig { + dbname: DBNAME.into(), + application_name: APPLICATION_NAME.into(), + ..Default::default() + }, + } + } +} diff --git a/lib/audit-logs/src/database/context.rs b/lib/audit-logs/src/database/context.rs new file mode 100644 index 0000000000..5647d85ad0 --- /dev/null +++ b/lib/audit-logs/src/database/context.rs @@ -0,0 +1,35 @@ +use si_data_pg::{PgPool, PgPoolError}; +use telemetry::prelude::*; +use thiserror::Error; + +use super::AuditDatabaseConfig; + +#[allow(missing_docs)] +#[derive(Error, Debug)] +pub enum AuditDatabaseContextError { + #[error("pg pool error: {0}")] + PgPool(#[from] PgPoolError), +} + +type Result = std::result::Result; + +/// The context used for communicating with and setting up the audit database. +#[derive(Debug, Clone)] +pub struct AuditDatabaseContext { + pg_pool: PgPool, +} + +impl AuditDatabaseContext { + /// Creates an [`AuditDatabaseContext`] from an [`AuditDatabaseConfig`]. + #[instrument(level = "info", name = "audit.context.from_config", skip_all)] + pub async fn from_config(config: &AuditDatabaseConfig) -> Result { + Ok(Self { + pg_pool: PgPool::new(&config.pg).await?, + }) + } + + /// Returns a reference to the [`PgPool`]. + pub fn pg_pool(&self) -> &PgPool { + &self.pg_pool + } +} diff --git a/lib/audit-logs/src/pg/migrate.rs b/lib/audit-logs/src/database/migrate.rs similarity index 95% rename from lib/audit-logs/src/pg/migrate.rs rename to lib/audit-logs/src/database/migrate.rs index fe44531c01..6ae894a02f 100644 --- a/lib/audit-logs/src/pg/migrate.rs +++ b/lib/audit-logs/src/database/migrate.rs @@ -18,7 +18,7 @@ type Result = std::result::Result; /// Performs migrations for the audit database. #[instrument(level = "info", name = "audit.init.migrate", skip_all)] pub async fn migrate(context: &AuditDatabaseContext) -> Result<()> { - migrate_inner(&context.pg_pool).await + migrate_inner(context.pg_pool()).await } #[instrument(level = "info", name = "audit.init.migrate.inner", skip_all)] diff --git a/lib/audit-logs/src/lib.rs b/lib/audit-logs/src/lib.rs index fb0d7532fd..a68b326556 100644 --- a/lib/audit-logs/src/lib.rs +++ b/lib/audit-logs/src/lib.rs @@ -25,170 +25,8 @@ while_true )] -pub mod pg; +pub mod database; mod stream; -use pg::AuditDatabaseContext; -use serde::Deserialize; -use serde::Serialize; -use si_data_pg::PgError; -use si_data_pg::PgPoolError; -use si_events::Actor; -use si_events::ChangeSetId; -use si_events::ChangeSetStatus; -use si_events::UserPk; -use si_events::WorkspacePk; -use strum::Display; -use strum::EnumDiscriminants; -use telemetry::prelude::*; -use thiserror::Error; - pub use stream::AuditLogsStream; pub use stream::AuditLogsStreamError; - -#[allow(missing_docs)] -#[derive(Debug, Error)] -pub enum AuditLogError { - #[error("pg error: {0}")] - Pg(#[from] PgError), - #[error("pg pool error: {0}")] - PgPool(#[from] PgPoolError), - #[error("serde json error: {0}")] - SerdeJson(#[from] serde_json::Error), -} - -type Result = std::result::Result; - -// FIXME(nick): delete this once accessor patterns are in place. -// impl TryFrom for AuditLogRow { -// type Error = AuditLogError; -// -// fn try_from(value: PgRow) -> std::result::Result { -// Ok(Self { -// actor: todo!(), -// kind: AuditLogKind::CreateChangeSet, -// entity_name: todo!(), -// timestamp: todo!(), -// change_set_id: todo!(), -// }) -// let status_string: String = value.try_get("status")?; -// let status = ChangeSetStatus::try_from(status_string.as_str())?; -// Ok(Self { -// id: value.try_get("id")?, -// created_at: value.try_get("created_at")?, -// updated_at: value.try_get("updated_at")?, -// name: value.try_get("name")?, -// status, -// base_change_set_id: value.try_get("base_change_set_id")?, -// workspace_snapshot_address: value.try_get("workspace_snapshot_address")?, -// workspace_id: value.try_get("workspace_id")?, -// merge_requested_by_user_id: value.try_get("merge_requested_by_user_id")?, -// merge_requested_at: value.try_get("merge_requested_at")?, -// reviewed_by_user_id: value.try_get("reviewed_by_user_id")?, -// reviewed_at: value.try_get("reviewed_at")?, -// }) -// } -// } - -#[allow(missing_docs)] -#[remain::sorted] -#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq, Display, EnumDiscriminants)] -pub enum AuditLogKind { - #[allow(missing_docs)] - AbandonChangeSet { from_status: ChangeSetStatus }, - #[allow(missing_docs)] - CreateChangeSet, -} - -#[allow(missing_docs)] -#[remain::sorted] -#[derive(Debug, Serialize, Deserialize, EnumDiscriminants)] -#[serde(untagged, rename_all = "camelCase")] -pub enum AuditLogMetadata { - #[allow(missing_docs)] - #[serde(rename_all = "camelCase")] - AbandonChangeSet { from_status: ChangeSetStatus }, - #[allow(missing_docs)] - #[serde(rename_all = "camelCase")] - CreateChangeSet, -} - -impl From for AuditLogMetadata { - fn from(value: AuditLogKind) -> Self { - match value { - AuditLogKind::AbandonChangeSet { from_status } => { - Self::AbandonChangeSet { from_status } - } - AuditLogKind::CreateChangeSet => Self::CreateChangeSet, - } - } -} - -#[allow(clippy::too_many_arguments, missing_docs)] -#[instrument( - name = "audit_log.insert", - level = "debug", - skip_all, - fields( - si.workspace.id = %workspace_id, - ), -)] -pub async fn insert( - context: &AuditDatabaseContext, - workspace_id: WorkspacePk, - kind: AuditLogKind, - timestamp: String, - title: String, - change_set_id: Option, - actor: Actor, - entity_name: Option, - entity_type: Option, -) -> Result<()> { - let kind_as_string = kind.to_string(); - let user_id: Option = match actor { - Actor::System => None, - Actor::User(user_id) => Some(user_id), - }; - let serialized_metadata = serde_json::to_value(AuditLogMetadata::from(kind))?; - - let _ = context - .pg_pool() - .get() - .await? - .query_one( - "INSERT INTO audit_logs ( - workspace_id, - kind, - timestamp, - title, - change_set_id, - user_id, - entity_name, - entity_type, - metadata - ) VALUES ( - $1, - $2, - $3, - $4, - $5, - $6, - $7, - $8, - $9 - )", - &[ - &workspace_id, - &kind_as_string, - ×tamp, - &title, - &change_set_id, - &user_id, - &entity_name, - &entity_type, - &serialized_metadata, - ], - ) - .await?; - Ok(()) -} diff --git a/lib/audit-logs/src/pg.rs b/lib/audit-logs/src/pg.rs deleted file mode 100644 index a4fbaf45ed..0000000000 --- a/lib/audit-logs/src/pg.rs +++ /dev/null @@ -1,66 +0,0 @@ -//! Contains functionality for setting up and communicating with the audit database. - -use serde::{Deserialize, Serialize}; -use si_data_pg::{PgPool, PgPoolConfig, PgPoolError}; -use telemetry::prelude::*; -use thiserror::Error; - -mod migrate; - -pub use migrate::{migrate, AuditDatabaseMigrationError}; - -/// The name of the audit database. -pub const DBNAME: &str = "si_audit"; -const APPLICATION_NAME: &str = "si-audit"; - -#[allow(missing_docs)] -#[derive(Error, Debug)] -pub enum AuditDatabaseContextError { - #[error("pg pool error: {0}")] - PgPool(#[from] PgPoolError), -} - -type Result = std::result::Result; - -/// The context used for communicating with and setting up the audit database. -#[allow(missing_debug_implementations)] -#[derive(Clone)] -pub struct AuditDatabaseContext { - pg_pool: PgPool, -} - -impl AuditDatabaseContext { - /// Creates an [`AuditDatabaseContext`] from an [`AuditDatabaseConfig`]. - #[instrument(level = "info", name = "audit.context.from_config", skip_all)] - pub async fn from_config(config: &AuditDatabaseConfig) -> Result { - Ok(Self { - pg_pool: PgPool::new(&config.pg).await?, - }) - } - - /// Returns a reference to the [`PgPool`]. - pub fn pg_pool(&self) -> &PgPool { - &self.pg_pool - } -} - -/// The configuration used for communicating with and setting up the audit database. -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct AuditDatabaseConfig { - /// The configuration for the PostgreSQL pool. - /// - /// _Note:_ this is called "pg" for ease of use with layered load configuration files. - pub pg: PgPoolConfig, -} - -impl Default for AuditDatabaseConfig { - fn default() -> Self { - Self { - pg: PgPoolConfig { - dbname: DBNAME.into(), - application_name: APPLICATION_NAME.into(), - ..Default::default() - }, - } - } -} diff --git a/lib/audit-logs/src/stream.rs b/lib/audit-logs/src/stream.rs index 03969d77a9..0374b4d944 100644 --- a/lib/audit-logs/src/stream.rs +++ b/lib/audit-logs/src/stream.rs @@ -76,6 +76,11 @@ impl AuditLogsStream { Subject::from(self.prefixed_subject(SUBJECT_PREFIX, &format!("{workspace_id}.>"))) } + /// Returns the subject for consuming [`AuditLogs`](AuditLog) for all workspaces. + pub fn consuming_subject_for_all_workspaces(&self) -> Subject { + Subject::from(self.prefixed_subject(SUBJECT_PREFIX, ">")) + } + /// Returns the subject for publishing and consuming [`AuditLogs`](AuditLog) for a given change set. pub fn subject_for_change_set( &self, diff --git a/lib/dal/src/audit_logging.rs b/lib/dal/src/audit_logging.rs index 7dfea2f46b..122a66ed4a 100644 --- a/lib/dal/src/audit_logging.rs +++ b/lib/dal/src/audit_logging.rs @@ -17,9 +17,9 @@ use si_data_nats::async_nats::jetstream::context::RequestErrorKind; use si_data_nats::async_nats::jetstream::stream::ConsumerErrorKind; use si_events::audit_log::AuditLog; use si_events::audit_log::AuditLogKind; +use si_events::audit_log::AuditLogMetadata; use si_events::Actor; use si_frontend_types::AuditLog as FrontendAuditLog; -use si_frontend_types::AuditLogDeserializedMetadata as FrontendAuditLogDeserializedMetadata; use telemetry::prelude::*; use thiserror::Error; use tokio_util::task::TaskTracker; @@ -383,8 +383,7 @@ impl FrontendAuditLogAssembler { // If we are working on HEAD, we show all audit logs without a change set, all // audit logs on HEAD, and all audit logs for abandoned or applied change sets. // - // If we are not working on HEAD, we only show audit logs for our own change set as - // well as certain audit logs that are relevant on HEAD, like "CreateChangeSet". + // If we are not working on HEAD, we only show audit logs for our own change set. if self.working_on_head { if let Some((change_set_id, _, change_set_status)) = change_set_metadata { if change_set_id != self.change_set_id { @@ -418,8 +417,9 @@ impl FrontendAuditLogAssembler { self.find_user_metadata(ctx, inner.actor).await?; let kind = inner.kind.to_string(); - let deserialized_metadata = FrontendAuditLogDeserializedMetadata::from(inner.kind); - let (title, entity_type) = deserialized_metadata.title_and_entity_type(); + let metadata = AuditLogMetadata::from(inner.kind); + let (title, entity_type) = metadata.title_and_entity_type(); + let entity_type = entity_type.unwrap_or(" "); let (change_set_id, change_set_name) = match change_set_metadata { Some((change_set_id, change_set_name, _)) => { (Some(change_set_id), Some(change_set_name)) @@ -438,7 +438,7 @@ impl FrontendAuditLogAssembler { timestamp: inner.timestamp, change_set_id, change_set_name, - metadata: serde_json::to_value(deserialized_metadata)?, + metadata: serde_json::to_value(metadata)?, })) } } diff --git a/lib/forklift-server/BUCK b/lib/forklift-server/BUCK index b92b2830cd..8c9b699235 100644 --- a/lib/forklift-server/BUCK +++ b/lib/forklift-server/BUCK @@ -3,18 +3,24 @@ load("@prelude-si//:macros.bzl", "rust_library") rust_library( name = "forklift-server", deps = [ + "//lib/audit-logs:audit-logs", "//lib/billing-events:billing-events", + "//lib/buck2-resources:buck2-resources", "//lib/data-warehouse-stream-client:data-warehouse-stream-client", "//lib/naxum:naxum", "//lib/si-data-nats:si-data-nats", + "//lib/si-events-rs:si-events", "//lib/si-settings:si-settings", + "//lib/si-std:si-std", "//lib/telemetry-nats-rs:telemetry-nats", "//lib/telemetry-rs:telemetry", "//third-party/rust:derive_builder", + "//third-party/rust:futures", "//third-party/rust:remain", "//third-party/rust:serde", "//third-party/rust:serde_json", "//third-party/rust:thiserror", + "//third-party/rust:tokio", "//third-party/rust:tokio-util", "//third-party/rust:ulid", ], diff --git a/lib/forklift-server/Cargo.toml b/lib/forklift-server/Cargo.toml index 69b511372a..15bf19d4b6 100644 --- a/lib/forklift-server/Cargo.toml +++ b/lib/forklift-server/Cargo.toml @@ -9,17 +9,24 @@ rust-version.workspace = true publish.workspace = true [dependencies] +audit-logs = { path = "../../lib/audit-logs" } billing-events = { path = "../../lib/billing-events" } +buck2-resources = { path = "../../lib/buck2-resources" } data-warehouse-stream-client = { path = "../../lib/data-warehouse-stream-client" } -derive_builder = { workspace = true } naxum = { path = "../../lib/naxum" } -remain = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } si-data-nats = { path = "../../lib/si-data-nats" } +si-events = { path = "../../lib/si-events-rs" } si-settings = { path = "../../lib/si-settings" } +si-std = { path = "../../lib/si-std" } telemetry = { path = "../../lib/telemetry-rs" } telemetry-nats = { path = "../../lib/telemetry-nats-rs" } + +derive_builder = { workspace = true } +futures = { workspace = true } +remain = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } thiserror = { workspace = true } +tokio = { workspace = true } tokio-util = { workspace = true } ulid = { workspace = true } diff --git a/lib/forklift-server/src/config.rs b/lib/forklift-server/src/config.rs index c3bb25258a..5545a0f350 100644 --- a/lib/forklift-server/src/config.rs +++ b/lib/forklift-server/src/config.rs @@ -1,6 +1,11 @@ +use std::{env, path::Path}; + +use audit_logs::database::AuditDatabaseConfig; +use buck2_resources::Buck2Resources; use derive_builder::Builder; use serde::{Deserialize, Serialize}; use si_data_nats::NatsConfig; +use si_std::CanonicalFileError; use telemetry::prelude::*; use thiserror::Error; use ulid::Ulid; @@ -15,12 +20,22 @@ const DEFAULT_CONCURRENCY_LIMIT: usize = 1000; #[remain::sorted] #[derive(Debug, Error)] pub enum ConfigError { + #[error("canonical file error: {0}")] + CanonicalFile(#[from] CanonicalFileError), #[error("config builder error: {0}")] ConfigBuilder(#[from] ConfigBuilderError), + #[error("error configuring for development")] + Development(#[source] Box), #[error("si settings error: {0}")] SiSettings(#[from] si_settings::SettingsError), } +impl ConfigError { + fn development(err: impl std::error::Error + 'static + Sync + Send) -> Self { + Self::Development(Box::new(err)) + } +} + type Result = std::result::Result; /// The config for the forklift server. @@ -37,6 +52,12 @@ pub struct Config { #[builder(default = "default_data_warehouse_stream_name()")] data_warehouse_stream_name: Option, + + #[builder(default)] + enable_audit_logs_app: bool, + + #[builder(default)] + audit: AuditDatabaseConfig, } impl StandardConfig for Config { @@ -69,6 +90,16 @@ impl Config { pub fn data_warehouse_stream_name(&self) -> Option<&str> { self.data_warehouse_stream_name.as_deref() } + + /// Indicates whether or not the audit logs app will be enabled. + pub fn enable_audit_logs_app(&self) -> bool { + self.enable_audit_logs_app + } + + /// Gets a reference to the audit database config. + pub fn audit(&self) -> &AuditDatabaseConfig { + &self.audit + } } #[allow(missing_docs)] @@ -82,6 +113,10 @@ pub struct ConfigFile { pub nats: NatsConfig, #[serde(default = "default_data_warehouse_stream_name")] pub data_warehouse_stream_name: Option, + #[serde(default)] + pub enable_audit_logs_app: bool, + #[serde(default)] + pub audit: AuditDatabaseConfig, } impl Default for ConfigFile { @@ -91,6 +126,8 @@ impl Default for ConfigFile { concurrency_limit: default_concurrency_limit(), nats: Default::default(), data_warehouse_stream_name: default_data_warehouse_stream_name(), + enable_audit_logs_app: Default::default(), + audit: Default::default(), } } } @@ -102,13 +139,17 @@ impl StandardConfigFile for ConfigFile { impl TryFrom for Config { type Error = ConfigError; - fn try_from(value: ConfigFile) -> Result { - let mut config = Config::builder(); - config.instance_id(value.instance_id); - config.concurrency_limit(value.concurrency_limit); - config.nats(value.nats); - config.data_warehouse_stream_name(value.data_warehouse_stream_name); - config.build().map_err(Into::into) + fn try_from(mut value: ConfigFile) -> Result { + detect_and_configure_development(&mut value)?; + + Ok(Config { + instance_id: value.instance_id, + concurrency_limit: value.concurrency_limit, + nats: value.nats, + data_warehouse_stream_name: value.data_warehouse_stream_name, + enable_audit_logs_app: value.enable_audit_logs_app, + audit: value.audit, + }) } } @@ -123,3 +164,51 @@ fn default_concurrency_limit() -> usize { fn default_data_warehouse_stream_name() -> Option { None } + +#[allow(clippy::disallowed_methods)] // Used to determine if running in development +fn detect_and_configure_development(config: &mut ConfigFile) -> Result<()> { + if env::var("BUCK_RUN_BUILD_ID").is_ok() || env::var("BUCK_BUILD_ID").is_ok() { + buck2_development(config) + } else if let Ok(dir) = env::var("CARGO_MANIFEST_DIR") { + cargo_development(dir, config) + } else { + Ok(()) + } +} + +fn buck2_development(config: &mut ConfigFile) -> Result<()> { + let resources = Buck2Resources::read().map_err(ConfigError::development)?; + + let postgres_cert = resources + .get_ends_with("dev.postgres.root.crt") + .map_err(ConfigError::development)? + .to_string_lossy() + .to_string(); + + warn!( + postgres_cert = postgres_cert.as_str(), + "detected development run", + ); + + config.audit.pg.certificate_path = Some(postgres_cert.clone().try_into()?); + config.audit.pg.dbname = audit_logs::database::DBNAME.to_string(); + + Ok(()) +} + +fn cargo_development(dir: String, config: &mut ConfigFile) -> Result<()> { + let postgres_cert = Path::new(&dir) + .join("../../config/keys/dev.postgres.root.crt") + .to_string_lossy() + .to_string(); + + warn!( + postgres_cert = postgres_cert.as_str(), + "detected development run", + ); + + config.audit.pg.certificate_path = Some(postgres_cert.clone().try_into()?); + config.audit.pg.dbname = audit_logs::database::DBNAME.to_string(); + + Ok(()) +} diff --git a/lib/forklift-server/src/lib.rs b/lib/forklift-server/src/lib.rs index 1fdb18898a..81192f9646 100644 --- a/lib/forklift-server/src/lib.rs +++ b/lib/forklift-server/src/lib.rs @@ -25,9 +25,7 @@ while_true )] -mod app_state; mod config; -mod handlers; mod server; pub use config::Config; diff --git a/lib/forklift-server/src/server.rs b/lib/forklift-server/src/server.rs index 90c8b14081..e980b3e6c6 100644 --- a/lib/forklift-server/src/server.rs +++ b/lib/forklift-server/src/server.rs @@ -1,61 +1,32 @@ -use std::{ - fmt, - future::{Future, IntoFuture as _}, - io, - sync::Arc, -}; +use std::{fmt, future::Future, io, sync::Arc}; -use billing_events::{BillingEventsError, BillingEventsWorkQueue}; -use data_warehouse_stream_client::DataWarehouseStreamClient; -use naxum::{ - extract::MatchedSubject, - handler::Handler as _, - middleware::{ - ack::AckLayer, - matched_subject::{ForSubject, MatchedSubjectLayer}, - trace::TraceLayer, - }, - response::{IntoResponse, Response}, - MessageHead, ServiceBuilder, ServiceExt as _, TowerServiceExt as _, -}; -use si_data_nats::{async_nats, jetstream, NatsClient}; -use si_data_nats::{ - async_nats::{ - error::Error as AsyncNatsError, - jetstream::{ - consumer::{pull::Stream, StreamErrorKind}, - stream::ConsumerErrorKind, - }, - }, - ConnectionMetadata, -}; +use si_data_nats::{jetstream, NatsClient}; use telemetry::prelude::*; use thiserror::Error; +use tokio::task::JoinError; use tokio_util::sync::CancellationToken; -use crate::{ - app_state::{AppState, NoopAppState}, - config::Config, - handlers, -}; +use crate::config::Config; -const CONSUMER_NAME: &str = "forklift-server"; +mod app; + +pub(crate) use app::AppSetupError; + +const DURABLE_CONSUMER_NAME: &str = "forklift-server"; #[derive(Debug, Error)] pub enum ServerError { - #[error("async nats consumer error: {0}")] - AsyncNatsConsumer(#[from] AsyncNatsError), - #[error("async nats stream error: {0}")] - AsyncNatsStream(#[from] AsyncNatsError), - #[error("billing events error: {0}")] - BillingEvents(#[from] BillingEventsError), + #[error("app setup error: {0}")] + AppSetup(#[from] AppSetupError), + #[error("join error: {0}")] + Join(#[from] JoinError), #[error("naxum error: {0}")] Naxum(#[source] io::Error), #[error("si data nats error: {0}")] SiDataNats(#[from] si_data_nats::Error), } -type ServerResult = Result; +type Result = std::result::Result; /// Server metadata, used with telemetry. #[derive(Clone, Debug)] @@ -83,8 +54,10 @@ impl ServerMetadata { /// The forklift server instance with its inner naxum task. pub struct Server { metadata: Arc, - inner: Box> + Unpin + Send>, shutdown_token: CancellationToken, + // TODO(nick): remove option once this is working. + inner_audit_logs: Option> + Unpin + Send>>, + inner_billing_events: Box> + Unpin + Send>, } impl fmt::Debug for Server { @@ -99,93 +72,50 @@ impl fmt::Debug for Server { impl Server { /// Creates a forklift server with a running naxum task. #[instrument(name = "forklift.init.from_config", level = "info", skip_all)] - pub async fn from_config(config: Config, token: CancellationToken) -> ServerResult { + pub async fn from_config(config: Config, token: CancellationToken) -> Result { let metadata = Arc::new(ServerMetadata { instance_id: config.instance_id().into(), job_invoked_provider: "si", }); let nats = Self::connect_to_nats(&config).await?; - let connection_metadata = nats.metadata_clone(); - - let incoming = { - let queue = BillingEventsWorkQueue::get_or_create(jetstream::new(nats)).await?; - let consumer_subject = queue.workspace_update_subject("*"); - queue - .stream() - .await? - .create_consumer(Self::incoming_consumer_config(consumer_subject)) - .await? - .messages() - .await? - }; - - let inner = match config.data_warehouse_stream_name() { - Some(stream_name) => { - info!(%stream_name, "creating billing events app in data warehouse stream delivery mode..."); - let client = DataWarehouseStreamClient::new(stream_name).await; - let state = AppState::new(client); - Self::build_app( - state, - connection_metadata, - incoming, + let jetstream_context = jetstream::new(nats); + + let inner_audit_logs = if config.enable_audit_logs_app() { + Some( + app::audit_logs( + jetstream_context.clone(), + DURABLE_CONSUMER_NAME.to_string(), + connection_metadata.clone(), config.concurrency_limit(), + config.audit(), token.clone(), - )? - } - None => { - info!("creating billing events app in no-op mode..."); - let state = NoopAppState::new(); - Self::build_noop_app( - state, - connection_metadata, - incoming, - config.concurrency_limit(), - token.clone(), - )? - } + ) + .await?, + ) + } else { + None }; + let inner_billing_events = app::billing_events( + jetstream_context, + DURABLE_CONSUMER_NAME.to_string(), + connection_metadata, + config.concurrency_limit(), + config.data_warehouse_stream_name(), + token.clone(), + ) + .await?; Ok(Self { metadata, - inner, + inner_audit_logs, + inner_billing_events, shutdown_token: token, }) } - fn build_app( - state: AppState, - connection_metadata: Arc, - incoming: Stream, - concurrency_limit: usize, - token: CancellationToken, - ) -> ServerResult> + Unpin + Send>> { - let app = ServiceBuilder::new() - .layer( - MatchedSubjectLayer::new().for_subject(ForkliftForSubject::with_prefix( - connection_metadata.subject_prefix(), - )), - ) - .layer( - TraceLayer::new() - .make_span_with( - telemetry_nats::NatsMakeSpan::builder(connection_metadata).build(), - ) - .on_response(telemetry_nats::NatsOnResponse::new()), - ) - .layer(AckLayer::new()) - .service(handlers::process_request.with_state(state)) - .map_response(Response::into_response); - - let inner = - naxum::serve_with_incoming_limit(incoming, app.into_make_service(), concurrency_limit) - .with_graceful_shutdown(naxum::wait_on_cancelled(token)); - - Ok(Box::new(inner.into_future())) - } - - /// Infallible wrapper around running the inner naxum task. + /// Infallible wrapper around running the inner naxum task(s). #[inline] pub async fn run(self) { if let Err(err) = self.try_run().await { @@ -193,104 +123,33 @@ impl Server { } } - /// Fallibly awaits the inner naxum task. - pub async fn try_run(self) -> ServerResult<()> { - self.inner.await.map_err(ServerError::Naxum)?; + /// Fallibly awaits the inner naxum task(s). + pub async fn try_run(self) -> Result<()> { + match self.inner_audit_logs { + Some(inner_audit_logs) => { + info!("running two apps: audit logs and billing events"); + let (inner_audit_logs_result, inner_billing_events_result) = futures::join!( + tokio::spawn(inner_audit_logs), + tokio::spawn(self.inner_billing_events) + ); + inner_audit_logs_result?.map_err(ServerError::Naxum)?; + inner_billing_events_result?.map_err(ServerError::Naxum)?; + } + None => { + info!("running one app: billing events"); + self.inner_billing_events + .await + .map_err(ServerError::Naxum)?; + } + } info!("forklift main loop shutdown complete"); Ok(()) } - fn build_noop_app( - state: NoopAppState, - connection_metadata: Arc, - incoming: Stream, - concurrency_limit: usize, - token: CancellationToken, - ) -> ServerResult> + Unpin + Send>> { - let app = ServiceBuilder::new() - .layer( - MatchedSubjectLayer::new().for_subject(ForkliftForSubject::with_prefix( - connection_metadata.subject_prefix(), - )), - ) - .layer( - TraceLayer::new() - .make_span_with( - telemetry_nats::NatsMakeSpan::builder(connection_metadata).build(), - ) - .on_response(telemetry_nats::NatsOnResponse::new()), - ) - .layer(AckLayer::new()) - .service(handlers::process_request_noop.with_state(state)) - .map_response(Response::into_response); - - let inner = - naxum::serve_with_incoming_limit(incoming, app.into_make_service(), concurrency_limit) - .with_graceful_shutdown(naxum::wait_on_cancelled(token)); - - Ok(Box::new(inner.into_future())) - } - #[instrument(name = "forklift.init.connect_to_nats", level = "info", skip_all)] - async fn connect_to_nats(config: &Config) -> ServerResult { + async fn connect_to_nats(config: &Config) -> Result { let client = NatsClient::new(config.nats()).await?; debug!("successfully connected nats client"); Ok(client) } - - #[inline] - fn incoming_consumer_config( - subject: impl Into, - ) -> async_nats::jetstream::consumer::pull::Config { - async_nats::jetstream::consumer::pull::Config { - durable_name: Some(CONSUMER_NAME.to_owned()), - filter_subject: subject.into(), - ..Default::default() - } - } -} - -#[derive(Clone, Debug)] -struct ForkliftForSubject { - prefix: Option<()>, -} - -impl ForkliftForSubject { - fn with_prefix(prefix: Option<&str>) -> Self { - Self { - prefix: prefix.map(|_p| ()), - } - } -} - -impl ForSubject for ForkliftForSubject -where - R: MessageHead, -{ - fn call(&mut self, req: &mut naxum::Message) { - let mut parts = req.subject().split('.'); - - match self.prefix { - Some(_) => { - if let (Some(prefix), Some(p1), Some(p2), Some(_workspace_id), None) = ( - parts.next(), - parts.next(), - parts.next(), - parts.next(), - parts.next(), - ) { - let matched = format!("{prefix}.{p1}.{p2}.:workspace_id"); - req.extensions_mut().insert(MatchedSubject::from(matched)); - }; - } - None => { - if let (Some(p1), Some(p2), Some(_workspace_id), None) = - (parts.next(), parts.next(), parts.next(), parts.next()) - { - let matched = format!("{p1}.{p2}.:workspace_id"); - req.extensions_mut().insert(MatchedSubject::from(matched)); - }; - } - } - } } diff --git a/lib/forklift-server/src/server/app.rs b/lib/forklift-server/src/server/app.rs new file mode 100644 index 0000000000..dc55faed18 --- /dev/null +++ b/lib/forklift-server/src/server/app.rs @@ -0,0 +1,73 @@ +use std::{future::Future, io, sync::Arc}; + +use ::audit_logs::database::AuditDatabaseConfig; +use si_data_nats::{jetstream::Context, ConnectionMetadata}; +use telemetry::prelude::*; +use thiserror::Error; +use tokio_util::sync::CancellationToken; + +mod audit_logs; +mod billing_events; + +pub(crate) use audit_logs::AuditLogsAppSetupError; +pub(crate) use billing_events::BillingEventsAppSetupError; + +#[derive(Debug, Error)] +pub enum AppSetupError { + #[error("audit logs app setup: {0}")] + AuditLogsAppSetup(#[from] AuditLogsAppSetupError), + #[error("billing events app setup: {0}")] + BillingEventsAppSetup(#[from] BillingEventsAppSetupError), +} + +type Result = std::result::Result; + +#[instrument( + name = "forklift.init.app.audit_logs", + level = "info", + skip_all, + fields(durable_consumer_name) +)] +pub(crate) async fn audit_logs( + jetstream_context: Context, + durable_consumer_name: String, + connection_metadata: Arc, + concurrency_limit: usize, + audit_database_config: &AuditDatabaseConfig, + token: CancellationToken, +) -> Result> + Unpin + Send>> { + Ok(audit_logs::build_and_run( + jetstream_context, + durable_consumer_name, + connection_metadata, + concurrency_limit, + audit_database_config, + token, + ) + .await?) +} + +#[instrument( + name = "forklift.init.app.billing_events", + level = "info", + skip_all, + fields(durable_consumer_name) +)] +pub(crate) async fn billing_events( + jetstream_context: Context, + durable_consumer_name: String, + connection_metadata: Arc, + concurrency_limit: usize, + data_warehouse_stream_name: Option<&str>, + token: CancellationToken, +) -> Result> + Unpin + Send>> { + Ok(billing_events::build_and_run( + jetstream_context, + durable_consumer_name, + connection_metadata, + concurrency_limit, + data_warehouse_stream_name, + token, + ) + .await?) +} diff --git a/lib/forklift-server/src/server/app/audit_logs.rs b/lib/forklift-server/src/server/app/audit_logs.rs new file mode 100644 index 0000000000..92aeb4b00f --- /dev/null +++ b/lib/forklift-server/src/server/app/audit_logs.rs @@ -0,0 +1,153 @@ +use std::{ + future::{Future, IntoFuture as _}, + io, + sync::Arc, +}; + +use app_state::AppState; +use audit_logs::{ + database::{AuditDatabaseConfig, AuditDatabaseContext, AuditDatabaseContextError}, + AuditLogsStream, AuditLogsStreamError, +}; +use naxum::{ + extract::MatchedSubject, + handler::Handler as _, + middleware::{ + ack::AckLayer, + matched_subject::{ForSubject, MatchedSubjectLayer}, + trace::TraceLayer, + }, + response::{IntoResponse, Response}, + MessageHead, ServiceBuilder, ServiceExt as _, TowerServiceExt as _, +}; +use si_data_nats::{ + async_nats::{ + self, + error::Error as AsyncNatsError, + jetstream::{consumer::StreamErrorKind, stream::ConsumerErrorKind}, + }, + jetstream::Context, + ConnectionMetadata, +}; +use telemetry::prelude::*; +use thiserror::Error; +use tokio_util::sync::CancellationToken; + +mod app_state; +mod handlers; + +#[derive(Debug, Error)] +pub enum AuditLogsAppSetupError { + #[error("async nats consumer error: {0}")] + AsyncNatsConsumer(#[from] AsyncNatsError), + #[error("async nats stream error: {0}")] + AsyncNatsStream(#[from] AsyncNatsError), + #[error("audit database context error: {0}")] + AuditDatabaseContext(#[from] AuditDatabaseContextError), + #[error("audit logs stream error: {0}")] + AuditLogsStream(#[from] AuditLogsStreamError), +} + +type Result = std::result::Result; + +/// Builds a naxum app for audit logs. Note that despite having an ack layer, all audit logs remain on the stream when +/// processed. This is because the audit logs stream is limits-based and is not a work queue. Sneaky! +#[instrument( + name = "forklift.init.app.audit_logs.build_and_run", + level = "debug", + skip_all +)] +pub(crate) async fn build_and_run( + jetstream_context: Context, + durable_consumer_name: String, + connection_metadata: Arc, + concurrency_limit: usize, + audit_database_config: &AuditDatabaseConfig, + token: CancellationToken, +) -> Result> + Unpin + Send>> { + let incoming = { + let stream = AuditLogsStream::get_or_create(jetstream_context).await?; + let consumer_subject = stream.consuming_subject_for_all_workspaces(); + stream + .stream() + .await? + .create_consumer(async_nats::jetstream::consumer::pull::Config { + durable_name: Some(durable_consumer_name), + filter_subject: consumer_subject.into_string(), + ..Default::default() + }) + .await? + .messages() + .await? + }; + + let context = AuditDatabaseContext::from_config(audit_database_config).await?; + let state = AppState::new(context, connection_metadata.subject_prefix().is_some()); + + // NOTE(nick,fletcher): the "NatsMakeSpan" builder defaults to "info" level logging. Bump it down, if needed. + let app = ServiceBuilder::new() + .layer( + MatchedSubjectLayer::new().for_subject(ForkliftAuditLogsForSubject::with_prefix( + connection_metadata.subject_prefix(), + )), + ) + .layer( + TraceLayer::new() + .make_span_with(telemetry_nats::NatsMakeSpan::builder(connection_metadata).build()) + .on_response(telemetry_nats::NatsOnResponse::new()), + ) + .layer(AckLayer::new()) + .service(handlers::default.with_state(state)) + .map_response(Response::into_response); + + let inner = + naxum::serve_with_incoming_limit(incoming, app.into_make_service(), concurrency_limit) + .with_graceful_shutdown(naxum::wait_on_cancelled(token)); + + Ok(Box::new(inner.into_future())) +} + +#[derive(Clone, Debug)] +struct ForkliftAuditLogsForSubject { + prefix: Option<()>, +} + +impl ForkliftAuditLogsForSubject { + fn with_prefix(prefix: Option<&str>) -> Self { + Self { + prefix: prefix.map(|_p| ()), + } + } +} + +impl ForSubject for ForkliftAuditLogsForSubject +where + R: MessageHead, +{ + fn call(&mut self, req: &mut naxum::Message) { + let mut parts = req.subject().split('.'); + + match self.prefix { + Some(_) => { + if let (Some(prefix), Some(p1), Some(p2), Some(_workspace_id), None) = ( + parts.next(), + parts.next(), + parts.next(), + parts.next(), + parts.next(), + ) { + let matched = format!("{prefix}.{p1}.{p2}.:workspace_id"); + req.extensions_mut().insert(MatchedSubject::from(matched)); + }; + } + None => { + if let (Some(p1), Some(p2), Some(_workspace_id), None) = + (parts.next(), parts.next(), parts.next(), parts.next()) + { + let matched = format!("{p1}.{p2}.:workspace_id"); + req.extensions_mut().insert(MatchedSubject::from(matched)); + }; + } + } + } +} diff --git a/lib/forklift-server/src/server/app/audit_logs/app_state.rs b/lib/forklift-server/src/server/app/audit_logs/app_state.rs new file mode 100644 index 0000000000..312b2e936f --- /dev/null +++ b/lib/forklift-server/src/server/app/audit_logs/app_state.rs @@ -0,0 +1,25 @@ +use audit_logs::database::AuditDatabaseContext; + +// NOTE(nick): we need an app state for all naxum apps at the time of writing, even if they are unused. +#[derive(Debug, Clone)] +pub(crate) struct AppState { + context: AuditDatabaseContext, + using_prefix: bool, +} + +impl AppState { + pub(crate) fn new(context: AuditDatabaseContext, using_prefix: bool) -> Self { + Self { + context, + using_prefix, + } + } + + pub(crate) fn context(&self) -> &AuditDatabaseContext { + &self.context + } + + pub(crate) fn using_prefix(&self) -> bool { + self.using_prefix + } +} diff --git a/lib/forklift-server/src/server/app/audit_logs/handlers.rs b/lib/forklift-server/src/server/app/audit_logs/handlers.rs new file mode 100644 index 0000000000..5567196571 --- /dev/null +++ b/lib/forklift-server/src/server/app/audit_logs/handlers.rs @@ -0,0 +1,83 @@ +use std::str::FromStr; + +use audit_logs::database::AuditDatabaseError; +use naxum::{ + extract::State, + response::{IntoResponse, Response}, + Json, +}; +use si_data_nats::Subject; +use si_events::{audit_log::AuditLog, WorkspacePk}; +use telemetry::prelude::*; +use thiserror::Error; + +use super::app_state::AppState; + +#[remain::sorted] +#[derive(Debug, Error)] +pub(crate) enum HandlerError { + #[error("audit database error: {0}")] + AuditDatabase(#[from] AuditDatabaseError), + #[error("serde json error: {0}")] + SerdeJson(#[from] serde_json::Error), + #[error("ulid decode error: {0}")] + UlidDecode(#[from] ulid::DecodeError), + #[error("unexpected subject shape: {0}")] + UnexpectedSubjectShape(Subject), +} + +type Result = std::result::Result; + +impl IntoResponse for HandlerError { + fn into_response(self) -> Response { + error!(si.error.message = ?self, "failed to process message"); + Response::default_internal_server_error() + } +} + +pub(crate) async fn default( + State(state): State, + subject: Subject, + Json(audit_log): Json, +) -> Result<()> { + // Hitting an error when finding the workspace id should be impossible as we match the subject using middleware + // before we get here. + let workspace_id = find_workspace_id(subject, state.using_prefix())?; + + match audit_log { + AuditLog::V1(inner) => { + audit_logs::database::insert( + state.context(), + workspace_id, + inner.kind, + inner.timestamp, + inner.change_set_id, + inner.actor, + Some(inner.entity_name), + ) + .await?; + } + } + Ok(()) +} + +// NOTE(nick,fletcher): we may be able to remove this if we store the workspace id on the audit log object itself, and +// we have a plan for old messages. +fn find_workspace_id(subject: Subject, using_prefix: bool) -> Result { + let mut parts = subject.split('.'); + if using_prefix { + if let (Some(_prefix), Some(_p1), Some(_p2), Some(workspace_id)) = + (parts.next(), parts.next(), parts.next(), parts.next()) + { + Ok(WorkspacePk::from_str(workspace_id)?) + } else { + Err(HandlerError::UnexpectedSubjectShape(subject)) + } + } else if let (Some(_p1), Some(_p2), Some(workspace_id)) = + (parts.next(), parts.next(), parts.next()) + { + Ok(WorkspacePk::from_str(workspace_id)?) + } else { + Err(HandlerError::UnexpectedSubjectShape(subject)) + } +} diff --git a/lib/forklift-server/src/server/app/billing_events.rs b/lib/forklift-server/src/server/app/billing_events.rs new file mode 100644 index 0000000000..970164b2fa --- /dev/null +++ b/lib/forklift-server/src/server/app/billing_events.rs @@ -0,0 +1,217 @@ +use std::{ + future::{Future, IntoFuture as _}, + io, + sync::Arc, +}; + +use app_state::{AppState, NoopAppState}; +use billing_events::{BillingEventsError, BillingEventsWorkQueue}; +use data_warehouse_stream_client::DataWarehouseStreamClient; +use naxum::{ + extract::MatchedSubject, + handler::Handler as _, + middleware::{ + ack::AckLayer, + matched_subject::{ForSubject, MatchedSubjectLayer}, + trace::TraceLayer, + }, + response::{IntoResponse, Response}, + MessageHead, ServiceBuilder, ServiceExt as _, TowerServiceExt as _, +}; +use si_data_nats::{ + async_nats::{ + self, + error::Error as AsyncNatsError, + jetstream::{ + consumer::{pull::Stream, StreamErrorKind}, + stream::ConsumerErrorKind, + }, + }, + jetstream::Context, + ConnectionMetadata, +}; +use telemetry::prelude::*; +use thiserror::Error; +use tokio_util::sync::CancellationToken; + +mod app_state; +mod handlers; + +#[derive(Debug, Error)] +pub enum BillingEventsAppSetupError { + #[error("async nats consumer error: {0}")] + AsyncNatsConsumer(#[from] AsyncNatsError), + #[error("async nats stream error: {0}")] + AsyncNatsStream(#[from] AsyncNatsError), + #[error("billing events error: {0}")] + BillingEvents(#[from] BillingEventsError), +} + +type Result = std::result::Result; + +#[instrument( + name = "forklift.init.app.billing_events.build_and_run", + level = "debug", + skip_all +)] +pub(crate) async fn build_and_run( + jetstream_context: Context, + durable_consumer_name: String, + connection_metadata: Arc, + concurrency_limit: usize, + data_warehouse_stream_name: Option<&str>, + token: CancellationToken, +) -> Result> + Unpin + Send>> { + let incoming = { + let queue = BillingEventsWorkQueue::get_or_create(jetstream_context).await?; + let consumer_subject = queue.workspace_update_subject("*"); + queue + .stream() + .await? + .create_consumer(async_nats::jetstream::consumer::pull::Config { + durable_name: Some(durable_consumer_name), + filter_subject: consumer_subject, + ..Default::default() + }) + .await? + .messages() + .await? + }; + + let inner = match data_warehouse_stream_name { + Some(stream_name) => { + info!(%stream_name, "creating billing events app in data warehouse stream delivery mode..."); + let client = DataWarehouseStreamClient::new(stream_name).await; + let state = AppState::new(client); + build_app( + state, + connection_metadata, + incoming, + concurrency_limit, + token.clone(), + )? + } + None => { + info!("creating billing events app in no-op mode..."); + let state = NoopAppState::new(); + build_noop_app( + state, + connection_metadata, + incoming, + concurrency_limit, + token.clone(), + )? + } + }; + + Ok(inner) +} + +#[instrument( + name = "forklift.init.app.billing_events.build_and_run.build_app", + level = "debug", + skip_all +)] +fn build_app( + state: AppState, + connection_metadata: Arc, + incoming: Stream, + concurrency_limit: usize, + token: CancellationToken, +) -> Result> + Unpin + Send>> { + let app = ServiceBuilder::new() + .layer(MatchedSubjectLayer::new().for_subject( + ForkliftBillingEventsForSubject::with_prefix(connection_metadata.subject_prefix()), + )) + .layer( + TraceLayer::new() + .make_span_with(telemetry_nats::NatsMakeSpan::builder(connection_metadata).build()) + .on_response(telemetry_nats::NatsOnResponse::new()), + ) + .layer(AckLayer::new()) + .service(handlers::process_request.with_state(state)) + .map_response(Response::into_response); + + let inner = + naxum::serve_with_incoming_limit(incoming, app.into_make_service(), concurrency_limit) + .with_graceful_shutdown(naxum::wait_on_cancelled(token)); + + Ok(Box::new(inner.into_future())) +} + +#[instrument( + name = "forklift.init.app.billing_events.build_and_run.build_noop_app", + level = "debug", + skip_all +)] +fn build_noop_app( + state: NoopAppState, + connection_metadata: Arc, + incoming: Stream, + concurrency_limit: usize, + token: CancellationToken, +) -> Result> + Unpin + Send>> { + let app = ServiceBuilder::new() + .layer(MatchedSubjectLayer::new().for_subject( + ForkliftBillingEventsForSubject::with_prefix(connection_metadata.subject_prefix()), + )) + .layer( + TraceLayer::new() + .make_span_with(telemetry_nats::NatsMakeSpan::builder(connection_metadata).build()) + .on_response(telemetry_nats::NatsOnResponse::new()), + ) + .layer(AckLayer::new()) + .service(handlers::process_request_noop.with_state(state)) + .map_response(Response::into_response); + + let inner = + naxum::serve_with_incoming_limit(incoming, app.into_make_service(), concurrency_limit) + .with_graceful_shutdown(naxum::wait_on_cancelled(token)); + + Ok(Box::new(inner.into_future())) +} + +#[derive(Clone, Debug)] +struct ForkliftBillingEventsForSubject { + prefix: Option<()>, +} + +impl ForkliftBillingEventsForSubject { + fn with_prefix(prefix: Option<&str>) -> Self { + Self { + prefix: prefix.map(|_p| ()), + } + } +} + +impl ForSubject for ForkliftBillingEventsForSubject +where + R: MessageHead, +{ + fn call(&mut self, req: &mut naxum::Message) { + let mut parts = req.subject().split('.'); + + match self.prefix { + Some(_) => { + if let (Some(prefix), Some(p1), Some(p2), Some(_workspace_id), None) = ( + parts.next(), + parts.next(), + parts.next(), + parts.next(), + parts.next(), + ) { + let matched = format!("{prefix}.{p1}.{p2}.:workspace_id"); + req.extensions_mut().insert(MatchedSubject::from(matched)); + }; + } + None => { + if let (Some(p1), Some(p2), Some(_workspace_id), None) = + (parts.next(), parts.next(), parts.next(), parts.next()) + { + let matched = format!("{p1}.{p2}.:workspace_id"); + req.extensions_mut().insert(MatchedSubject::from(matched)); + }; + } + } + } +} diff --git a/lib/forklift-server/src/app_state.rs b/lib/forklift-server/src/server/app/billing_events/app_state.rs similarity index 100% rename from lib/forklift-server/src/app_state.rs rename to lib/forklift-server/src/server/app/billing_events/app_state.rs diff --git a/lib/forklift-server/src/handlers.rs b/lib/forklift-server/src/server/app/billing_events/handlers.rs similarity index 96% rename from lib/forklift-server/src/handlers.rs rename to lib/forklift-server/src/server/app/billing_events/handlers.rs index e262f1d548..98c9c26408 100644 --- a/lib/forklift-server/src/handlers.rs +++ b/lib/forklift-server/src/server/app/billing_events/handlers.rs @@ -9,7 +9,7 @@ use si_data_nats::Subject; use telemetry::prelude::*; use thiserror::Error; -use crate::{app_state::AppState, app_state::NoopAppState}; +use super::app_state::{AppState, NoopAppState}; #[remain::sorted] #[derive(Debug, Error)] diff --git a/lib/sdf-server/src/config.rs b/lib/sdf-server/src/config.rs index d13e679160..7d7502f951 100644 --- a/lib/sdf-server/src/config.rs +++ b/lib/sdf-server/src/config.rs @@ -1,5 +1,5 @@ use asset_sprayer::config::{AssetSprayerConfig, SIOpenAIConfig}; -use audit_logs::pg::AuditDatabaseConfig; +use audit_logs::database::AuditDatabaseConfig; use dal::jwt_key::JwtConfig; use serde_with::{DeserializeFromStr, SerializeDisplay}; use si_crypto::VeritechCryptoConfig; @@ -437,7 +437,7 @@ fn default_layer_db_config() -> LayerDbConfig { } #[allow(clippy::disallowed_methods)] // Used to determine if running in development -pub fn detect_and_configure_development(config: &mut ConfigFile) -> Result<()> { +fn detect_and_configure_development(config: &mut ConfigFile) -> Result<()> { if env::var("BUCK_RUN_BUILD_ID").is_ok() || env::var("BUCK_BUILD_ID").is_ok() { buck2_development(config) } else if let Ok(dir) = env::var("CARGO_MANIFEST_DIR") { @@ -512,7 +512,7 @@ fn buck2_development(config: &mut ConfigFile) -> Result<()> { config.layer_db_config.pg_pool_config.dbname = si_layer_cache::pg::DBNAME.to_string(); config.spicedb.enabled = true; config.audit.pg.certificate_path = Some(postgres_cert.clone().try_into()?); - config.audit.pg.dbname = audit_logs::pg::DBNAME.to_string(); + config.audit.pg.dbname = audit_logs::database::DBNAME.to_string(); Ok(()) } @@ -574,7 +574,7 @@ fn cargo_development(dir: String, config: &mut ConfigFile) -> Result<()> { config.pkgs_path = pkgs_path; config.spicedb.enabled = true; config.audit.pg.certificate_path = Some(postgres_cert.clone().try_into()?); - config.audit.pg.dbname = audit_logs::pg::DBNAME.to_string(); + config.audit.pg.dbname = audit_logs::database::DBNAME.to_string(); Ok(()) } diff --git a/lib/sdf-server/src/lib.rs b/lib/sdf-server/src/lib.rs index 3d74821975..f2b2c22cc6 100644 --- a/lib/sdf-server/src/lib.rs +++ b/lib/sdf-server/src/lib.rs @@ -14,7 +14,7 @@ use std::io; -use audit_logs::pg::AuditDatabaseContextError; +use audit_logs::database::AuditDatabaseContextError; use si_data_spicedb::SpiceDbError; use thiserror::Error; @@ -38,9 +38,8 @@ pub use self::{ app::AxumApp, app_state::ApplicationRuntimeMode, config::{ - detect_and_configure_development, Config, ConfigBuilder, ConfigError, ConfigFile, - IncomingStream, MigrationMode, StandardConfig, StandardConfigFile, WorkspacePermissions, - WorkspacePermissionsMode, + Config, ConfigBuilder, ConfigError, ConfigFile, IncomingStream, MigrationMode, + StandardConfig, StandardConfigFile, WorkspacePermissions, WorkspacePermissionsMode, }, migrations::Migrator, nats_multiplexer::CRDT_MULTIPLEXER_SUBJECT, diff --git a/lib/sdf-server/src/migrations.rs b/lib/sdf-server/src/migrations.rs index cf2945472f..58e7317210 100644 --- a/lib/sdf-server/src/migrations.rs +++ b/lib/sdf-server/src/migrations.rs @@ -1,6 +1,6 @@ use std::{future::IntoFuture as _, time::Duration}; -use audit_logs::pg::{ +use audit_logs::database::{ AuditDatabaseContext, AuditDatabaseContextError, AuditDatabaseMigrationError, }; use dal::{ @@ -146,7 +146,7 @@ impl Migrator { #[instrument(name = "sdf.migrator.migrate_audit_database", level = "info", skip_all)] async fn migrate_audit_database(&self) -> MigratorResult<()> { - audit_logs::pg::migrate(&self.audit_database_context) + audit_logs::database::migrate(&self.audit_database_context) .await .map_err(MigratorError::MigrateAuditDatabase) } diff --git a/lib/sdf-server/src/server.rs b/lib/sdf-server/src/server.rs index 2a37032557..a27cd610a3 100644 --- a/lib/sdf-server/src/server.rs +++ b/lib/sdf-server/src/server.rs @@ -1,7 +1,7 @@ use std::{fmt, future::IntoFuture as _, net::SocketAddr, path::PathBuf, sync::Arc}; use asset_sprayer::AssetSprayer; -use audit_logs::pg::AuditDatabaseContext; +use audit_logs::database::AuditDatabaseContext; use axum::{async_trait, routing::IntoMakeService, Router}; use dal::{JwtPublicSigningKey, ServicesContext}; use hyper::server::accept::Accept; diff --git a/lib/si-events-rs/src/audit_log.rs b/lib/si-events-rs/src/audit_log.rs index 0af7b2cca7..626d432934 100644 --- a/lib/si-events-rs/src/audit_log.rs +++ b/lib/si-events-rs/src/audit_log.rs @@ -1,13 +1,14 @@ use chrono::Utc; use serde::{Deserialize, Serialize}; -use v1::{AuditLogKindV1, AuditLogV1}; +use v1::{AuditLogKindV1, AuditLogMetadataV1, AuditLogV1}; use crate::{Actor, ChangeSetId}; mod v1; pub type AuditLogKind = AuditLogKindV1; +pub type AuditLogMetadata = AuditLogMetadataV1; // TODO(nick): switch to something like "naxum-api-types" crate to avoid sizing issues. #[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] diff --git a/lib/si-events-rs/src/audit_log/v1.rs b/lib/si-events-rs/src/audit_log/v1.rs index 938e1c3c0e..87cceafbc2 100644 --- a/lib/si-events-rs/src/audit_log/v1.rs +++ b/lib/si-events-rs/src/audit_log/v1.rs @@ -7,6 +7,10 @@ use crate::{ SecretId, WorkspacePk, }; +type MetadataDiscrim = AuditLogMetadataV1Discriminants; +type Kind = AuditLogKindV1; +type Metadata = AuditLogMetadataV1; + #[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)] pub struct AuditLogV1 { pub actor: Actor, @@ -190,3 +194,511 @@ pub enum AuditLogKindV1 { from_status: ChangeSetStatus, }, } + +/// This is an identical copy of latest [`AuditLogKind`], but uses "serde untagged" wrapper. This is used for inserting +/// and reading from the "metadata" column in the table as well as for additional columns. +/// +/// Reference: https://serde.rs/enum-representations.html#untagged +/// +/// _Note:_ there are multiple uses of renaming to camel case are related to this: https://github.com/serde-rs/serde/issues/1560 +#[remain::sorted] +#[derive(Debug, Deserialize, Serialize, EnumDiscriminants)] +#[serde(untagged, rename_all = "camelCase")] +pub enum AuditLogMetadataV1 { + #[serde(rename_all = "camelCase")] + AbandonChangeSet { from_status: ChangeSetStatus }, + #[serde(rename_all = "camelCase")] + AddAction { + prototype_id: ActionPrototypeId, + action_kind: ActionKind, + func_id: FuncId, + func_display_name: Option, + func_name: String, + }, + #[serde(rename_all = "camelCase")] + ApplyChangeSet, + #[serde(rename_all = "camelCase")] + ApproveChangeSetApply { from_status: ChangeSetStatus }, + #[serde(rename_all = "camelCase")] + CancelAction { + prototype_id: ActionPrototypeId, + action_kind: ActionKind, + func_id: FuncId, + func_display_name: Option, + func_name: String, + }, + + #[serde(rename_all = "camelCase")] + CreateChangeSet, + #[serde(rename_all = "camelCase")] + CreateComponent { + name: String, + component_id: ComponentId, + schema_variant_id: SchemaVariantId, + schema_variant_name: String, + }, + #[serde(rename_all = "camelCase")] + CreateSecret { name: String, secret_id: SecretId }, + #[serde(rename_all = "camelCase")] + DeleteComponent { + name: String, + component_id: ComponentId, + schema_variant_id: SchemaVariantId, + schema_variant_name: String, + }, + #[serde(rename_all = "camelCase")] + DeleteSecret { name: String, secret_id: SecretId }, + #[serde(rename_all = "camelCase")] + ExportWorkspace { + id: WorkspacePk, + name: String, + version: String, + }, + #[serde(rename_all = "camelCase")] + InstallWorkspace { + id: WorkspacePk, + name: String, + version: String, + }, + #[serde(rename_all = "camelCase")] + Login, + #[serde(rename_all = "camelCase")] + PutActionOnHold { + prototype_id: ActionPrototypeId, + action_kind: ActionKind, + func_id: FuncId, + func_display_name: Option, + func_name: String, + }, + #[serde(rename_all = "camelCase")] + RejectChangeSetApply { from_status: ChangeSetStatus }, + #[serde(rename_all = "camelCase")] + ReopenChangeSet { from_status: ChangeSetStatus }, + #[serde(rename_all = "camelCase")] + RequestChangeSetApproval { from_status: ChangeSetStatus }, + #[serde(rename_all = "camelCase")] + RetryAction { + prototype_id: ActionPrototypeId, + action_kind: ActionKind, + func_id: FuncId, + func_display_name: Option, + func_name: String, + }, + #[serde(rename_all = "camelCase")] + RunAction { + prototype_id: ActionPrototypeId, + action_kind: ActionKind, + func_id: FuncId, + func_display_name: Option, + func_name: String, + run_status: bool, + }, + #[serde(rename_all = "camelCase")] + UpdateDependentInputSocket { + input_socket_id: InputSocketId, + input_socket_name: String, + attribute_value_id: AttributeValueId, + input_attribute_value_ids: Vec, + func_id: FuncId, + func_display_name: Option, + func_name: String, + component_id: ComponentId, + component_name: String, + schema_variant_id: SchemaVariantId, + schema_variant_display_name: String, + before_value: Option, + after_value: Option, + }, + #[serde(rename_all = "camelCase")] + UpdateDependentOutputSocket { + output_socket_id: OutputSocketId, + output_socket_name: String, + attribute_value_id: AttributeValueId, + input_attribute_value_ids: Vec, + func_id: FuncId, + func_display_name: Option, + func_name: String, + component_id: ComponentId, + component_name: String, + schema_variant_id: SchemaVariantId, + schema_variant_display_name: String, + before_value: Option, + after_value: Option, + }, + #[serde(rename_all = "camelCase")] + UpdateDependentProperty { + prop_id: PropId, + prop_name: String, + attribute_value_id: AttributeValueId, + input_attribute_value_ids: Vec, + func_id: FuncId, + func_display_name: Option, + func_name: String, + component_id: ComponentId, + component_name: String, + schema_variant_id: SchemaVariantId, + schema_variant_display_name: String, + before_value: Option, + after_value: Option, + }, + #[serde(rename_all = "camelCase")] + UpdatePropertyEditorValue { + component_id: ComponentId, + component_name: String, + schema_variant_id: SchemaVariantId, + schema_variant_display_name: String, + prop_id: PropId, + prop_name: String, + attribute_value_id: AttributeValueId, + before_value: Option, + after_value: Option, + }, + #[serde(rename_all = "camelCase")] + UpdatePropertyEditorValueForSecret { + component_id: ComponentId, + component_name: String, + schema_variant_id: SchemaVariantId, + schema_variant_display_name: String, + prop_id: PropId, + prop_name: String, + attribute_value_id: AttributeValueId, + before_secret_name: Option, + before_secret_id: Option, + after_secret_name: Option, + after_secret_id: Option, + }, + #[serde(rename_all = "camelCase")] + UpdateSecret { name: String, secret_id: SecretId }, + #[serde(rename_all = "camelCase")] + UpgradeComponent { + name: String, + component_id: ComponentId, + schema_id: SchemaId, + new_schema_variant_id: SchemaVariantId, + new_schema_variant_name: String, + old_schema_variant_id: SchemaVariantId, + old_schema_variant_name: String, + }, + #[serde(rename_all = "camelCase")] + WithdrawRequestForChangeSetApply { from_status: ChangeSetStatus }, +} + +impl AuditLogMetadataV1 { + pub fn title_and_entity_type(&self) -> (&'static str, Option<&'static str>) { + // Please keep this in alphabetical order! + // #[remain::sorted] // NOTE(nick): this is not yet stable + match self.into() { + MetadataDiscrim::AbandonChangeSet => ("Abandoned", Some("Change Set")), + MetadataDiscrim::AddAction => ("Enqueued", Some("Action")), + MetadataDiscrim::ApplyChangeSet => ("Applied", Some("Change Set")), + MetadataDiscrim::ApproveChangeSetApply => { + ("Approved Request to Apply", Some("Change Set")) + } + MetadataDiscrim::CancelAction => ("Removed", Some("Action")), + MetadataDiscrim::CreateChangeSet => ("Created", Some("Change Set")), + MetadataDiscrim::CreateComponent => ("Created", Some("Component")), + MetadataDiscrim::CreateSecret => ("Created", Some("Secret")), + MetadataDiscrim::DeleteComponent => ("Deleted", Some("Component")), + MetadataDiscrim::DeleteSecret => ("Deleted", Some("Secret")), + MetadataDiscrim::ExportWorkspace => ("Exported", Some("Workspace")), + MetadataDiscrim::InstallWorkspace => ("Installed", Some("Workspace")), + MetadataDiscrim::Login => ("Authenticated", None), + MetadataDiscrim::PutActionOnHold => ("Paused", Some("Action")), + MetadataDiscrim::RejectChangeSetApply => { + ("Rejected Request to Apply", Some("Change Set")) + } + MetadataDiscrim::ReopenChangeSet => ("Reopened", Some("Change Set")), + MetadataDiscrim::RequestChangeSetApproval => ("Requested to Apply", Some("Change Set")), + MetadataDiscrim::RetryAction => ("Retried", Some("Action")), + MetadataDiscrim::RunAction => ("Ran", Some("Action")), + MetadataDiscrim::UpdateDependentInputSocket => ("Set Dependent", Some("Input Socket")), + MetadataDiscrim::UpdateDependentOutputSocket => { + ("Set Dependent", Some("Output Socket")) + } + MetadataDiscrim::UpdateDependentProperty => ("Set Dependent", Some("Property")), + MetadataDiscrim::UpdatePropertyEditorValue => ("Updated Component", Some("Property")), + MetadataDiscrim::UpdatePropertyEditorValueForSecret => { + ("Updated Component", Some("Property for Secret")) + } + MetadataDiscrim::UpdateSecret => ("Updated", Some("Secret")), + MetadataDiscrim::UpgradeComponent => ("Upgraded", Some("Component")), + MetadataDiscrim::WithdrawRequestForChangeSetApply => { + ("Withdrew Request to Apply", Some("Change Set")) + } + } + } +} + +impl From for Metadata { + fn from(value: Kind) -> Self { + // Please keep this in alphabetical order! + // #[remain::sorted] // NOTE(nick): this is not yet stable + match value { + Kind::AbandonChangeSet { from_status } => Self::AbandonChangeSet { from_status }, + Kind::AddAction { + prototype_id, + action_kind, + func_id, + func_display_name, + func_name, + } => Self::AddAction { + prototype_id, + action_kind, + func_id, + func_display_name, + func_name, + }, + Kind::ApplyChangeSet => Self::ApplyChangeSet, + Kind::ApproveChangeSetApply { from_status } => { + Self::ApproveChangeSetApply { from_status } + } + Kind::CancelAction { + prototype_id, + action_kind, + func_id, + func_display_name, + func_name, + } => Self::CancelAction { + prototype_id, + action_kind, + func_id, + func_display_name, + func_name, + }, + Kind::CreateChangeSet => Self::CreateChangeSet, + Kind::CreateComponent { + name, + component_id, + schema_variant_id, + schema_variant_name, + } => Self::CreateComponent { + name, + component_id, + schema_variant_id, + schema_variant_name, + }, + Kind::CreateSecret { name, secret_id } => Self::CreateSecret { name, secret_id }, + Kind::DeleteComponent { + name, + component_id, + schema_variant_id, + schema_variant_name, + } => Self::DeleteComponent { + name, + component_id, + schema_variant_id, + schema_variant_name, + }, + Kind::DeleteSecret { name, secret_id } => Self::DeleteSecret { name, secret_id }, + Kind::ExportWorkspace { id, name, version } => { + Self::ExportWorkspace { id, name, version } + } + Kind::InstallWorkspace { id, name, version } => { + Self::InstallWorkspace { id, name, version } + } + Kind::Login => Self::Login, + Kind::PutActionOnHold { + prototype_id, + action_kind, + func_id, + func_display_name, + func_name, + } => Self::PutActionOnHold { + prototype_id, + action_kind, + func_id, + func_display_name, + func_name, + }, + Kind::RejectChangeSetApply { from_status } => { + Self::RejectChangeSetApply { from_status } + } + Kind::ReopenChangeSet { from_status } => Self::ReopenChangeSet { from_status }, + Kind::RequestChangeSetApproval { from_status } => { + Self::RequestChangeSetApproval { from_status } + } + Kind::RetryAction { + prototype_id, + action_kind, + func_id, + func_display_name, + func_name, + } => Self::RetryAction { + prototype_id, + action_kind, + func_id, + func_display_name, + func_name, + }, + Kind::RunAction { + prototype_id, + action_kind, + func_id, + func_display_name, + func_name, + run_status, + } => Self::RunAction { + prototype_id, + action_kind, + func_id, + func_display_name, + func_name, + run_status, + }, + Kind::UpdateDependentInputSocket { + input_socket_id, + input_socket_name, + attribute_value_id, + input_attribute_value_ids, + func_id, + func_display_name, + func_name, + component_id, + component_name, + schema_variant_id, + schema_variant_display_name, + before_value, + after_value, + } => Self::UpdateDependentInputSocket { + input_socket_id, + input_socket_name, + attribute_value_id, + input_attribute_value_ids, + func_id, + func_display_name, + func_name, + component_id, + component_name, + schema_variant_id, + schema_variant_display_name, + before_value, + after_value, + }, + Kind::UpdateDependentOutputSocket { + output_socket_id, + output_socket_name, + attribute_value_id, + input_attribute_value_ids, + func_id, + func_display_name, + func_name, + component_id, + component_name, + schema_variant_id, + schema_variant_display_name, + before_value, + after_value, + } => Self::UpdateDependentOutputSocket { + output_socket_id, + output_socket_name, + attribute_value_id, + input_attribute_value_ids, + func_id, + func_display_name, + func_name, + component_id, + component_name, + schema_variant_id, + schema_variant_display_name, + before_value, + after_value, + }, + Kind::UpdateDependentProperty { + prop_id, + prop_name, + attribute_value_id, + input_attribute_value_ids, + func_id, + func_display_name, + func_name, + component_id, + component_name, + schema_variant_id, + schema_variant_display_name, + before_value, + after_value, + } => Self::UpdateDependentProperty { + prop_id, + prop_name, + attribute_value_id, + input_attribute_value_ids, + func_id, + func_display_name, + func_name, + component_id, + component_name, + schema_variant_id, + schema_variant_display_name, + before_value, + after_value, + }, + Kind::UpdatePropertyEditorValue { + component_id, + component_name, + schema_variant_id, + schema_variant_display_name, + prop_id, + prop_name, + attribute_value_id, + before_value, + after_value, + } => Self::UpdatePropertyEditorValue { + component_id, + component_name, + schema_variant_id, + schema_variant_display_name, + prop_id, + prop_name, + attribute_value_id, + before_value, + after_value, + }, + Kind::UpdatePropertyEditorValueForSecret { + component_id, + component_name, + schema_variant_id, + schema_variant_display_name, + prop_id, + prop_name, + attribute_value_id, + before_secret_name, + before_secret_id, + after_secret_name, + after_secret_id, + } => Self::UpdatePropertyEditorValueForSecret { + component_id, + component_name, + schema_variant_id, + schema_variant_display_name, + prop_id, + prop_name, + attribute_value_id, + before_secret_name, + before_secret_id, + after_secret_name, + after_secret_id, + }, + Kind::UpdateSecret { name, secret_id } => Self::UpdateSecret { name, secret_id }, + Kind::UpgradeComponent { + name, + component_id, + schema_id, + new_schema_variant_id, + new_schema_variant_name, + old_schema_variant_id, + old_schema_variant_name, + } => Self::UpgradeComponent { + name, + component_id, + schema_id, + new_schema_variant_id, + new_schema_variant_name, + old_schema_variant_id, + old_schema_variant_name, + }, + Kind::WithdrawRequestForChangeSetApply { from_status } => { + Self::WithdrawRequestForChangeSetApply { from_status } + } + } + } +} diff --git a/lib/si-frontend-types-rs/src/audit_log.rs b/lib/si-frontend-types-rs/src/audit_log.rs index 190e642a27..6578c32757 100644 --- a/lib/si-frontend-types-rs/src/audit_log.rs +++ b/lib/si-frontend-types-rs/src/audit_log.rs @@ -1,10 +1,5 @@ use serde::Serialize; -use si_events::{ - audit_log::AuditLogKind, ActionKind, ActionPrototypeId, AttributeValueId, ChangeSetId, - ChangeSetStatus, ComponentId, FuncId, InputSocketId, OutputSocketId, PropId, SchemaId, - SchemaVariantId, SecretId, UserPk, WorkspacePk, -}; -use strum::EnumDiscriminants; +use si_events::{ChangeSetId, UserPk}; #[derive(Debug, Serialize, Clone, PartialEq, Eq)] #[serde(rename_all = "camelCase")] @@ -30,516 +25,7 @@ pub struct AuditLog { pub change_set_id: Option, /// The name of the change set. pub change_set_name: Option, - /// Serialized version of [`AuditLogDeserializedMetadata`], which is an untagged version of the specific - /// [`AuditLogKind`]. + /// Serialized version of [`AuditLogMetadata`](si_events::audit_log::AuditLogMetadata), which is an + /// untagged version of the specific [`AuditLogKind`](si_events::audit_log::AuditLogKind). pub metadata: serde_json::Value, } - -/// This is an identical copy of latest [`AuditLogKind`], but uses "serde untagged" wrapper. -/// -/// Reference: https://serde.rs/enum-representations.html#untagged -/// -/// _Note:_ there are multiple uses of renaming to camel case are related to this: https://github.com/serde-rs/serde/issues/1560 -#[remain::sorted] -#[derive(Debug, Serialize, EnumDiscriminants)] -#[serde(untagged, rename_all = "camelCase")] -pub enum AuditLogDeserializedMetadata { - #[serde(rename_all = "camelCase")] - AbandonChangeSet { from_status: ChangeSetStatus }, - #[serde(rename_all = "camelCase")] - AddAction { - prototype_id: ActionPrototypeId, - action_kind: ActionKind, - func_id: FuncId, - func_display_name: Option, - func_name: String, - }, - #[serde(rename_all = "camelCase")] - ApplyChangeSet, - #[serde(rename_all = "camelCase")] - ApproveChangeSetApply { from_status: ChangeSetStatus }, - #[serde(rename_all = "camelCase")] - CancelAction { - prototype_id: ActionPrototypeId, - action_kind: ActionKind, - func_id: FuncId, - func_display_name: Option, - func_name: String, - }, - - #[serde(rename_all = "camelCase")] - CreateChangeSet, - #[serde(rename_all = "camelCase")] - CreateComponent { - name: String, - component_id: ComponentId, - schema_variant_id: SchemaVariantId, - schema_variant_name: String, - }, - #[serde(rename_all = "camelCase")] - CreateSecret { name: String, secret_id: SecretId }, - #[serde(rename_all = "camelCase")] - DeleteComponent { - name: String, - component_id: ComponentId, - schema_variant_id: SchemaVariantId, - schema_variant_name: String, - }, - #[serde(rename_all = "camelCase")] - DeleteSecret { name: String, secret_id: SecretId }, - #[serde(rename_all = "camelCase")] - ExportWorkspace { - id: WorkspacePk, - name: String, - version: String, - }, - #[serde(rename_all = "camelCase")] - InstallWorkspace { - id: WorkspacePk, - name: String, - version: String, - }, - #[serde(rename_all = "camelCase")] - Login, - #[serde(rename_all = "camelCase")] - PutActionOnHold { - prototype_id: ActionPrototypeId, - action_kind: ActionKind, - func_id: FuncId, - func_display_name: Option, - func_name: String, - }, - #[serde(rename_all = "camelCase")] - RejectChangeSetApply { from_status: ChangeSetStatus }, - #[serde(rename_all = "camelCase")] - ReopenChangeSet { from_status: ChangeSetStatus }, - #[serde(rename_all = "camelCase")] - RequestChangeSetApproval { from_status: ChangeSetStatus }, - #[serde(rename_all = "camelCase")] - RetryAction { - prototype_id: ActionPrototypeId, - action_kind: ActionKind, - func_id: FuncId, - func_display_name: Option, - func_name: String, - }, - #[serde(rename_all = "camelCase")] - RunAction { - prototype_id: ActionPrototypeId, - action_kind: ActionKind, - func_id: FuncId, - func_display_name: Option, - func_name: String, - run_status: bool, - }, - #[serde(rename_all = "camelCase")] - UpdateDependentInputSocket { - input_socket_id: InputSocketId, - input_socket_name: String, - attribute_value_id: AttributeValueId, - input_attribute_value_ids: Vec, - func_id: FuncId, - func_display_name: Option, - func_name: String, - component_id: ComponentId, - component_name: String, - schema_variant_id: SchemaVariantId, - schema_variant_display_name: String, - before_value: Option, - after_value: Option, - }, - #[serde(rename_all = "camelCase")] - UpdateDependentOutputSocket { - output_socket_id: OutputSocketId, - output_socket_name: String, - attribute_value_id: AttributeValueId, - input_attribute_value_ids: Vec, - func_id: FuncId, - func_display_name: Option, - func_name: String, - component_id: ComponentId, - component_name: String, - schema_variant_id: SchemaVariantId, - schema_variant_display_name: String, - before_value: Option, - after_value: Option, - }, - #[serde(rename_all = "camelCase")] - UpdateDependentProperty { - prop_id: PropId, - prop_name: String, - attribute_value_id: AttributeValueId, - input_attribute_value_ids: Vec, - func_id: FuncId, - func_display_name: Option, - func_name: String, - component_id: ComponentId, - component_name: String, - schema_variant_id: SchemaVariantId, - schema_variant_display_name: String, - before_value: Option, - after_value: Option, - }, - #[serde(rename_all = "camelCase")] - UpdatePropertyEditorValue { - component_id: ComponentId, - component_name: String, - schema_variant_id: SchemaVariantId, - schema_variant_display_name: String, - prop_id: PropId, - prop_name: String, - attribute_value_id: AttributeValueId, - before_value: Option, - after_value: Option, - }, - #[serde(rename_all = "camelCase")] - UpdatePropertyEditorValueForSecret { - component_id: ComponentId, - component_name: String, - schema_variant_id: SchemaVariantId, - schema_variant_display_name: String, - prop_id: PropId, - prop_name: String, - attribute_value_id: AttributeValueId, - before_secret_name: Option, - before_secret_id: Option, - after_secret_name: Option, - after_secret_id: Option, - }, - #[serde(rename_all = "camelCase")] - UpdateSecret { name: String, secret_id: SecretId }, - #[serde(rename_all = "camelCase")] - UpgradeComponent { - name: String, - component_id: ComponentId, - schema_id: SchemaId, - new_schema_variant_id: SchemaVariantId, - new_schema_variant_name: String, - old_schema_variant_id: SchemaVariantId, - old_schema_variant_name: String, - }, - #[serde(rename_all = "camelCase")] - WithdrawRequestForChangeSetApply { from_status: ChangeSetStatus }, -} - -impl AuditLogDeserializedMetadata { - pub fn title_and_entity_type(&self) -> (&'static str, &'static str) { - type Kind = AuditLogDeserializedMetadataDiscriminants; - - // Reflect updates to the entity type in "app/web/src/api/sdf/dal/audit_log.ts" and please keep this in - // alphabetical order! - match self.into() { - Kind::AbandonChangeSet => ("Abandoned", "Change Set"), - Kind::AddAction => ("Enqueued", "Action"), - Kind::ApplyChangeSet => ("Applied", "Change Set"), - Kind::ApproveChangeSetApply => ("Approved Request to Apply", "Change Set"), - Kind::CancelAction => ("Removed", "Action"), - Kind::CreateChangeSet => ("Created", "Change Set"), - Kind::CreateComponent => ("Created", "Component"), - Kind::CreateSecret => ("Created", "Secret"), - Kind::DeleteComponent => ("Deleted", "Component"), - Kind::DeleteSecret => ("Deleted", "Secret"), - Kind::ExportWorkspace => ("Exported", "Workspace"), - Kind::InstallWorkspace => ("Installed", "Workspace"), - Kind::Login => ("Authenticated", " "), - Kind::PutActionOnHold => ("Paused", "Action"), - Kind::RejectChangeSetApply => ("Rejected Request to Apply", "Change Set"), - Kind::ReopenChangeSet => ("Reopened", "Change Set"), - Kind::RequestChangeSetApproval => ("Requested to Apply", "Change Set"), - Kind::RetryAction => ("Retried", "Action"), - Kind::RunAction => ("Ran", "Action"), - Kind::UpdateDependentInputSocket => ("Set Dependent", "Input Socket"), - Kind::UpdateDependentOutputSocket => ("Set Dependent", "Output Socket"), - Kind::UpdateDependentProperty => ("Set Dependent", "Property"), - Kind::UpdatePropertyEditorValue => ("Updated Component", "Property"), - Kind::UpdatePropertyEditorValueForSecret => { - ("Updated Component", "Property for Secret") - } - Kind::UpdateSecret => ("Updated", "Secret"), - Kind::UpgradeComponent => ("Upgraded", "Component"), - Kind::WithdrawRequestForChangeSetApply => ("Withdrew Request to Apply", "Change Set"), - } - } -} - -impl From for AuditLogDeserializedMetadata { - fn from(value: AuditLogKind) -> Self { - // Reflect updates to the audit log kind in "app/web/src/api/sdf/dal/audit_log.ts" and please keep this in - // alphabetical order! - match value { - AuditLogKind::AbandonChangeSet { from_status } => { - Self::AbandonChangeSet { from_status } - } - AuditLogKind::AddAction { - prototype_id, - action_kind, - func_id, - func_display_name, - func_name, - } => Self::AddAction { - prototype_id, - action_kind, - func_id, - func_display_name, - func_name, - }, - AuditLogKind::ApplyChangeSet => Self::ApplyChangeSet, - AuditLogKind::ApproveChangeSetApply { from_status } => { - Self::ApproveChangeSetApply { from_status } - } - AuditLogKind::CancelAction { - prototype_id, - action_kind, - func_id, - func_display_name, - func_name, - } => Self::CancelAction { - prototype_id, - action_kind, - func_id, - func_display_name, - func_name, - }, - AuditLogKind::CreateChangeSet => Self::CreateChangeSet, - AuditLogKind::CreateComponent { - name, - component_id, - schema_variant_id, - schema_variant_name, - } => Self::CreateComponent { - name, - component_id, - schema_variant_id, - schema_variant_name, - }, - AuditLogKind::CreateSecret { name, secret_id } => { - Self::CreateSecret { name, secret_id } - } - AuditLogKind::DeleteComponent { - name, - component_id, - schema_variant_id, - schema_variant_name, - } => Self::DeleteComponent { - name, - component_id, - schema_variant_id, - schema_variant_name, - }, - AuditLogKind::DeleteSecret { name, secret_id } => { - Self::DeleteSecret { name, secret_id } - } - AuditLogKind::ExportWorkspace { id, name, version } => { - Self::ExportWorkspace { id, name, version } - } - AuditLogKind::InstallWorkspace { id, name, version } => { - Self::InstallWorkspace { id, name, version } - } - AuditLogKind::Login => Self::Login, - AuditLogKind::PutActionOnHold { - prototype_id, - action_kind, - func_id, - func_display_name, - func_name, - } => Self::PutActionOnHold { - prototype_id, - action_kind, - func_id, - func_display_name, - func_name, - }, - AuditLogKind::RejectChangeSetApply { from_status } => { - Self::RejectChangeSetApply { from_status } - } - AuditLogKind::ReopenChangeSet { from_status } => Self::ReopenChangeSet { from_status }, - AuditLogKind::RequestChangeSetApproval { from_status } => { - Self::RequestChangeSetApproval { from_status } - } - AuditLogKind::RetryAction { - prototype_id, - action_kind, - func_id, - func_display_name, - func_name, - } => Self::RetryAction { - prototype_id, - action_kind, - func_id, - func_display_name, - func_name, - }, - AuditLogKind::RunAction { - prototype_id, - action_kind, - func_id, - func_display_name, - func_name, - run_status, - } => Self::RunAction { - prototype_id, - action_kind, - func_id, - func_display_name, - func_name, - run_status, - }, - AuditLogKind::UpdateDependentInputSocket { - input_socket_id, - input_socket_name, - attribute_value_id, - input_attribute_value_ids, - func_id, - func_display_name, - func_name, - component_id, - component_name, - schema_variant_id, - schema_variant_display_name, - before_value, - after_value, - } => Self::UpdateDependentInputSocket { - input_socket_id, - input_socket_name, - attribute_value_id, - input_attribute_value_ids, - func_id, - func_display_name, - func_name, - component_id, - component_name, - schema_variant_id, - schema_variant_display_name, - before_value, - after_value, - }, - AuditLogKind::UpdateDependentOutputSocket { - output_socket_id, - output_socket_name, - attribute_value_id, - input_attribute_value_ids, - func_id, - func_display_name, - func_name, - component_id, - component_name, - schema_variant_id, - schema_variant_display_name, - before_value, - after_value, - } => Self::UpdateDependentOutputSocket { - output_socket_id, - output_socket_name, - attribute_value_id, - input_attribute_value_ids, - func_id, - func_display_name, - func_name, - component_id, - component_name, - schema_variant_id, - schema_variant_display_name, - before_value, - after_value, - }, - AuditLogKind::UpdateDependentProperty { - prop_id, - prop_name, - attribute_value_id, - input_attribute_value_ids, - func_id, - func_display_name, - func_name, - component_id, - component_name, - schema_variant_id, - schema_variant_display_name, - before_value, - after_value, - } => Self::UpdateDependentProperty { - prop_id, - prop_name, - attribute_value_id, - input_attribute_value_ids, - func_id, - func_display_name, - func_name, - component_id, - component_name, - schema_variant_id, - schema_variant_display_name, - before_value, - after_value, - }, - AuditLogKind::UpdatePropertyEditorValue { - component_id, - component_name, - schema_variant_id, - schema_variant_display_name, - prop_id, - prop_name, - attribute_value_id, - before_value, - after_value, - } => Self::UpdatePropertyEditorValue { - component_id, - component_name, - schema_variant_id, - schema_variant_display_name, - prop_id, - prop_name, - attribute_value_id, - before_value, - after_value, - }, - AuditLogKind::UpdatePropertyEditorValueForSecret { - component_id, - component_name, - schema_variant_id, - schema_variant_display_name, - prop_id, - prop_name, - attribute_value_id, - before_secret_name, - before_secret_id, - after_secret_name, - after_secret_id, - } => Self::UpdatePropertyEditorValueForSecret { - component_id, - component_name, - schema_variant_id, - schema_variant_display_name, - prop_id, - prop_name, - attribute_value_id, - before_secret_name, - before_secret_id, - after_secret_name, - after_secret_id, - }, - AuditLogKind::UpdateSecret { name, secret_id } => { - Self::UpdateSecret { name, secret_id } - } - AuditLogKind::UpgradeComponent { - name, - component_id, - schema_id, - new_schema_variant_id, - new_schema_variant_name, - old_schema_variant_id, - old_schema_variant_name, - } => Self::UpgradeComponent { - name, - component_id, - schema_id, - new_schema_variant_id, - new_schema_variant_name, - old_schema_variant_id, - old_schema_variant_name, - }, - AuditLogKind::WithdrawRequestForChangeSetApply { from_status } => { - Self::WithdrawRequestForChangeSetApply { from_status } - } - } - } -} diff --git a/lib/si-frontend-types-rs/src/lib.rs b/lib/si-frontend-types-rs/src/lib.rs index ace3236595..eaeeb6e6a7 100644 --- a/lib/si-frontend-types-rs/src/lib.rs +++ b/lib/si-frontend-types-rs/src/lib.rs @@ -7,7 +7,7 @@ mod module; mod schema_variant; mod workspace; -pub use crate::audit_log::{AuditLog, AuditLogDeserializedMetadata}; +pub use crate::audit_log::AuditLog; pub use crate::change_set::ChangeSet; pub use crate::component::{ ChangeStatus, ConnectionAnnotation, DiagramComponentView, DiagramSocket,