From 24490ca0d72ca2ed2c916ffd8aa5633df085aaab Mon Sep 17 00:00:00 2001 From: Frank Hamand Date: Thu, 5 Sep 2024 09:28:48 +0100 Subject: [PATCH 1/7] replay overflow limiter --- rust/capture/src/config.rs | 2 ++ rust/capture/src/limiters/redis.rs | 13 +++++++--- rust/capture/src/server.rs | 26 ++++++++++++++++--- rust/capture/src/sinks/kafka.rs | 39 +++++++++++++++++++++-------- rust/capture/tests/common.rs | 1 + rust/capture/tests/django_compat.rs | 4 +-- 6 files changed, 66 insertions(+), 19 deletions(-) diff --git a/rust/capture/src/config.rs b/rust/capture/src/config.rs index ae502e5f20b8d..c74bd9d395a75 100644 --- a/rust/capture/src/config.rs +++ b/rust/capture/src/config.rs @@ -85,6 +85,8 @@ pub struct KafkaConfig { pub kafka_exceptions_topic: String, #[envconfig(default = "events_plugin_ingestion")] pub kafka_heatmaps_topic: String, + #[envconfig(default = "session_recording_snapshot_item_overflow")] + pub kafka_replay_overflow_topic: String, #[envconfig(default = "false")] pub kafka_tls: bool, #[envconfig(default = "")] diff --git a/rust/capture/src/limiters/redis.rs b/rust/capture/src/limiters/redis.rs index cc7e7d119d89b..7122f93a4391d 100644 --- a/rust/capture/src/limiters/redis.rs +++ b/rust/capture/src/limiters/redis.rs @@ -29,7 +29,10 @@ use crate::redis::Client; /// /// Some small delay between an account being limited and the limit taking effect is acceptable. /// However, ideally we should not allow requests from some pods but 429 from others. -const QUOTA_LIMITER_CACHE_KEY: &str = "@posthog/quota-limits/"; + +// todo: fetch from env +pub const QUOTA_LIMITER_CACHE_KEY: &str = "@posthog/quota-limits/"; +pub const OVERFLOW_LIMITER_CACHE_KEY: &str = "@posthog/capture-overflow/"; #[derive(Debug)] pub enum QuotaResource { @@ -66,6 +69,7 @@ impl RedisLimiter { pub fn new( interval: Duration, redis: Arc, + limiter_cache_key: String, redis_key_prefix: Option, resource: QuotaResource, ) -> anyhow::Result { @@ -76,7 +80,7 @@ impl RedisLimiter { interval, limited, redis: redis.clone(), - key: format!("{key_prefix}{QUOTA_LIMITER_CACHE_KEY}{}", resource.as_str()), + key: format!("{key_prefix}{limiter_cache_key}{}", resource.as_str()), }; // Spawn a background task to periodically fetch data from Redis @@ -133,6 +137,7 @@ impl RedisLimiter { #[cfg(test)] mod tests { + use crate::limiters::redis::QUOTA_LIMITER_CACHE_KEY; use std::sync::Arc; use time::Duration; @@ -147,7 +152,7 @@ mod tests { .zrangebyscore_ret("@posthog/quota-limits/events", vec![String::from("banana")]); let client = Arc::new(client); - let limiter = RedisLimiter::new(Duration::seconds(1), client, None, QuotaResource::Events) + let limiter = RedisLimiter::new(Duration::seconds(1), client, QUOTA_LIMITER_CACHE_KEY.to_string(), None, QuotaResource::Events) .expect("Failed to create billing limiter"); tokio::time::sleep(std::time::Duration::from_millis(30)).await; @@ -167,6 +172,7 @@ mod tests { let limiter = RedisLimiter::new( Duration::seconds(1), client.clone(), + QUOTA_LIMITER_CACHE_KEY.to_string(), None, QuotaResource::Events, ) @@ -178,6 +184,7 @@ mod tests { let prefixed_limiter = RedisLimiter::new( Duration::microseconds(1), client, + QUOTA_LIMITER_CACHE_KEY.to_string(), Some("prefix//".to_string()), QuotaResource::Events, ) diff --git a/rust/capture/src/server.rs b/rust/capture/src/server.rs index bb6f7aaf5dd5b..bb328245b5138 100644 --- a/rust/capture/src/server.rs +++ b/rust/capture/src/server.rs @@ -10,7 +10,7 @@ use crate::config::CaptureMode; use crate::config::Config; use crate::limiters::overflow::OverflowLimiter; -use crate::limiters::redis::{QuotaResource, RedisLimiter}; +use crate::limiters::redis::{QuotaResource, RedisLimiter, OVERFLOW_LIMITER_CACHE_KEY, QUOTA_LIMITER_CACHE_KEY}; use crate::redis::RedisClient; use crate::router; use crate::sinks::kafka::KafkaSink; @@ -25,9 +25,24 @@ where let redis_client = Arc::new(RedisClient::new(config.redis_url).expect("failed to create redis client")); + let replay_overflow_limiter = match config.capture_mode { + CaptureMode::Recordings => Some( + RedisLimiter::new( + Duration::seconds(5), + redis_client.clone(), + OVERFLOW_LIMITER_CACHE_KEY.to_string(), + config.redis_key_prefix.clone(), + QuotaResource::Recordings, + ) + .expect("failed to start replay overflow limiter"), + ), + _ => None, + }; + let billing_limiter = RedisLimiter::new( Duration::seconds(5), redis_client.clone(), + QUOTA_LIMITER_CACHE_KEY.to_string(), config.redis_key_prefix, match config.capture_mode { CaptureMode::Events => QuotaResource::Events, @@ -86,8 +101,13 @@ where Some(partition) } }; - let sink = KafkaSink::new(config.kafka, sink_liveness, partition) - .expect("failed to start Kafka sink"); + let sink = KafkaSink::new( + config.kafka, + sink_liveness, + partition, + replay_overflow_limiter, + ) + .expect("failed to start Kafka sink"); router::router( crate::time::SystemTime {}, diff --git a/rust/capture/src/sinks/kafka.rs b/rust/capture/src/sinks/kafka.rs index b1d5171390347..912dd5c5f20cf 100644 --- a/rust/capture/src/sinks/kafka.rs +++ b/rust/capture/src/sinks/kafka.rs @@ -1,5 +1,4 @@ -use std::time::Duration; - +use crate::limiters::redis::QuotaResource; use async_trait::async_trait; use health::HealthHandle; use metrics::{counter, gauge, histogram}; @@ -8,6 +7,7 @@ use rdkafka::message::{Header, OwnedHeaders}; use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord, Producer}; use rdkafka::util::Timeout; use rdkafka::ClientConfig; +use std::time::Duration; use tokio::task::JoinSet; use tracing::log::{debug, error, info}; use tracing::{info_span, instrument, Instrument}; @@ -114,6 +114,8 @@ pub struct KafkaSink { client_ingestion_warning_topic: String, exceptions_topic: String, heatmaps_topic: String, + replay_overflow_limiter: Option, + replay_overflow_topic: String, } impl KafkaSink { @@ -121,6 +123,7 @@ impl KafkaSink { config: KafkaConfig, liveness: HealthHandle, partition: Option, + replay_overflow_limiter: Option, ) -> anyhow::Result { info!("connecting to Kafka brokers at {}...", config.kafka_hosts); @@ -181,6 +184,8 @@ impl KafkaSink { client_ingestion_warning_topic: config.kafka_client_ingestion_warning_topic, exceptions_topic: config.kafka_exceptions_topic, heatmaps_topic: config.kafka_heatmaps_topic, + replay_overflow_topic: config.kafka_replay_overflow_topic, + replay_overflow_limiter: replay_overflow_limiter, }) } @@ -222,14 +227,25 @@ impl KafkaSink { ), DataType::HeatmapMain => (&self.heatmaps_topic, Some(event_key.as_str())), DataType::ExceptionMain => (&self.exceptions_topic, Some(event_key.as_str())), - DataType::SnapshotMain => ( - &self.main_topic, - Some( - session_id - .as_deref() - .ok_or(CaptureError::MissingSessionId)?, - ), - ), + DataType::SnapshotMain => { + let session_id = session_id + .as_deref() + .ok_or(CaptureError::MissingSessionId)?; + let is_overflowing = match &self.replay_overflow_limiter { + None => false, + Some(limiter) => { + limiter + .is_limited(&session_id) + .await + } + }; + + if is_overflowing { + (&self.replay_overflow_topic, Some(session_id)) + } else { + (&self.main_topic, Some(session_id)) + } + } }; match self.producer.send_result(FutureRecord { @@ -377,12 +393,13 @@ mod tests { kafka_client_ingestion_warning_topic: "events_plugin_ingestion".to_string(), kafka_exceptions_topic: "events_plugin_ingestion".to_string(), kafka_heatmaps_topic: "events_plugin_ingestion".to_string(), + kafka_replay_overflow_topic: "session_recording_snapshot_item_overflow".to_string(), kafka_tls: false, kafka_client_id: "".to_string(), kafka_metadata_max_age_ms: 60000, kafka_producer_max_retries: 2, }; - let sink = KafkaSink::new(config, handle, limiter).expect("failed to create sink"); + let sink = KafkaSink::new(config, handle, limiter, None).expect("failed to create sink"); (cluster, sink) } diff --git a/rust/capture/tests/common.rs b/rust/capture/tests/common.rs index e9b636ac9a735..b91d343971cce 100644 --- a/rust/capture/tests/common.rs +++ b/rust/capture/tests/common.rs @@ -49,6 +49,7 @@ pub static DEFAULT_CONFIG: Lazy = Lazy::new(|| Config { kafka_client_ingestion_warning_topic: "events_plugin_ingestion".to_string(), kafka_exceptions_topic: "events_plugin_ingestion".to_string(), kafka_heatmaps_topic: "events_plugin_ingestion".to_string(), + kafka_replay_overflow_topic: "session_recording_snapshot_item_overflow".to_string(), kafka_tls: false, kafka_client_id: "".to_string(), kafka_metadata_max_age_ms: 60000, diff --git a/rust/capture/tests/django_compat.rs b/rust/capture/tests/django_compat.rs index a5f81aa589c51..f6509750ddf78 100644 --- a/rust/capture/tests/django_compat.rs +++ b/rust/capture/tests/django_compat.rs @@ -6,8 +6,7 @@ use base64::engine::general_purpose; use base64::Engine; use capture::api::{CaptureError, CaptureResponse, CaptureResponseCode, DataType, ProcessedEvent}; use capture::config::CaptureMode; -use capture::limiters::redis::QuotaResource; -use capture::limiters::redis::RedisLimiter; +use capture::limiters::redis::{QuotaResource, RedisLimiter, QUOTA_LIMITER_CACHE_KEY}; use capture::redis::MockRedisClient; use capture::router::router; use capture::sinks::Event; @@ -105,6 +104,7 @@ async fn it_matches_django_capture_behaviour() -> anyhow::Result<()> { let billing_limiter = RedisLimiter::new( Duration::weeks(1), redis.clone(), + QUOTA_LIMITER_CACHE_KEY.to_string(), None, QuotaResource::Events, ) From 9f1057848025468513e5451268b8eb85dfe6d3e3 Mon Sep 17 00:00:00 2001 From: Frank Hamand Date: Thu, 5 Sep 2024 10:15:39 +0100 Subject: [PATCH 2/7] update test so we're covering the new cache key option --- rust/capture/src/limiters/redis.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/capture/src/limiters/redis.rs b/rust/capture/src/limiters/redis.rs index 7122f93a4391d..88f7ac8e17989 100644 --- a/rust/capture/src/limiters/redis.rs +++ b/rust/capture/src/limiters/redis.rs @@ -149,10 +149,10 @@ mod tests { #[tokio::test] async fn test_dynamic_limited() { let client = MockRedisClient::new() - .zrangebyscore_ret("@posthog/quota-limits/events", vec![String::from("banana")]); + .zrangebyscore_ret("@posthog/capture-overflow/events", vec![String::from("banana")]); let client = Arc::new(client); - let limiter = RedisLimiter::new(Duration::seconds(1), client, QUOTA_LIMITER_CACHE_KEY.to_string(), None, QuotaResource::Events) + let limiter = RedisLimiter::new(Duration::seconds(1), client, OVERFLOW_LIMITER_CACHE_KEY.to_string(), None, QuotaResource::Recordings) .expect("Failed to create billing limiter"); tokio::time::sleep(std::time::Duration::from_millis(30)).await; From 2e2a2fd5ac82bc4638fa9c841cc0e842094e35d7 Mon Sep 17 00:00:00 2001 From: Frank Hamand Date: Thu, 5 Sep 2024 11:00:30 +0100 Subject: [PATCH 3/7] Add test --- rust/capture/src/limiters/redis.rs | 8 ++- rust/capture/tests/common.rs | 26 ++++++- rust/capture/tests/recordings.rs | 106 ++++++++++++++++++++++++++++- 3 files changed, 134 insertions(+), 6 deletions(-) diff --git a/rust/capture/src/limiters/redis.rs b/rust/capture/src/limiters/redis.rs index 88f7ac8e17989..2ccfa2b27d839 100644 --- a/rust/capture/src/limiters/redis.rs +++ b/rust/capture/src/limiters/redis.rs @@ -137,7 +137,7 @@ impl RedisLimiter { #[cfg(test)] mod tests { - use crate::limiters::redis::QUOTA_LIMITER_CACHE_KEY; + use crate::limiters::redis::{OVERFLOW_LIMITER_CACHE_KEY, QUOTA_LIMITER_CACHE_KEY}; use std::sync::Arc; use time::Duration; @@ -148,8 +148,10 @@ mod tests { #[tokio::test] async fn test_dynamic_limited() { - let client = MockRedisClient::new() - .zrangebyscore_ret("@posthog/capture-overflow/events", vec![String::from("banana")]); + let client = MockRedisClient::new().zrangebyscore_ret( + "@posthog/capture-overflow/events", + vec![String::from("banana")], + ); let client = Arc::new(client); let limiter = RedisLimiter::new(Duration::seconds(1), client, OVERFLOW_LIMITER_CACHE_KEY.to_string(), None, QuotaResource::Recordings) diff --git a/rust/capture/tests/common.rs b/rust/capture/tests/common.rs index b91d343971cce..02a0d2caa8a09 100644 --- a/rust/capture/tests/common.rs +++ b/rust/capture/tests/common.rs @@ -26,7 +26,9 @@ use tokio::time::timeout; use tracing::{debug, warn}; use capture::config::{CaptureMode, Config, KafkaConfig}; -use capture::limiters::redis::QuotaResource; +use capture::limiters::redis::{ + QuotaResource, OVERFLOW_LIMITER_CACHE_KEY, QUOTA_LIMITER_CACHE_KEY, +}; use capture::server::serve; pub static DEFAULT_CONFIG: Lazy = Lazy::new(|| Config { @@ -279,7 +281,27 @@ impl PrefixedRedis { } pub fn add_billing_limit(&self, res: QuotaResource, token: &str, until: time::Duration) { - let key = format!("{}@posthog/quota-limits/{}", self.key_prefix, res.as_str()); + let key = format!( + "{}{}{}", + self.key_prefix, + QUOTA_LIMITER_CACHE_KEY, + res.as_str() + ); + let score = OffsetDateTime::now_utc().add(until).unix_timestamp(); + self.client + .get_connection() + .expect("failed to get connection") + .zadd::(key, token, score) + .expect("failed to insert in redis"); + } + + pub fn add_overflow_limit(&self, res: QuotaResource, token: &str, until: time::Duration) { + let key = format!( + "{}{}{}", + self.key_prefix, + OVERFLOW_LIMITER_CACHE_KEY, + res.as_str() + ); let score = OffsetDateTime::now_utc().add(until).unix_timestamp(); self.client .get_connection() diff --git a/rust/capture/tests/recordings.rs b/rust/capture/tests/recordings.rs index 1dfd763701164..fab1ce38a57b1 100644 --- a/rust/capture/tests/recordings.rs +++ b/rust/capture/tests/recordings.rs @@ -1,8 +1,12 @@ use crate::common::*; use anyhow::Result; use assert_json_diff::assert_json_include; +use capture::config::CaptureMode; +use capture::limiters::redis::QuotaResource; use reqwest::StatusCode; -use serde_json::json; +use serde_json::{json, value::Value}; +use time::Duration; + mod common; #[tokio::test] @@ -117,3 +121,103 @@ async fn it_defaults_window_id_to_session_id() -> Result<()> { assert_eq!(StatusCode::OK, res.status()); Ok(()) } + +#[tokio::test] +async fn it_applies_overflow_limits() -> Result<()> { + setup_tracing(); + let token = random_string("token", 16); + let session1 = random_string("session1", 16); + let session2 = random_string("session2", 16); + let session3 = random_string("session3", 16); + let distinct_id = random_string("id", 16); + + let topic = EphemeralTopic::new().await; + let overflow_topic = EphemeralTopic::new().await; + + // Setup overflow limits: + // - session1 limit is expired -> accept messages + // - session2 limit is active -> send to overflow + // - session3 is not in redis -> accept by default + let redis = PrefixedRedis::new().await; + redis.add_overflow_limit(QuotaResource::Recordings, &session1, Duration::seconds(-60)); + redis.add_overflow_limit(QuotaResource::Recordings, &session2, Duration::seconds(60)); + + let mut config = DEFAULT_CONFIG.clone(); + config.redis_key_prefix = redis.key_prefix(); + config.kafka.kafka_topic = topic.topic_name().to_string(); + config.kafka.kafka_replay_overflow_topic = overflow_topic.topic_name().to_string(); + config.kafka.kafka_replay_overflow_topic = overflow_topic.topic_name().to_string(); + config.capture_mode = CaptureMode::Recordings; + let server = ServerHandle::for_config(config).await; + + for payload in [ + json!({ + "token": token, + "event": "testing", + "distinct_id": distinct_id, + "properties": { + "$session_id": session1, + "$snapshot_data": [], + }, + }), + json!({ + "token": token, + "event": "testing", + "distinct_id": distinct_id, + "properties": { + "$session_id": session2, + "$snapshot_data": [], + }, + }), + json!({ + "token": token, + "event": "testing", + "distinct_id": distinct_id, + "properties": { + "$session_id": session3, + "$snapshot_data": [], + }, + }), + ] { + let res = server.capture_recording(payload.to_string()).await; + assert_eq!(StatusCode::OK, res.status()); + } + + // Batches 1 and 3 go through, batch 2 is sent to overflow + assert_json_include!( + actual: serde_json::from_str::(topic.next_event()?.get("data").unwrap().as_str().unwrap())?, + expected: json!({ + "event": "$snapshot_items", + "properties": { + "$session_id": session1, + "distinct_id": distinct_id, + "$snapshot_items": [], + }, + }) + ); + assert_json_include!( + actual: serde_json::from_str::(topic.next_event()?.get("data").unwrap().as_str().unwrap())?, + expected: json!({ + "event": "$snapshot_items", + "properties": { + "$session_id": session3, + "distinct_id": distinct_id, + "$snapshot_items": [], + }, + }) + ); + + assert_json_include!( + actual: serde_json::from_str::(overflow_topic.next_event()?.get("data").unwrap().as_str().unwrap())?, + expected: json!({ + "event": "$snapshot_items", + "properties": { + "$session_id": session2, + "distinct_id": distinct_id, + "$snapshot_items": [], + }, + }) + ); + + Ok(()) +} From c93ae7a0e1918688e242175abb5c912973960222 Mon Sep 17 00:00:00 2001 From: Frank Hamand Date: Thu, 5 Sep 2024 13:44:13 +0100 Subject: [PATCH 4/7] clippy fixes --- rust/capture/src/limiters/redis.rs | 2 +- rust/capture/src/sinks/kafka.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/capture/src/limiters/redis.rs b/rust/capture/src/limiters/redis.rs index 2ccfa2b27d839..da4974e771209 100644 --- a/rust/capture/src/limiters/redis.rs +++ b/rust/capture/src/limiters/redis.rs @@ -77,10 +77,10 @@ impl RedisLimiter { let key_prefix = redis_key_prefix.unwrap_or_default(); let limiter = RedisLimiter { - interval, limited, redis: redis.clone(), key: format!("{key_prefix}{limiter_cache_key}{}", resource.as_str()), + interval, }; // Spawn a background task to periodically fetch data from Redis diff --git a/rust/capture/src/sinks/kafka.rs b/rust/capture/src/sinks/kafka.rs index 912dd5c5f20cf..7c014f754bece 100644 --- a/rust/capture/src/sinks/kafka.rs +++ b/rust/capture/src/sinks/kafka.rs @@ -185,7 +185,7 @@ impl KafkaSink { exceptions_topic: config.kafka_exceptions_topic, heatmaps_topic: config.kafka_heatmaps_topic, replay_overflow_topic: config.kafka_replay_overflow_topic, - replay_overflow_limiter: replay_overflow_limiter, + replay_overflow_limiter, }) } @@ -235,7 +235,7 @@ impl KafkaSink { None => false, Some(limiter) => { limiter - .is_limited(&session_id) + .is_limited(session_id) .await } }; From efe2a4a965ebef69ce7f17d30fdd7917eae79fb8 Mon Sep 17 00:00:00 2001 From: Frank Hamand Date: Fri, 6 Sep 2024 15:10:45 +0100 Subject: [PATCH 5/7] fix import --- rust/capture/src/sinks/kafka.rs | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/rust/capture/src/sinks/kafka.rs b/rust/capture/src/sinks/kafka.rs index 7c014f754bece..2d189abefa6b3 100644 --- a/rust/capture/src/sinks/kafka.rs +++ b/rust/capture/src/sinks/kafka.rs @@ -1,4 +1,4 @@ -use crate::limiters::redis::QuotaResource; +use crate::limiters::redis::RedisLimiter; use async_trait::async_trait; use health::HealthHandle; use metrics::{counter, gauge, histogram}; @@ -233,11 +233,7 @@ impl KafkaSink { .ok_or(CaptureError::MissingSessionId)?; let is_overflowing = match &self.replay_overflow_limiter { None => false, - Some(limiter) => { - limiter - .is_limited(session_id) - .await - } + Some(limiter) => limiter.is_limited(session_id).await, }; if is_overflowing { From fba05f82b4391dd943e3478fd9c033a307557c58 Mon Sep 17 00:00:00 2001 From: Frank Hamand Date: Fri, 6 Sep 2024 15:10:52 +0100 Subject: [PATCH 6/7] fmt --- rust/capture/src/limiters/redis.rs | 10 ++++++++-- rust/capture/src/server.rs | 4 +++- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/rust/capture/src/limiters/redis.rs b/rust/capture/src/limiters/redis.rs index da4974e771209..22d384d08c5a3 100644 --- a/rust/capture/src/limiters/redis.rs +++ b/rust/capture/src/limiters/redis.rs @@ -154,8 +154,14 @@ mod tests { ); let client = Arc::new(client); - let limiter = RedisLimiter::new(Duration::seconds(1), client, OVERFLOW_LIMITER_CACHE_KEY.to_string(), None, QuotaResource::Recordings) - .expect("Failed to create billing limiter"); + let limiter = RedisLimiter::new( + Duration::seconds(1), + client, + OVERFLOW_LIMITER_CACHE_KEY.to_string(), + None, + QuotaResource::Recordings, + ) + .expect("Failed to create billing limiter"); tokio::time::sleep(std::time::Duration::from_millis(30)).await; assert!(!limiter.is_limited("not_limited").await); diff --git a/rust/capture/src/server.rs b/rust/capture/src/server.rs index bb328245b5138..85d84a1d6fb7d 100644 --- a/rust/capture/src/server.rs +++ b/rust/capture/src/server.rs @@ -10,7 +10,9 @@ use crate::config::CaptureMode; use crate::config::Config; use crate::limiters::overflow::OverflowLimiter; -use crate::limiters::redis::{QuotaResource, RedisLimiter, OVERFLOW_LIMITER_CACHE_KEY, QUOTA_LIMITER_CACHE_KEY}; +use crate::limiters::redis::{ + QuotaResource, RedisLimiter, OVERFLOW_LIMITER_CACHE_KEY, QUOTA_LIMITER_CACHE_KEY, +}; use crate::redis::RedisClient; use crate::router; use crate::sinks::kafka::KafkaSink; From f14deb9b8c6627957808936944d605b10fb3a686 Mon Sep 17 00:00:00 2001 From: Frank Hamand Date: Mon, 9 Sep 2024 16:47:18 +0100 Subject: [PATCH 7/7] fix test --- rust/capture/src/limiters/redis.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/capture/src/limiters/redis.rs b/rust/capture/src/limiters/redis.rs index 22d384d08c5a3..1a59ddd3b0b54 100644 --- a/rust/capture/src/limiters/redis.rs +++ b/rust/capture/src/limiters/redis.rs @@ -149,7 +149,7 @@ mod tests { #[tokio::test] async fn test_dynamic_limited() { let client = MockRedisClient::new().zrangebyscore_ret( - "@posthog/capture-overflow/events", + "@posthog/capture-overflow/recordings", vec![String::from("banana")], ); let client = Arc::new(client);