Skip to content

Commit

Permalink
Merge pull request #4855 from systeminit/nick/eng-2756
Browse files Browse the repository at this point in the history
Consume and publish pending audit logs
  • Loading branch information
nickgerace authored Oct 29, 2024
2 parents 6fe7545 + b89009e commit 908e8f6
Show file tree
Hide file tree
Showing 52 changed files with 960 additions and 241 deletions.
29 changes: 28 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ members = [
"bin/rebaser",
"bin/sdf",
"bin/veritech",
"lib/naxum-api-types",
"lib/audit-logs",
"lib/auth-api-client",
"lib/billing-events",
"lib/buck2-resources",
Expand All @@ -32,6 +32,7 @@ members = [
"lib/nats-multiplexer-core",
"lib/nats-subscriber",
"lib/naxum",
"lib/naxum-api-types",
"lib/object-tree",
"lib/pending-events",
"lib/pinga-core",
Expand All @@ -41,7 +42,8 @@ members = [
"lib/rebaser-core",
"lib/rebaser-server",
"lib/sdf-server",
"lib/shuttle",
"lib/shuttle-core",
"lib/shuttle-server",
"lib/si-crypto",
"lib/si-data-nats",
"lib/si-data-pg",
Expand Down
18 changes: 18 additions & 0 deletions lib/audit-logs/BUCK
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
load("@prelude-si//:macros.bzl", "rust_library")

rust_library(
name = "audit-logs",
deps = [
"//lib/si-data-nats:si-data-nats",
"//lib/si-events-rs:si-events",
"//lib/telemetry-nats-rs:telemetry-nats",
"//lib/telemetry-rs:telemetry",
"//third-party/rust:remain",
"//third-party/rust:serde",
"//third-party/rust:serde_json",
"//third-party/rust:thiserror",
],
srcs = glob([
"src/**/*.rs",
]),
)
20 changes: 20 additions & 0 deletions lib/audit-logs/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "audit-logs"
edition = "2021"
version.workspace = true
authors.workspace = true
license.workspace = true
repository.workspace = true
rust-version.workspace = true
publish.workspace = true

[dependencies]
si-data-nats = { path = "../../lib/si-data-nats" }
si-events = { path = "../../lib/si-events-rs" }
telemetry = { path = "../../lib/telemetry-rs" }
telemetry-nats = { path = "../../lib/telemetry-nats-rs" }

remain = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
thiserror = { workspace = true }
144 changes: 144 additions & 0 deletions lib/audit-logs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
//! This crate provides a centralized location for working with the audit logs NATS JetStream stream.
#![warn(
bad_style,
clippy::missing_panics_doc,
clippy::panic,
clippy::panic_in_result_fn,
clippy::unwrap_in_result,
clippy::unwrap_used,
dead_code,
improper_ctypes,
missing_debug_implementations,
missing_docs,
no_mangle_generic_items,
non_shorthand_field_patterns,
overflowing_literals,
path_statements,
patterns_in_fns_without_body,
unconditional_recursion,
unreachable_pub,
unused,
unused_allocation,
unused_comparisons,
unused_parens,
while_true
)]

use std::time::Duration;

use serde::Serialize;
use serde_json::Error;
use si_data_nats::{
async_nats::{
self,
jetstream::{
context::{CreateStreamError, PublishError},
stream::{Config, RetentionPolicy},
},
},
jetstream, Subject,
};
use si_events::{audit_log::AuditLog, WorkspacePk};
use telemetry::prelude::*;
use telemetry_nats::propagation;
use thiserror::Error;

// TODO(nick): switch out of beta.
// const STREAM_NAME: &str = "AUDIT_LOGS";
// const STREAM_DESCRIPTION: &str = "Audit logs"
// const SUBJECT_PREFIX: &str = "audit.log";
const STREAM_NAME: &str = "AUDIT_LOGS_BETA";
const STREAM_DESCRIPTION: &str = "Audit logs (beta)";
const SUBJECT_PREFIX: &str = "beta.audit.log";
const THIRTY_DAYS_IN_SECONDS: u64 = 30 * 24 * 60 * 60;

#[allow(missing_docs)]
#[remain::sorted]
#[derive(Debug, Error)]
pub enum AuditLogsError {
#[error("create stream error: {0}")]
CreateStream(#[from] CreateStreamError),
#[error("publish error: {0}")]
Publish(#[from] PublishError),
#[error("serde json error: {0}")]
SerdeJson(#[from] Error),
}

type Result<T> = std::result::Result<T, AuditLogsError>;

/// A wrapper around the audit logs stream's NATS Jetstream context with helper methods for
/// interacting with the stream.
#[derive(Debug, Clone)]
pub struct AuditLogsStream {
context: jetstream::Context,
}

impl AuditLogsStream {
/// "Gets" or creates the audit logs stream wrapper with an underlying NATS JetStream stream.
pub async fn get_or_create(context: jetstream::Context) -> Result<Self> {
let object = Self { context };
object.stream().await?;
Ok(object)
}

/// "Gets" or creates the NATS JetStream stream for the audit logs stream wrapper.
pub async fn stream(&self) -> Result<async_nats::jetstream::stream::Stream> {
Ok(self
.context
.get_or_create_stream(Config {
name: self.prefixed_stream_name(STREAM_NAME),
description: Some(STREAM_DESCRIPTION.to_string()),
subjects: vec![self.prefixed_subject(SUBJECT_PREFIX, ">")],
retention: RetentionPolicy::Limits,
max_age: Duration::from_secs(THIRTY_DAYS_IN_SECONDS),
..Default::default()
})
.await?)
}

/// Publishes a audit log.
#[instrument(name = "audit_logs_stream.publish", level = "debug", skip_all)]
pub async fn publish(&self, workspace_id: WorkspacePk, audit_log: &AuditLog) -> Result<()> {
self.publish_message_inner(SUBJECT_PREFIX, &workspace_id.to_string(), audit_log)
.await
}

/// Returns the subject for publishing and consuming [`AuditLogs`](AuditLog).
pub fn subject(&self, workspace_id: WorkspacePk) -> Subject {
Subject::from(self.prefixed_subject(SUBJECT_PREFIX, &workspace_id.to_string()))
}

async fn publish_message_inner(
&self,
subject: &str,
parameters: &str,
message: &impl Serialize,
) -> Result<()> {
let subject = self.prefixed_subject(subject, parameters);
let ack = self
.context
.publish_with_headers(
subject,
propagation::empty_injected_headers(),
serde_json::to_vec(message)?.into(),
)
.await?;
ack.await?;
Ok(())
}

fn prefixed_stream_name(&self, stream_name: &str) -> String {
match self.context.metadata().subject_prefix() {
Some(prefix) => format!("{prefix}_{stream_name}"),
None => stream_name.to_owned(),
}
}

fn prefixed_subject(&self, subject: &str, suffix: &str) -> String {
match self.context.metadata().subject_prefix() {
Some(prefix) => format!("{prefix}.{subject}.{suffix}"),
None => format!("{subject}.{suffix}"),
}
}
}
6 changes: 6 additions & 0 deletions lib/dal/BUCK
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ load(
rust_library(
name = "dal",
deps = [
"//lib/audit-logs:audit-logs",
"//lib/billing-events:billing-events",
"//lib/dal-macros:dal-macros",
"//lib/module-index-client:module-index-client",
"//lib/object-tree:object-tree",
"//lib/pending-events:pending-events",
"//lib/pinga-core:pinga-core",
"//lib/rebaser-client:rebaser-client",
"//lib/shuttle-server:shuttle-server",
"//lib/si-crypto:si-crypto",
"//lib/si-data-nats:si-data-nats",
"//lib/si-data-pg:si-data-pg",
Expand Down Expand Up @@ -65,6 +68,7 @@ rust_library(
"//third-party/rust:thiserror",
"//third-party/rust:tokio",
"//third-party/rust:tokio-stream",
"//third-party/rust:tokio-util",
"//third-party/rust:tryhard",
"//third-party/rust:ulid",
"//third-party/rust:url",
Expand All @@ -89,7 +93,9 @@ rust_library(
rust_test(
name = "test-integration",
deps = [
"//lib/audit-logs:audit-logs",
"//lib/dal-test:dal-test",
"//lib/pending-events:pending-events",
"//lib/rebaser-server:rebaser-server",
"//lib/si-events-rs:si-events",
"//lib/si-frontend-types-rs:si-frontend-types",
Expand Down
Loading

0 comments on commit 908e8f6

Please sign in to comment.