Skip to content

Commit

Permalink
Merge pull request #4977 from systeminit/nick/eng-2851
Browse files Browse the repository at this point in the history
Add audit database migrations
  • Loading branch information
fnichol authored Nov 16, 2024
2 parents 55062a3 + 744cbc2 commit 1a1cae2
Show file tree
Hide file tree
Showing 18 changed files with 314 additions and 126 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions component/init/configs/service.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,12 @@ url = "$SI_NATS_URL"

[layer_db_config.cache_config]
disk_path = "$SI_LAYER_CACHE_DISK_PATH"

[audit.pg]
user = "si"
password = "$SI_PG_PASSWORD"
dbname = "$SI_AUDIT_DBNAME"
application_name = "$SI_SERVICE"
hostname = "$SI_PG_PROXY_HOST"
port = 5432
pool_max_size = $SI_PG_POOL_SIZE
4 changes: 2 additions & 2 deletions dev/docker-compose.platform.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ services:
- "PGPASSWORD=bugbear"
- "POSTGRES_USER=si"
- "POSTGRES_DB=si"
- "POSTGRES_MULTIPLE_DBS=si_layer_db,si_auth,si_auth_prisma_shadow_db,si_module_index"
- "POSTGRES_MULTIPLE_DBS=si_layer_db,si_auth,si_auth_prisma_shadow_db,si_module_index,si_audit"
ports:
- "7432:5432"
healthcheck:
Expand Down Expand Up @@ -57,7 +57,7 @@ services:
- "PGPASSWORD=bugbear"
- "POSTGRES_USER=si_test"
- "POSTGRES_DB=si_test"
- "POSTGRES_MULTIPLE_DBS=si_test_dal,si_test_sdf_server,si_test_layer_db"
- "POSTGRES_MULTIPLE_DBS=si_test_dal,si_test_sdf_server,si_test_layer_db,si_test_audit"
command:
- "-c"
- "fsync=off"
Expand Down
6 changes: 6 additions & 0 deletions lib/audit-logs/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,21 @@ rust_library(
name = "audit-logs",
deps = [
"//lib/si-data-nats:si-data-nats",
"//lib/si-data-pg:si-data-pg",
"//lib/si-events-rs:si-events",
"//lib/telemetry-nats-rs:telemetry-nats",
"//lib/telemetry-rs:telemetry",
"//third-party/rust:refinery",
"//third-party/rust:remain",
"//third-party/rust:serde",
"//third-party/rust:serde_json",
"//third-party/rust:thiserror",
],
srcs = glob([
"src/**/*.rs",
"src/migrations/*.sql",
]),
env = {
"CARGO_MANIFEST_DIR": ".",
},
)
2 changes: 2 additions & 0 deletions lib/audit-logs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ publish.workspace = true

[dependencies]
si-data-nats = { path = "../../lib/si-data-nats" }
si-data-pg = { path = "../../lib/si-data-pg" }
si-events = { path = "../../lib/si-events-rs" }
telemetry = { path = "../../lib/telemetry-rs" }
telemetry-nats = { path = "../../lib/telemetry-nats-rs" }

refinery = { workspace = true }
remain = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
106 changes: 5 additions & 101 deletions lib/audit-logs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! This crate provides a centralized location for working with the audit logs NATS JetStream stream.
//! This crate provides a centralized location for working with the audit logs domain.
#![warn(
bad_style,
Expand All @@ -25,104 +25,8 @@
while_true
)]

use std::time::Duration;
pub mod pg;
mod stream;

use serde_json::Error;
use si_data_nats::{
async_nats::{
self,
jetstream::{
context::{CreateStreamError, PublishError},
stream::{Config, RetentionPolicy},
},
},
jetstream, Subject,
};
use si_events::{ChangeSetId, WorkspacePk};
use telemetry::prelude::*;
use thiserror::Error;

const STREAM_NAME: &str = "AUDIT_LOGS";
const STREAM_DESCRIPTION: &str = "Audit logs";
const SUBJECT_PREFIX: &str = "audit.log";
const THIRTY_DAYS_IN_SECONDS: u64 = 30 * 24 * 60 * 60;

