diff --git a/Cargo.lock b/Cargo.lock index 9757ac44b0..a32449bc47 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -412,6 +412,7 @@ dependencies = [ name = "audit-logs" version = "0.1.0" dependencies = [ + "chrono", "refinery", "remain", "serde", @@ -2622,18 +2623,24 @@ dependencies = [ name = "forklift-server" version = "0.1.0" dependencies = [ + "audit-logs", "billing-events", + "buck2-resources", "data-warehouse-stream-client", "derive_builder", + "futures", "naxum", "remain", "serde", "serde_json", "si-data-nats", + "si-events", "si-settings", + "si-std", "telemetry", "telemetry-nats", "thiserror", + "tokio", "tokio-util", "ulid", ] diff --git a/bin/forklift/BUCK b/bin/forklift/BUCK index adebe2d04e..c9610da103 100644 --- a/bin/forklift/BUCK +++ b/bin/forklift/BUCK @@ -15,6 +15,9 @@ rust_binary( ], srcs = glob(["src/**/*.rs"]), env = {"CARGO_BIN_NAME": "forklift"}, + resources = { + "dev.postgres.root.crt": "//config/keys:dev.postgres.root.crt", + }, ) docker_image( diff --git a/bin/forklift/src/args.rs b/bin/forklift/src/args.rs index da59d9579b..d0a7968dbe 100644 --- a/bin/forklift/src/args.rs +++ b/bin/forklift/src/args.rs @@ -79,6 +79,10 @@ pub(crate) struct Args { /// The name of the data warehouse stream #[arg(long)] pub(crate) data_warehouse_stream_name: Option, + + /// Enables the audit logs app + #[arg(long, default_value = "false")] + pub(crate) enable_audit_logs_app: bool, } impl TryFrom for Config { @@ -105,6 +109,7 @@ impl TryFrom for Config { if let Some(data_warehouse_stream_name) = args.data_warehouse_stream_name { config_map.set("data_warehouse_stream_name", data_warehouse_stream_name); } + config_map.set("enable_audit_logs_app", args.enable_audit_logs_app); })? .try_into() } diff --git a/lib/audit-logs/BUCK b/lib/audit-logs/BUCK index c22330bfbd..8676b309ad 100644 --- a/lib/audit-logs/BUCK +++ b/lib/audit-logs/BUCK @@ -8,6 +8,7 @@ rust_library( "//lib/si-events-rs:si-events", "//lib/telemetry-nats-rs:telemetry-nats", "//lib/telemetry-rs:telemetry", + "//third-party/rust:chrono", "//third-party/rust:refinery", "//third-party/rust:remain", "//third-party/rust:serde", diff --git a/lib/audit-logs/Cargo.toml b/lib/audit-logs/Cargo.toml index 1dfbd13924..9889cddf1e 100644 --- a/lib/audit-logs/Cargo.toml +++ b/lib/audit-logs/Cargo.toml @@ -15,6 +15,7 @@ si-events = { path = "../../lib/si-events-rs" } telemetry = { path = "../../lib/telemetry-rs" } telemetry-nats = { path = "../../lib/telemetry-nats-rs" } +chrono = { workspace = true } refinery = { workspace = true } remain = { workspace = true } serde = { workspace = true } diff --git a/lib/audit-logs/src/database.rs b/lib/audit-logs/src/database.rs new file mode 100644 index 0000000000..55daa1cd2b --- /dev/null +++ b/lib/audit-logs/src/database.rs @@ -0,0 +1,109 @@ +//! Contains functionality for setting up and communicating with the audit database. + +use chrono::DateTime; +use chrono::Utc; +use si_data_pg::PgError; +use si_data_pg::PgPoolError; +use si_events::audit_log::AuditLogKind; +use si_events::audit_log::AuditLogMetadata; +use si_events::Actor; +use si_events::ChangeSetId; +use si_events::WorkspacePk; +use telemetry::prelude::*; +use thiserror::Error; + +mod config; +mod context; +mod migrate; + +pub use config::AuditDatabaseConfig; +pub use config::DBNAME; +pub use context::AuditDatabaseContext; +pub use context::AuditDatabaseContextError; +pub use migrate::{migrate, AuditDatabaseMigrationError}; + +#[allow(missing_docs)] +#[derive(Error, Debug)] +pub enum AuditDatabaseError { + #[error("chrono parse error: {0}")] + ChronoParse(#[from] chrono::ParseError), + #[error("pg error: {0}")] + Pg(#[from] PgError), + #[error("pg pool error: {0}")] + PgPool(#[from] PgPoolError), + #[error("serde json error: {0}")] + SerdeJson(#[from] serde_json::Error), +} + +type Result = std::result::Result; + +#[allow(clippy::too_many_arguments, missing_docs)] +#[instrument( + name = "audit_log.insert", + level = "debug", + skip_all, + fields( + si.workspace.id = %workspace_id, + ), +)] +pub async fn insert( + context: &AuditDatabaseContext, + workspace_id: WorkspacePk, + kind: AuditLogKind, + timestamp: String, + change_set_id: Option, + actor: Actor, + entity_name: Option, +) -> Result<()> { + let kind_as_string = kind.to_string(); + let user_id = match actor { + Actor::System => None, + Actor::User(user_id) => Some(user_id), + }; + + let metadata = AuditLogMetadata::from(kind); + let (title, entity_type) = metadata.title_and_entity_type(); + let serialized_metadata = serde_json::to_value(metadata)?; + let timestamp: DateTime = timestamp.parse()?; + + context + .pg_pool() + .get() + .await? + .query_one( + "INSERT INTO audit_logs ( + workspace_id, + kind, + timestamp, + title, + change_set_id, + user_id, + entity_name, + entity_type, + metadata + ) VALUES ( + $1, + $2, + $3, + $4, + $5, + $6, + $7, + $8, + $9 + ) RETURNING *", + &[ + &workspace_id, + &kind_as_string, + ×tamp, + &title, + &change_set_id.map(|id| id.to_string()), + &user_id.map(|id| id.to_string()), + &entity_name, + &entity_type, + &serialized_metadata, + ], + ) + .await?; + Ok(()) +} diff --git a/lib/audit-logs/src/database/config.rs b/lib/audit-logs/src/database/config.rs new file mode 100644 index 0000000000..8e3d63924c --- /dev/null +++ b/lib/audit-logs/src/database/config.rs @@ -0,0 +1,27 @@ +use serde::{Deserialize, Serialize}; +use si_data_pg::PgPoolConfig; + +/// The name of the audit database. +pub const DBNAME: &str = "si_audit"; +const APPLICATION_NAME: &str = "si-audit"; + +/// The configuration used for communicating with and setting up the audit database. +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct AuditDatabaseConfig { + /// The configuration for the PostgreSQL pool. + /// + /// _Note:_ this is called "pg" for ease of use with layered load configuration files. + pub pg: PgPoolConfig, +} + +impl Default for AuditDatabaseConfig { + fn default() -> Self { + Self { + pg: PgPoolConfig { + dbname: DBNAME.into(), + application_name: APPLICATION_NAME.into(), + ..Default::default() + }, + } + } +} diff --git a/lib/audit-logs/src/database/context.rs b/lib/audit-logs/src/database/context.rs new file mode 100644 index 0000000000..5647d85ad0 --- /dev/null +++ b/lib/audit-logs/src/database/context.rs @@ -0,0 +1,35 @@ +use si_data_pg::{PgPool, PgPoolError}; +use telemetry::prelude::*; +use thiserror::Error; + +use super::AuditDatabaseConfig; + +#[allow(missing_docs)] +#[derive(Error, Debug)] +pub enum AuditDatabaseContextError { + #[error("pg pool error: {0}")] + PgPool(#[from] PgPoolError), +} + +type Result = std::result::Result; + +/// The context used for communicating with and setting up the audit database. +#[derive(Debug, Clone)] +pub struct AuditDatabaseContext { + pg_pool: PgPool, +} + +impl AuditDatabaseContext { + /// Creates an [`AuditDatabaseContext`] from an [`AuditDatabaseConfig`]. + #[instrument(level = "info", name = "audit.context.from_config", skip_all)] + pub async fn from_config(config: &AuditDatabaseConfig) -> Result { + Ok(Self { + pg_pool: PgPool::new(&config.pg).await?, + }) + } + + /// Returns a reference to the [`PgPool`]. + pub fn pg_pool(&self) -> &PgPool { + &self.pg_pool + } +} diff --git a/lib/audit-logs/src/pg/migrate.rs b/lib/audit-logs/src/database/migrate.rs similarity index 95% rename from lib/audit-logs/src/pg/migrate.rs rename to lib/audit-logs/src/database/migrate.rs index fe44531c01..6ae894a02f 100644 --- a/lib/audit-logs/src/pg/migrate.rs +++ b/lib/audit-logs/src/database/migrate.rs @@ -18,7 +18,7 @@ type Result = std::result::Result; /// Performs migrations for the audit database. #[instrument(level = "info", name = "audit.init.migrate", skip_all)] pub async fn migrate(context: &AuditDatabaseContext) -> Result<()> { - migrate_inner(&context.pg_pool).await + migrate_inner(context.pg_pool()).await } #[instrument(level = "info", name = "audit.init.migrate.inner", skip_all)] diff --git a/lib/audit-logs/src/lib.rs b/lib/audit-logs/src/lib.rs index fb0d7532fd..a68b326556 100644 --- a/lib/audit-logs/src/lib.rs +++ b/lib/audit-logs/src/lib.rs @@ -25,170 +25,8 @@ while_true )] -pub mod pg; +pub mod database; mod stream; -use pg::AuditDatabaseContext; -use serde::Deserialize; -use serde::Serialize; -use si_data_pg::PgError; -use si_data_pg::PgPoolError; -use si_events::Actor; -use si_events::ChangeSetId; -use si_events::ChangeSetStatus; -use si_events::UserPk; -use si_events::WorkspacePk; -use strum::Display; -use strum::EnumDiscriminants; -use telemetry::prelude::*; -use thiserror::Error; - pub use stream::AuditLogsStream; pub use stream::AuditLogsStreamError; - -#[allow(missing_docs)] -#[derive(Debug, Error)] -pub enum AuditLogError { - #[error("pg error: {0}")] - Pg(#[from] PgError), - #[error("pg pool error: {0}")] - PgPool(#[from] PgPoolError), - #[error("serde json error: {0}")] - SerdeJson(#[from] serde_json::Error), -} - -type Result = std::result::Result; - -// FIXME(nick): delete this once accessor patterns are in place. -// impl TryFrom for AuditLogRow { -// type Error = AuditLogError; -// -// fn try_from(value: PgRow) -> std::result::Result { -// Ok(Self { -// actor: todo!(), -// kind: AuditLogKind::CreateChangeSet, -// entity_name: todo!(), -// timestamp: todo!(), -// change_set_id: todo!(), -// }) -// let status_string: String = value.try_get("status")?; -// let status = ChangeSetStatus::try_from(status_string.as_str())?; -// Ok(Self { -// id: value.try_get("id")?, -// created_at: value.try_get("created_at")?, -// updated_at: value.try_get("updated_at")?, -// name: value.try_get("name")?, -// status, -// base_change_set_id: value.try_get("base_change_set_id")?, -// workspace_snapshot_address: value.try_get("workspace_snapshot_address")?, -// workspace_id: value.try_get("workspace_id")?, -// merge_requested_by_user_id: value.try_get("merge_requested_by_user_id")?, -// merge_requested_at: value.try_get("merge_requested_at")?, -// reviewed_by_user_id: value.try_get("reviewed_by_user_id")?, -// reviewed_at: value.try_get("reviewed_at")?, -// }) -// } -// } - -#[allow(missing_docs)] -#[remain::sorted] -#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq, Display, EnumDiscriminants)] -pub enum AuditLogKind { - #[allow(missing_docs)] - AbandonChangeSet { from_status: ChangeSetStatus }, - #[allow(missing_docs)] - CreateChangeSet, -} - -#[allow(missing_docs)] -#[remain::sorted] -#[derive(Debug, Serialize, Deserialize, EnumDiscriminants)] -#[serde(untagged, rename_all = "camelCase")] -pub enum AuditLogMetadata { - #[allow(missing_docs)] - #[serde(rename_all = "camelCase")] - AbandonChangeSet { from_status: ChangeSetStatus }, - #[allow(missing_docs)] - #[serde(rename_all = "camelCase")] - CreateChangeSet, -} - -impl From for AuditLogMetadata { - fn from(value: AuditLogKind) -> Self { - match value { - AuditLogKind::AbandonChangeSet { from_status } => { - Self::AbandonChangeSet { from_status } - } - AuditLogKind::CreateChangeSet => Self::CreateChangeSet, - } - } -} - -#[allow(clippy::too_many_arguments, missing_docs)] -#[instrument( - name = "audit_log.insert", - level = "debug", - skip_all, - fields( - si.workspace.id = %workspace_id, - ), -)] -pub async fn insert( - context: &AuditDatabaseContext, - workspace_id: WorkspacePk, - kind: AuditLogKind, - timestamp: String, - title: String, - change_set_id: Option, - actor: Actor, - entity_name: Option, - entity_type: Option, -) -> Result<()> { - let kind_as_string = kind.to_string(); - let user_id: Option = match actor { - Actor::System => None, - Actor::User(user_id) => Some(user_id), - }; - let serialized_metadata = serde_json::to_value(AuditLogMetadata::from(kind))?; - - let _ = context - .pg_pool() - .get() - .await? - .query_one( - "INSERT INTO audit_logs ( - workspace_id, - kind, - timestamp, - title, - change_set_id, - user_id, - entity_name, - entity_type, - metadata - ) VALUES ( - $1, - $2, - $3, - $4, - $5, - $6, - $7, - $8, - $9 - )", - &[ - &workspace_id, - &kind_as_string, - ×tamp, - &title, - &change_set_id, - &user_id, - &entity_name, - &entity_type, - &serialized_metadata, - ], - ) - .await?; - Ok(()) -} diff --git a/lib/audit-logs/src/pg.rs b/lib/audit-logs/src/pg.rs deleted file mode 100644 index a4fbaf45ed..0000000000 --- a/lib/audit-logs/src/pg.rs +++ /dev/null @@ -1,66 +0,0 @@ -//! Contains functionality for setting up and communicating with the audit database. - -use serde::{Deserialize, Serialize}; -use si_data_pg::{PgPool, PgPoolConfig, PgPoolError}; -use telemetry::prelude::*; -use thiserror::Error; - -mod migrate; - -pub use migrate::{migrate, AuditDatabaseMigrationError}; - -/// The name of the audit database. -pub const DBNAME: &str = "si_audit"; -const APPLICATION_NAME: &str = "si-audit"; - -#[allow(missing_docs)] -#[derive(Error, Debug)] -pub enum AuditDatabaseContextError { - #[error("pg pool error: {0}")] - PgPool(#[from] PgPoolError), -} - -type Result = std::result::Result; - -/// The context used for communicating with and setting up the audit database. -#[allow(missing_debug_implementations)] -#[derive(Clone)] -pub struct AuditDatabaseContext { - pg_pool: PgPool, -} - -impl AuditDatabaseContext { - /// Creates an [`AuditDatabaseContext`] from an [`AuditDatabaseConfig`]. - #[instrument(level = "info", name = "audit.context.from_config", skip_all)] - pub async fn from_config(config: &AuditDatabaseConfig) -> Result { - Ok(Self { - pg_pool: PgPool::new(&config.pg).await?, - }) - } - - /// Returns a reference to the [`PgPool`]. - pub fn pg_pool(&self) -> &PgPool { - &self.pg_pool - } -} - -/// The configuration used for communicating with and setting up the audit database. -#[derive(Debug, Clone, Deserialize, Serialize)] -pub struct AuditDatabaseConfig { - /// The configuration for the PostgreSQL pool. - /// - /// _Note:_ this is called "pg" for ease of use with layered load configuration files. - pub pg: PgPoolConfig, -} - -impl Default for AuditDatabaseConfig { - fn default() -> Self { - Self { - pg: PgPoolConfig { - dbname: DBNAME.into(), - application_name: APPLICATION_NAME.into(), - ..Default::default() - }, - } - } -} diff --git a/lib/audit-logs/src/stream.rs b/lib/audit-logs/src/stream.rs index 03969d77a9..0374b4d944 100644 --- a/lib/audit-logs/src/stream.rs +++ b/lib/audit-logs/src/stream.rs @@ -76,6 +76,11 @@ impl AuditLogsStream { Subject::from(self.prefixed_subject(SUBJECT_PREFIX, &format!("{workspace_id}.>"))) } + /// Returns the subject for consuming [`AuditLogs`](AuditLog) for all workspaces. + pub fn consuming_subject_for_all_workspaces(&self) -> Subject { + Subject::from(self.prefixed_subject(SUBJECT_PREFIX, ">")) + } + /// Returns the subject for publishing and consuming [`AuditLogs`](AuditLog) for a given change set. pub fn subject_for_change_set( &self, diff --git a/lib/dal/src/audit_logging.rs b/lib/dal/src/audit_logging.rs index 7dfea2f46b..122a66ed4a 100644 --- a/lib/dal/src/audit_logging.rs +++ b/lib/dal/src/audit_logging.rs @@ -17,9 +17,9 @@ use si_data_nats::async_nats::jetstream::context::RequestErrorKind; use si_data_nats::async_nats::jetstream::stream::ConsumerErrorKind; use si_events::audit_log::AuditLog; use si_events::audit_log::AuditLogKind; +use si_events::audit_log::AuditLogMetadata; use si_events::Actor; use si_frontend_types::AuditLog as FrontendAuditLog; -use si_frontend_types::AuditLogDeserializedMetadata as FrontendAuditLogDeserializedMetadata; use telemetry::prelude::*; use thiserror::Error; use tokio_util::task::TaskTracker; @@ -383,8 +383,7 @@ impl FrontendAuditLogAssembler { // If we are working on HEAD, we show all audit logs without a change set, all // audit logs on HEAD, and all audit logs for abandoned or applied change sets. // - // If we are not working on HEAD, we only show audit logs for our own change set as - // well as certain audit logs that are relevant on HEAD, like "CreateChangeSet". + // If we are not working on HEAD, we only show audit logs for our own change set. if self.working_on_head { if let Some((change_set_id, _, change_set_status)) = change_set_metadata { if change_set_id != self.change_set_id { @@ -418,8 +417,9 @@ impl FrontendAuditLogAssembler { self.find_user_metadata(ctx, inner.actor).await?; let kind = inner.kind.to_string(); - let deserialized_metadata = FrontendAuditLogDeserializedMetadata::from(inner.kind); - let (title, entity_type) = deserialized_metadata.title_and_entity_type(); + let metadata = AuditLogMetadata::from(inner.kind); + let (title, entity_type) = metadata.title_and_entity_type(); + let entity_type = entity_type.unwrap_or(" "); let (change_set_id, change_set_name) = match change_set_metadata { Some((change_set_id, change_set_name, _)) => { (Some(change_set_id), Some(change_set_name)) @@ -438,7 +438,7 @@ impl FrontendAuditLogAssembler { timestamp: inner.timestamp, change_set_id, change_set_name, - metadata: serde_json::to_value(deserialized_metadata)?, + metadata: serde_json::to_value(metadata)?, })) } } diff --git a/lib/forklift-server/BUCK b/lib/forklift-server/BUCK index b92b2830cd..8c9b699235 100644 --- a/lib/forklift-server/BUCK +++ b/lib/forklift-server/BUCK @@ -3,18 +3,24 @@ load("@prelude-si//:macros.bzl", "rust_library") rust_library( name = "forklift-server", deps = [ + "//lib/audit-logs:audit-logs", "//lib/billing-events:billing-events", + "//lib/buck2-resources:buck2-resources", "//lib/data-warehouse-stream-client:data-warehouse-stream-client", "//lib/naxum:naxum", "//lib/si-data-nats:si-data-nats", + "//lib/si-events-rs:si-events", "//lib/si-settings:si-settings", + "//lib/si-std:si-std", "//lib/telemetry-nats-rs:telemetry-nats", "//lib/telemetry-rs:telemetry", "//third-party/rust:derive_builder", + "//third-party/rust:futures", "//third-party/rust:remain", "//third-party/rust:serde", "//third-party/rust:serde_json", "//third-party/rust:thiserror", + "//third-party/rust:tokio", "//third-party/rust:tokio-util", "//third-party/rust:ulid", ], diff --git a/lib/forklift-server/Cargo.toml b/lib/forklift-server/Cargo.toml index 69b511372a..15bf19d4b6 100644 --- a/lib/forklift-server/Cargo.toml +++ b/lib/forklift-server/Cargo.toml @@ -9,17 +9,24 @@ rust-version.workspace = true publish.workspace = true [dependencies] +audit-logs = { path = "../../lib/audit-logs" } billing-events = { path = "../../lib/billing-events" } +buck2-resources = { path = "../../lib/buck2-resources" } data-warehouse-stream-client = { path = "../../lib/data-warehouse-stream-client" } -derive_builder = { workspace = true } naxum = { path = "../../lib/naxum" } -remain = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } si-data-nats = { path = "../../lib/si-data-nats" } +si-events = { path = "../../lib/si-events-rs" } si-settings = { path = "../../lib/si-settings" } +si-std = { path = "../../lib/si-std" } telemetry = { path = "../../lib/telemetry-rs" } telemetry-nats = { path = "../../lib/telemetry-nats-rs" } + +derive_builder = { workspace = true } +futures = { workspace = true } +remain = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } thiserror = { workspace = true } +tokio = { workspace = true } tokio-util = { workspace = true } ulid = { workspace = true } diff --git a/lib/forklift-server/src/config.rs b/lib/forklift-server/src/config.rs index c3bb25258a..5545a0f350 100644 --- a/lib/forklift-server/src/config.rs +++ b/lib/forklift-server/src/config.rs @@ -1,6 +1,11 @@ +use std::{env, path::Path}; + +use audit_logs::database::AuditDatabaseConfig; +use buck2_resources::Buck2Resources; use derive_builder::Builder; use serde::{Deserialize, Serialize}; use si_data_nats::NatsConfig; +use si_std::CanonicalFileError; use telemetry::prelude::*; use thiserror::Error; use ulid::Ulid; @@ -15,12 +20,22 @@ const DEFAULT_CONCURRENCY_LIMIT: usize = 1000; #[remain::sorted] #[derive(Debug, Error)] pub enum ConfigError { + #[error("canonical file error: {0}")] + CanonicalFile(#[from] CanonicalFileError), #[error("config builder error: {0}")] ConfigBuilder(#[from] ConfigBuilderError), + #[error("error configuring for development")] + Development(#[source] Box), #[error("si settings error: {0}")] SiSettings(#[from] si_settings::SettingsError), } +impl ConfigError { + fn development(err: impl std::error::Error + 'static + Sync + Send) -> Self { + Self::Development(Box::new(err)) + } +} + type Result = std::result::Result; /// The config for the forklift server. @@ -37,6 +52,12 @@ pub struct Config { #[builder(default = "default_data_warehouse_stream_name()")] data_warehouse_stream_name: Option, + + #[builder(default)] + enable_audit_logs_app: bool, + + #[builder(default)] + audit: AuditDatabaseConfig, } impl StandardConfig for Config { @@ -69,6 +90,16 @@ impl Config { pub fn data_warehouse_stream_name(&self) -> Option<&str> { self.data_warehouse_stream_name.as_deref() } + + /// Indicates whether or not the audit logs app will be enabled. + pub fn enable_audit_logs_app(&self) -> bool { + self.enable_audit_logs_app + } + + /// Gets a reference to the audit database config. + pub fn audit(&self) -> &AuditDatabaseConfig { + &self.audit + } } #[allow(missing_docs)] @@ -82,6 +113,10 @@ pub struct ConfigFile { pub nats: NatsConfig, #[serde(default = "default_data_warehouse_stream_name")] pub data_warehouse_stream_name: Option, + #[serde(default)] + pub enable_audit_logs_app: bool, + #[serde(default)] + pub audit: AuditDatabaseConfig, } impl Default for ConfigFile { @@ -91,6 +126,8 @@ impl Default for ConfigFile { concurrency_limit: default_concurrency_limit(), nats: Default::default(), data_warehouse_stream_name: default_data_warehouse_stream_name(), + enable_audit_logs_app: Default::default(), + audit: Default::default(), } } } @@ -102,13 +139,17 @@ impl StandardConfigFile for ConfigFile { impl TryFrom for Config { type Error = ConfigError; - fn try_from(value: ConfigFile) -> Result { - let mut config = Config::builder(); - config.instance_id(value.instance_id); - config.concurrency_limit(value.concurrency_limit); - config.nats(value.nats); - config.data_warehouse_stream_name(value.data_warehouse_stream_name); - config.build().map_err(Into::into) + fn try_from(mut value: ConfigFile) -> Result { + detect_and_configure_development(&mut value)?; + + Ok(Config { + instance_id: value.instance_id, + concurrency_limit: value.concurrency_limit, + nats: value.nats, + data_warehouse_stream_name: value.data_warehouse_stream_name, + enable_audit_logs_app: value.enable_audit_logs_app, + audit: value.audit, + }) } } @@ -123,3 +164,51 @@ fn default_concurrency_limit() -> usize { fn default_data_warehouse_stream_name() -> Option { None } + +#[allow(clippy::disallowed_methods)] // Used to determine if running in development +fn detect_and_configure_development(config: &mut ConfigFile) -> Result<()> { + if env::var("BUCK_RUN_BUILD_ID").is_ok() || env::var("BUCK_BUILD_ID").is_ok() { + buck2_development(config) + } else if let Ok(dir) = env::var("CARGO_MANIFEST_DIR") { + cargo_development(dir, config) + } else { + Ok(()) + } +} + +fn buck2_development(config: &mut ConfigFile) -> Result<()> { + let resources = Buck2Resources::read().map_err(ConfigError::development)?; + + let postgres_cert = resources + .get_ends_with("dev.postgres.root.crt") + .map_err(ConfigError::development)? + .to_string_lossy() + .to_string(); + + warn!( + postgres_cert = postgres_cert.as_str(), + "detected development run", + ); + + config.audit.pg.certificate_path = Some(postgres_cert.clone().try_into()?); + config.audit.pg.dbname = audit_logs::database::DBNAME.to_string(); + + Ok(()) +} + +fn cargo_development(dir: String, config: &mut ConfigFile) -> Result<()> { + let postgres_cert = Path::new(&dir) + .join("../../config/keys/dev.postgres.root.crt") + .to_string_lossy() + .to_string(); + + warn!( + postgres_cert = postgres_cert.as_str(), + "detected development run", + ); + + config.audit.pg.certificate_path = Some(postgres_cert.clone().try_into()?); + config.audit.pg.dbname = audit_logs::database::DBNAME.to_string(); + + Ok(()) +} diff --git a/lib/forklift-server/src/lib.rs b/lib/forklift-server/src/lib.rs index 1fdb18898a..81192f9646 100644 --- a/lib/forklift-server/src/lib.rs +++ b/lib/forklift-server/src/lib.rs @@ -25,9 +25,7 @@ while_true )] -mod app_state; mod config; -mod handlers; mod server; pub use config::Config; diff --git a/lib/forklift-server/src/server.rs b/lib/forklift-server/src/server.rs index 90c8b14081..e980b3e6c6 100644 --- a/lib/forklift-server/src/server.rs +++ b/lib/forklift-server/src/server.rs @@ -1,61 +1,32 @@ -use std::{ - fmt, - future::{Future, IntoFuture as _}, - io, - sync::Arc, -}; +use std::{fmt, future::Future, io, sync::Arc}; -use billing_events::{BillingEventsError, BillingEventsWorkQueue}; -use data_warehouse_stream_client::DataWarehouseStreamClient; -use naxum::{ - extract::MatchedSubject, - handler::Handler as _, - middleware::{ - ack::AckLayer, - matched_subject::{ForSubject, MatchedSubjectLayer}, - trace::TraceLayer, - }, - response::{IntoResponse, Response}, - MessageHead, ServiceBuilder, ServiceExt as _, TowerServiceExt as _, -}; -use si_data_nats::{async_nats, jetstream, NatsClient}; -use si_data_nats::{ - async_nats::{ - error::Error as AsyncNatsError, - jetstream::{ - consumer::{pull::Stream, StreamErrorKind}, - stream::ConsumerErrorKind, - }, - }, - ConnectionMetadata, -}; +use si_data_nats::{jetstream, NatsClient}; use telemetry::prelude::*; use thiserror::Error; +use tokio::task::JoinError; use tokio_util::sync::CancellationToken; -use crate::{ - app_state::{AppState, NoopAppState}, - config::Config, - handlers, -}; +use crate::config::Config; -const CONSUMER_NAME: &str = "forklift-server"; +mod app; + +pub(crate) use app::AppSetupError; + +const DURABLE_CONSUMER_NAME: &str = "forklift-server"; #[derive(Debug, Error)] pub enum ServerError { - #[error("async nats consumer error: {0}")] - AsyncNatsConsumer(#[from] AsyncNatsError), - #[error("async nats stream error: {0}")] - AsyncNatsStream(#[from] AsyncNatsError), - #[error("billing events error: {0}")] - BillingEvents(#[from] BillingEventsError), + #[error("app setup error: {0}")] + AppSetup(#[from] AppSetupError), + #[error("join error: {0}")] + Join(#[from] JoinError), #[error("naxum error: {0}")] Naxum(#[source] io::Error), #[error("si data nats error: {0}")] SiDataNats(#[from] si_data_nats::Error), } -type ServerResult = Result; +type Result = std::result::Result; /// Server metadata, used with telemetry. #[derive(Clone, Debug)] @@ -83,8 +54,10 @@ impl ServerMetadata { /// The forklift server instance with its inner naxum task. pub struct Server { metadata: Arc, - inner: Box> + Unpin + Send>, shutdown_token: CancellationToken, + // TODO(nick): remove option once this is working. + inner_audit_logs: Option> + Unpin + Send>>, + inner_billing_events: Box> + Unpin + Send>, } impl fmt::Debug for Server { @@ -99,93 +72,50 @@ impl fmt::Debug for Server { impl Server { /// Creates a forklift server with a running naxum task. #[instrument(name = "forklift.init.from_config", level = "info", skip_all)] - pub async fn from_config(config: Config, token: CancellationToken) -> ServerResult { + pub async fn from_config(config: Config, token: CancellationToken) -> Result { let metadata = Arc::new(ServerMetadata { instance_id: config.instance_id().into(), job_invoked_provider: "si", }); let nats = Self::connect_to_nats(&config).await?; - let connection_metadata = nats.metadata_clone(); - - let incoming = { - let queue = BillingEventsWorkQueue::get_or_create(jetstream::new(nats)).await?; - let consumer_subject = queue.workspace_update_subject("*"); - queue - .stream() - .await? - .create_consumer(Self::incoming_consumer_config(consumer_subject)) - .await? - .messages() - .await? - }; - - let inner = match config.data_warehouse_stream_name() { - Some(stream_name) => { - info!(%stream_name, "creating billing events app in data warehouse stream delivery mode..."); - let client = DataWarehouseStreamClient::new(stream_name).await; - let state = AppState::new(client); - Self::build_app( - state, - connection_metadata, - incoming, + let jetstream_context = jetstream::new(nats); + + let inner_audit_logs = if config.enable_audit_logs_app() { + Some( + app::audit_logs( + jetstream_context.clone(), + DURABLE_CONSUMER_NAME.to_string(), + connection_metadata.clone(), config.concurrency_limit(), + config.audit(), token.clone(), - )? - } - None => { - info!("creating billing events app in no-op mode..."); - let state = NoopAppState::new(); - Self::build_noop_app( - state, - connection_metadata, - incoming, - config.concurrency_limit(), - token.clone(), - )? - } + ) + .await?, + ) + } else { + None }; + let inner_billing_events = app::billing_events( + jetstream_context, + DURABLE_CONSUMER_NAME.to_string(), + connection_metadata, + config.concurrency_limit(), + config.data_warehouse_stream_name(), + token.clone(), + ) + .await?; Ok(Self { metadata, - inner, + inner_audit_logs, + inner_billing_events, shutdown_token: token, }) } - fn build_app( - state: AppState, - connection_metadata: Arc, - incoming: Stream, - concurrency_limit: usize, - token: CancellationToken, - ) -> ServerResult> + Unpin + Send>> { - let app = ServiceBuilder::new() - .layer( - MatchedSubjectLayer::new().for_subject(ForkliftForSubject::with_prefix( - connection_metadata.subject_prefix(), - )), - ) - .layer( - TraceLayer::new() - .make_span_with( - telemetry_nats::NatsMakeSpan::builder(connection_metadata).build(), - ) - .on_response(telemetry_nats::NatsOnResponse::new()), - ) - .layer(AckLayer::new()) - .service(handlers::process_request.with_state(state)) - .map_response(Response::into_response); - - let inner = - naxum::serve_with_incoming_limit(incoming, app.into_make_service(), concurrency_limit) - .with_graceful_shutdown(naxum::wait_on_cancelled(token)); - - Ok(Box::new(inner.into_future())) - } - - /// Infallible wrapper around running the inner naxum task. + /// Infallible wrapper around running the inner naxum task(s). #[inline] pub async fn run(self) { if let Err(err) = self.try_run().await { @@ -193,104 +123,33 @@ impl Server { } } - /// Fallibly awaits the inner naxum task. - pub async fn try_run(self) -> ServerResult<()> { - self.inner.await.map_err(ServerError::Naxum)?; + /// Fallibly awaits the inner naxum task(s). + pub async fn try_run(self) -> Result<()> { + match self.inner_audit_logs { + Some(inner_audit_logs) => { + info!("running two apps: audit logs and billing events"); + let (inner_audit_logs_result, inner_billing_events_result) = futures::join!( + tokio::spawn(inner_audit_logs), + tokio::spawn(self.inner_billing_events) + ); + inner_audit_logs_result?.map_err(ServerError::Naxum)?; + inner_billing_events_result?.map_err(ServerError::Naxum)?; + } + None => { + info!("running one app: billing events"); + self.inner_billing_events + .await + .map_err(ServerError::Naxum)?; + } + } info!("forklift main loop shutdown complete"); Ok(()) } - fn build_noop_app( - state: NoopAppState, - connection_metadata: Arc, - incoming: Stream, - concurrency_limit: usize, - token: CancellationToken, - ) -> ServerResult> + Unpin + Send>> { - let app = ServiceBuilder::new() - .layer( - MatchedSubjectLayer::new().for_subject(ForkliftForSubject::with_prefix( - connection_metadata.subject_prefix(), - )), - ) - .layer( - TraceLayer::new() - .make_span_with( - telemetry_nats::NatsMakeSpan::builder(connection_metadata).build(), - ) - .on_response(telemetry_nats::NatsOnResponse::new()), - ) - .layer(AckLayer::new()) - .service(handlers::process_request_noop.with_state(state)) - .map_response(Response::into_response); - - let inner = - naxum::serve_with_incoming_limit(incoming, app.into_make_service(), concurrency_limit) - .with_graceful_shutdown(naxum::wait_on_cancelled(token)); - - Ok(Box::new(inner.into_future())) - } - #[instrument(name = "forklift.init.connect_to_nats", level = "info", skip_all)] - async fn connect_to_nats(config: &Config) -> ServerResult { + async fn connect_to_nats(config: &Config) -> Result { let client = NatsClient::new(config.nats()).await?; debug!("successfully connected nats client"); Ok(client) } - - #[inline] - fn incoming_consumer_config( - subject: impl Into, - ) -> async_nats::jetstream::consumer::pull::Config { - async_nats::jetstream::consumer::pull::Config { - durable_name: Some(CONSUMER_NAME.to_owned()), - filter_subject: subject.into(), - ..Default::default() - } - } -} - -#[derive(Clone, Debug)] -struct ForkliftForSubject { - prefix: Option<()>, -} - -impl ForkliftForSubject { - fn with_prefix(prefix: Option<&str>) -> Self { - Self { - prefix: prefix.map(|_p| ()), - } - } -} - -impl ForSubject for ForkliftForSubject -where - R: MessageHead, -{ - fn call(&mut self, req: &mut naxum::Message) { - let mut parts = req.subject().split('.'); - - match self.prefix { - Some(_) => { - if let (Some(prefix), Some(p1), Some(p2), Some(_workspace_id), None) = ( - parts.next(), - parts.next(), - parts.next(), - parts.next(), - parts.next(), - ) { - let matched = format!("{prefix}.{p1}.{p2}.:workspace_id"); - req.extensions_mut().insert(MatchedSubject::from(matched)); - }; - } - None => { - if let (Some(p1), Some(p2), Some(_workspace_id), None) = - (parts.next(), parts.next(), parts.next(), parts.next()) - { - let matched = format!("{p1}.{p2}.:workspace_id"); - req.extensions_mut().insert(MatchedSubject::from(matched)); - }; - } - } - } } diff --git a/lib/forklift-server/src/server/app.rs b/lib/forklift-server/src/server/app.rs new file mode 100644 index 0000000000..dc55faed18 --- /dev/null +++ b/lib/forklift-server/src/server/app.rs @@ -0,0 +1,73 @@ +use std::{future::Future, io, sync::Arc}; + +use ::audit_logs::database::AuditDatabaseConfig; +use si_data_nats::{jetstream::Context, ConnectionMetadata}; +use telemetry::prelude::*; +use thiserror::Error; +use tokio_util::sync::CancellationToken; + +mod audit_logs; +mod billing_events; + +pub(crate) use audit_logs::AuditLogsAppSetupError; +pub(crate) use billing_events::BillingEventsAppSetupError; + +#[derive(Debug, Error)] +pub enum AppSetupError { + #[error("audit logs app setup: {0}")] + AuditLogsAppSetup(#[from] AuditLogsAppSetupError), + #[error("billing events app setup: {0}")] + BillingEventsAppSetup(#[from] BillingEventsAppSetupError), +} + +type Result = std::result::Result; + +#[instrument( + name = "forklift.init.app.audit_logs", + level = "info", + skip_all, + fields(durable_consumer_name) +)] +pub(crate) async fn audit_logs( + jetstream_context: Context, + durable_consumer_name: String, + connection_metadata: Arc, + concurrency_limit: usize, + audit_database_config: &AuditDatabaseConfig, + token: CancellationToken, +) -> Result> + Unpin + Send>> { + Ok(audit_logs::build_and_run( + jetstream_context, + durable_consumer_name, + connection_metadata, + concurrency_limit, + audit_database_config, + token, + ) + .await?) +} + +#[instrument( + name = "forklift.init.app.billing_events", + level = "info", + skip_all, + fields(durable_consumer_name) +)] +pub(crate) async fn billing_events( + jetstream_context: Context, + durable_consumer_name: String, + connection_metadata: Arc, + concurrency_limit: usize, + data_warehouse_stream_name: Option<&str>, + token: CancellationToken, +) -> Result> + Unpin + Send>> { + Ok(billing_events::build_and_run( + jetstream_context, + durable_consumer_name, + connection_metadata, + concurrency_limit, + data_warehouse_stream_name, + token, + ) + .await?) +} diff --git a/lib/forklift-server/src/server/app/audit_logs.rs b/lib/forklift-server/src/server/app/audit_logs.rs new file mode 100644 index 0000000000..92aeb4b00f --- /dev/null +++ b/lib/forklift-server/src/server/app/audit_logs.rs @@ -0,0 +1,153 @@ +use std::{ + future::{Future, IntoFuture as _}, + io, + sync::Arc, +}; + +use app_state::AppState; +use audit_logs::{ + database::{AuditDatabaseConfig, AuditDatabaseContext, AuditDatabaseContextError}, + AuditLogsStream, AuditLogsStreamError, +}; +use naxum::{ + extract::MatchedSubject, + handler::Handler as _, + middleware::{ + ack::AckLayer, + matched_subject::{ForSubject, MatchedSubjectLayer}, + trace::TraceLayer, + }, + response::{IntoResponse, Response}, + MessageHead, ServiceBuilder, ServiceExt as _, TowerServiceExt as _, +}; +use si_data_nats::{ + async_nats::{ + self, + error::Error as AsyncNatsError, + jetstream::{consumer::StreamErrorKind, stream::ConsumerErrorKind}, + }, + jetstream::Context, + ConnectionMetadata, +}; +use telemetry::prelude::*; +use thiserror::Error; +use tokio_util::sync::CancellationToken; + +mod app_state; +mod handlers; + +#[derive(Debug, Error)] +pub enum AuditLogsAppSetupError { + #[error("async nats consumer error: {0}")] + AsyncNatsConsumer(#[from] AsyncNatsError), + #[error("async nats stream error: {0}")] + AsyncNatsStream(#[from] AsyncNatsError), + #[error("audit database context error: {0}")] + AuditDatabaseContext(#[from] AuditDatabaseContextError), + #[error("audit logs stream error: {0}")] + AuditLogsStream(#[from] AuditLogsStreamError), +} + +type Result = std::result::Result; + +/// Builds a naxum app for audit logs. Note that despite having an ack layer, all audit logs remain on the stream when +/// processed. This is because the audit logs stream is limits-based and is not a work queue. Sneaky! +#[instrument( + name = "forklift.init.app.audit_logs.build_and_run", + level = "debug", + skip_all +)] +pub(crate) async fn build_and_run( + jetstream_context: Context, + durable_consumer_name: String, + connection_metadata: Arc, + concurrency_limit: usize, + audit_database_config: &AuditDatabaseConfig, + token: CancellationToken, +) -> Result> + Unpin + Send>> { + let incoming = { + let stream = AuditLogsStream::get_or_create(jetstream_context).await?; + let consumer_subject = stream.consuming_subject_for_all_workspaces(); + stream + .stream() + .await? + .create_consumer(async_nats::jetstream::consumer::pull::Config { + durable_name: Some(durable_consumer_name), + filter_subject: consumer_subject.into_string(), + ..Default::default() + }) + .await? + .messages() + .await? + }; + + let context = AuditDatabaseContext::from_config(audit_database_config).await?; + let state = AppState::new(context, connection_metadata.subject_prefix().is_some()); + + // NOTE(nick,fletcher): the "NatsMakeSpan" builder defaults to "info" level logging. Bump it down, if needed. + let app = ServiceBuilder::new() + .layer( + MatchedSubjectLayer::new().for_subject(ForkliftAuditLogsForSubject::with_prefix( + connection_metadata.subject_prefix(), + )), + ) + .layer( + TraceLayer::new() + .make_span_with(telemetry_nats::NatsMakeSpan::builder(connection_metadata).build()) + .on_response(telemetry_nats::NatsOnResponse::new()), + ) + .layer(AckLayer::new()) + .service(handlers::default.with_state(state)) + .map_response(Response::into_response); + + let inner = + naxum::serve_with_incoming_limit(incoming, app.into_make_service(), concurrency_limit) + .with_graceful_shutdown(naxum::wait_on_cancelled(token)); + + Ok(Box::new(inner.into_future())) +} + +#[derive(Clone, Debug)] +struct ForkliftAuditLogsForSubject { + prefix: Option<()>, +} + +impl ForkliftAuditLogsForSubject { + fn with_prefix(prefix: Option<&str>) -> Self { + Self { + prefix: prefix.map(|_p| ()), + } + } +} + +impl ForSubject for ForkliftAuditLogsForSubject +where + R: MessageHead, +{ + fn call(&mut self, req: &mut naxum::Message) { + let mut parts = req.subject().split('.'); + + match self.prefix { + Some(_) => { + if let (Some(prefix), Some(p1), Some(p2), Some(_workspace_id), None) = ( + parts.next(), + parts.next(), + parts.next(), + parts.next(), + parts.next(), + ) { + let matched = format!("{prefix}.{p1}.{p2}.:workspace_id"); + req.extensions_mut().insert(MatchedSubject::from(matched)); + }; + } + None => { + if let (Some(p1), Some(p2), Some(_workspace_id), None) = + (parts.next(), parts.next(), parts.next(), parts.next()) + { + let matched = format!("{p1}.{p2}.:workspace_id"); + req.extensions_mut().insert(MatchedSubject::from(matched)); + }; + } + } + } +} diff --git a/lib/forklift-server/src/server/app/audit_logs/app_state.rs b/lib/forklift-server/src/server/app/audit_logs/app_state.rs new file mode 100644 index 0000000000..312b2e936f --- /dev/null +++ b/lib/forklift-server/src/server/app/audit_logs/app_state.rs @@ -0,0 +1,25 @@ +use audit_logs::database::AuditDatabaseContext; + +// NOTE(nick): we need an app state for all naxum apps at the time of writing, even if they are unused. +#[derive(Debug, Clone)] +pub(crate) struct AppState { + context: AuditDatabaseContext, + using_prefix: bool, +} + +impl AppState { + pub(crate) fn new(context: AuditDatabaseContext, using_prefix: bool) -> Self { + Self { + context, + using_prefix, + } + } + + pub(crate) fn context(&self) -> &AuditDatabaseContext { + &self.context + } + + pub(crate) fn using_prefix(&self) -> bool { + self.using_prefix + } +} diff --git a/lib/forklift-server/src/server/app/audit_logs/handlers.rs b/lib/forklift-server/src/server/app/audit_logs/handlers.rs new file mode 100644 index 0000000000..5567196571 --- /dev/null +++ b/lib/forklift-server/src/server/app/audit_logs/handlers.rs @@ -0,0 +1,83 @@ +use std::str::FromStr; + +use audit_logs::database::AuditDatabaseError; +use naxum::{ + extract::State, + response::{IntoResponse, Response}, + Json, +}; +use si_data_nats::Subject; +use si_events::{audit_log::AuditLog, WorkspacePk}; +use telemetry::prelude::*; +use thiserror::Error; + +use super::app_state::AppState; + +#[remain::sorted] +#[derive(Debug, Error)] +pub(crate) enum HandlerError { + #[error("audit database error: {0}")] + AuditDatabase(#[from] AuditDatabaseError), + #[error("serde json error: {0}")] + SerdeJson(#[from] serde_json::Error), + #[error("ulid decode error: {0}")] + UlidDecode(#[from] ulid::DecodeError), + #[error("unexpected subject shape: {0}")] + UnexpectedSubjectShape(Subject), +} + +type Result = std::result::Result; + +impl IntoResponse for HandlerError { + fn into_response(self) -> Response { + error!(si.error.message = ?self, "failed to process message"); + Response::default_internal_server_error() + } +} + +pub(crate) async fn default( + State(state): State, + subject: Subject, + Json(audit_log): Json, +) -> Result<()> { + // Hitting an error when finding the workspace id should be impossible as we match the subject using middleware + // before we get here. + let workspace_id = find_workspace_id(subject, state.using_prefix())?; + + match audit_log { + AuditLog::V1(inner) => { + audit_logs::database::insert( + state.context(), + workspace_id, + inner.kind, + inner.timestamp, + inner.change_set_id, + inner.actor, + Some(inner.entity_name), + ) + .await?; + } + } + Ok(()) +} + +// NOTE(nick,fletcher): we may be able to remove this if we store the workspace id on the audit log object itself, and +// we have a plan for old messages. +fn find_workspace_id(subject: Subject, using_prefix: bool) -> Result { + let mut parts = subject.split('.'); + if using_prefix { + if let (Some(_prefix), Some(_p1), Some(_p2), Some(workspace_id)) = + (parts.next(), parts.next(), parts.next(), parts.next()) + { + Ok(WorkspacePk::from_str(workspace_id)?) + } else { + Err(HandlerError::UnexpectedSubjectShape(subject)) + } + } else if let (Some(_p1), Some(_p2), Some(workspace_id)) = + (parts.next(), parts.next(), parts.next()) + { + Ok(WorkspacePk::from_str(workspace_id)?) + } else { + Err(HandlerError::UnexpectedSubjectShape(subject)) + } +} diff --git a/lib/forklift-server/src/server/app/billing_events.rs b/lib/forklift-server/src/server/app/billing_events.rs new file mode 100644 index 0000000000..970164b2fa --- /dev/null +++ b/lib/forklift-server/src/server/app/billing_events.rs @@ -0,0 +1,217 @@ +use std::{ + future::{Future, IntoFuture as _}, + io, + sync::Arc, +}; + +use app_state::{AppState, NoopAppState}; +use billing_events::{BillingEventsError, BillingEventsWorkQueue}; +use data_warehouse_stream_client::DataWarehouseStreamClient; +use naxum::{ + extract::MatchedSubject, + handler::Handler as _, + middleware::{ + ack::AckLayer, + matched_subject::{ForSubject, MatchedSubjectLayer}, + trace::TraceLayer, + }, + response::{IntoResponse, Response}, + MessageHead, ServiceBuilder, ServiceExt as _, TowerServiceExt as _, +}; +use si_data_nats::{ + async_nats::{ + self, + error::Error as AsyncNatsError, + jetstream::{ + consumer::{pull::Stream, StreamErrorKind}, + stream::ConsumerErrorKind, + }, + }, + jetstream::Context, + ConnectionMetadata, +}; +use telemetry::prelude::*; +use thiserror::Error; +use tokio_util::sync::CancellationToken; + +mod app_state; +mod handlers; + +#[derive(Debug, Error)] +pub enum BillingEventsAppSetupError { + #[error("async nats consumer error: {0}")] + AsyncNatsConsumer(#[from] AsyncNatsError), + #[error("async nats stream error: {0}")] + AsyncNatsStream(#[from] AsyncNatsError), + #[error("billing events error: {0}")] + BillingEvents(#[from] BillingEventsError), +} + +type Result = std::result::Result; + +#[instrument( + name = "forklift.init.app.billing_events.build_and_run", + level = "debug", + skip_all +)] +pub(crate) async fn build_and_run( + jetstream_context: Context, + durable_consumer_name: String, + connection_metadata: Arc, + concurrency_limit: usize, + data_warehouse_stream_name: Option<&str>, + token: CancellationToken, +) -> Result> + Unpin + Send>> { + let incoming = { + let queue = BillingEventsWorkQueue::get_or_create(jetstream_context).await?; + let consumer_subject = queue.workspace_update_subject("*"); + queue + .stream() + .await? + .create_consumer(async_nats::jetstream::consumer::pull::Config { + durable_name: Some(durable_consumer_name), + filter_subject: consumer_subject, + ..Default::default() + }) + .await? + .messages() + .await? + }; + + let inner = match data_warehouse_stream_name { + Some(stream_name) => { + info!(%stream_name, "creating billing events app in data warehouse stream delivery mode..."); + let client = DataWarehouseStreamClient::new(stream_name).await; + let state = AppState::new(client); + build_app( + state, + connection_metadata, + incoming, + concurrency_limit, + token.clone(), + )? + } + None => { + info!("creating billing events app in no-op mode..."); + let state = NoopAppState::new(); + build_noop_app( + state, + connection_metadata, + incoming, + concurrency_limit, + token.clone(), + )? + } + }; + + Ok(inner) +} + +#[instrument( + name = "forklift.init.app.billing_events.build_and_run.build_app", + level = "debug", + skip_all +)] +fn build_app( + state: AppState, + connection_metadata: Arc, + incoming: Stream, + concurrency_limit: usize, + token: CancellationToken, +) -> Result> + Unpin + Send>> { + let app = ServiceBuilder::new() + .layer(MatchedSubjectLayer::new().for_subject( + ForkliftBillingEventsForSubject::with_prefix(connection_metadata.subject_prefix()), + )) + .layer( + TraceLayer::new() + .make_span_with(telemetry_nats::NatsMakeSpan::builder(connection_metadata).build()) + .on_response(telemetry_nats::NatsOnResponse::new()), + ) + .layer(AckLayer::new()) + .service(handlers::process_request.with_state(state)) + .map_response(Response::into_response); + + let inner = + naxum::serve_with_incoming_limit(incoming, app.into_make_service(), concurrency_limit) + .with_graceful_shutdown(naxum::wait_on_cancelled(token)); + + Ok(Box::new(inner.into_future())) +} + +#[instrument( + name = "forklift.init.app.billing_events.build_and_run.build_noop_app", + level = "debug", + skip_all +)] +fn build_noop_app( + state: NoopAppState, + connection_metadata: Arc, + incoming: Stream, + concurrency_limit: usize, + token: CancellationToken, +) -> Result> + Unpin + Send>> { + let app = ServiceBuilder::new() + .layer(MatchedSubjectLayer::new().for_subject( + ForkliftBillingEventsForSubject::with_prefix(connection_metadata.subject_prefix()), + )) + .layer( + TraceLayer::new() + .make_span_with(telemetry_nats::NatsMakeSpan::builder(connection_metadata).build()) + .on_response(telemetry_nats::NatsOnResponse::new()), + ) + .layer(AckLayer::new()) + .service(handlers::process_request_noop.with_state(state)) + .map_response(Response::into_response); + + let inner = + naxum::serve_with_incoming_limit(incoming, app.into_make_service(), concurrency_limit) + .with_graceful_shutdown(naxum::wait_on_cancelled(token)); + + Ok(Box::new(inner.into_future())) +} + +#[derive(Clone, Debug)] +struct ForkliftBillingEventsForSubject { + prefix: Option<()>, +} + +impl ForkliftBillingEventsForSubject { + fn with_prefix(prefix: Option<&str>) -> Self { + Self { + prefix: prefix.map(|_p| ()), + } + } +} + +impl ForSubject for ForkliftBillingEventsForSubject +where + R: MessageHead, +{ + fn call(&mut self, req: &mut naxum::Message) { + let mut parts = req.subject().split('.'); + + match self.prefix { + Some(_) => { + if let (Some(prefix), Some(p1), Some(p2), Some(_workspace_id), None) = ( + parts.next(), + parts.next(), + parts.next(), + parts.next(), + parts.next(), + ) { + let matched = format!("{prefix}.{p1}.{p2}.:workspace_id"); + req.extensions_mut().insert(MatchedSubject::from(matched)); + }; + } + None => { + if let (Some(p1), Some(p2), Some(_workspace_id), None) = + (parts.next(), parts.next(), parts.next(), parts.next()) + { + let matched = format!("{p1}.{p2}.:workspace_id"); + req.extensions_mut().insert(MatchedSubject::from(matched)); + }; + } + } + } +} diff --git a/lib/forklift-server/src/app_state.rs b/lib/forklift-server/src/server/app/billing_events/app_state.rs similarity index 100% rename from lib/forklift-server/src/app_state.rs rename to lib/forklift-server/src/server/app/billing_events/app_state.rs diff --git a/lib/forklift-server/src/handlers.rs b/lib/forklift-server/src/server/app/billing_events/handlers.rs similarity index 96% rename from lib/forklift-server/src/handlers.rs rename to lib/forklift-server/src/server/app/billing_events/handlers.rs index e262f1d548..98c9c26408 100644 --- a/lib/forklift-server/src/handlers.rs +++ b/lib/forklift-server/src/server/app/billing_events/handlers.rs @@ -9,7 +9,7 @@ use si_data_nats::Subject; use telemetry::prelude::*; use thiserror::Error; -use crate::{app_state::AppState, app_state::NoopAppState}; +use super::app_state::{AppState, NoopAppState}; #[remain::sorted] #[derive(Debug, Error)] diff --git a/lib/sdf-server/src/config.rs b/lib/sdf-server/src/config.rs index d13e679160..7d7502f951 100644 --- a/lib/sdf-server/src/config.rs +++ b/lib/sdf-server/src/config.rs @@ -1,5 +1,5 @@ use asset_sprayer::config::{AssetSprayerConfig, SIOpenAIConfig}; -use audit_logs::pg::AuditDatabaseConfig; +use audit_logs::database::AuditDatabaseConfig; use dal::jwt_key::JwtConfig; use serde_with::{DeserializeFromStr, SerializeDisplay}; use si_crypto::VeritechCryptoConfig; @@ -437,7 +437,7 @@ fn default_layer_db_config() -> LayerDbConfig { } #[allow(clippy::disallowed_methods)] // Used to determine if running in development -pub fn detect_and_configure_development(config: &mut ConfigFile) -> Result<()> { +fn detect_and_configure_development(config: &mut ConfigFile) -> Result<()> { if env::var("BUCK_RUN_BUILD_ID").is_ok() || env::var("BUCK_BUILD_ID").is_ok() { buck2_development(config) } else if let Ok(dir) = env::var("CARGO_MANIFEST_DIR") { @@ -512,7 +512,7 @@ fn buck2_development(config: &mut ConfigFile) -> Result<()> { config.layer_db_config.pg_pool_config.dbname = si_layer_cache::pg::DBNAME.to_string(); config.spicedb.enabled = true; config.audit.pg.certificate_path = Some(postgres_cert.clone().try_into()?); - config.audit.pg.dbname = audit_logs::pg::DBNAME.to_string(); + config.audit.pg.dbname = audit_logs::database::DBNAME.to_string(); Ok(()) } @@ -574,7 +574,7 @@ fn cargo_development(dir: String, config: &mut ConfigFile) -> Result<()> { config.pkgs_path = pkgs_path; config.spicedb.enabled = true; config.audit.pg.certificate_path = Some(postgres_cert.clone().try_into()?); - config.audit.pg.dbname = audit_logs::pg::DBNAME.to_string(); + config.audit.pg.dbname = audit_logs::database::DBNAME.to_string(); Ok(()) } diff --git a/lib/sdf-server/src/lib.rs b/lib/sdf-server/src/lib.rs index 3d74821975..f2b2c22cc6 100644 --- a/lib/sdf-server/src/lib.rs +++ b/lib/sdf-server/src/lib.rs @@ -14,7 +14,7 @@ use std::io; -use audit_logs::pg::AuditDatabaseContextError; +use audit_logs::database::AuditDatabaseContextError; use si_data_spicedb::SpiceDbError; use thiserror::Error; @@ -38,9 +38,8 @@ pub use self::{ app::AxumApp, app_state::ApplicationRuntimeMode, config::{ - detect_and_configure_development, Config, ConfigBuilder, ConfigError, ConfigFile, - IncomingStream, MigrationMode, StandardConfig, StandardConfigFile, WorkspacePermissions, - WorkspacePermissionsMode, + Config, ConfigBuilder, ConfigError, ConfigFile, IncomingStream, MigrationMode, + StandardConfig, StandardConfigFile, WorkspacePermissions, WorkspacePermissionsMode, }, migrations::Migrator, nats_multiplexer::CRDT_MULTIPLEXER_SUBJECT, diff --git a/lib/sdf-server/src/migrations.rs b/lib/sdf-server/src/migrations.rs index cf2945472f..58e7317210 100644 --- a/lib/sdf-server/src/migrations.rs +++ b/lib/sdf-server/src/migrations.rs @@ -1,6 +1,6 @@ use std::{future::IntoFuture as _, time::Duration}; -use audit_logs::pg::{ +use audit_logs::database::{ AuditDatabaseContext, AuditDatabaseContextError, AuditDatabaseMigrationError, }; use dal::{ @@ -146,7 +146,7 @@ impl Migrator { #[instrument(name = "sdf.migrator.migrate_audit_database", level = "info", skip_all)] async fn migrate_audit_database(&self) -> MigratorResult<()> { - audit_logs::pg::migrate(&self.audit_database_context) + audit_logs::database::migrate(&self.audit_database_context) .await .map_err(MigratorError::MigrateAuditDatabase) } diff --git a/lib/sdf-server/src/server.rs b/lib/sdf-server/src/server.rs index 2a37032557..a27cd610a3 100644 --- a/lib/sdf-server/src/server.rs +++ b/lib/sdf-server/src/server.rs @@ -1,7 +1,7 @@ use std::{fmt, future::IntoFuture as _, net::SocketAddr, path::PathBuf, sync::Arc}; use asset_sprayer::AssetSprayer; -use audit_logs::pg::AuditDatabaseContext; +use audit_logs::database::AuditDatabaseContext; use axum::{async_trait, routing::IntoMakeService, Router}; use dal::{JwtPublicSigningKey, ServicesContext}; use hyper::server::accept::Accept; diff --git a/lib/si-events-rs/src/audit_log.rs b/lib/si-events-rs/src/audit_log.rs index 0af7b2cca7..626d432934 100644 --- a/lib/si-events-rs/src/audit_log.rs +++ b/lib/si-events-rs/src/audit_log.rs @@ -1,13 +1,14 @@ use chrono::Utc; use serde::{Deserialize, Serialize}; -use v1::{AuditLogKindV1, AuditLogV1}; +use v1::{AuditLogKindV1, AuditLogMetadataV1, AuditLogV1}; use crate::{Actor, ChangeSetId}; mod v1; pub type AuditLogKind = AuditLogKindV1; +pub type AuditLogMetadata = AuditLogMetadataV1; // TODO(nick): switch to something like "naxum-api-types" crate to avoid sizing issues. #[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] diff --git a/lib/si-events-rs/src/audit_log/v1.rs b/lib/si-events-rs/src/audit_log/v1.rs index 938e1c3c0e..87cceafbc2 100644 --- a/lib/si-events-rs/src/audit_log/v1.rs +++ b/lib/si-events-rs/src/audit_log/v1.rs @@ -7,6 +7,10 @@ use crate::{ SecretId, WorkspacePk, }; +type MetadataDiscrim = AuditLogMetadataV1Discriminants; +type Kind = AuditLogKindV1; +type Metadata = AuditLogMetadataV1; + #[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Eq)] pub struct AuditLogV1 { pub actor: Actor, @@ -190,3 +194,511 @@ pub enum AuditLogKindV1 { from_status: ChangeSetStatus, }, } + +/// This is an identical copy of latest [`AuditLogKind`], but uses "serde untagged" wrapper. This is used for inserting +/// and reading from the "metadata" column in the table as well as for additional columns. +/// +/// Reference: https://serde.rs/enum-representations.html#untagged +/// +/// _Note:_ there are multiple uses of renaming to camel case are related to this: https://github.com/serde-rs/serde/issues/1560 +#[remain::sorted] +#[derive(Debug, Deserialize, Serialize, EnumDiscriminants)] +#[serde(untagged, rename_all = "camelCase")] +pub enum AuditLogMetadataV1 { + #[serde(rename_all = "camelCase")] + AbandonChangeSet { from_status: ChangeSetStatus }, + #[serde(rename_all = "camelCase")] + AddAction { + prototype_id: ActionPrototypeId, + action_kind: ActionKind, + func_id: FuncId, + func_display_name: Option, + func_name: String, + }, + #[serde(rename_all = "camelCase")] + ApplyChangeSet, + #[serde(rename_all = "camelCase")] + ApproveChangeSetApply { from_status: ChangeSetStatus }, + #[serde(rename_all = "camelCase")] + CancelAction { + prototype_id: ActionPrototypeId, + action_kind: ActionKind, + func_id: FuncId, + func_display_name: Option, + func_name: String, + }, + + #[serde(rename_all = "camelCase")] + CreateChangeSet, + #[serde(rename_all = "camelCase")] + CreateComponent { + name: String, + component_id: ComponentId, + schema_variant_id: SchemaVariantId, + schema_variant_name: String, + }, + #[serde(rename_all = "camelCase")] + CreateSecret { name: String, secret_id: SecretId }, + #[serde(rename_all = "camelCase")] + DeleteComponent { + name: String, + component_id: ComponentId, + schema_variant_id: SchemaVariantId, + schema_variant_name: String, + }, + #[serde(rename_all = "camelCase")] + DeleteSecret { name: String, secret_id: SecretId }, + #[serde(rename_all = "camelCase")] + ExportWorkspace { + id: WorkspacePk, + name: String, + version: String, + }, + #[serde(rename_all = "camelCase")] + InstallWorkspace { + id: WorkspacePk, + name: String, + version: String, + }, + #[serde(rename_all = "camelCase")] + Login, + #[serde(rename_all = "camelCase")] + PutActionOnHold { + prototype_id: ActionPrototypeId, + action_kind: ActionKind, + func_id: FuncId, + func_display_name: Option, + func_name: String, + }, + #[serde(rename_all = "camelCase")] + RejectChangeSetApply { from_status: ChangeSetStatus }, + #[serde(rename_all = "camelCase")] + ReopenChangeSet { from_status: ChangeSetStatus }, + #[serde(rename_all = "camelCase")] + RequestChangeSetApproval { from_status: ChangeSetStatus }, + #[serde(rename_all = "camelCase")] + RetryAction { + prototype_id: ActionPrototypeId, + action_kind: ActionKind, + func_id: FuncId, + func_display_name: Option, + func_name: String, + }, + #[serde(rename_all = "camelCase")] + RunAction { + prototype_id: ActionPrototypeId, + action_kind: ActionKind, + func_id: FuncId, + func_display_name: Option, + func_name: String, + run_status: bool, + }, + #[serde(rename_all = "camelCase")] + UpdateDependentInputSocket { + input_socket_id: InputSocketId, + input_socket_name: String, + attribute_value_id: AttributeValueId, + input_attribute_value_ids: Vec, + func_id: FuncId, + func_display_name: Option, + func_name: String, + component_id: ComponentId, + component_name: String, + schema_variant_id: SchemaVariantId, + schema_variant_display_name: String, + before_value: Option, + after_value: Option, + }, + #[serde(rename_all = "camelCase")] + UpdateDependentOutputSocket { + output_socket_id: OutputSocketId, + output_socket_name: String, + attribute_value_id: AttributeValueId, + input_attribute_value_ids: Vec, + func_id: FuncId, + func_display_name: Option, + func_name: String, + component_id: ComponentId, + component_name: String, + schema_variant_id: SchemaVariantId, + schema_variant_display_name: String, + before_value: Option, + after_value: Option, + }, + #[serde(rename_all = "camelCase")] + UpdateDependentProperty { + prop_id: PropId, + prop_name: String, + attribute_value_id: AttributeValueId, + input_attribute_value_ids: Vec, + func_id: FuncId, + func_display_name: Option, + func_name: String, + component_id: ComponentId, + component_name: String, + schema_variant_id: SchemaVariantId, + schema_variant_display_name: String, + before_value: Option, + after_value: Option, + }, + #[serde(rename_all = "camelCase")] + UpdatePropertyEditorValue { + component_id: ComponentId, + component_name: String, + schema_variant_id: SchemaVariantId, + schema_variant_display_name: String, + prop_id: PropId, + prop_name: String, + attribute_value_id: AttributeValueId, + before_value: Option, + after_value: Option, + }, + #[serde(rename_all = "camelCase")] + UpdatePropertyEditorValueForSecret { + component_id: ComponentId, + component_name: String, + schema_variant_id: SchemaVariantId, + schema_variant_display_name: String, + prop_id: PropId, + prop_name: String, + attribute_value_id: AttributeValueId, + before_secret_name: Option, + before_secret_id: Option, + after_secret_name: Option, + after_secret_id: Option, + }, + #[serde(rename_all = "camelCase")] + UpdateSecret { name: String, secret_id: SecretId }, + #[serde(rename_all = "camelCase")] + UpgradeComponent { + name: String, + component_id: ComponentId, + schema_id: SchemaId, + new_schema_variant_id: SchemaVariantId, + new_schema_variant_name: String, + old_schema_variant_id: SchemaVariantId, + old_schema_variant_name: String, + }, + #[serde(rename_all = "camelCase")] + WithdrawRequestForChangeSetApply { from_status: ChangeSetStatus }, +} + +impl AuditLogMetadataV1 { + pub fn title_and_entity_type(&self) -> (&'static str, Option<&'static str>) { + // Please keep this in alphabetical order! + // #[remain::sorted] // NOTE(nick): this is not yet stable + match self.into() { + MetadataDiscrim::AbandonChangeSet => ("Abandoned", Some("Change Set")), + MetadataDiscrim::AddAction => ("Enqueued", Some("Action")), + MetadataDiscrim::ApplyChangeSet => ("Applied", Some("Change Set")), + MetadataDiscrim::ApproveChangeSetApply => { + ("Approved Request to Apply", Some("Change Set")) + } + MetadataDiscrim::CancelAction => ("Removed", Some("Action")), + MetadataDiscrim::CreateChangeSet => ("Created", Some("Change Set")), + MetadataDiscrim::CreateComponent => ("Created", Some("Component")), + MetadataDiscrim::CreateSecret => ("Created", Some("Secret")), + MetadataDiscrim::DeleteComponent => ("Deleted", Some("Component")), + MetadataDiscrim::DeleteSecret => ("Deleted", Some("Secret")), + MetadataDiscrim::ExportWorkspace => ("Exported", Some("Workspace")), + MetadataDiscrim::InstallWorkspace => ("Installed", Some("Workspace")), + MetadataDiscrim::Login => ("Authenticated", None), + MetadataDiscrim::PutActionOnHold => ("Paused", Some("Action")), + MetadataDiscrim::RejectChangeSetApply => { + ("Rejected Request to Apply", Some("Change Set")) + } + MetadataDiscrim::ReopenChangeSet => ("Reopened", Some("Change Set")), + MetadataDiscrim::RequestChangeSetApproval => ("Requested to Apply", Some("Change Set")), + MetadataDiscrim::RetryAction => ("Retried", Some("Action")), + MetadataDiscrim::RunAction => ("Ran", Some("Action")), + MetadataDiscrim::UpdateDependentInputSocket => ("Set Dependent", Some("Input Socket")), + MetadataDiscrim::UpdateDependentOutputSocket => { + ("Set Dependent", Some("Output Socket")) + } + MetadataDiscrim::UpdateDependentProperty => ("Set Dependent", Some("Property")), + MetadataDiscrim::UpdatePropertyEditorValue => ("Updated Component", Some("Property")), + MetadataDiscrim::UpdatePropertyEditorValueForSecret => { + ("Updated Component", Some("Property for Secret")) + } + MetadataDiscrim::UpdateSecret => ("Updated", Some("Secret")), + MetadataDiscrim::UpgradeComponent => ("Upgraded", Some("Component")), + MetadataDiscrim::WithdrawRequestForChangeSetApply => { + ("Withdrew Request to Apply", Some("Change Set")) + } + } + } +} + +impl From for Metadata { + fn from(value: Kind) -> Self { + // Please keep this in alphabetical order! + // #[remain::sorted] // NOTE(nick): this is not yet stable + match value { + Kind::AbandonChangeSet { from_status } => Self::AbandonChangeSet { from_status }, + Kind::AddAction { + prototype_id, + action_kind, + func_id, + func_display_name, + func_name, + } => Self::AddAction { + prototype_id, + action_kind, + func_id, + func_display_name, + func_name, + }, + Kind::ApplyChangeSet => Self::ApplyChangeSet, + Kind::ApproveChangeSetApply { from_status } => { + Self::ApproveChangeSetApply { from_status } + } + Kind::CancelAction { + prototype_id, + action_kind, + func_id, + func_display_name, + func_name, + } => Self::CancelAction { + prototype_id, + action_kind, + func_id, + func_display_name, + func_name, + }, + Kind::CreateChangeSet => Self::CreateChangeSet, + Kind::CreateComponent { + name, + component_id, + schema_variant_id, + schema_variant_name, + } => Self::CreateComponent { + name, + component_id, + schema_variant_id, + schema_variant_name, + }, + Kind::CreateSecret { name, secret_id } => Self::CreateSecret { name, secret_id }, + Kind::DeleteComponent { + name, + component_id, + schema_variant_id, + schema_variant_name, + } => Self::DeleteComponent { + name, + component_id, + schema_variant_id, + schema_variant_name, + }, + Kind::DeleteSecret { name, secret_id } => Self::DeleteSecret { name, secret_id }, + Kind::ExportWorkspace { id, name, version } => { + Self::ExportWorkspace { id, name, version } + } + Kind::InstallWorkspace { id, name, version } => { + Self::InstallWorkspace { id, name, version } + } + Kind::Login => Self::Login, + Kind::PutActionOnHold { + prototype_id, + action_kind, + func_id, + func_display_name, + func_name, + } => Self::PutActionOnHold { + prototype_id, + action_kind, + func_id, + func_display_name, + func_name, + }, + Kind::RejectChangeSetApply { from_status } => { + Self::RejectChangeSetApply { from_status } + } + Kind::ReopenChangeSet { from_status } => Self::ReopenChangeSet { from_status }, + Kind::RequestChangeSetApproval { from_status } => { + Self::RequestChangeSetApproval { from_status } + } + Kind::RetryAction { + prototype_id, + action_kind, + func_id, + func_display_name, + func_name, + } => Self::RetryAction { + prototype_id, + action_kind, + func_id, + func_display_name, + func_name, + }, + Kind::RunAction { + prototype_id, + action_kind, + func_id, + func_display_name, + func_name, + run_status, + } => Self::RunAction { + prototype_id, + action_kind, + func_id, + func_display_name, + func_name, + run_status, + }, + Kind::UpdateDependentInputSocket { + input_socket_id, + input_socket_name, + attribute_value_id, + input_attribute_value_ids, + func_id, + func_display_name, + func_name, + component_id, + component_name, + schema_variant_id, + schema_variant_display_name, + before_value, + after_value, + } => Self::UpdateDependentInputSocket { + input_socket_id, + input_socket_name, + attribute_value_id, + input_attribute_value_ids, + func_id, + func_display_name, + func_name, + component_id, + component_name, + schema_variant_id, + schema_variant_display_name, + before_value, + after_value, + }, + Kind::UpdateDependentOutputSocket { + output_socket_id, + output_socket_name, + attribute_value_id, + input_attribute_value_ids, + func_id, + func_display_name, + func_name, + component_id, + component_name, + schema_variant_id, + schema_variant_display_name, + before_value, + after_value, + } => Self::UpdateDependentOutputSocket { + output_socket_id, + output_socket_name, + attribute_value_id, + input_attribute_value_ids, + func_id, + func_display_name, + func_name, + component_id, + component_name, + schema_variant_id, + schema_variant_display_name, + before_value, + after_value, + }, + Kind::UpdateDependentProperty { + prop_id, + prop_name, + attribute_value_id, + input_attribute_value_ids, + func_id, + func_display_name, + func_name, + component_id, + component_name, + schema_variant_id, + schema_variant_display_name, + before_value, + after_value, + } => Self::UpdateDependentProperty { + prop_id, + prop_name, + attribute_value_id, + input_attribute_value_ids, + func_id, + func_display_name, + func_name, + component_id, + component_name, + schema_variant_id, + schema_variant_display_name, + before_value, + after_value, + }, + Kind::UpdatePropertyEditorValue { + component_id, + component_name, + schema_variant_id, + schema_variant_display_name, + prop_id, + prop_name, + attribute_value_id, + before_value, + after_value, + } => Self::UpdatePropertyEditorValue { + component_id, + component_name, + schema_variant_id, + schema_variant_display_name, + prop_id, + prop_name, + attribute_value_id, + before_value, + after_value, + }, + Kind::UpdatePropertyEditorValueForSecret { + component_id, + component_name, + schema_variant_id, + schema_variant_display_name, + prop_id, + prop_name, + attribute_value_id, + before_secret_name, + before_secret_id, + after_secret_name, + after_secret_id, + } => Self::UpdatePropertyEditorValueForSecret { + component_id, + component_name, + schema_variant_id, + schema_variant_display_name, + prop_id, + prop_name, + attribute_value_id, + before_secret_name, + before_secret_id, + after_secret_name, + after_secret_id, + }, + Kind::UpdateSecret { name, secret_id } => Self::UpdateSecret { name, secret_id }, + Kind::UpgradeComponent { + name, + component_id, + schema_id, + new_schema_variant_id, + new_schema_variant_name, + old_schema_variant_id, + old_schema_variant_name, + } => Self::UpgradeComponent { + name, + component_id, + schema_id, + new_schema_variant_id, + new_schema_variant_name, + old_schema_variant_id, + old_schema_variant_name, + }, + Kind::WithdrawRequestForChangeSetApply { from_status } => { + Self::WithdrawRequestForChangeSetApply { from_status } + } + } + } +} diff --git a/lib/si-frontend-types-rs/src/audit_log.rs b/lib/si-frontend-types-rs/src/audit_log.rs index 190e642a27..6578c32757 100644 --- a/lib/si-frontend-types-rs/src/audit_log.rs +++ b/lib/si-frontend-types-rs/src/audit_log.rs @@ -1,10 +1,5 @@ use serde::Serialize; -use si_events::{ - audit_log::AuditLogKind, ActionKind, ActionPrototypeId, AttributeValueId, ChangeSetId, - ChangeSetStatus, ComponentId, FuncId, InputSocketId, OutputSocketId, PropId, SchemaId, - SchemaVariantId, SecretId, UserPk, WorkspacePk, -}; -use strum::EnumDiscriminants; +use si_events::{ChangeSetId, UserPk}; #[derive(Debug, Serialize, Clone, PartialEq, Eq)] #[serde(rename_all = "camelCase")] @@ -30,516 +25,7 @@ pub struct AuditLog { pub change_set_id: Option, /// The name of the change set. pub change_set_name: Option, - /// Serialized version of [`AuditLogDeserializedMetadata`], which is an untagged version of the specific - /// [`AuditLogKind`]. + /// Serialized version of [`AuditLogMetadata`](si_events::audit_log::AuditLogMetadata), which is an + /// untagged version of the specific [`AuditLogKind`](si_events::audit_log::AuditLogKind). pub metadata: serde_json::Value, } - -/// This is an identical copy of latest [`AuditLogKind`], but uses "serde untagged" wrapper. -/// -/// Reference: https://serde.rs/enum-representations.html#untagged -/// -/// _Note:_ there are multiple uses of renaming to camel case are related to this: https://github.com/serde-rs/serde/issues/1560 -#[remain::sorted] -#[derive(Debug, Serialize, EnumDiscriminants)] -#[serde(untagged, rename_all = "camelCase")] -pub enum AuditLogDeserializedMetadata { - #[serde(rename_all = "camelCase")] - AbandonChangeSet { from_status: ChangeSetStatus }, - #[serde(rename_all = "camelCase")] - AddAction { - prototype_id: ActionPrototypeId, - action_kind: ActionKind, - func_id: FuncId, - func_display_name: Option, - func_name: String, - }, - #[serde(rename_all = "camelCase")] - ApplyChangeSet, - #[serde(rename_all = "camelCase")] - ApproveChangeSetApply { from_status: ChangeSetStatus }, - #[serde(rename_all = "camelCase")] - CancelAction { - prototype_id: ActionPrototypeId, - action_kind: ActionKind, - func_id: FuncId, - func_display_name: Option, - func_name: String, - }, - - #[serde(rename_all = "camelCase")] - CreateChangeSet, - #[serde(rename_all = "camelCase")] - CreateComponent { - name: String, - component_id: ComponentId, - schema_variant_id: SchemaVariantId, - schema_variant_name: String, - }, - #[serde(rename_all = "camelCase")] - CreateSecret { name: String, secret_id: SecretId }, - #[serde(rename_all = "camelCase")] - DeleteComponent { - name: String, - component_id: ComponentId, - schema_variant_id: SchemaVariantId, - schema_variant_name: String, - }, - #[serde(rename_all = "camelCase")] - DeleteSecret { name: String, secret_id: SecretId }, - #[serde(rename_all = "camelCase")] - ExportWorkspace { - id: WorkspacePk, - name: String, - version: String, - }, - #[serde(rename_all = "camelCase")] - InstallWorkspace { - id: WorkspacePk, - name: String, - version: String, - }, - #[serde(rename_all = "camelCase")] - Login, - #[serde(rename_all = "camelCase")] - PutActionOnHold { - prototype_id: ActionPrototypeId, - action_kind: ActionKind, - func_id: FuncId, - func_display_name: Option, - func_name: String, - }, - #[serde(rename_all = "camelCase")] - RejectChangeSetApply { from_status: ChangeSetStatus }, - #[serde(rename_all = "camelCase")] - ReopenChangeSet { from_status: ChangeSetStatus }, - #[serde(rename_all = "camelCase")] - RequestChangeSetApproval { from_status: ChangeSetStatus }, - #[serde(rename_all = "camelCase")] - RetryAction { - prototype_id: ActionPrototypeId, - action_kind: ActionKind, - func_id: FuncId, - func_display_name: Option, - func_name: String, - }, - #[serde(rename_all = "camelCase")] - RunAction { - prototype_id: ActionPrototypeId, - action_kind: ActionKind, - func_id: FuncId, - func_display_name: Option, - func_name: String, - run_status: bool, - }, - #[serde(rename_all = "camelCase")] - UpdateDependentInputSocket { - input_socket_id: InputSocketId, - input_socket_name: String, - attribute_value_id: AttributeValueId, - input_attribute_value_ids: Vec, - func_id: FuncId, - func_display_name: Option, - func_name: String, - component_id: ComponentId, - component_name: String, - schema_variant_id: SchemaVariantId, - schema_variant_display_name: String, - before_value: Option, - after_value: Option, - }, - #[serde(rename_all = "camelCase")] - UpdateDependentOutputSocket { - output_socket_id: OutputSocketId, - output_socket_name: String, - attribute_value_id: AttributeValueId, - input_attribute_value_ids: Vec, - func_id: FuncId, - func_display_name: Option, - func_name: String, - component_id: ComponentId, - component_name: String, - schema_variant_id: SchemaVariantId, - schema_variant_display_name: String, - before_value: Option, - after_value: Option, - }, - #[serde(rename_all = "camelCase")] - UpdateDependentProperty { - prop_id: PropId, - prop_name: String, - attribute_value_id: AttributeValueId, - input_attribute_value_ids: Vec, - func_id: FuncId, - func_display_name: Option, - func_name: String, - component_id: ComponentId, - component_name: String, - schema_variant_id: SchemaVariantId, - schema_variant_display_name: String, - before_value: Option, - after_value: Option, - }, - #[serde(rename_all = "camelCase")] - UpdatePropertyEditorValue { - component_id: ComponentId, - component_name: String, - schema_variant_id: SchemaVariantId, - schema_variant_display_name: String, - prop_id: PropId, - prop_name: String, - attribute_value_id: AttributeValueId, - before_value: Option, - after_value: Option, - }, - #[serde(rename_all = "camelCase")] - UpdatePropertyEditorValueForSecret { - component_id: ComponentId, - component_name: String, - schema_variant_id: SchemaVariantId, - schema_variant_display_name: String, - prop_id: PropId, - prop_name: String, - attribute_value_id: AttributeValueId, - before_secret_name: Option, - before_secret_id: Option, - after_secret_name: Option, - after_secret_id: Option, - }, - #[serde(rename_all = "camelCase")] - UpdateSecret { name: String, secret_id: SecretId }, - #[serde(rename_all = "camelCase")] - UpgradeComponent { - name: String, - component_id: ComponentId, - schema_id: SchemaId, - new_schema_variant_id: SchemaVariantId, - new_schema_variant_name: String, - old_schema_variant_id: SchemaVariantId, - old_schema_variant_name: String, - }, - #[serde(rename_all = "camelCase")] - WithdrawRequestForChangeSetApply { from_status: ChangeSetStatus }, -} - -impl AuditLogDeserializedMetadata { - pub fn title_and_entity_type(&self) -> (&'static str, &'static str) { - type Kind = AuditLogDeserializedMetadataDiscriminants; - - // Reflect updates to the entity type in "app/web/src/api/sdf/dal/audit_log.ts" and please keep this in - // alphabetical order! - match self.into() { - Kind::AbandonChangeSet => ("Abandoned", "Change Set"), - Kind::AddAction => ("Enqueued", "Action"), - Kind::ApplyChangeSet => ("Applied", "Change Set"), - Kind::ApproveChangeSetApply => ("Approved Request to Apply", "Change Set"), - Kind::CancelAction => ("Removed", "Action"), - Kind::CreateChangeSet => ("Created", "Change Set"), - Kind::CreateComponent => ("Created", "Component"), - Kind::CreateSecret => ("Created", "Secret"), - Kind::DeleteComponent => ("Deleted", "Component"), - Kind::DeleteSecret => ("Deleted", "Secret"), - Kind::ExportWorkspace => ("Exported", "Workspace"), - Kind::InstallWorkspace => ("Installed", "Workspace"), - Kind::Login => ("Authenticated", " "), - Kind::PutActionOnHold => ("Paused", "Action"), - Kind::RejectChangeSetApply => ("Rejected Request to Apply", "Change Set"), - Kind::ReopenChangeSet => ("Reopened", "Change Set"), - Kind::RequestChangeSetApproval => ("Requested to Apply", "Change Set"), - Kind::RetryAction => ("Retried", "Action"), - Kind::RunAction => ("Ran", "Action"), - Kind::UpdateDependentInputSocket => ("Set Dependent", "Input Socket"), - Kind::UpdateDependentOutputSocket => ("Set Dependent", "Output Socket"), - Kind::UpdateDependentProperty => ("Set Dependent", "Property"), - Kind::UpdatePropertyEditorValue => ("Updated Component", "Property"), - Kind::UpdatePropertyEditorValueForSecret => { - ("Updated Component", "Property for Secret") - } - Kind::UpdateSecret => ("Updated", "Secret"), - Kind::UpgradeComponent => ("Upgraded", "Component"), - Kind::WithdrawRequestForChangeSetApply => ("Withdrew Request to Apply", "Change Set"), - } - } -} - -impl From for AuditLogDeserializedMetadata { - fn from(value: AuditLogKind) -> Self { - // Reflect updates to the audit log kind in "app/web/src/api/sdf/dal/audit_log.ts" and please keep this in - // alphabetical order! - match value { - AuditLogKind::AbandonChangeSet { from_status } => { - Self::AbandonChangeSet { from_status } - } - AuditLogKind::AddAction { - prototype_id, - action_kind, - func_id, - func_display_name, - func_name, - } => Self::AddAction { - prototype_id, - action_kind, - func_id, - func_display_name, - func_name, - }, - AuditLogKind::ApplyChangeSet => Self::ApplyChangeSet, - AuditLogKind::ApproveChangeSetApply { from_status } => { - Self::ApproveChangeSetApply { from_status } - } - AuditLogKind::CancelAction { - prototype_id, - action_kind, - func_id, - func_display_name, - func_name, - } => Self::CancelAction { - prototype_id, - action_kind, - func_id, - func_display_name, - func_name, - }, - AuditLogKind::CreateChangeSet => Self::CreateChangeSet, - AuditLogKind::CreateComponent { - name, - component_id, - schema_variant_id, - schema_variant_name, - } => Self::CreateComponent { - name, - component_id, - schema_variant_id, - schema_variant_name, - }, - AuditLogKind::CreateSecret { name, secret_id } => { - Self::CreateSecret { name, secret_id } - } - AuditLogKind::DeleteComponent { - name, - component_id, - schema_variant_id, - schema_variant_name, - } => Self::DeleteComponent { - name, - component_id, - schema_variant_id, - schema_variant_name, - }, - AuditLogKind::DeleteSecret { name, secret_id } => { - Self::DeleteSecret { name, secret_id } - } - AuditLogKind::ExportWorkspace { id, name, version } => { - Self::ExportWorkspace { id, name, version } - } - AuditLogKind::InstallWorkspace { id, name, version } => { - Self::InstallWorkspace { id, name, version } - } - AuditLogKind::Login => Self::Login, - AuditLogKind::PutActionOnHold { - prototype_id, - action_kind, - func_id, - func_display_name, - func_name, - } => Self::PutActionOnHold { - prototype_id, - action_kind, - func_id, - func_display_name, - func_name, - }, - AuditLogKind::RejectChangeSetApply { from_status } => { - Self::RejectChangeSetApply { from_status } - } - AuditLogKind::ReopenChangeSet { from_status } => Self::ReopenChangeSet { from_status }, - AuditLogKind::RequestChangeSetApproval { from_status } => { - Self::RequestChangeSetApproval { from_status } - } - AuditLogKind::RetryAction { - prototype_id, - action_kind, - func_id, - func_display_name, - func_name, - } => Self::RetryAction { - prototype_id, - action_kind, - func_id, - func_display_name, - func_name, - }, - AuditLogKind::RunAction { - prototype_id, - action_kind, - func_id, - func_display_name, - func_name, - run_status, - } => Self::RunAction { - prototype_id, - action_kind, - func_id, - func_display_name, - func_name, - run_status, - }, - AuditLogKind::UpdateDependentInputSocket { - input_socket_id, - input_socket_name, - attribute_value_id, - input_attribute_value_ids, - func_id, - func_display_name, - func_name, - component_id, - component_name, - schema_variant_id, - schema_variant_display_name, - before_value, - after_value, - } => Self::UpdateDependentInputSocket { - input_socket_id, - input_socket_name, - attribute_value_id, - input_attribute_value_ids, - func_id, - func_display_name, - func_name, - component_id, - component_name, - schema_variant_id, - schema_variant_display_name, - before_value, - after_value, - }, - AuditLogKind::UpdateDependentOutputSocket { - output_socket_id, - output_socket_name, - attribute_value_id, - input_attribute_value_ids, - func_id, - func_display_name, - func_name, - component_id, - component_name, - schema_variant_id, - schema_variant_display_name, - before_value, - after_value, - } => Self::UpdateDependentOutputSocket { - output_socket_id, - output_socket_name, - attribute_value_id, - input_attribute_value_ids, - func_id, - func_display_name, - func_name, - component_id, - component_name, - schema_variant_id, - schema_variant_display_name, - before_value, - after_value, - }, - AuditLogKind::UpdateDependentProperty { - prop_id, - prop_name, - attribute_value_id, - input_attribute_value_ids, - func_id, - func_display_name, - func_name, - component_id, - component_name, - schema_variant_id, - schema_variant_display_name, - before_value, - after_value, - } => Self::UpdateDependentProperty { - prop_id, - prop_name, - attribute_value_id, - input_attribute_value_ids, - func_id, - func_display_name, - func_name, - component_id, - component_name, - schema_variant_id, - schema_variant_display_name, - before_value, - after_value, - }, - AuditLogKind::UpdatePropertyEditorValue { - component_id, - component_name, - schema_variant_id, - schema_variant_display_name, - prop_id, - prop_name, - attribute_value_id, - before_value, - after_value, - } => Self::UpdatePropertyEditorValue { - component_id, - component_name, - schema_variant_id, - schema_variant_display_name, - prop_id, - prop_name, - attribute_value_id, - before_value, - after_value, - }, - AuditLogKind::UpdatePropertyEditorValueForSecret { - component_id, - component_name, - schema_variant_id, - schema_variant_display_name, - prop_id, - prop_name, - attribute_value_id, - before_secret_name, - before_secret_id, - after_secret_name, - after_secret_id, - } => Self::UpdatePropertyEditorValueForSecret { - component_id, - component_name, - schema_variant_id, - schema_variant_display_name, - prop_id, - prop_name, - attribute_value_id, - before_secret_name, - before_secret_id, - after_secret_name, - after_secret_id, - }, - AuditLogKind::UpdateSecret { name, secret_id } => { - Self::UpdateSecret { name, secret_id } - } - AuditLogKind::UpgradeComponent { - name, - component_id, - schema_id, - new_schema_variant_id, - new_schema_variant_name, - old_schema_variant_id, - old_schema_variant_name, - } => Self::UpgradeComponent { - name, - component_id, - schema_id, - new_schema_variant_id, - new_schema_variant_name, - old_schema_variant_id, - old_schema_variant_name, - }, - AuditLogKind::WithdrawRequestForChangeSetApply { from_status } => { - Self::WithdrawRequestForChangeSetApply { from_status } - } - } - } -} diff --git a/lib/si-frontend-types-rs/src/lib.rs b/lib/si-frontend-types-rs/src/lib.rs index ace3236595..eaeeb6e6a7 100644 --- a/lib/si-frontend-types-rs/src/lib.rs +++ b/lib/si-frontend-types-rs/src/lib.rs @@ -7,7 +7,7 @@ mod module; mod schema_variant; mod workspace; -pub use crate::audit_log::{AuditLog, AuditLogDeserializedMetadata}; +pub use crate::audit_log::AuditLog; pub use crate::change_set::ChangeSet; pub use crate::component::{ ChangeStatus, ConnectionAnnotation, DiagramComponentView, DiagramSocket,