Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add replay overflow limiter to rust capture #24803

Merged
merged 7 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions rust/capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ pub struct KafkaConfig {
pub kafka_exceptions_topic: String,
#[envconfig(default = "events_plugin_ingestion")]
pub kafka_heatmaps_topic: String,
#[envconfig(default = "session_recording_snapshot_item_overflow")]
pub kafka_replay_overflow_topic: String,
#[envconfig(default = "false")]
pub kafka_tls: bool,
#[envconfig(default = "")]
Expand Down
29 changes: 22 additions & 7 deletions rust/capture/src/limiters/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ use crate::redis::Client;
///
/// Some small delay between an account being limited and the limit taking effect is acceptable.
/// However, ideally we should not allow requests from some pods but 429 from others.
const QUOTA_LIMITER_CACHE_KEY: &str = "@posthog/quota-limits/";

// todo: fetch from env
pub const QUOTA_LIMITER_CACHE_KEY: &str = "@posthog/quota-limits/";
pub const OVERFLOW_LIMITER_CACHE_KEY: &str = "@posthog/capture-overflow/";

#[derive(Debug)]
pub enum QuotaResource {
Expand Down Expand Up @@ -66,17 +69,18 @@ impl RedisLimiter {
pub fn new(
interval: Duration,
redis: Arc<dyn Client + Send + Sync>,
limiter_cache_key: String,
redis_key_prefix: Option<String>,
resource: QuotaResource,
) -> anyhow::Result<RedisLimiter> {
let limited = Arc::new(RwLock::new(HashSet::new()));
let key_prefix = redis_key_prefix.unwrap_or_default();

let limiter = RedisLimiter {
interval,
limited,
redis: redis.clone(),
key: format!("{key_prefix}{QUOTA_LIMITER_CACHE_KEY}{}", resource.as_str()),
key: format!("{key_prefix}{limiter_cache_key}{}", resource.as_str()),
interval,
};

// Spawn a background task to periodically fetch data from Redis
Expand Down Expand Up @@ -133,6 +137,7 @@ impl RedisLimiter {

#[cfg(test)]
mod tests {
use crate::limiters::redis::{OVERFLOW_LIMITER_CACHE_KEY, QUOTA_LIMITER_CACHE_KEY};
use std::sync::Arc;
use time::Duration;

Expand All @@ -143,12 +148,20 @@ mod tests {

#[tokio::test]
async fn test_dynamic_limited() {
let client = MockRedisClient::new()
.zrangebyscore_ret("@posthog/quota-limits/events", vec![String::from("banana")]);
let client = MockRedisClient::new().zrangebyscore_ret(
"@posthog/capture-overflow/recordings",
vec![String::from("banana")],
);
let client = Arc::new(client);

let limiter = RedisLimiter::new(Duration::seconds(1), client, None, QuotaResource::Events)
.expect("Failed to create billing limiter");
let limiter = RedisLimiter::new(
Duration::seconds(1),
client,
OVERFLOW_LIMITER_CACHE_KEY.to_string(),
None,
QuotaResource::Recordings,
)
.expect("Failed to create billing limiter");
tokio::time::sleep(std::time::Duration::from_millis(30)).await;

assert!(!limiter.is_limited("not_limited").await);
Expand All @@ -167,6 +180,7 @@ mod tests {
let limiter = RedisLimiter::new(
Duration::seconds(1),
client.clone(),
QUOTA_LIMITER_CACHE_KEY.to_string(),
None,
QuotaResource::Events,
)
Expand All @@ -178,6 +192,7 @@ mod tests {
let prefixed_limiter = RedisLimiter::new(
Duration::microseconds(1),
client,
QUOTA_LIMITER_CACHE_KEY.to_string(),
Some("prefix//".to_string()),
QuotaResource::Events,
)
Expand Down
28 changes: 25 additions & 3 deletions rust/capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use crate::config::CaptureMode;
use crate::config::Config;

use crate::limiters::overflow::OverflowLimiter;
use crate::limiters::redis::{QuotaResource, RedisLimiter};
use crate::limiters::redis::{
QuotaResource, RedisLimiter, OVERFLOW_LIMITER_CACHE_KEY, QUOTA_LIMITER_CACHE_KEY,
};
use crate::redis::RedisClient;
use crate::router;
use crate::sinks::kafka::KafkaSink;
Expand All @@ -25,9 +27,24 @@ where
let redis_client =
Arc::new(RedisClient::new(config.redis_url).expect("failed to create redis client"));

let replay_overflow_limiter = match config.capture_mode {
CaptureMode::Recordings => Some(
RedisLimiter::new(
Duration::seconds(5),
redis_client.clone(),
OVERFLOW_LIMITER_CACHE_KEY.to_string(),
config.redis_key_prefix.clone(),
QuotaResource::Recordings,
)
.expect("failed to start replay overflow limiter"),
),
_ => None,
};

let billing_limiter = RedisLimiter::new(
Duration::seconds(5),
redis_client.clone(),
QUOTA_LIMITER_CACHE_KEY.to_string(),
config.redis_key_prefix,
match config.capture_mode {
CaptureMode::Events => QuotaResource::Events,
Expand Down Expand Up @@ -86,8 +103,13 @@ where
Some(partition)
}
};
let sink = KafkaSink::new(config.kafka, sink_liveness, partition)
.expect("failed to start Kafka sink");
let sink = KafkaSink::new(
config.kafka,
sink_liveness,
partition,
replay_overflow_limiter,
)
.expect("failed to start Kafka sink");

router::router(
crate::time::SystemTime {},
Expand Down
35 changes: 24 additions & 11 deletions rust/capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::time::Duration;

use crate::limiters::redis::RedisLimiter;
use async_trait::async_trait;
use health::HealthHandle;
use metrics::{counter, gauge, histogram};
Expand All @@ -8,6 +7,7 @@ use rdkafka::message::{Header, OwnedHeaders};
use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord, Producer};
use rdkafka::util::Timeout;
use rdkafka::ClientConfig;
use std::time::Duration;
use tokio::task::JoinSet;
use tracing::log::{debug, error, info};
use tracing::{info_span, instrument, Instrument};
Expand Down Expand Up @@ -114,13 +114,16 @@ pub struct KafkaSink {
client_ingestion_warning_topic: String,
exceptions_topic: String,
heatmaps_topic: String,
replay_overflow_limiter: Option<RedisLimiter>,
replay_overflow_topic: String,
}

impl KafkaSink {
pub fn new(
config: KafkaConfig,
liveness: HealthHandle,
partition: Option<OverflowLimiter>,
replay_overflow_limiter: Option<RedisLimiter>,
) -> anyhow::Result<KafkaSink> {
info!("connecting to Kafka brokers at {}...", config.kafka_hosts);

Expand Down Expand Up @@ -181,6 +184,8 @@ impl KafkaSink {
client_ingestion_warning_topic: config.kafka_client_ingestion_warning_topic,
exceptions_topic: config.kafka_exceptions_topic,
heatmaps_topic: config.kafka_heatmaps_topic,
replay_overflow_topic: config.kafka_replay_overflow_topic,
replay_overflow_limiter,
})
}

Expand Down Expand Up @@ -222,14 +227,21 @@ impl KafkaSink {
),
DataType::HeatmapMain => (&self.heatmaps_topic, Some(event_key.as_str())),
DataType::ExceptionMain => (&self.exceptions_topic, Some(event_key.as_str())),
DataType::SnapshotMain => (
&self.main_topic,
Some(
session_id
.as_deref()
.ok_or(CaptureError::MissingSessionId)?,
),
),
DataType::SnapshotMain => {
let session_id = session_id
.as_deref()
.ok_or(CaptureError::MissingSessionId)?;
let is_overflowing = match &self.replay_overflow_limiter {
None => false,
Some(limiter) => limiter.is_limited(session_id).await,
};

if is_overflowing {
(&self.replay_overflow_topic, Some(session_id))
} else {
(&self.main_topic, Some(session_id))
}
}
};

match self.producer.send_result(FutureRecord {
Expand Down Expand Up @@ -377,12 +389,13 @@ mod tests {
kafka_client_ingestion_warning_topic: "events_plugin_ingestion".to_string(),
kafka_exceptions_topic: "events_plugin_ingestion".to_string(),
kafka_heatmaps_topic: "events_plugin_ingestion".to_string(),
kafka_replay_overflow_topic: "session_recording_snapshot_item_overflow".to_string(),
kafka_tls: false,
kafka_client_id: "".to_string(),
kafka_metadata_max_age_ms: 60000,
kafka_producer_max_retries: 2,
};
let sink = KafkaSink::new(config, handle, limiter).expect("failed to create sink");
let sink = KafkaSink::new(config, handle, limiter, None).expect("failed to create sink");
(cluster, sink)
}

Expand Down
27 changes: 25 additions & 2 deletions rust/capture/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use tokio::time::timeout;
use tracing::{debug, warn};

use capture::config::{CaptureMode, Config, KafkaConfig};
use capture::limiters::redis::QuotaResource;
use capture::limiters::redis::{
QuotaResource, OVERFLOW_LIMITER_CACHE_KEY, QUOTA_LIMITER_CACHE_KEY,
};
use capture::server::serve;

pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
Expand All @@ -49,6 +51,7 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
kafka_client_ingestion_warning_topic: "events_plugin_ingestion".to_string(),
kafka_exceptions_topic: "events_plugin_ingestion".to_string(),
kafka_heatmaps_topic: "events_plugin_ingestion".to_string(),
kafka_replay_overflow_topic: "session_recording_snapshot_item_overflow".to_string(),
kafka_tls: false,
kafka_client_id: "".to_string(),
kafka_metadata_max_age_ms: 60000,
Expand Down Expand Up @@ -278,7 +281,27 @@ impl PrefixedRedis {
}

pub fn add_billing_limit(&self, res: QuotaResource, token: &str, until: time::Duration) {
let key = format!("{}@posthog/quota-limits/{}", self.key_prefix, res.as_str());
let key = format!(
"{}{}{}",
self.key_prefix,
QUOTA_LIMITER_CACHE_KEY,
res.as_str()
);
let score = OffsetDateTime::now_utc().add(until).unix_timestamp();
self.client
.get_connection()
.expect("failed to get connection")
.zadd::<String, i64, &str, i64>(key, token, score)
.expect("failed to insert in redis");
}

pub fn add_overflow_limit(&self, res: QuotaResource, token: &str, until: time::Duration) {
let key = format!(
"{}{}{}",
self.key_prefix,
OVERFLOW_LIMITER_CACHE_KEY,
res.as_str()
);
let score = OffsetDateTime::now_utc().add(until).unix_timestamp();
self.client
.get_connection()
Expand Down
4 changes: 2 additions & 2 deletions rust/capture/tests/django_compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use base64::engine::general_purpose;
use base64::Engine;
use capture::api::{CaptureError, CaptureResponse, CaptureResponseCode, DataType, ProcessedEvent};
use capture::config::CaptureMode;
use capture::limiters::redis::QuotaResource;
use capture::limiters::redis::RedisLimiter;
use capture::limiters::redis::{QuotaResource, RedisLimiter, QUOTA_LIMITER_CACHE_KEY};
use capture::redis::MockRedisClient;
use capture::router::router;
use capture::sinks::Event;
Expand Down Expand Up @@ -105,6 +104,7 @@ async fn it_matches_django_capture_behaviour() -> anyhow::Result<()> {
let billing_limiter = RedisLimiter::new(
Duration::weeks(1),
redis.clone(),
QUOTA_LIMITER_CACHE_KEY.to_string(),
None,
QuotaResource::Events,
)
Expand Down
Loading
Loading