Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(capture-rs): implement replay capture in capture-rs #24461

Merged
merged 13 commits into from
Aug 21, 2024
29 changes: 14 additions & 15 deletions docker-compose.base.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ services:
}

handle @replay-capture {
reverse_proxy replay-capture:8000
reverse_proxy replay-capture:3000
}

handle {
Expand Down Expand Up @@ -155,32 +155,31 @@ services:

capture:
image: ghcr.io/posthog/posthog/capture:master
build:
context: rust/
args:
BIN: capture
restart: on-failure
environment:
ADDRESS: '0.0.0.0:3000'
KAFKA_TOPIC: 'events_plugin_ingestion'
KAFKA_HOSTS: 'kafka:9092'
REDIS_URL: 'redis://redis:6379/'
CAPTURE_MODE: events

replay-capture:
image: ghcr.io/posthog/posthog/replay-capture:master
image: ghcr.io/posthog/posthog/capture:master
build:
context: vector/replay-capture
context: rust/
args:
BIN: capture
restart: on-failure
entrypoint: ['sh', '-c']
command:
- |
set -x
# seed empty required data files
mkdir -p /etc/vector/data
echo "token" > /etc/vector/data/quota_limited_teams.csv
echo "session_id" > /etc/vector/data/overflow_sessions.csv
exec vector -v --watch-config
environment:
KAFKA_EVENTS_TOPIC: session_recording_snapshot_item_events
KAFKA_OVERFLOW_TOPIC: session_recording_snapshot_item_overflow
KAFKA_BOOSTRAP_SERVERS: 'kafka:9092'
ADDRESS: '0.0.0.0:3000'
KAFKA_TOPIC: 'session_recording_snapshot_item_events'
KAFKA_HOSTS: 'kafka:9092'
REDIS_URL: 'redis://redis:6379/'
CAPTURE_MODE: recordings

plugins:
command: ./bin/plugin-server --no-restart-loop
Expand Down
17 changes: 16 additions & 1 deletion rust/capture/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ pub enum CaptureResponseCode {
#[derive(Debug, PartialEq, Eq, Deserialize, Serialize)]
pub struct CaptureResponse {
pub status: CaptureResponseCode,

#[serde(skip_serializing_if = "Option::is_none")]
pub quota_limited: Option<Vec<String>>,
}

#[derive(Error, Debug)]
Expand All @@ -32,6 +35,12 @@ pub enum CaptureError {
EmptyDistinctId,
#[error("event submitted without a distinct_id")]
MissingDistinctId,
#[error("replay event submitted without snapshot data")]
MissingSnapshotData,
#[error("replay event submitted without session id")]
MissingSessionId,
#[error("replay event submitted without window id")]
MissingWindowId,

#[error("event submitted without an api_key")]
NoTokenError,
Expand Down Expand Up @@ -64,7 +73,10 @@ impl IntoResponse for CaptureError {
| CaptureError::EmptyDistinctId
| CaptureError::MissingDistinctId
| CaptureError::EventTooBig
| CaptureError::NonRetryableSinkError => (StatusCode::BAD_REQUEST, self.to_string()),
| CaptureError::NonRetryableSinkError
| CaptureError::MissingSessionId
| CaptureError::MissingWindowId
| CaptureError::MissingSnapshotData => (StatusCode::BAD_REQUEST, self.to_string()),

CaptureError::NoTokenError
| CaptureError::MultipleTokensError
Expand All @@ -87,6 +99,7 @@ pub enum DataType {
ClientIngestionWarning,
HeatmapMain,
ExceptionMain,
SnapshotMain,
}
#[derive(Clone, Debug, Serialize, Eq, PartialEq)]
pub struct ProcessedEvent {
Expand All @@ -103,6 +116,8 @@ pub struct ProcessedEvent {
)]
pub sent_at: Option<OffsetDateTime>,
pub token: String,
#[serde(skip_serializing)]
pub session_id: Option<String>,
}

impl ProcessedEvent {
Expand Down
21 changes: 21 additions & 0 deletions rust/capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,24 @@ use std::{net::SocketAddr, num::NonZeroU32};

use envconfig::Envconfig;

#[derive(Debug, PartialEq, Clone)]
pub enum CaptureMode {
Events,
Recordings,
}

impl std::str::FromStr for CaptureMode {
type Err = String;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.trim().to_lowercase().as_ref() {
"events" => Ok(CaptureMode::Events),
"recordings" => Ok(CaptureMode::Recordings),
_ => Err(format!("Unknown Capture Type: {s}")),
}
}
}

#[derive(Envconfig, Clone)]
pub struct Config {
#[envconfig(default = "false")]
Expand Down Expand Up @@ -37,6 +55,9 @@ pub struct Config {
#[envconfig(default = "true")]
pub export_prometheus: bool,
pub redis_key_prefix: Option<String>,

#[envconfig(default = "events")]
pub capture_mode: CaptureMode,
}

#[derive(Envconfig, Clone)]
Expand Down
35 changes: 27 additions & 8 deletions rust/capture/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ use crate::{
limiters::billing::BillingLimiter, redis::Client, sinks, time::TimeSource, v0_endpoint,
};

use crate::config::CaptureMode;
use crate::prometheus::{setup_metrics_recorder, track_metrics};

const EVENT_BODY_SIZE: usize = 2 * 1024 * 1024; // 2MB
const BATCH_BODY_SIZE: usize = 20 * 1024 * 1024; // 20MB, up from the default 2MB used for normal event payloads
const RECORDING_BODY_SIZE: usize = 20 * 1024 * 1024; // 20MB, up from the default 2MB used for normal event payloads

#[derive(Clone)]
pub struct State {
Expand All @@ -43,6 +45,7 @@ pub fn router<
redis: Arc<R>,
billing: BillingLimiter,
metrics: bool,
capture_mode: CaptureMode,
) -> Router {
let state = State {
sink: Arc::new(sink),
Expand Down Expand Up @@ -106,14 +109,30 @@ pub fn router<
.route("/_readiness", get(index))
.route("/_liveness", get(move || ready(liveness.get_status())));

let router = Router::new()
.merge(batch_router)
.merge(event_router)
.merge(status_router)
.layer(TraceLayer::new_for_http())
.layer(cors)
.layer(axum::middleware::from_fn(track_metrics))
.with_state(state);
let recordings_router = Router::new()
.route(
"/s",
post(v0_endpoint::recording)
.get(v0_endpoint::recording)
.options(v0_endpoint::options),
)
.route(
"/s/",
post(v0_endpoint::recording)
.get(v0_endpoint::recording)
.options(v0_endpoint::options),
)
.layer(DefaultBodyLimit::max(RECORDING_BODY_SIZE));

let router = match capture_mode {
CaptureMode::Events => Router::new().merge(batch_router).merge(event_router),
CaptureMode::Recordings => Router::new().merge(recordings_router),
}
.merge(status_router)
.layer(TraceLayer::new_for_http())
.layer(cors)
.layer(axum::middleware::from_fn(track_metrics))
.with_state(state);

// Don't install metrics unless asked to
// Installing a global recorder when capture is used as a library (during tests etc)
Expand Down
2 changes: 2 additions & 0 deletions rust/capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ where
redis_client,
billing,
config.export_prometheus,
config.capture_mode,
)
} else {
let sink_liveness = liveness
Expand Down Expand Up @@ -86,6 +87,7 @@ where
redis_client,
billing,
config.export_prometheus,
config.capture_mode,
)
};

Expand Down
14 changes: 13 additions & 1 deletion rust/capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use async_trait::async_trait;
use health::HealthHandle;
use metrics::{counter, gauge, histogram};
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord, Producer};
use rdkafka::util::Timeout;
use rdkafka::ClientConfig;
Expand Down Expand Up @@ -179,6 +180,8 @@ impl KafkaSink {
})?;

let event_key = event.key();
let session_id = event.session_id.as_deref();

let (topic, partition_key): (&str, Option<&str>) = match &event.data_type {
DataType::AnalyticsHistorical => (&self.historical_topic, Some(event_key.as_str())), // We never trigger overflow on historical events
DataType::AnalyticsMain => {
Expand All @@ -199,6 +202,10 @@ impl KafkaSink {
),
DataType::HeatmapMain => (&self.heatmaps_topic, Some(event_key.as_str())),
DataType::ExceptionMain => (&self.exceptions_topic, Some(event_key.as_str())),
DataType::SnapshotMain => (
&self.main_topic,
Some(session_id.ok_or(CaptureError::MissingSessionId)?),
),
};

match self.producer.send_result(FutureRecord {
Expand All @@ -207,7 +214,10 @@ impl KafkaSink {
partition: None,
key: partition_key,
timestamp: None,
headers: None,
headers: Some(OwnedHeaders::new().insert(Header {
key: "token",
value: Some(&event.token),
})),
}) {
Ok(ack) => Ok(ack),
Err((e, _)) => match e.rdkafka_error_code() {
Expand Down Expand Up @@ -361,6 +371,7 @@ mod tests {
now: "".to_string(),
sent_at: None,
token: "token1".to_string(),
session_id: None,
};

// Wait for producer to be healthy, to keep kafka_message_timeout_ms short and tests faster
Expand Down Expand Up @@ -393,6 +404,7 @@ mod tests {
now: "".to_string(),
sent_at: None,
token: "token1".to_string(),
session_id: None,
};
match sink.send(big_event).await {
Err(CaptureError::EventTooBig) => {} // Expected
Expand Down
Loading
Loading