Skip to content

Commit

Permalink
feat: add replay overflow limiter to rust capture (#24803)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankh authored and timgl committed Sep 10, 2024
1 parent 7a67664 commit 47e860b
Show file tree
Hide file tree
Showing 7 changed files with 205 additions and 26 deletions.
2 changes: 2 additions & 0 deletions rust/capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ pub struct KafkaConfig {
pub kafka_exceptions_topic: String,
#[envconfig(default = "events_plugin_ingestion")]
pub kafka_heatmaps_topic: String,
#[envconfig(default = "session_recording_snapshot_item_overflow")]
pub kafka_replay_overflow_topic: String,
#[envconfig(default = "false")]
pub kafka_tls: bool,
#[envconfig(default = "")]
Expand Down
29 changes: 22 additions & 7 deletions rust/capture/src/limiters/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use crate::redis::Client;
///
/// Some small delay between an account being limited and the limit taking effect is acceptable.
/// However, ideally we should not allow requests from some pods but 429 from others.
const QUOTA_LIMITER_CACHE_KEY: &str = "@posthog/quota-limits/";
// todo: fetch from env
pub const QUOTA_LIMITER_CACHE_KEY: &str = "@posthog/quota-limits/";
pub const OVERFLOW_LIMITER_CACHE_KEY: &str = "@posthog/capture-overflow/";

#[derive(Debug)]
pub enum QuotaResource {
Expand Down Expand Up @@ -66,17 +69,18 @@ impl RedisLimiter {
pub fn new(
interval: Duration,
redis: Arc<dyn Client + Send + Sync>,
limiter_cache_key: String,
redis_key_prefix: Option<String>,
resource: QuotaResource,
) -> anyhow::Result<RedisLimiter> {
let limited = Arc::new(RwLock::new(HashSet::new()));
let key_prefix = redis_key_prefix.unwrap_or_default();

let limiter = RedisLimiter {
interval,
limited,
redis: redis.clone(),
key: format!("{key_prefix}{QUOTA_LIMITER_CACHE_KEY}{}", resource.as_str()),
key: format!("{key_prefix}{limiter_cache_key}{}", resource.as_str()),
interval,
};

// Spawn a background task to periodically fetch data from Redis
Expand Down Expand Up @@ -133,6 +137,7 @@ impl RedisLimiter {

#[cfg(test)]
mod tests {
use crate::limiters::redis::{OVERFLOW_LIMITER_CACHE_KEY, QUOTA_LIMITER_CACHE_KEY};
use std::sync::Arc;
use time::Duration;

Expand All @@ -143,12 +148,20 @@ mod tests {

#[tokio::test]
async fn test_dynamic_limited() {
let client = MockRedisClient::new()
.zrangebyscore_ret("@posthog/quota-limits/events", vec![String::from("banana")]);
let client = MockRedisClient::new().zrangebyscore_ret(
"@posthog/capture-overflow/recordings",
vec![String::from("banana")],
);
let client = Arc::new(client);

let limiter = RedisLimiter::new(Duration::seconds(1), client, None, QuotaResource::Events)
.expect("Failed to create billing limiter");
let limiter = RedisLimiter::new(
Duration::seconds(1),
client,
OVERFLOW_LIMITER_CACHE_KEY.to_string(),
None,
QuotaResource::Recordings,
)
.expect("Failed to create billing limiter");
tokio::time::sleep(std::time::Duration::from_millis(30)).await;

assert!(!limiter.is_limited("not_limited").await);
Expand All @@ -167,6 +180,7 @@ mod tests {
let limiter = RedisLimiter::new(
Duration::seconds(1),
client.clone(),
QUOTA_LIMITER_CACHE_KEY.to_string(),
None,
QuotaResource::Events,
)
Expand All @@ -178,6 +192,7 @@ mod tests {
let prefixed_limiter = RedisLimiter::new(
Duration::microseconds(1),
client,
QUOTA_LIMITER_CACHE_KEY.to_string(),
Some("prefix//".to_string()),
QuotaResource::Events,
)
Expand Down
28 changes: 25 additions & 3 deletions rust/capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use crate::config::CaptureMode;
use crate::config::Config;

use crate::limiters::overflow::OverflowLimiter;
use crate::limiters::redis::{QuotaResource, RedisLimiter};
use crate::limiters::redis::{
QuotaResource, RedisLimiter, OVERFLOW_LIMITER_CACHE_KEY, QUOTA_LIMITER_CACHE_KEY,
};
use crate::redis::RedisClient;
use crate::router;
use crate::sinks::kafka::KafkaSink;
Expand All @@ -25,9 +27,24 @@ where
let redis_client =
Arc::new(RedisClient::new(config.redis_url).expect("failed to create redis client"));

let replay_overflow_limiter = match config.capture_mode {
CaptureMode::Recordings => Some(
RedisLimiter::new(
Duration::seconds(5),
redis_client.clone(),
OVERFLOW_LIMITER_CACHE_KEY.to_string(),
config.redis_key_prefix.clone(),
QuotaResource::Recordings,
)
.expect("failed to start replay overflow limiter"),
),
_ => None,
};

let billing_limiter = RedisLimiter::new(
Duration::seconds(5),
redis_client.clone(),
QUOTA_LIMITER_CACHE_KEY.to_string(),
config.redis_key_prefix,
match config.capture_mode {
CaptureMode::Events => QuotaResource::Events,
Expand Down Expand Up @@ -86,8 +103,13 @@ where
Some(partition)
}
};
let sink = KafkaSink::new(config.kafka, sink_liveness, partition)
.expect("failed to start Kafka sink");
let sink = KafkaSink::new(
config.kafka,
sink_liveness,
partition,
replay_overflow_limiter,
)
.expect("failed to start Kafka sink");

router::router(
crate::time::SystemTime {},
Expand Down
35 changes: 24 additions & 11 deletions rust/capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::time::Duration;

use crate::limiters::redis::RedisLimiter;
use async_trait::async_trait;
use health::HealthHandle;
use metrics::{counter, gauge, histogram};
Expand All @@ -8,6 +7,7 @@ use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord, Producer};
use rdkafka::util::Timeout;
use rdkafka::ClientConfig;
use std::time::Duration;
use tokio::task::JoinSet;
use tracing::log::{debug, error, info};
use tracing::{info_span, instrument, Instrument};
Expand Down Expand Up @@ -114,13 +114,16 @@ pub struct KafkaSink {
client_ingestion_warning_topic: String,
exceptions_topic: String,
heatmaps_topic: String,
replay_overflow_limiter: Option<RedisLimiter>,
replay_overflow_topic: String,
}

impl KafkaSink {
pub fn new(
config: KafkaConfig,
liveness: HealthHandle,
partition: Option<OverflowLimiter>,
replay_overflow_limiter: Option<RedisLimiter>,
) -> anyhow::Result<KafkaSink> {
info!("connecting to Kafka brokers at {}...", config.kafka_hosts);

Expand Down Expand Up @@ -181,6 +184,8 @@ impl KafkaSink {
client_ingestion_warning_topic: config.kafka_client_ingestion_warning_topic,
exceptions_topic: config.kafka_exceptions_topic,
heatmaps_topic: config.kafka_heatmaps_topic,
replay_overflow_topic: config.kafka_replay_overflow_topic,
replay_overflow_limiter,
})
}

Expand Down Expand Up @@ -222,14 +227,21 @@ impl KafkaSink {
),
DataType::HeatmapMain => (&self.heatmaps_topic, Some(event_key.as_str())),
DataType::ExceptionMain => (&self.exceptions_topic, Some(event_key.as_str())),
DataType::SnapshotMain => (
&self.main_topic,
Some(
session_id
.as_deref()
.ok_or(CaptureError::MissingSessionId)?,
),
),
DataType::SnapshotMain => {
let session_id = session_id
.as_deref()
.ok_or(CaptureError::MissingSessionId)?;
let is_overflowing = match &self.replay_overflow_limiter {
None => false,
Some(limiter) => limiter.is_limited(session_id).await,
};

if is_overflowing {
(&self.replay_overflow_topic, Some(session_id))
} else {
(&self.main_topic, Some(session_id))
}
}
};

match self.producer.send_result(FutureRecord {
Expand Down Expand Up @@ -377,12 +389,13 @@ mod tests {
kafka_client_ingestion_warning_topic: "events_plugin_ingestion".to_string(),
kafka_exceptions_topic: "events_plugin_ingestion".to_string(),
kafka_heatmaps_topic: "events_plugin_ingestion".to_string(),
kafka_replay_overflow_topic: "session_recording_snapshot_item_overflow".to_string(),
kafka_tls: false,
kafka_client_id: "".to_string(),
kafka_metadata_max_age_ms: 60000,
kafka_producer_max_retries: 2,
};
let sink = KafkaSink::new(config, handle, limiter).expect("failed to create sink");
let sink = KafkaSink::new(config, handle, limiter, None).expect("failed to create sink");
(cluster, sink)
}

Expand Down
27 changes: 25 additions & 2 deletions rust/capture/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use tokio::time::timeout;
use tracing::{debug, warn};

use capture::config::{CaptureMode, Config, KafkaConfig};
use capture::limiters::redis::QuotaResource;
use capture::limiters::redis::{
QuotaResource, OVERFLOW_LIMITER_CACHE_KEY, QUOTA_LIMITER_CACHE_KEY,
};
use capture::server::serve;

pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
Expand All @@ -49,6 +51,7 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
kafka_client_ingestion_warning_topic: "events_plugin_ingestion".to_string(),
kafka_exceptions_topic: "events_plugin_ingestion".to_string(),
kafka_heatmaps_topic: "events_plugin_ingestion".to_string(),
kafka_replay_overflow_topic: "session_recording_snapshot_item_overflow".to_string(),
kafka_tls: false,
kafka_client_id: "".to_string(),
kafka_metadata_max_age_ms: 60000,
Expand Down Expand Up @@ -278,7 +281,27 @@ impl PrefixedRedis {
}

pub fn add_billing_limit(&self, res: QuotaResource, token: &str, until: time::Duration) {
let key = format!("{}@posthog/quota-limits/{}", self.key_prefix, res.as_str());
let key = format!(
"{}{}{}",
self.key_prefix,
QUOTA_LIMITER_CACHE_KEY,
res.as_str()
);
let score = OffsetDateTime::now_utc().add(until).unix_timestamp();
self.client
.get_connection()
.expect("failed to get connection")
.zadd::<String, i64, &str, i64>(key, token, score)
.expect("failed to insert in redis");
}

pub fn add_overflow_limit(&self, res: QuotaResource, token: &str, until: time::Duration) {
let key = format!(
"{}{}{}",
self.key_prefix,
OVERFLOW_LIMITER_CACHE_KEY,
res.as_str()
);
let score = OffsetDateTime::now_utc().add(until).unix_timestamp();
self.client
.get_connection()
Expand Down
4 changes: 2 additions & 2 deletions rust/capture/tests/django_compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use base64::engine::general_purpose;
use base64::Engine;
use capture::api::{CaptureError, CaptureResponse, CaptureResponseCode, DataType, ProcessedEvent};
use capture::config::CaptureMode;
use capture::limiters::redis::QuotaResource;
use capture::limiters::redis::RedisLimiter;
use capture::limiters::redis::{QuotaResource, RedisLimiter, QUOTA_LIMITER_CACHE_KEY};
use capture::redis::MockRedisClient;
use capture::router::router;
use capture::sinks::Event;
Expand Down Expand Up @@ -105,6 +104,7 @@ async fn it_matches_django_capture_behaviour() -> anyhow::Result<()> {
let billing_limiter = RedisLimiter::new(
Duration::weeks(1),
redis.clone(),
QUOTA_LIMITER_CACHE_KEY.to_string(),
None,
QuotaResource::Events,
)
Expand Down
Loading

0 comments on commit 47e860b

Please sign in to comment.