Skip to content

Commit

Permalink
chore: rename billing limiter to redis limiter in rust capture (#24640)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankh authored Aug 29, 2024
1 parent 1e1a740 commit f061c47
Show file tree
Hide file tree
Showing 8 changed files with 29 additions and 28 deletions.
2 changes: 1 addition & 1 deletion rust/capture/src/limiters/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pub mod billing;
pub mod overflow;
pub mod redis;
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@ use std::{collections::HashSet, ops::Sub, sync::Arc};

use crate::redis::Client;

/// Limit accounts by team ID if they hit a billing limit
/// Limit events by checking if a value is present in Redis
///
/// We have an async celery worker that regularly checks on accounts + assesses if they are beyond
/// a billing limit. If this is the case, a key is set in redis.
///
/// For replay sessions we also check if too many events are coming in in ingestion for a single session
/// and set a redis key to redirect further events to overflow.
///
/// Requirements
///
/// 1. Updates from the celery worker should be reflected in capture within a short period of time
Expand Down Expand Up @@ -50,16 +53,16 @@ pub enum LimiterError {
}

#[derive(Clone)]
pub struct BillingLimiter {
pub struct RedisLimiter {
limited: Arc<RwLock<HashSet<String>>>,
redis: Arc<dyn Client + Send + Sync>,
redis_key_prefix: String,
interval: Duration,
updated: Arc<RwLock<OffsetDateTime>>,
}

impl BillingLimiter {
/// Create a new BillingLimiter.
impl RedisLimiter {
/// Create a new RedisLimiter.
///
/// This connects to a redis cluster - pass in a vec of addresses for the initial nodes.
///
Expand All @@ -71,14 +74,14 @@ impl BillingLimiter {
interval: Duration,
redis: Arc<dyn Client + Send + Sync>,
redis_key_prefix: Option<String>,
) -> anyhow::Result<BillingLimiter> {
) -> anyhow::Result<RedisLimiter> {
let limited = Arc::new(RwLock::new(HashSet::new()));

// Force an update immediately if we have any reasonable interval
let updated = OffsetDateTime::from_unix_timestamp(0)?;
let updated = Arc::new(RwLock::new(updated));

Ok(BillingLimiter {
Ok(RedisLimiter {
interval,
limited,
updated,
Expand Down Expand Up @@ -172,7 +175,7 @@ mod tests {
use time::Duration;

use crate::{
limiters::billing::{BillingLimiter, QuotaResource},
limiters::redis::{QuotaResource, RedisLimiter},
redis::MockRedisClient,
};

Expand All @@ -182,7 +185,7 @@ mod tests {
.zrangebyscore_ret("@posthog/quota-limits/events", vec![String::from("banana")]);
let client = Arc::new(client);

let limiter = BillingLimiter::new(Duration::microseconds(1), client, None)
let limiter = RedisLimiter::new(Duration::microseconds(1), client, None)
.expect("Failed to create billing limiter");

assert!(
Expand All @@ -202,12 +205,12 @@ mod tests {
let client = Arc::new(client);

// Default lookup without prefix fails
let limiter = BillingLimiter::new(Duration::microseconds(1), client.clone(), None)
let limiter = RedisLimiter::new(Duration::microseconds(1), client.clone(), None)
.expect("Failed to create billing limiter");
assert!(!limiter.is_limited("banana", QuotaResource::Events).await);

// Limiter using the correct prefix
let prefixed_limiter = BillingLimiter::new(
let prefixed_limiter = RedisLimiter::new(
Duration::microseconds(1),
client,
Some("prefix//".to_string()),
Expand Down
10 changes: 4 additions & 6 deletions rust/capture/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ use health::HealthRegistry;
use tower_http::cors::{AllowHeaders, AllowOrigin, CorsLayer};
use tower_http::trace::TraceLayer;

use crate::{
limiters::billing::BillingLimiter, redis::Client, sinks, time::TimeSource, v0_endpoint,
};
use crate::{limiters::redis::RedisLimiter, redis::Client, sinks, time::TimeSource, v0_endpoint};

use crate::config::CaptureMode;
use crate::prometheus::{setup_metrics_recorder, track_metrics};
Expand All @@ -27,7 +25,7 @@ pub struct State {
pub sink: Arc<dyn sinks::Event + Send + Sync>,
pub timesource: Arc<dyn TimeSource + Send + Sync>,
pub redis: Arc<dyn Client + Send + Sync>,
pub billing: BillingLimiter,
pub billing_limiter: RedisLimiter,
}

async fn index() -> &'static str {
Expand All @@ -43,15 +41,15 @@ pub fn router<
liveness: HealthRegistry,
sink: S,
redis: Arc<R>,
billing: BillingLimiter,
billing_limiter: RedisLimiter,
metrics: bool,
capture_mode: CaptureMode,
) -> Router {
let state = State {
sink: Arc::new(sink),
timesource: Arc::new(timesource),
redis,
billing,
billing_limiter,
};

// Very permissive CORS policy, as old SDK versions
Expand Down
8 changes: 4 additions & 4 deletions rust/capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use tokio::net::TcpListener;

use crate::config::Config;

use crate::limiters::billing::BillingLimiter;
use crate::limiters::overflow::OverflowLimiter;
use crate::limiters::redis::RedisLimiter;
use crate::redis::RedisClient;
use crate::router;
use crate::sinks::kafka::KafkaSink;
Expand All @@ -24,7 +24,7 @@ where
let redis_client =
Arc::new(RedisClient::new(config.redis_url).expect("failed to create redis client"));

let billing = BillingLimiter::new(
let billing_limiter = RedisLimiter::new(
Duration::seconds(5),
redis_client.clone(),
config.redis_key_prefix,
Expand All @@ -44,7 +44,7 @@ where
liveness,
PrintSink {},
redis_client,
billing,
billing_limiter,
config.export_prometheus,
config.capture_mode,
)
Expand Down Expand Up @@ -85,7 +85,7 @@ where
liveness,
sink,
redis_client,
billing,
billing_limiter,
config.export_prometheus,
config.capture_mode,
)
Expand Down
4 changes: 2 additions & 2 deletions rust/capture/src/v0_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use serde_json::json;
use serde_json::Value;
use tracing::instrument;

use crate::limiters::billing::QuotaResource;
use crate::limiters::redis::QuotaResource;
use crate::prometheus::report_dropped_events;
use crate::v0_request::{Compression, ProcessingContext, RawRequest};
use crate::{
Expand Down Expand Up @@ -117,7 +117,7 @@ async fn handle_common(
};

let billing_limited = state
.billing
.billing_limiter
.is_limited(context.token.as_str(), quota_resource)
.await;

Expand Down
2 changes: 1 addition & 1 deletion rust/capture/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tokio::time::timeout;
use tracing::{debug, warn};

use capture::config::{CaptureMode, Config, KafkaConfig};
use capture::limiters::billing::QuotaResource;
use capture::limiters::redis::QuotaResource;
use capture::server::serve;

pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
Expand Down
6 changes: 3 additions & 3 deletions rust/capture/tests/django_compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use base64::engine::general_purpose;
use base64::Engine;
use capture::api::{CaptureError, CaptureResponse, CaptureResponseCode, DataType, ProcessedEvent};
use capture::config::CaptureMode;
use capture::limiters::billing::BillingLimiter;
use capture::limiters::redis::RedisLimiter;
use capture::redis::MockRedisClient;
use capture::router::router;
use capture::sinks::Event;
Expand Down Expand Up @@ -101,15 +101,15 @@ async fn it_matches_django_capture_behaviour() -> anyhow::Result<()> {
let timesource = FixedTime { time: case.now };

let redis = Arc::new(MockRedisClient::new());
let billing = BillingLimiter::new(Duration::weeks(1), redis.clone(), None)
let billing_limiter = RedisLimiter::new(Duration::weeks(1), redis.clone(), None)
.expect("failed to create billing limiter");

let app = router(
timesource,
liveness.clone(),
sink.clone(),
redis,
billing,
billing_limiter,
false,
CaptureMode::Events,
);
Expand Down
2 changes: 1 addition & 1 deletion rust/capture/tests/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use time::Duration;
use crate::common::*;
use anyhow::Result;
use assert_json_diff::assert_json_include;
use capture::limiters::billing::QuotaResource;
use capture::limiters::redis::QuotaResource;
use reqwest::StatusCode;
use serde_json::json;
use uuid::Uuid;
Expand Down

0 comments on commit f061c47

Please sign in to comment.