diff --git a/rust/feature-flags/src/flag_analytics.rs b/rust/feature-flags/src/flag_analytics.rs new file mode 100644 index 0000000000000..6bdfcb4b2e903 --- /dev/null +++ b/rust/feature-flags/src/flag_analytics.rs @@ -0,0 +1,112 @@ +use anyhow::Result; +use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; + +use crate::flag_request::FlagRequestType; +use crate::redis::{Client as RedisClient, CustomRedisError}; + +const CACHE_BUCKET_SIZE: u64 = 60 * 2; // duration in seconds + +pub fn get_team_request_key(team_id: i32, request_type: FlagRequestType) -> String { + match request_type { + FlagRequestType::Decide => format!("posthog:decide_requests:{}", team_id), + FlagRequestType::LocalEvaluation => { + format!("posthog:local_evaluation_requests:{}", team_id) + } + } +} + +pub async fn increment_request_count( + redis_client: Arc, + team_id: i32, + count: i32, + request_type: FlagRequestType, +) -> Result<(), CustomRedisError> { + let time_bucket = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + / CACHE_BUCKET_SIZE; + let key_name = get_team_request_key(team_id, request_type); + redis_client + .hincrby(key_name, time_bucket.to_string(), Some(count)) + .await?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::test_utils::setup_redis_client; + + #[tokio::test] + async fn test_get_team_request_key() { + assert_eq!( + get_team_request_key(123, FlagRequestType::Decide), + "posthog:decide_requests:123" + ); + assert_eq!( + get_team_request_key(456, FlagRequestType::LocalEvaluation), + "posthog:local_evaluation_requests:456" + ); + } + + #[tokio::test] + async fn test_increment_request_count() { + let redis_client = setup_redis_client(None); + + let team_id = 789; + let count = 5; + + // Test for Decide request type + increment_request_count( + redis_client.clone(), + team_id, + count, + FlagRequestType::Decide, + ) + .await + .unwrap(); + + // Test for LocalEvaluation request type + increment_request_count( + redis_client.clone(), + team_id, + count, + FlagRequestType::LocalEvaluation, + ) + .await + .unwrap(); + + let decide_key = get_team_request_key(team_id, FlagRequestType::Decide); + let local_eval_key = get_team_request_key(team_id, FlagRequestType::LocalEvaluation); + + // Get the current time bucket + let time_bucket = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() + / CACHE_BUCKET_SIZE; + + // Verify the counts in Redis + let decide_count: i32 = redis_client + .hget(decide_key.clone(), time_bucket.to_string()) + .await + .unwrap() + .parse() + .unwrap(); + let local_eval_count: i32 = redis_client + .hget(local_eval_key.clone(), time_bucket.to_string()) + .await + .unwrap() + .parse() + .unwrap(); + + assert_eq!(decide_count, count); + assert_eq!(local_eval_count, count); + + // Clean up Redis after the test + redis_client.del(decide_key).await.unwrap(); + redis_client.del(local_eval_key).await.unwrap(); + } +} diff --git a/rust/feature-flags/src/flag_request.rs b/rust/feature-flags/src/flag_request.rs index 78f8637177745..05c4ceff047be 100644 --- a/rust/feature-flags/src/flag_request.rs +++ b/rust/feature-flags/src/flag_request.rs @@ -10,6 +10,12 @@ use crate::{ redis::Client as RedisClient, team::Team, }; +#[derive(Debug, Clone, Copy)] +pub enum FlagRequestType { + Decide, + LocalEvaluation, +} + #[derive(Default, Debug, Deserialize, Serialize)] pub struct FlagRequest { #[serde( diff --git a/rust/feature-flags/src/lib.rs b/rust/feature-flags/src/lib.rs index ed25ba7e318d6..ecce361fabc4c 100644 --- a/rust/feature-flags/src/lib.rs +++ b/rust/feature-flags/src/lib.rs @@ -2,6 +2,7 @@ pub mod api; pub mod config; pub mod database; pub mod feature_flag_match_reason; +pub mod flag_analytics; pub mod flag_definitions; pub mod flag_matching; pub mod flag_request; diff --git a/rust/feature-flags/src/redis.rs b/rust/feature-flags/src/redis.rs index 954ffe1a09f04..56d15b48c00ba 100644 --- a/rust/feature-flags/src/redis.rs +++ b/rust/feature-flags/src/redis.rs @@ -25,16 +25,22 @@ pub enum CustomRedisError { } /// A simple redis wrapper /// Copied from capture/src/redis.rs. -/// TODO: Modify this to support hincrby +/// Supports get, set, del, zrangebyscore, and hincrby operations #[async_trait] pub trait Client { // A very simplified wrapper, but works for our usage async fn zrangebyscore(&self, k: String, min: String, max: String) -> Result>; - + async fn hincrby( + &self, + k: String, + v: String, + count: Option, + ) -> Result<(), CustomRedisError>; async fn get(&self, k: String) -> Result; async fn set(&self, k: String, v: String) -> Result<()>; async fn del(&self, k: String) -> Result<(), CustomRedisError>; + async fn hget(&self, k: String, field: String) -> Result; } pub struct RedisClient { @@ -60,6 +66,21 @@ impl Client for RedisClient { Ok(fut?) } + async fn hincrby( + &self, + k: String, + v: String, + count: Option, + ) -> Result<(), CustomRedisError> { + let mut conn = self.client.get_async_connection().await?; + + let count = count.unwrap_or(1); + let results = conn.hincr(k, v, count); + let fut = timeout(Duration::from_secs(REDIS_TIMEOUT_MILLISECS), results).await?; + + fut.map_err(CustomRedisError::from) + } + async fn get(&self, k: String) -> Result { let mut conn = self.client.get_async_connection().await?; @@ -99,9 +120,21 @@ impl Client for RedisClient { let mut conn = self.client.get_async_connection().await?; let results = conn.del(k); - let fut: Result<(), RedisError> = - timeout(Duration::from_secs(REDIS_TIMEOUT_MILLISECS), results).await?; + let fut = timeout(Duration::from_secs(REDIS_TIMEOUT_MILLISECS), results).await?; fut.map_err(CustomRedisError::from) } + + async fn hget(&self, k: String, field: String) -> Result { + let mut conn = self.client.get_async_connection().await?; + + let results = conn.hget(k, field); + let fut: Result, RedisError> = + timeout(Duration::from_secs(REDIS_TIMEOUT_MILLISECS), results).await?; + + match fut? { + Some(value) => Ok(value), + None => Err(CustomRedisError::NotFound), + } + } }