Skip to content

Commit

Permalink
Merge pull request #5002 from systeminit/nick/eng-2854
Browse files Browse the repository at this point in the history
Write audit logs to database in forklift
  • Loading branch information
nickgerace authored Nov 21, 2024
2 parents 2aa840d + 68a7605 commit e9ef245
Show file tree
Hide file tree
Showing 33 changed files with 1,458 additions and 985 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions bin/forklift/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ rust_binary(
],
srcs = glob(["src/**/*.rs"]),
env = {"CARGO_BIN_NAME": "forklift"},
resources = {
"dev.postgres.root.crt": "//config/keys:dev.postgres.root.crt",
},
)

docker_image(
Expand Down
5 changes: 5 additions & 0 deletions bin/forklift/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ pub(crate) struct Args {
/// The name of the data warehouse stream
#[arg(long)]
pub(crate) data_warehouse_stream_name: Option<String>,

/// Enables the audit logs app
#[arg(long, default_value = "false")]
pub(crate) enable_audit_logs_app: bool,
}

impl TryFrom<Args> for Config {
Expand All @@ -105,6 +109,7 @@ impl TryFrom<Args> for Config {
if let Some(data_warehouse_stream_name) = args.data_warehouse_stream_name {
config_map.set("data_warehouse_stream_name", data_warehouse_stream_name);
}
config_map.set("enable_audit_logs_app", args.enable_audit_logs_app);
})?
.try_into()
}
Expand Down
1 change: 1 addition & 0 deletions lib/audit-logs/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ rust_library(
"//lib/si-events-rs:si-events",
"//lib/telemetry-nats-rs:telemetry-nats",
"//lib/telemetry-rs:telemetry",
"//third-party/rust:chrono",
"//third-party/rust:refinery",
"//third-party/rust:remain",
"//third-party/rust:serde",
Expand Down
1 change: 1 addition & 0 deletions lib/audit-logs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ si-events = { path = "../../lib/si-events-rs" }
telemetry = { path = "../../lib/telemetry-rs" }
telemetry-nats = { path = "../../lib/telemetry-nats-rs" }

chrono = { workspace = true }
refinery = { workspace = true }
remain = { workspace = true }
serde = { workspace = true }
Expand Down
109 changes: 109 additions & 0 deletions lib/audit-logs/src/database.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
//! Contains functionality for setting up and communicating with the audit database.
use chrono::DateTime;
use chrono::Utc;
use si_data_pg::PgError;
use si_data_pg::PgPoolError;
use si_events::audit_log::AuditLogKind;
use si_events::audit_log::AuditLogMetadata;
use si_events::Actor;
use si_events::ChangeSetId;
use si_events::WorkspacePk;
use telemetry::prelude::*;
use thiserror::Error;

mod config;
mod context;
mod migrate;

pub use config::AuditDatabaseConfig;
pub use config::DBNAME;
pub use context::AuditDatabaseContext;
pub use context::AuditDatabaseContextError;
pub use migrate::{migrate, AuditDatabaseMigrationError};

#[allow(missing_docs)]
#[derive(Error, Debug)]
pub enum AuditDatabaseError {
#[error("chrono parse error: {0}")]
ChronoParse(#[from] chrono::ParseError),
#[error("pg error: {0}")]
Pg(#[from] PgError),
#[error("pg pool error: {0}")]
PgPool(#[from] PgPoolError),
#[error("serde json error: {0}")]
SerdeJson(#[from] serde_json::Error),
}

type Result<T> = std::result::Result<T, AuditDatabaseError>;

#[allow(clippy::too_many_arguments, missing_docs)]
#[instrument(
name = "audit_log.insert",
level = "debug",
skip_all,
fields(
si.workspace.id = %workspace_id,
),
)]
pub async fn insert(
context: &AuditDatabaseContext,
workspace_id: WorkspacePk,
kind: AuditLogKind,
timestamp: String,
change_set_id: Option<ChangeSetId>,
actor: Actor,
entity_name: Option<String>,
) -> Result<()> {
let kind_as_string = kind.to_string();
let user_id = match actor {
Actor::System => None,
Actor::User(user_id) => Some(user_id),
};

let metadata = AuditLogMetadata::from(kind);
let (title, entity_type) = metadata.title_and_entity_type();
let serialized_metadata = serde_json::to_value(metadata)?;
let timestamp: DateTime<Utc> = timestamp.parse()?;

context
.pg_pool()
.get()
.await?
.query_one(
"INSERT INTO audit_logs (
workspace_id,
kind,
timestamp,
title,
change_set_id,
user_id,
entity_name,
entity_type,
metadata
) VALUES (
$1,
$2,
$3,
$4,
$5,
$6,
$7,
$8,
$9
) RETURNING *",
&[
&workspace_id,
&kind_as_string,
&timestamp,
&title,
&change_set_id.map(|id| id.to_string()),
&user_id.map(|id| id.to_string()),
&entity_name,
&entity_type,
&serialized_metadata,
],
)
.await?;
Ok(())
}
27 changes: 27 additions & 0 deletions lib/audit-logs/src/database/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use serde::{Deserialize, Serialize};
use si_data_pg::PgPoolConfig;

/// The name of the audit database.
pub const DBNAME: &str = "si_audit";
const APPLICATION_NAME: &str = "si-audit";

/// The configuration used for communicating with and setting up the audit database.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct AuditDatabaseConfig {
/// The configuration for the PostgreSQL pool.
///
/// _Note:_ this is called "pg" for ease of use with layered load configuration files.
pub pg: PgPoolConfig,
}

impl Default for AuditDatabaseConfig {
fn default() -> Self {
Self {
pg: PgPoolConfig {
dbname: DBNAME.into(),
application_name: APPLICATION_NAME.into(),
..Default::default()
},
}
}
}
35 changes: 35 additions & 0 deletions lib/audit-logs/src/database/context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use si_data_pg::{PgPool, PgPoolError};
use telemetry::prelude::*;
use thiserror::Error;

use super::AuditDatabaseConfig;

#[allow(missing_docs)]
#[derive(Error, Debug)]
pub enum AuditDatabaseContextError {
#[error("pg pool error: {0}")]
PgPool(#[from] PgPoolError),
}

type Result<T> = std::result::Result<T, AuditDatabaseContextError>;

/// The context used for communicating with and setting up the audit database.
#[derive(Debug, Clone)]
pub struct AuditDatabaseContext {
pg_pool: PgPool,
}

impl AuditDatabaseContext {
/// Creates an [`AuditDatabaseContext`] from an [`AuditDatabaseConfig`].
#[instrument(level = "info", name = "audit.context.from_config", skip_all)]
pub async fn from_config(config: &AuditDatabaseConfig) -> Result<Self> {
Ok(Self {
pg_pool: PgPool::new(&config.pg).await?,
})
}

/// Returns a reference to the [`PgPool`].
pub fn pg_pool(&self) -> &PgPool {
&self.pg_pool
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Result<T> = std::result::Result<T, AuditDatabaseMigrationError>;
/// Performs migrations for the audit database.
#[instrument(level = "info", name = "audit.init.migrate", skip_all)]
pub async fn migrate(context: &AuditDatabaseContext) -> Result<()> {
migrate_inner(&context.pg_pool).await
migrate_inner(context.pg_pool()).await
}

#[instrument(level = "info", name = "audit.init.migrate.inner", skip_all)]
Expand Down
Loading

0 comments on commit e9ef245

Please sign in to comment.