#[allow(missing_docs)]
#[remain::sorted]
#[derive(Debug, Error)]
pub enum AuditLogsError {
#[error("create stream error: {0}")]
CreateStream(#[from] CreateStreamError),
#[error("publish error: {0}")]
Publish(#[from] PublishError),
#[error("serde json error: {0}")]
SerdeJson(#[from] Error),
}

type Result<T> = std::result::Result<T, AuditLogsError>;

/// A wrapper around the audit logs stream's NATS Jetstream context with helper methods for
/// interacting with the stream.
#[derive(Debug, Clone)]
pub struct AuditLogsStream {
context: jetstream::Context,
}

impl AuditLogsStream {
/// "Gets" or creates the audit logs stream wrapper with an underlying NATS JetStream stream.
pub async fn get_or_create(context: jetstream::Context) -> Result<Self> {
let object = Self { context };
object.stream().await?;
Ok(object)
}

/// "Gets" or creates the NATS JetStream stream for the audit logs stream wrapper.
pub async fn stream(&self) -> Result<async_nats::jetstream::stream::Stream> {
Ok(self
.context
.get_or_create_stream(Config {
name: self.prefixed_stream_name(STREAM_NAME),
description: Some(STREAM_DESCRIPTION.to_string()),
subjects: vec![self.prefixed_subject(SUBJECT_PREFIX, ">")],
retention: RetentionPolicy::Limits,
max_age: Duration::from_secs(THIRTY_DAYS_IN_SECONDS),
..Default::default()
})
.await?)
}

/// Returns the subject for consuming [`AuditLogs`](AuditLog) for the entire workspace.
pub fn publishing_subject_for_workspace(&self, workspace_id: WorkspacePk) -> Subject {
Subject::from(self.prefixed_subject(SUBJECT_PREFIX, &workspace_id.to_string()))
}

/// Returns the subject for consuming [`AuditLogs`](AuditLog) for the entire workspace.
pub fn consuming_subject_for_workspace(&self, workspace_id: WorkspacePk) -> Subject {
Subject::from(self.prefixed_subject(SUBJECT_PREFIX, &format!("{workspace_id}.>")))
}

/// Returns the subject for publishing and consuming [`AuditLogs`](AuditLog) for a given change set.
pub fn subject_for_change_set(
&self,
workspace_id: WorkspacePk,
change_set_id: ChangeSetId,
) -> Subject {
Subject::from(
self.prefixed_subject(SUBJECT_PREFIX, &format!("{workspace_id}.{change_set_id}")),
)
}

fn prefixed_stream_name(&self, stream_name: &str) -> String {
match self.context.metadata().subject_prefix() {
Some(prefix) => format!("{prefix}_{stream_name}"),
None => stream_name.to_owned(),
}
}

fn prefixed_subject(&self, subject: &str, suffix: &str) -> String {
match self.context.metadata().subject_prefix() {
Some(prefix) => format!("{prefix}.{subject}.{suffix}"),
None => format!("{subject}.{suffix}"),
}
}
}
pub use stream::AuditLogsStream;
pub use stream::AuditLogsStreamError;
1 change: 1 addition & 0 deletions lib/audit-logs/src/migrations/U0001__test_migration.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
SELECT 1;
61 changes: 61 additions & 0 deletions lib/audit-logs/src/pg.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
//! Contains functionality for setting up and communicating with the audit database.
use serde::{Deserialize, Serialize};
use si_data_pg::{PgPool, PgPoolConfig, PgPoolError};
use telemetry::prelude::*;
use thiserror::Error;

mod migrate;

pub use migrate::{migrate, AuditDatabaseMigrationError};

/// The name of the audit database.
pub const DBNAME: &str = "si_audit";
const APPLICATION_NAME: &str = "si-audit";

#[allow(missing_docs)]
#[derive(Error, Debug)]
pub enum AuditDatabaseContextError {
#[error("pg pool error: {0}")]
PgPool(#[from] PgPoolError),
}

type Result<T> = std::result::Result<T, AuditDatabaseContextError>;

/// The context used for communicating with and setting up the audit database.
#[allow(missing_debug_implementations)]
#[derive(Clone)]
pub struct AuditDatabaseContext {
pg_pool: PgPool,
}

impl AuditDatabaseContext {
/// Creates an [`AuditDatabaseContext`] from an [`AuditDatabaseConfig`].
#[instrument(level = "info", name = "audit.context.from_config", skip_all)]
pub async fn from_config(config: &AuditDatabaseConfig) -> Result<Self> {
Ok(Self {
pg_pool: PgPool::new(&config.pg).await?,
})
}
}

/// The configuration used for communicating with and setting up the audit database.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AuditDatabaseConfig {
/// The configuration for the PostgreSQL pool.
///
/// _Note:_ this is called "pg" for ease of use with layered load configuration files.
pub pg: PgPoolConfig,
}

impl Default for AuditDatabaseConfig {
fn default() -> Self {
Self {
pg: PgPoolConfig {
dbname: DBNAME.into(),
application_name: APPLICATION_NAME.into(),
..Default::default()
},
}
}
}
34 changes: 34 additions & 0 deletions lib/audit-logs/src/pg/migrate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//! Contains functionality for migrating the audit database.
use si_data_pg::{PgPool, PgPoolError};
use telemetry::prelude::*;
use thiserror::Error;

use super::AuditDatabaseContext;

#[allow(missing_docs)]
#[derive(Error, Debug)]
pub enum AuditDatabaseMigrationError {
#[error("pg pool error: {0}")]
PgPool(#[from] PgPoolError),
}

type Result<T> = std::result::Result<T, AuditDatabaseMigrationError>;

/// Performs migrations for the audit database.
#[instrument(level = "info", name = "audit.init.migrate", skip_all)]
pub async fn migrate(context: &AuditDatabaseContext) -> Result<()> {
migrate_inner(&context.pg_pool).await
}

#[instrument(level = "info", name = "audit.init.migrate.inner", skip_all)]
async fn migrate_inner(pg: &PgPool) -> Result<()> {
pg.migrate(embedded::migrations::runner()).await?;
Ok(())
}

mod embedded {
use refinery::embed_migrations;

embed_migrations!("./src/migrations");
}
Loading

0 comments on commit 1a1cae2

Please sign in to comment.