diff --git a/rust/capture/src/config.rs b/rust/capture/src/config.rs index c74bd9d395a75..5cb5cf006edde 100644 --- a/rust/capture/src/config.rs +++ b/rust/capture/src/config.rs @@ -41,6 +41,7 @@ pub struct Config { pub overflow_burst_limit: NonZeroU32, pub overflow_forced_keys: Option, // Coma-delimited keys + pub dropped_keys: Option, // ":,;..." #[envconfig(nested = true)] pub kafka: KafkaConfig, diff --git a/rust/capture/src/limiters/mod.rs b/rust/capture/src/limiters/mod.rs index b389f6e16e348..d31e3eb59dc48 100644 --- a/rust/capture/src/limiters/mod.rs +++ b/rust/capture/src/limiters/mod.rs @@ -1,2 +1,3 @@ pub mod overflow; pub mod redis; +pub mod token_dropper; diff --git a/rust/capture/src/limiters/token_dropper.rs b/rust/capture/src/limiters/token_dropper.rs new file mode 100644 index 0000000000000..0d6358443c619 --- /dev/null +++ b/rust/capture/src/limiters/token_dropper.rs @@ -0,0 +1,76 @@ +use std::collections::HashMap; + +use tracing::warn; + +#[derive(Default)] +pub struct TokenDropper { + to_drop: HashMap>, +} + +impl TokenDropper { + // Takes ":,;..." + pub fn new(config: &str) -> Self { + let mut to_drop = HashMap::new(); + for pair in config.split(';') { + let mut parts = pair.split(':'); + let Some(token) = parts.next() else { + warn!("No distinct id's configured for pair {}", pair); + continue; + }; + let Some(ids) = parts.next() else { + warn!("No distinct id's configured for token {}", token); + continue; + }; + let ids = ids.split(',').map(|s| s.to_string()).collect(); + to_drop.insert(token.to_string(), ids); + } + Self { to_drop } + } + + pub fn should_drop(&self, token: &str, distinct_id: &str) -> bool { + self.to_drop + .get(token) + .map(|ids| ids.iter().any(|id| id == distinct_id || id == "*")) + .unwrap_or(false) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_empty_config() { + let dropper = TokenDropper::new(""); + assert!(!dropper.should_drop("token", "id")); + } + + #[test] + fn test_single_token_id() { + let dropper = TokenDropper::new("token:id"); + assert!(dropper.should_drop("token", "id")); + assert!(!dropper.should_drop("token", "other")); + } + + #[test] + fn test_multiple_ids() { + let dropper = TokenDropper::new("token:id1,id2"); + assert!(dropper.should_drop("token", "id1")); + assert!(dropper.should_drop("token", "id2")); + assert!(!dropper.should_drop("token", "id3")); + } + + #[test] + fn test_wildcard() { + let dropper = TokenDropper::new("token:*"); + assert!(dropper.should_drop("token", "anything")); + } + + #[test] + fn test_multiple_tokens() { + let dropper = TokenDropper::new("token1:id1;token2:id2"); + assert!(dropper.should_drop("token1", "id1")); + assert!(dropper.should_drop("token2", "id2")); + assert!(!dropper.should_drop("token1", "id2")); + } +} diff --git a/rust/capture/src/router.rs b/rust/capture/src/router.rs index 011bb49d67502..dfb176779222d 100644 --- a/rust/capture/src/router.rs +++ b/rust/capture/src/router.rs @@ -12,6 +12,7 @@ use tower::limit::ConcurrencyLimitLayer; use tower_http::cors::{AllowHeaders, AllowOrigin, CorsLayer}; use tower_http::trace::TraceLayer; +use crate::limiters::token_dropper::TokenDropper; use crate::test_endpoint; use crate::{limiters::redis::RedisLimiter, redis::Client, sinks, time::TimeSource, v0_endpoint}; @@ -28,6 +29,7 @@ pub struct State { pub timesource: Arc, pub redis: Arc, pub billing_limiter: RedisLimiter, + pub token_dropper: Arc, pub event_size_limit: usize, } @@ -46,6 +48,7 @@ pub fn router< sink: S, redis: Arc, billing_limiter: RedisLimiter, + token_dropper: TokenDropper, metrics: bool, capture_mode: CaptureMode, concurrency_limit: Option, @@ -57,6 +60,7 @@ pub fn router< redis, billing_limiter, event_size_limit, + token_dropper: Arc::new(token_dropper), }; // Very permissive CORS policy, as old SDK versions diff --git a/rust/capture/src/server.rs b/rust/capture/src/server.rs index d20f75c3d3a38..d935e106cf3b6 100644 --- a/rust/capture/src/server.rs +++ b/rust/capture/src/server.rs @@ -13,6 +13,8 @@ use crate::limiters::overflow::OverflowLimiter; use crate::limiters::redis::{ QuotaResource, RedisLimiter, OVERFLOW_LIMITER_CACHE_KEY, QUOTA_LIMITER_CACHE_KEY, }; + +use crate::limiters::token_dropper::TokenDropper; use crate::redis::RedisClient; use crate::router; use crate::router::BATCH_BODY_SIZE; @@ -54,6 +56,11 @@ where ) .expect("failed to create billing limiter"); + let token_dropper = config + .dropped_keys + .map(|k| TokenDropper::new(&k)) + .unwrap_or_default(); + // In Recordings capture mode, we unpack a batch of events, and then pack them back up into // a big blob and send to kafka all at once - so we should abort unpacking a batch if the data // size crosses the kafka limit. In the Events mode, we can unpack the batch and send each @@ -78,6 +85,7 @@ where PrintSink {}, redis_client, billing_limiter, + token_dropper, config.export_prometheus, config.capture_mode, config.concurrency_limit, @@ -126,6 +134,7 @@ where sink, redis_client, billing_limiter, + token_dropper, config.export_prometheus, config.capture_mode, config.concurrency_limit, diff --git a/rust/capture/src/v0_endpoint.rs b/rust/capture/src/v0_endpoint.rs index e21e3a5c94520..c30981c6239c2 100644 --- a/rust/capture/src/v0_endpoint.rs +++ b/rust/capture/src/v0_endpoint.rs @@ -14,6 +14,7 @@ use serde_json::json; use serde_json::Value; use tracing::instrument; +use crate::limiters::token_dropper::TokenDropper; use crate::prometheus::report_dropped_events; use crate::v0_request::{ Compression, DataType, ProcessedEvent, ProcessedEventMetadata, ProcessingContext, RawRequest, @@ -172,7 +173,14 @@ pub async fn event( } Err(err) => Err(err), Ok((context, events)) => { - if let Err(err) = process_events(state.sink.clone(), &events, &context).await { + if let Err(err) = process_events( + state.sink.clone(), + state.token_dropper.clone(), + &events, + &context, + ) + .await + { let cause = match err { CaptureError::EmptyDistinctId => "empty_distinct_id", CaptureError::MissingDistinctId => "missing_distinct_id", @@ -300,14 +308,24 @@ pub fn process_single_event( #[instrument(skip_all, fields(events = events.len()))] pub async fn process_events<'a>( sink: Arc, + dropper: Arc, events: &'a [RawEvent], context: &'a ProcessingContext, ) -> Result<(), CaptureError> { - let events: Vec = events + let mut events: Vec = events .iter() .map(|e| process_single_event(e, context)) .collect::, CaptureError>>()?; + events.retain(|e| { + if dropper.should_drop(&e.event.token, &e.event.distinct_id) { + report_dropped_events("token_dropper", 1); + false + } else { + true + } + }); + tracing::debug!(events=?events, "processed {} events", events.len()); if events.len() == 1 { diff --git a/rust/capture/tests/common.rs b/rust/capture/tests/common.rs index 02a0d2caa8a09..3660c16f6ab85 100644 --- a/rust/capture/tests/common.rs +++ b/rust/capture/tests/common.rs @@ -39,6 +39,7 @@ pub static DEFAULT_CONFIG: Lazy = Lazy::new(|| Config { overflow_burst_limit: NonZeroU32::new(5).unwrap(), overflow_per_second_limit: NonZeroU32::new(10).unwrap(), overflow_forced_keys: None, + dropped_keys: None, kafka: KafkaConfig { kafka_producer_linger_ms: 0, // Send messages as soon as possible kafka_producer_queue_mib: 10, diff --git a/rust/capture/tests/django_compat.rs b/rust/capture/tests/django_compat.rs index 67be98629cf11..c792f57173c61 100644 --- a/rust/capture/tests/django_compat.rs +++ b/rust/capture/tests/django_compat.rs @@ -7,6 +7,7 @@ use base64::Engine; use capture::api::{CaptureError, CaptureResponse, CaptureResponseCode}; use capture::config::CaptureMode; use capture::limiters::redis::{QuotaResource, RedisLimiter, QUOTA_LIMITER_CACHE_KEY}; +use capture::limiters::token_dropper::TokenDropper; use capture::redis::MockRedisClient; use capture::router::router; use capture::sinks::Event; @@ -117,6 +118,7 @@ async fn it_matches_django_capture_behaviour() -> anyhow::Result<()> { sink.clone(), redis, billing_limiter, + TokenDropper::default(), false, CaptureMode::Events, None, diff --git a/rust/capture/tests/events.rs b/rust/capture/tests/events.rs index e872d2ea9f171..763506839b62d 100644 --- a/rust/capture/tests/events.rs +++ b/rust/capture/tests/events.rs @@ -40,6 +40,62 @@ async fn it_captures_one_event() -> Result<()> { Ok(()) } +#[tokio::test] +async fn it_drops_events_if_dropper_enabled() -> Result<()> { + setup_tracing(); + let token = random_string("token", 16); + let distinct_id = random_string("id", 16); + let dropped_id = random_string("id", 16); + + let main_topic = EphemeralTopic::new().await; + let histo_topic = EphemeralTopic::new().await; + let mut config = DEFAULT_CONFIG.clone(); + config.kafka.kafka_topic = main_topic.topic_name().to_string(); + config.kafka.kafka_historical_topic = histo_topic.topic_name().to_string(); + config.dropped_keys = Some(format!("{}:{}", token, dropped_id)); + let server = ServerHandle::for_config(config).await; + + let event = json!({ + "token": token, + "event": "testing", + "distinct_id": distinct_id + }); + + let dropped = json!({ + "token": token, + "event": "testing", + "distinct_id": dropped_id + }); + + let res = server.capture_events(event.to_string()).await; + assert_eq!(StatusCode::OK, res.status()); + let res = server.capture_events(dropped.to_string()).await; + assert_eq!(StatusCode::OK, res.status()); + let res = server.capture_events(event.to_string()).await; + assert_eq!(StatusCode::OK, res.status()); + + let event = main_topic.next_event()?; + assert_json_include!( + actual: event, + expected: json!({ + "token": token, + "distinct_id": distinct_id + }) + ); + + // Next event we get is identical to the first, because the dropped event is not captured + let event = main_topic.next_event()?; + assert_json_include!( + actual: event, + expected: json!({ + "token": token, + "distinct_id": distinct_id + }) + ); + + Ok(()) +} + #[tokio::test] async fn it_captures_a_posthogjs_array() -> Result<()> { setup_tracing();