From 029061fcec2e772d39968b83375da8b041f1f996 Mon Sep 17 00:00:00 2001 From: dylan Date: Wed, 20 Nov 2024 18:29:42 -0600 Subject: [PATCH 1/7] first pass of easy cleanups --- rust/feature-flags/src/api/handler.rs | 64 +- .../src/cohort/cohort_cache_manager.rs | 84 +- .../src/cohort/cohort_operations.rs | 113 +-- rust/feature-flags/src/flags/flag_matching.rs | 774 ++++++++---------- .../src/flags/flag_operations.rs | 116 +-- rust/feature-flags/src/router.rs | 12 +- rust/feature-flags/src/server.rs | 11 +- .../tests/test_flag_matching_consistency.rs | 20 +- 8 files changed, 522 insertions(+), 672 deletions(-) diff --git a/rust/feature-flags/src/api/handler.rs b/rust/feature-flags/src/api/handler.rs index 26c7527678526..21dfa60c67e8a 100644 --- a/rust/feature-flags/src/api/handler.rs +++ b/rust/feature-flags/src/api/handler.rs @@ -69,8 +69,8 @@ pub struct FeatureFlagEvaluationContext { team_id: i32, distinct_id: String, feature_flags: FeatureFlagList, - postgres_reader: Arc, - postgres_writer: Arc, + reader: Arc, + writer: Arc, cohort_cache: Arc, #[builder(default)] person_property_overrides: Option>, @@ -93,10 +93,10 @@ pub async fn process_request(context: RequestContext) -> Result Result Result FlagsResponse { let group_type_mapping_cache = - GroupTypeMappingCache::new(context.team_id, context.postgres_reader.clone()); + GroupTypeMappingCache::new(context.team_id, context.reader.clone()); let mut feature_flag_matcher = FeatureFlagMatcher::new( context.distinct_id, context.team_id, - context.postgres_reader, - context.postgres_writer, + context.reader, + context.writer, context.cohort_cache, Some(group_type_mapping_cache), context.groups, @@ -362,9 +362,9 @@ mod tests { #[tokio::test] async fn test_evaluate_feature_flags() { - let postgres_reader: Arc = setup_pg_reader_client(None).await; - let postgres_writer: Arc = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); + let reader: Arc = setup_pg_reader_client(None).await; + let writer: Arc = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); let flag = FeatureFlag { name: Some("Test Flag".to_string()), id: 1, @@ -402,8 +402,8 @@ mod tests { .team_id(1) .distinct_id("user123".to_string()) .feature_flags(feature_flag_list) - .postgres_reader(postgres_reader) - .postgres_writer(postgres_writer) + .reader(reader) + .writer(writer) .cohort_cache(cohort_cache) .person_property_overrides(Some(person_properties)) .build() @@ -511,9 +511,9 @@ mod tests { #[tokio::test] async fn test_evaluate_feature_flags_multiple_flags() { - let postgres_reader: Arc = setup_pg_reader_client(None).await; - let postgres_writer: Arc = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); + let reader: Arc = setup_pg_reader_client(None).await; + let writer: Arc = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); let flags = vec![ FeatureFlag { name: Some("Flag 1".to_string()), @@ -563,8 +563,8 @@ mod tests { .team_id(1) .distinct_id("user123".to_string()) .feature_flags(feature_flag_list) - .postgres_reader(postgres_reader) - .postgres_writer(postgres_writer) + .reader(reader) + .writer(writer) .cohort_cache(cohort_cache) .build() .expect("Failed to build FeatureFlagEvaluationContext"); @@ -616,12 +616,10 @@ mod tests { #[tokio::test] async fn test_evaluate_feature_flags_with_overrides() { - let postgres_reader: Arc = setup_pg_reader_client(None).await; - let postgres_writer: Arc = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader: Arc = setup_pg_reader_client(None).await; + let writer: Arc = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); let flag = FeatureFlag { name: Some("Test Flag".to_string()), @@ -665,8 +663,8 @@ mod tests { .team_id(team.id) .distinct_id("user123".to_string()) .feature_flags(feature_flag_list) - .postgres_reader(postgres_reader) - .postgres_writer(postgres_writer) + .reader(reader) + .writer(writer) .cohort_cache(cohort_cache) .group_property_overrides(Some(group_property_overrides)) .groups(Some(groups)) @@ -699,9 +697,9 @@ mod tests { #[tokio::test] async fn test_long_distinct_id() { let long_id = "a".repeat(1000); - let postgres_reader: Arc = setup_pg_reader_client(None).await; - let postgres_writer: Arc = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); + let reader: Arc = setup_pg_reader_client(None).await; + let writer: Arc = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); let flag = FeatureFlag { name: Some("Test Flag".to_string()), id: 1, @@ -729,8 +727,8 @@ mod tests { .team_id(1) .distinct_id(long_id) .feature_flags(feature_flag_list) - .postgres_reader(postgres_reader) - .postgres_writer(postgres_writer) + .reader(reader) + .writer(writer) .cohort_cache(cohort_cache) .build() .expect("Failed to build FeatureFlagEvaluationContext"); diff --git a/rust/feature-flags/src/cohort/cohort_cache_manager.rs b/rust/feature-flags/src/cohort/cohort_cache_manager.rs index 549553565930c..d2e43f529dea6 100644 --- a/rust/feature-flags/src/cohort/cohort_cache_manager.rs +++ b/rust/feature-flags/src/cohort/cohort_cache_manager.rs @@ -2,7 +2,9 @@ use crate::api::errors::FlagError; use crate::cohort::cohort_models::Cohort; use crate::flags::flag_matching::{PostgresReader, TeamId}; use moka::future::Cache; +use std::sync::Arc; use std::time::Duration; +use tokio::sync::Mutex; /// CohortCacheManager manages the in-memory cache of cohorts using `moka` for caching. /// @@ -12,8 +14,8 @@ use std::time::Duration; /// /// ```text /// CohortCacheManager { -/// postgres_reader: PostgresReader, -/// per_team_cohorts: Cache> { +/// reader: PostgresReader, +/// cache: Cache> { /// // Example: /// 2: [ /// Cohort { id: 1, name: "Power Users", filters: {...} }, @@ -22,35 +24,37 @@ use std::time::Duration; /// 5: [ /// Cohort { id: 3, name: "Beta Users", filters: {...} } /// ] -/// } +/// }, +/// fetch_lock: Mutex<()> // Manager-wide lock /// } /// ``` /// #[derive(Clone)] pub struct CohortCacheManager { - postgres_reader: PostgresReader, - per_team_cohort_cache: Cache>, + reader: PostgresReader, + cache: Cache>, + fetch_lock: Arc>, // Added fetch_lock } impl CohortCacheManager { pub fn new( - postgres_reader: PostgresReader, + reader: PostgresReader, max_capacity: Option, ttl_seconds: Option, ) -> Self { - // We use the size of the cohort list (i.e., the number of cohorts for a given team)as the weight of the entry - let weigher = - |_: &TeamId, value: &Vec| -> u32 { value.len().try_into().unwrap_or(u32::MAX) }; + // We use the size of the cohort list (i.e., the number of cohorts for a given team) as the weight of the entry + let weigher = |_: &TeamId, value: &Vec| -> u32 { value.len() as u32 }; let cache = Cache::builder() .time_to_live(Duration::from_secs(ttl_seconds.unwrap_or(300))) // Default to 5 minutes .weigher(weigher) - .max_capacity(max_capacity.unwrap_or(10_000)) // Default to 10,000 cohorts + .max_capacity(max_capacity.unwrap_or(100_000)) // Default to 100,000 cohorts .build(); Self { - postgres_reader, - per_team_cohort_cache: cache, + reader, + cache, + fetch_lock: Arc::new(Mutex::new(())), // Initialize the lock } } @@ -58,14 +62,21 @@ impl CohortCacheManager { /// /// If the cohorts are not present in the cache or have expired, it fetches them from the database, /// caches the result upon successful retrieval, and then returns it. - pub async fn get_cohorts_for_team(&self, team_id: TeamId) -> Result, FlagError> { - if let Some(cached_cohorts) = self.per_team_cohort_cache.get(&team_id).await { + pub async fn get_cohorts(&self, team_id: TeamId) -> Result, FlagError> { + if let Some(cached_cohorts) = self.cache.get(&team_id).await { return Ok(cached_cohorts.clone()); } - let fetched_cohorts = Cohort::list_from_pg(self.postgres_reader.clone(), team_id).await?; - self.per_team_cohort_cache - .insert(team_id, fetched_cohorts.clone()) - .await; + + // Acquire the lock before fetching + let _lock = self.fetch_lock.lock().await; + + // Double-check the cache after acquiring the lock + if let Some(cached_cohorts) = self.cache.get(&team_id).await { + return Ok(cached_cohorts.clone()); + } + + let fetched_cohorts = Cohort::list_from_pg(self.reader.clone(), team_id).await?; + self.cache.insert(team_id, fetched_cohorts.clone()).await; Ok(fetched_cohorts) } @@ -116,18 +127,18 @@ mod tests { Some(1), // 1-second TTL ); - let cohorts = cohort_cache.get_cohorts_for_team(team_id).await?; + let cohorts = cohort_cache.get_cohorts(team_id).await?; assert_eq!(cohorts.len(), 1); assert_eq!(cohorts[0].team_id, team_id); - let cached_cohorts = cohort_cache.per_team_cohort_cache.get(&team_id).await; + let cached_cohorts = cohort_cache.cache.get(&team_id).await; assert!(cached_cohorts.is_some()); // Wait for TTL to expire sleep(Duration::from_secs(2)).await; // Attempt to retrieve from cache again - let cached_cohorts = cohort_cache.per_team_cohort_cache.get(&team_id).await; + let cached_cohorts = cohort_cache.cache.get(&team_id).await; assert!(cached_cohorts.is_none(), "Cache entry should have expired"); Ok(()) @@ -152,11 +163,11 @@ mod tests { let team_id = team.id; inserted_team_ids.push(team_id); setup_test_cohort(writer_client.clone(), team_id, None).await?; - cohort_cache.get_cohorts_for_team(team_id).await?; + cohort_cache.get_cohorts(team_id).await?; } - cohort_cache.per_team_cohort_cache.run_pending_tasks().await; - let cache_size = cohort_cache.per_team_cohort_cache.entry_count(); + cohort_cache.cache.run_pending_tasks().await; + let cache_size = cohort_cache.cache.entry_count(); assert_eq!( cache_size, max_capacity, "Cache size should be equal to max_capacity" @@ -165,26 +176,23 @@ mod tests { let new_team = insert_new_team_in_pg(writer_client.clone(), None).await?; let new_team_id = new_team.id; setup_test_cohort(writer_client.clone(), new_team_id, None).await?; - cohort_cache.get_cohorts_for_team(new_team_id).await?; + cohort_cache.get_cohorts(new_team_id).await?; - cohort_cache.per_team_cohort_cache.run_pending_tasks().await; - let cache_size_after = cohort_cache.per_team_cohort_cache.entry_count(); + cohort_cache.cache.run_pending_tasks().await; + let cache_size_after = cohort_cache.cache.entry_count(); assert_eq!( cache_size_after, max_capacity, "Cache size should remain equal to max_capacity after eviction" ); let evicted_team_id = &inserted_team_ids[0]; - let cached_cohorts = cohort_cache - .per_team_cohort_cache - .get(evicted_team_id) - .await; + let cached_cohorts = cohort_cache.cache.get(evicted_team_id).await; assert!( cached_cohorts.is_none(), "Least recently used cache entry should have been evicted" ); - let cached_new_team = cohort_cache.per_team_cohort_cache.get(&new_team_id).await; + let cached_new_team = cohort_cache.cache.get(&new_team_id).await; assert!( cached_new_team.is_some(), "Newly added cache entry should be present" @@ -194,25 +202,21 @@ mod tests { } #[tokio::test] - async fn test_get_cohorts_for_team() -> Result<(), anyhow::Error> { + async fn test_get_cohorts() -> Result<(), anyhow::Error> { let writer_client = setup_pg_writer_client(None).await; let reader_client = setup_pg_reader_client(None).await; let team_id = setup_test_team(writer_client.clone()).await?; let _cohort = setup_test_cohort(writer_client.clone(), team_id, None).await?; let cohort_cache = CohortCacheManager::new(reader_client.clone(), None, None); - let cached_cohorts = cohort_cache.per_team_cohort_cache.get(&team_id).await; + let cached_cohorts = cohort_cache.cache.get(&team_id).await; assert!(cached_cohorts.is_none(), "Cache should initially be empty"); - let cohorts = cohort_cache.get_cohorts_for_team(team_id).await?; + let cohorts = cohort_cache.get_cohorts(team_id).await?; assert_eq!(cohorts.len(), 1); assert_eq!(cohorts[0].team_id, team_id); - let cached_cohorts = cohort_cache - .per_team_cohort_cache - .get(&team_id) - .await - .unwrap(); + let cached_cohorts = cohort_cache.cache.get(&team_id).await.unwrap(); assert_eq!(cached_cohorts.len(), 1); assert_eq!(cached_cohorts[0].team_id, team_id); diff --git a/rust/feature-flags/src/cohort/cohort_operations.rs b/rust/feature-flags/src/cohort/cohort_operations.rs index b987ae3e225ba..9ff522f89f236 100644 --- a/rust/feature-flags/src/cohort/cohort_operations.rs +++ b/rust/feature-flags/src/cohort/cohort_operations.rs @@ -9,39 +9,6 @@ use crate::{ }; impl Cohort { - /// Returns a cohort from postgres given a cohort_id and team_id - #[instrument(skip_all)] - pub async fn from_pg( - client: Arc, - cohort_id: i32, - team_id: i32, - ) -> Result { - let mut conn = client.get_connection().await.map_err(|e| { - tracing::error!("Failed to get database connection: {}", e); - // TODO should I model my errors more generally? Like, yes, everything behind this API is technically a FlagError, - // but I'm not sure if accessing Cohort definitions should be a FlagError (vs idk, a CohortError? A more general API error?) - FlagError::DatabaseUnavailable - })?; - - let query = "SELECT id, name, description, team_id, deleted, filters, query, version, pending_version, count, is_calculating, is_static, errors_calculating, groups, created_by_id FROM posthog_cohort WHERE id = $1 AND team_id = $2"; - let cohort = sqlx::query_as::<_, Cohort>(query) - .bind(cohort_id) - .bind(team_id) - .fetch_optional(&mut *conn) - .await - .map_err(|e| { - tracing::error!("Failed to fetch cohort from database: {}", e); - FlagError::Internal(format!("Database query error: {}", e)) - })?; - - cohort.ok_or_else(|| { - FlagError::CohortNotFound(format!( - "Cohort with id {} not found for team {}", - cohort_id, team_id - )) - }) - } - /// Returns all cohorts for a given team #[instrument(skip_all)] pub async fn list_from_pg( @@ -76,12 +43,10 @@ impl Cohort { tracing::error!("Failed to parse filters for cohort {}: {}", self.id, e); FlagError::CohortFiltersParsingError })?; - Ok(cohort_property - .properties - .to_property_filters() - .into_iter() - .filter(|f| !(f.key == "id" && f.prop_type == "cohort")) - .collect()) + + let mut props = cohort_property.properties.to_inner(); + props.retain(|f| !(f.key == "id" && f.prop_type == "cohort")); + return Ok(props); } /// Extracts dependent CohortIds from the cohort's filters @@ -175,11 +140,10 @@ impl InnerCohortProperty { /// ] /// } /// ``` - pub fn to_property_filters(&self) -> Vec { + pub fn to_inner(self) -> Vec { self.values - .iter() - .flat_map(|value| &value.values) - .cloned() + .into_iter() + .flat_map(|value| value.values) .collect() } } @@ -196,46 +160,18 @@ mod tests { }; use serde_json::json; - #[tokio::test] - async fn test_cohort_from_pg() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .expect("Failed to insert team"); - - let cohort = insert_cohort_for_team_in_pg( - postgres_writer.clone(), - team.id, - None, - json!({"properties": {"type": "OR", "values": [{"type": "OR", "values": [{"key": "$initial_browser_version", "type": "person", "value": ["125"], "negation": false, "operator": "exact"}]}]}}), - false, - ) - .await - .expect("Failed to insert cohort"); - - let fetched_cohort = Cohort::from_pg(postgres_reader, cohort.id, team.id) - .await - .expect("Failed to fetch cohort"); - - assert_eq!(fetched_cohort.id, cohort.id); - assert_eq!(fetched_cohort.name, "Test Cohort"); - assert_eq!(fetched_cohort.team_id, team.id); - } - #[tokio::test] async fn test_list_from_pg() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; - let team = insert_new_team_in_pg(postgres_reader.clone(), None) + let team = insert_new_team_in_pg(reader.clone(), None) .await .expect("Failed to insert team"); // Insert multiple cohorts for the team insert_cohort_for_team_in_pg( - postgres_writer.clone(), + writer.clone(), team.id, Some("Cohort 1".to_string()), json!({"properties": {"type": "AND", "values": [{"type": "property", "values": [{"key": "age", "type": "person", "value": [30], "negation": false, "operator": "gt"}]}]}}), @@ -245,7 +181,7 @@ mod tests { .expect("Failed to insert cohort1"); insert_cohort_for_team_in_pg( - postgres_writer.clone(), + writer.clone(), team.id, Some("Cohort 2".to_string()), json!({"properties": {"type": "OR", "values": [{"type": "property", "values": [{"key": "country", "type": "person", "value": ["USA"], "negation": false, "operator": "exact"}]}]}}), @@ -254,7 +190,7 @@ mod tests { .await .expect("Failed to insert cohort2"); - let cohorts = Cohort::list_from_pg(postgres_reader, team.id) + let cohorts = Cohort::list_from_pg(reader, team.id) .await .expect("Failed to list cohorts"); @@ -292,7 +228,7 @@ mod tests { } #[test] - fn test_cohort_property_to_property_filters() { + fn test_cohort_property_to_inner() { let cohort_property = InnerCohortProperty { prop_type: CohortPropertyType::AND, values: vec![CohortValues { @@ -318,7 +254,7 @@ mod tests { }], }; - let result = cohort_property.to_property_filters(); + let result = cohort_property.to_inner(); assert_eq!(result.len(), 2); assert_eq!(result[0].key, "email"); assert_eq!(result[0].value, json!("test@example.com")); @@ -328,16 +264,16 @@ mod tests { #[tokio::test] async fn test_extract_dependencies() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; - let team = insert_new_team_in_pg(postgres_reader.clone(), None) + let team = insert_new_team_in_pg(reader.clone(), None) .await .expect("Failed to insert team"); // Insert a single cohort that is dependent on another cohort let dependent_cohort = insert_cohort_for_team_in_pg( - postgres_writer.clone(), + writer.clone(), team.id, Some("Dependent Cohort".to_string()), json!({"properties": {"type": "OR", "values": [{"type": "OR", "values": [{"key": "$browser", "type": "person", "value": ["Safari"], "negation": false, "operator": "exact"}]}]}}), @@ -348,7 +284,7 @@ mod tests { // Insert main cohort with a single dependency let main_cohort = insert_cohort_for_team_in_pg( - postgres_writer.clone(), + writer.clone(), team.id, Some("Main Cohort".to_string()), json!({"properties": {"type": "OR", "values": [{"type": "OR", "values": [{"key": "id", "type": "cohort", "value": dependent_cohort.id, "negation": false}]}]}}), @@ -357,9 +293,14 @@ mod tests { .await .expect("Failed to insert main_cohort"); - let fetched_main_cohort = Cohort::from_pg(postgres_reader.clone(), main_cohort.id, team.id) + let cohorts = Cohort::list_from_pg(reader.clone(), team.id) .await - .expect("Failed to fetch main cohort"); + .expect("Failed to fetch cohorts"); + + let fetched_main_cohort = cohorts + .into_iter() + .find(|c| c.id == main_cohort.id) + .expect("Failed to find main cohort"); println!("fetched_main_cohort: {:?}", fetched_main_cohort); diff --git a/rust/feature-flags/src/flags/flag_matching.rs b/rust/feature-flags/src/flags/flag_matching.rs index ea04f6fb00b0a..aeda22a1efeff 100644 --- a/rust/feature-flags/src/flags/flag_matching.rs +++ b/rust/feature-flags/src/flags/flag_matching.rs @@ -74,17 +74,17 @@ pub struct GroupTypeMappingCache { failed_to_fetch_flags: bool, group_types_to_indexes: HashMap, group_indexes_to_types: HashMap, - postgres_reader: PostgresReader, + reader: PostgresReader, } impl GroupTypeMappingCache { - pub fn new(team_id: TeamId, postgres_reader: PostgresReader) -> Self { + pub fn new(team_id: TeamId, reader: PostgresReader) -> Self { GroupTypeMappingCache { team_id, failed_to_fetch_flags: false, group_types_to_indexes: HashMap::new(), group_indexes_to_types: HashMap::new(), - postgres_reader, + reader, } } @@ -101,7 +101,7 @@ impl GroupTypeMappingCache { let team_id = self.team_id; let mapping = match self - .fetch_group_type_mapping(self.postgres_reader.clone(), team_id) + .fetch_group_type_mapping(self.reader.clone(), team_id) .await { Ok(mapping) if !mapping.is_empty() => mapping, @@ -142,10 +142,10 @@ impl GroupTypeMappingCache { async fn fetch_group_type_mapping( &mut self, - postgres_reader: PostgresReader, + reader: PostgresReader, team_id: TeamId, ) -> Result, FlagError> { - let mut conn = postgres_reader.as_ref().get_connection().await?; + let mut conn = reader.as_ref().get_connection().await?; let query = r#" SELECT group_type, group_type_index @@ -186,8 +186,8 @@ pub struct PropertiesCache { pub struct FeatureFlagMatcher { pub distinct_id: String, pub team_id: TeamId, - pub postgres_reader: PostgresReader, - pub postgres_writer: PostgresWriter, + pub reader: PostgresReader, + pub writer: PostgresWriter, pub cohort_cache: Arc, group_type_mapping_cache: GroupTypeMappingCache, properties_cache: PropertiesCache, @@ -200,8 +200,8 @@ impl FeatureFlagMatcher { pub fn new( distinct_id: String, team_id: TeamId, - postgres_reader: PostgresReader, - postgres_writer: PostgresWriter, + reader: PostgresReader, + writer: PostgresWriter, cohort_cache: Arc, group_type_mapping_cache: Option, groups: Option>, @@ -209,11 +209,11 @@ impl FeatureFlagMatcher { FeatureFlagMatcher { distinct_id, team_id, - postgres_reader: postgres_reader.clone(), - postgres_writer: postgres_writer.clone(), + reader: reader.clone(), + writer: writer.clone(), cohort_cache, group_type_mapping_cache: group_type_mapping_cache - .unwrap_or_else(|| GroupTypeMappingCache::new(team_id, postgres_reader.clone())), + .unwrap_or_else(|| GroupTypeMappingCache::new(team_id, reader.clone())), groups: groups.unwrap_or_default(), properties_cache: PropertiesCache::default(), } @@ -292,7 +292,7 @@ impl FeatureFlagMatcher { target_distinct_ids: Vec, ) -> (Option>, bool) { let should_write = match should_write_hash_key_override( - self.postgres_reader.clone(), + self.reader.clone(), self.team_id, self.distinct_id.clone(), hash_key.clone(), @@ -320,7 +320,7 @@ impl FeatureFlagMatcher { if should_write { if let Err(e) = set_feature_flag_hash_key_overrides( // NB: this is the only method that writes to the database, so it's the only one that should use the writer - self.postgres_writer.clone(), + self.writer.clone(), self.team_id, target_distinct_ids.clone(), hash_key.clone(), @@ -354,7 +354,7 @@ impl FeatureFlagMatcher { ); match get_feature_flag_hash_key_overrides( - self.postgres_reader.clone(), + self.reader.clone(), self.team_id, target_distinct_ids, ) @@ -430,13 +430,13 @@ impl FeatureFlagMatcher { .filter_map(|flag| flag.get_group_type_index()) .collect(); - let postgres_reader = self.postgres_reader.clone(); + let reader = self.reader.clone(); let distinct_id = self.distinct_id.clone(); let team_id = self.team_id; match fetch_and_locally_cache_all_properties( &mut self.properties_cache, - postgres_reader, + reader, distinct_id, team_id, &group_type_indexes, @@ -844,10 +844,10 @@ impl FeatureFlagMatcher { /// Fetches the `PersonId` from the database based on the current `distinct_id` and `team_id`. /// This method is called when the `PersonId` is not present in the properties cache. async fn get_person_id_from_db(&mut self) -> Result { - let postgres_reader = self.postgres_reader.clone(); + let reader = self.reader.clone(); let distinct_id = self.distinct_id.clone(); let team_id = self.team_id; - fetch_person_properties_from_db(postgres_reader, distinct_id, team_id) + fetch_person_properties_from_db(reader, distinct_id, team_id) .await .map(|(_, person_id)| person_id) } @@ -884,7 +884,7 @@ impl FeatureFlagMatcher { // At the start of the request, fetch all of the cohorts for the team from the cache // This method also caches any cohorts for a given team in memory for the duration of the application, so we don't need to fetch from // the database again until we restart the application. See the CohortCacheManager for more details. - let cohorts = self.cohort_cache.get_cohorts_for_team(self.team_id).await?; + let cohorts = self.cohort_cache.get_cohorts(self.team_id).await?; // Split the cohorts into static and dynamic, since the dynamic ones have property filters // and we need to evaluate them based on the target properties, whereas the static ones are @@ -898,7 +898,7 @@ impl FeatureFlagMatcher { if !static_cohorts.is_empty() { let results = evaluate_static_cohorts( - self.postgres_reader.clone(), + self.reader.clone(), person_id, static_cohorts.iter().map(|c| c.id).collect(), ) @@ -912,7 +912,7 @@ impl FeatureFlagMatcher { .get_cohort_id() .ok_or(FlagError::CohortFiltersParsingError)?; let match_result = - evaluate_dynamic_cohorts(cohort_id, target_properties, cohorts.clone())?; + evaluate_dynamic_cohorts(cohort_id, target_properties, &cohorts)?; cohort_matches.insert(cohort_id, match_result); } } @@ -1002,10 +1002,10 @@ impl FeatureFlagMatcher { return Ok(result); } - let postgres_reader = self.postgres_reader.clone(); + let reader = self.reader.clone(); let team_id = self.team_id; let db_properties = - fetch_group_properties_from_db(postgres_reader, team_id, group_type_index).await?; + fetch_group_properties_from_db(reader, team_id, group_type_index).await?; // once the properties are fetched, cache them so we don't need to fetch again in a given request self.properties_cache @@ -1030,11 +1030,11 @@ impl FeatureFlagMatcher { return Ok(result); } - let postgres_reader = self.postgres_reader.clone(); + let reader = self.reader.clone(); let distinct_id = self.distinct_id.clone(); let team_id = self.team_id; let (db_properties, person_id) = - fetch_person_properties_from_db(postgres_reader, distinct_id, team_id).await?; + fetch_person_properties_from_db(reader, distinct_id, team_id).await?; // once the properties and person ID are fetched, cache them so we don't need to fetch again in a given request self.properties_cache.person_properties = Some(db_properties.clone()); @@ -1167,11 +1167,11 @@ impl FeatureFlagMatcher { /// Evaluate static cohort filters by checking if the person is in each cohort. async fn evaluate_static_cohorts( - postgres_reader: PostgresReader, + reader: PostgresReader, person_id: i32, // Change this parameter from distinct_id to person_id cohort_ids: Vec, ) -> Result, FlagError> { - let mut conn = postgres_reader.get_connection().await?; + let mut conn = reader.get_connection().await?; let query = r#" WITH cohort_membership AS ( @@ -1210,10 +1210,9 @@ async fn evaluate_static_cohorts( fn evaluate_dynamic_cohorts( initial_cohort_id: CohortId, target_properties: &HashMap, - cohorts: Vec, + cohorts: &[Cohort], ) -> Result { - let cohort_dependency_graph = - build_cohort_dependency_graph(initial_cohort_id, cohorts.clone())?; + let cohort_dependency_graph = build_cohort_dependency_graph(initial_cohort_id, cohorts)?; // We need to sort cohorts topologically to ensure we evaluate dependencies before the cohorts that depend on them. // For example, if cohort A depends on cohort B, we need to evaluate B first to know if A matches. @@ -1318,7 +1317,7 @@ fn apply_cohort_membership_logic( /// The graph is acyclic, which is required for valid cohort dependencies. fn build_cohort_dependency_graph( initial_cohort_id: CohortId, - cohorts: Vec, + cohorts: &[Cohort], ) -> Result, FlagError> { let mut graph = DiGraph::new(); let mut node_map = HashMap::new(); @@ -1386,12 +1385,12 @@ fn build_cohort_dependency_graph( /// It updates the properties cache with the fetched properties and returns the result. async fn fetch_and_locally_cache_all_properties( properties_cache: &mut PropertiesCache, - postgres_reader: PostgresReader, + reader: PostgresReader, distinct_id: String, team_id: TeamId, group_type_indexes: &HashSet, ) -> Result<(), FlagError> { - let mut conn = postgres_reader.as_ref().get_connection().await?; + let mut conn = reader.as_ref().get_connection().await?; let query = r#" SELECT @@ -1479,11 +1478,11 @@ async fn fetch_and_locally_cache_all_properties( /// This function constructs and executes a SQL query to fetch the person properties for a specified distinct ID and team ID. /// It returns the fetched properties as a HashMap. async fn fetch_person_properties_from_db( - postgres_reader: PostgresReader, + reader: PostgresReader, distinct_id: String, team_id: TeamId, ) -> Result<(HashMap, i32), FlagError> { - let mut conn = postgres_reader.as_ref().get_connection().await?; + let mut conn = reader.as_ref().get_connection().await?; let query = r#" SELECT "posthog_person"."id" as person_id, "posthog_person"."properties" as person_properties @@ -1520,11 +1519,11 @@ async fn fetch_person_properties_from_db( /// This function constructs and executes a SQL query to fetch the group properties for a specified team ID and group type index. /// It returns the fetched properties as a HashMap. async fn fetch_group_properties_from_db( - postgres_reader: PostgresReader, + reader: PostgresReader, team_id: TeamId, group_type_index: GroupTypeIndex, ) -> Result, FlagError> { - let mut conn = postgres_reader.as_ref().get_connection().await?; + let mut conn = reader.as_ref().get_connection().await?; let query = r#" SELECT "posthog_group"."group_properties" @@ -1582,12 +1581,12 @@ fn all_properties_match( } async fn get_feature_flag_hash_key_overrides( - postgres_reader: PostgresReader, + reader: PostgresReader, team_id: TeamId, distinct_id_and_hash_key_override: Vec, ) -> Result, FlagError> { let mut feature_flag_hash_key_overrides = HashMap::new(); - let mut conn = postgres_reader.as_ref().get_connection().await?; + let mut conn = reader.as_ref().get_connection().await?; let person_and_distinct_id_query = r#" SELECT person_id, distinct_id @@ -1637,7 +1636,7 @@ async fn get_feature_flag_hash_key_overrides( } async fn set_feature_flag_hash_key_overrides( - postgres_writer: PostgresWriter, + writer: PostgresWriter, team_id: TeamId, distinct_ids: Vec, hash_key_override: String, @@ -1646,7 +1645,7 @@ async fn set_feature_flag_hash_key_overrides( const RETRY_DELAY: Duration = Duration::from_millis(100); for retry in 0..MAX_RETRIES { - let mut conn = postgres_writer.get_connection().await?; + let mut conn = writer.get_connection().await?; let mut transaction = conn.begin().await?; let query = r#" @@ -1713,7 +1712,7 @@ async fn set_feature_flag_hash_key_overrides( } async fn should_write_hash_key_override( - postgres_reader: PostgresReader, + reader: PostgresReader, team_id: TeamId, distinct_id: String, hash_key_override: String, @@ -1746,7 +1745,7 @@ async fn should_write_hash_key_override( for retry in 0..MAX_RETRIES { let result = timeout(QUERY_TIMEOUT, async { - let mut conn = postgres_reader.get_connection().await.map_err(|e| { + let mut conn = reader.get_connection().await.map_err(|e| { FlagError::DatabaseError(format!("Failed to acquire connection: {}", e)) })?; @@ -1842,22 +1841,22 @@ mod tests { #[tokio::test] async fn test_fetch_properties_from_pg_to_match() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) + let team = insert_new_team_in_pg(reader.clone(), None) .await .expect("Failed to insert team in pg"); let distinct_id = "user_distinct_id".to_string(); - insert_person_for_team_in_pg(postgres_reader.clone(), team.id, distinct_id.clone(), None) + insert_person_for_team_in_pg(reader.clone(), team.id, distinct_id.clone(), None) .await .expect("Failed to insert person"); let not_matching_distinct_id = "not_matching_distinct_id".to_string(); insert_person_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, not_matching_distinct_id.clone(), Some(json!({ "email": "a@x.com"})), @@ -1893,8 +1892,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( distinct_id.clone(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -1907,8 +1906,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( not_matching_distinct_id.clone(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -1921,8 +1920,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( "other_distinct_id".to_string(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -1935,12 +1934,10 @@ mod tests { #[tokio::test] async fn test_person_property_overrides() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); let flag = create_test_flag( None, @@ -1975,8 +1972,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( "test_user".to_string(), team.id, - postgres_reader, - postgres_writer, + reader, + writer, cohort_cache, None, None, @@ -1997,12 +1994,10 @@ mod tests { #[tokio::test] async fn test_group_property_overrides() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); let flag = create_test_flag( None, @@ -2032,8 +2027,7 @@ mod tests { None, ); - let mut group_type_mapping_cache = - GroupTypeMappingCache::new(team.id, postgres_reader.clone()); + let mut group_type_mapping_cache = GroupTypeMappingCache::new(team.id, reader.clone()); let group_types_to_indexes = [("organization".to_string(), 1)].into_iter().collect(); group_type_mapping_cache.group_types_to_indexes = group_types_to_indexes; group_type_mapping_cache.group_indexes_to_types = @@ -2052,8 +2046,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( "test_user".to_string(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), Some(group_type_mapping_cache), Some(groups), @@ -2076,10 +2070,10 @@ mod tests { #[tokio::test] async fn test_get_matching_variant_with_cache() { let flag = create_test_flag_with_variants(1); - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let mut group_type_mapping_cache = GroupTypeMappingCache::new(1, postgres_reader.clone()); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let mut group_type_mapping_cache = GroupTypeMappingCache::new(1, reader.clone()); let group_types_to_indexes = [("group_type_1".to_string(), 1)].into_iter().collect(); let group_type_index_to_name = [(1, "group_type_1".to_string())].into_iter().collect(); @@ -2092,8 +2086,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( "test_user".to_string(), 1, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), Some(group_type_mapping_cache), Some(groups), @@ -2108,20 +2102,18 @@ mod tests { #[tokio::test] async fn test_get_matching_variant_with_db() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); let flag = create_test_flag_with_variants(team.id); let mut matcher = FeatureFlagMatcher::new( "test_user".to_string(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -2134,9 +2126,9 @@ mod tests { #[tokio::test] async fn test_is_condition_match_empty_properties() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); let flag = create_test_flag( Some(1), None, @@ -2167,8 +2159,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( "test_user".to_string(), 1, - postgres_reader, - postgres_writer, + reader, + writer, cohort_cache, None, None, @@ -2224,12 +2216,10 @@ mod tests { #[tokio::test] async fn test_overrides_avoid_db_lookups() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); let flag = create_test_flag( None, @@ -2265,8 +2255,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( "test_user".to_string(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -2295,12 +2285,10 @@ mod tests { #[tokio::test] async fn test_fallback_to_db_when_overrides_insufficient() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); let flag = create_test_flag( None, @@ -2346,7 +2334,7 @@ mod tests { )])); insert_person_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, "test_user".to_string(), Some(json!({"email": "test@example.com", "age": 30})), @@ -2357,8 +2345,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( "test_user".to_string(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -2381,16 +2369,14 @@ mod tests { #[tokio::test] async fn test_property_fetching_and_caching() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); let distinct_id = "test_user".to_string(); insert_person_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, distinct_id.clone(), Some(json!({"email": "test@example.com", "age": 30})), @@ -2401,8 +2387,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( distinct_id, team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -2426,16 +2412,14 @@ mod tests { #[tokio::test] async fn test_property_caching() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); let distinct_id = "test_user".to_string(); insert_person_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, distinct_id.clone(), Some(json!({"email": "test@example.com", "age": 30})), @@ -2446,8 +2430,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( distinct_id.clone(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -2479,8 +2463,8 @@ mod tests { let mut new_matcher = FeatureFlagMatcher::new( distinct_id.clone(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -2570,12 +2554,10 @@ mod tests { #[tokio::test] async fn test_concurrent_flag_evaluation() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); let flag = Arc::new(create_test_flag( None, Some(team.id), @@ -2600,15 +2582,15 @@ mod tests { let mut handles = vec![]; for i in 0..100 { let flag_clone = flag.clone(); - let postgres_reader_clone = postgres_reader.clone(); - let postgres_writer_clone = postgres_writer.clone(); + let reader_clone = reader.clone(); + let writer_clone = writer.clone(); let cohort_cache_clone = cohort_cache.clone(); handles.push(tokio::spawn(async move { let mut matcher = FeatureFlagMatcher::new( format!("test_user_{}", i), team.id, - postgres_reader_clone, - postgres_writer_clone, + reader_clone, + writer_clone, cohort_cache_clone, None, None, @@ -2629,12 +2611,10 @@ mod tests { #[tokio::test] async fn test_property_operators() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); let flag = create_test_flag( None, @@ -2675,7 +2655,7 @@ mod tests { ); insert_person_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, "test_user".to_string(), Some(json!({"email": "user@example@domain.com", "age": 30})), @@ -2686,8 +2666,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( "test_user".to_string(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -2700,9 +2680,9 @@ mod tests { #[tokio::test] async fn test_empty_hashed_identifier() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); let flag = create_test_flag( Some(1), None, @@ -2724,15 +2704,8 @@ mod tests { None, ); - let mut matcher = FeatureFlagMatcher::new( - "".to_string(), - 1, - postgres_reader, - postgres_writer, - cohort_cache, - None, - None, - ); + let mut matcher = + FeatureFlagMatcher::new("".to_string(), 1, reader, writer, cohort_cache, None, None); let result = matcher.get_match(&flag, None, None).await.unwrap(); @@ -2741,9 +2714,9 @@ mod tests { #[tokio::test] async fn test_rollout_percentage() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); let mut flag = create_test_flag( Some(1), None, @@ -2768,8 +2741,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( "test_user".to_string(), 1, - postgres_reader, - postgres_writer, + reader, + writer, cohort_cache, None, None, @@ -2789,9 +2762,9 @@ mod tests { #[tokio::test] async fn test_uneven_variant_distribution() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); let mut flag = create_test_flag_with_variants(1); // Adjust variant rollout percentages to be uneven @@ -2819,8 +2792,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( "test_user".to_string(), 1, - postgres_reader, - postgres_writer, + reader, + writer, cohort_cache, None, None, @@ -2851,22 +2824,15 @@ mod tests { #[tokio::test] async fn test_missing_properties_in_db() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); // Insert a person without properties - insert_person_for_team_in_pg( - postgres_reader.clone(), - team.id, - "test_user".to_string(), - None, - ) - .await - .unwrap(); + insert_person_for_team_in_pg(reader.clone(), team.id, "test_user".to_string(), None) + .await + .unwrap(); let flag = create_test_flag( None, @@ -2899,8 +2865,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( "test_user".to_string(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache, None, None, @@ -2913,16 +2879,14 @@ mod tests { #[tokio::test] async fn test_malformed_property_data() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); // Insert a person with malformed properties insert_person_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, "test_user".to_string(), Some(json!({"age": "not_a_number"})), @@ -2961,8 +2925,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( "test_user".to_string(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache, None, None, @@ -2976,12 +2940,10 @@ mod tests { #[tokio::test] async fn test_get_match_with_insufficient_overrides() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); let flag = create_test_flag( None, @@ -3027,7 +2989,7 @@ mod tests { )])); insert_person_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, "test_user".to_string(), Some(json!({"email": "test@example.com", "age": 30})), @@ -3038,8 +3000,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( "test_user".to_string(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache, None, None, @@ -3055,9 +3017,9 @@ mod tests { #[tokio::test] async fn test_evaluation_reasons() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); let flag = create_test_flag( Some(1), None, @@ -3082,8 +3044,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( "test_user".to_string(), 1, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache, None, None, @@ -3100,12 +3062,10 @@ mod tests { #[tokio::test] async fn test_complex_conditions() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); let flag = create_test_flag( Some(1), @@ -3150,7 +3110,7 @@ mod tests { ); insert_person_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, "test_user".to_string(), Some(json!({"email": "user2@example.com", "age": 35})), @@ -3161,8 +3121,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( "test_user".to_string(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache, None, None, @@ -3175,12 +3135,10 @@ mod tests { #[tokio::test] async fn test_super_condition_matches_boolean() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); let flag = create_test_flag( Some(1), @@ -3241,7 +3199,7 @@ mod tests { ); insert_person_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, "test_id".to_string(), Some(json!({"email": "test@posthog.com", "is_enabled": true})), @@ -3249,24 +3207,19 @@ mod tests { .await .unwrap(); - insert_person_for_team_in_pg(postgres_reader.clone(), team.id, "lil_id".to_string(), None) + insert_person_for_team_in_pg(reader.clone(), team.id, "lil_id".to_string(), None) .await .unwrap(); - insert_person_for_team_in_pg( - postgres_reader.clone(), - team.id, - "another_id".to_string(), - None, - ) - .await - .unwrap(); + insert_person_for_team_in_pg(reader.clone(), team.id, "another_id".to_string(), None) + .await + .unwrap(); let mut matcher_test_id = FeatureFlagMatcher::new( "test_id".to_string(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -3275,8 +3228,8 @@ mod tests { let mut matcher_example_id = FeatureFlagMatcher::new( "lil_id".to_string(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -3285,8 +3238,8 @@ mod tests { let mut matcher_another_id = FeatureFlagMatcher::new( "another_id".to_string(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -3312,15 +3265,13 @@ mod tests { #[tokio::test] async fn test_super_condition_matches_string() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); insert_person_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, "test_id".to_string(), Some(json!({"email": "test@posthog.com", "is_enabled": "true"})), @@ -3389,8 +3340,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( "test_id".to_string(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -3405,15 +3356,13 @@ mod tests { #[tokio::test] async fn test_super_condition_matches_and_false() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); insert_person_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, "test_id".to_string(), Some(json!({"email": "test@posthog.com", "is_enabled": true})), @@ -3421,16 +3370,11 @@ mod tests { .await .unwrap(); - insert_person_for_team_in_pg( - postgres_reader.clone(), - team.id, - "another_id".to_string(), - None, - ) - .await - .unwrap(); + insert_person_for_team_in_pg(reader.clone(), team.id, "another_id".to_string(), None) + .await + .unwrap(); - insert_person_for_team_in_pg(postgres_reader.clone(), team.id, "lil_id".to_string(), None) + insert_person_for_team_in_pg(reader.clone(), team.id, "lil_id".to_string(), None) .await .unwrap(); @@ -3495,8 +3439,8 @@ mod tests { let mut matcher_test_id = FeatureFlagMatcher::new( "test_id".to_string(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -3505,8 +3449,8 @@ mod tests { let mut matcher_example_id = FeatureFlagMatcher::new( "lil_id".to_string(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -3515,8 +3459,8 @@ mod tests { let mut matcher_another_id = FeatureFlagMatcher::new( "another_id".to_string(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -3556,16 +3500,14 @@ mod tests { #[tokio::test] async fn test_basic_cohort_matching() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); // Insert a cohort with the condition that matches the test user's properties let cohort_row = insert_cohort_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, None, json!({ @@ -3590,7 +3532,7 @@ mod tests { // Insert a person with properties that match the cohort condition insert_person_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, "test_user".to_string(), Some(json!({"$browser_version": 126})), @@ -3630,8 +3572,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( "test_user".to_string(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -3644,16 +3586,14 @@ mod tests { #[tokio::test] async fn test_not_in_cohort_matching() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); // Insert a cohort with a condition that does not match the test user's properties let cohort_row = insert_cohort_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, None, json!({ @@ -3678,7 +3618,7 @@ mod tests { // Insert a person with properties that do not match the cohort condition insert_person_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, "test_user".to_string(), Some(json!({"$browser_version": 126})), @@ -3718,8 +3658,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( "test_user".to_string(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -3732,16 +3672,14 @@ mod tests { #[tokio::test] async fn test_not_in_cohort_matching_user_in_cohort() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); // Insert a cohort with a condition that matches the test user's properties let cohort_row = insert_cohort_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, None, json!({ @@ -3766,7 +3704,7 @@ mod tests { // Insert a person with properties that match the cohort condition insert_person_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, "test_user".to_string(), Some(json!({"$browser_version": 126})), @@ -3806,8 +3744,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( "test_user".to_string(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -3821,16 +3759,14 @@ mod tests { #[tokio::test] async fn test_cohort_dependent_on_another_cohort() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); // Insert a base cohort let base_cohort_row = insert_cohort_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, None, json!({ @@ -3855,7 +3791,7 @@ mod tests { // Insert a dependent cohort that includes the base cohort let dependent_cohort_row = insert_cohort_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, None, json!({ @@ -3880,7 +3816,7 @@ mod tests { // Insert a person with properties that match the base cohort condition insert_person_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, "test_user".to_string(), Some(json!({"$browser_version": 126})), @@ -3920,8 +3856,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( "test_user".to_string(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -3934,16 +3870,14 @@ mod tests { #[tokio::test] async fn test_in_cohort_matching_user_not_in_cohort() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); // Insert a cohort with a condition that does not match the test user's properties let cohort_row = insert_cohort_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, None, json!({ @@ -3968,7 +3902,7 @@ mod tests { // Insert a person with properties that do not match the cohort condition insert_person_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, "test_user".to_string(), Some(json!({"$browser_version": 125})), @@ -4008,8 +3942,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( "test_user".to_string(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -4023,16 +3957,14 @@ mod tests { #[tokio::test] async fn test_static_cohort_matching_user_in_cohort() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); // Insert a static cohort let cohort = insert_cohort_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, Some("Static Cohort".to_string()), json!({}), // Static cohorts don't have property filters @@ -4044,7 +3976,7 @@ mod tests { // Insert a person let distinct_id = "static_user".to_string(); insert_person_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, distinct_id.clone(), Some(json!({"email": "static@user.com"})), @@ -4053,13 +3985,12 @@ mod tests { .unwrap(); // Retrieve the person's ID - let person_id = - get_person_id_by_distinct_id(postgres_reader.clone(), team.id, &distinct_id) - .await - .unwrap(); + let person_id = get_person_id_by_distinct_id(reader.clone(), team.id, &distinct_id) + .await + .unwrap(); // Associate the person with the static cohort - add_person_to_cohort(postgres_reader.clone(), person_id, cohort.id) + add_person_to_cohort(reader.clone(), person_id, cohort.id) .await .unwrap(); @@ -4095,8 +4026,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( distinct_id.clone(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -4112,16 +4043,14 @@ mod tests { #[tokio::test] async fn test_static_cohort_matching_user_not_in_cohort() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); // Insert a static cohort let cohort = insert_cohort_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, Some("Another Static Cohort".to_string()), json!({}), // Static cohorts don't have property filters @@ -4133,7 +4062,7 @@ mod tests { // Insert a person let distinct_id = "non_static_user".to_string(); insert_person_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, distinct_id.clone(), Some(json!({"email": "nonstatic@user.com"})), @@ -4175,8 +4104,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( distinct_id.clone(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -4192,16 +4121,14 @@ mod tests { #[tokio::test] async fn test_static_cohort_not_in_matching_user_not_in_cohort() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); // Insert a static cohort let cohort = insert_cohort_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, Some("Static Cohort NotIn".to_string()), json!({}), // Static cohorts don't have property filters @@ -4213,7 +4140,7 @@ mod tests { // Insert a person let distinct_id = "not_in_static_user".to_string(); insert_person_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, distinct_id.clone(), Some(json!({"email": "notinstatic@user.com"})), @@ -4255,8 +4182,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( distinct_id.clone(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -4272,16 +4199,14 @@ mod tests { #[tokio::test] async fn test_static_cohort_not_in_matching_user_in_cohort() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); // Insert a static cohort let cohort = insert_cohort_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, Some("Static Cohort NotIn User In".to_string()), json!({}), // Static cohorts don't have property filters @@ -4293,7 +4218,7 @@ mod tests { // Insert a person let distinct_id = "in_not_in_static_user".to_string(); insert_person_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, distinct_id.clone(), Some(json!({"email": "innotinstatic@user.com"})), @@ -4302,13 +4227,12 @@ mod tests { .unwrap(); // Retrieve the person's ID - let person_id = - get_person_id_by_distinct_id(postgres_reader.clone(), team.id, &distinct_id) - .await - .unwrap(); + let person_id = get_person_id_by_distinct_id(reader.clone(), team.id, &distinct_id) + .await + .unwrap(); // Associate the person with the static cohort - add_person_to_cohort(postgres_reader.clone(), person_id, cohort.id) + add_person_to_cohort(reader.clone(), person_id, cohort.id) .await .unwrap(); @@ -4344,8 +4268,8 @@ mod tests { let mut matcher = FeatureFlagMatcher::new( distinct_id.clone(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -4361,15 +4285,13 @@ mod tests { #[tokio::test] async fn test_set_feature_flag_hash_key_overrides_success() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); let distinct_id = "user2".to_string(); // Insert person - insert_person_for_team_in_pg(postgres_reader.clone(), team.id, distinct_id.clone(), None) + insert_person_for_team_in_pg(reader.clone(), team.id, distinct_id.clone(), None) .await .unwrap(); @@ -4404,13 +4326,13 @@ mod tests { }; // Insert the feature flag into the database - insert_flag_for_team_in_pg(postgres_writer.clone(), team.id, Some(flag_row)) + insert_flag_for_team_in_pg(writer.clone(), team.id, Some(flag_row)) .await .unwrap(); // Set hash key override set_feature_flag_hash_key_overrides( - postgres_writer.clone(), + writer.clone(), team.id, vec![distinct_id.clone()], "hash_key_2".to_string(), @@ -4419,13 +4341,10 @@ mod tests { .unwrap(); // Retrieve hash key overrides - let overrides = get_feature_flag_hash_key_overrides( - postgres_reader.clone(), - team.id, - vec![distinct_id.clone()], - ) - .await - .unwrap(); + let overrides = + get_feature_flag_hash_key_overrides(reader.clone(), team.id, vec![distinct_id.clone()]) + .await + .unwrap(); assert_eq!( overrides.get("test_flag"), @@ -4436,15 +4355,13 @@ mod tests { #[tokio::test] async fn test_get_feature_flag_hash_key_overrides_success() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); let distinct_id = "user2".to_string(); // Insert person - insert_person_for_team_in_pg(postgres_reader.clone(), team.id, distinct_id.clone(), None) + insert_person_for_team_in_pg(reader.clone(), team.id, distinct_id.clone(), None) .await .unwrap(); @@ -4479,13 +4396,13 @@ mod tests { }; // Insert the feature flag into the database - insert_flag_for_team_in_pg(postgres_writer.clone(), team.id, Some(flag_row)) + insert_flag_for_team_in_pg(writer.clone(), team.id, Some(flag_row)) .await .unwrap(); // Set hash key override set_feature_flag_hash_key_overrides( - postgres_writer.clone(), + writer.clone(), team.id, vec![distinct_id.clone()], "hash_key_2".to_string(), @@ -4494,13 +4411,10 @@ mod tests { .unwrap(); // Retrieve hash key overrides - let overrides = get_feature_flag_hash_key_overrides( - postgres_reader.clone(), - team.id, - vec![distinct_id.clone()], - ) - .await - .unwrap(); + let overrides = + get_feature_flag_hash_key_overrides(reader.clone(), team.id, vec![distinct_id.clone()]) + .await + .unwrap(); assert_eq!( overrides.get("test_flag"), @@ -4511,17 +4425,15 @@ mod tests { #[tokio::test] async fn test_evaluate_feature_flags_with_experience_continuity() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); let distinct_id = "user3".to_string(); // Insert person insert_person_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, distinct_id.clone(), Some(json!({"email": "user3@example.com"})), @@ -4560,7 +4472,7 @@ mod tests { // Set hash key override set_feature_flag_hash_key_overrides( - postgres_writer.clone(), + writer.clone(), team.id, vec![distinct_id.clone()], "hash_key_continuity".to_string(), @@ -4575,8 +4487,8 @@ mod tests { let result = FeatureFlagMatcher::new( distinct_id.clone(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -4594,16 +4506,14 @@ mod tests { #[tokio::test] async fn test_evaluate_feature_flags_with_continuity_missing_override() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); let distinct_id = "user4".to_string(); insert_person_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, distinct_id.clone(), Some(json!({"email": "user4@example.com"})), @@ -4647,8 +4557,8 @@ mod tests { let result = FeatureFlagMatcher::new( distinct_id.clone(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, @@ -4666,16 +4576,14 @@ mod tests { #[tokio::test] async fn test_evaluate_all_feature_flags_mixed_continuity() { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); - let team = insert_new_team_in_pg(postgres_reader.clone(), None) - .await - .unwrap(); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let team = insert_new_team_in_pg(reader.clone(), None).await.unwrap(); let distinct_id = "user5".to_string(); insert_person_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, distinct_id.clone(), Some(json!({"email": "user5@example.com"})), @@ -4743,7 +4651,7 @@ mod tests { // Set hash key override for the continuity flag set_feature_flag_hash_key_overrides( - postgres_writer.clone(), + writer.clone(), team.id, vec![distinct_id.clone()], "hash_key_mixed".to_string(), @@ -4758,8 +4666,8 @@ mod tests { let result = FeatureFlagMatcher::new( distinct_id.clone(), team.id, - postgres_reader.clone(), - postgres_writer.clone(), + reader.clone(), + writer.clone(), cohort_cache.clone(), None, None, diff --git a/rust/feature-flags/src/flags/flag_operations.rs b/rust/feature-flags/src/flags/flag_operations.rs index 0bb357b7ecae9..205afd6479d23 100644 --- a/rust/feature-flags/src/flags/flag_operations.rs +++ b/rust/feature-flags/src/flags/flag_operations.rs @@ -208,17 +208,17 @@ mod tests { #[tokio::test] async fn test_fetch_flags_from_pg() { - let postgres_reader = setup_pg_reader_client(None).await; + let reader = setup_pg_reader_client(None).await; - let team = insert_new_team_in_pg(postgres_reader.clone(), None) + let team = insert_new_team_in_pg(reader.clone(), None) .await .expect("Failed to insert team in pg"); - insert_flag_for_team_in_pg(postgres_reader.clone(), team.id, None) + insert_flag_for_team_in_pg(reader.clone(), team.id, None) .await .expect("Failed to insert flags"); - let flags_from_pg = FeatureFlagList::from_pg(postgres_reader.clone(), team.id) + let flags_from_pg = FeatureFlagList::from_pg(reader.clone(), team.id) .await .expect("Failed to fetch flags from pg"); @@ -347,9 +347,9 @@ mod tests { #[tokio::test] async fn test_fetch_empty_team_from_pg() { - let postgres_reader = setup_pg_reader_client(None).await; + let reader = setup_pg_reader_client(None).await; - let FeatureFlagList { flags } = FeatureFlagList::from_pg(postgres_reader.clone(), 1234) + let FeatureFlagList { flags } = FeatureFlagList::from_pg(reader.clone(), 1234) .await .expect("Failed to fetch flags from pg"); { @@ -359,9 +359,9 @@ mod tests { #[tokio::test] async fn test_fetch_nonexistent_team_from_pg() { - let postgres_reader = setup_pg_reader_client(None).await; + let reader = setup_pg_reader_client(None).await; - match FeatureFlagList::from_pg(postgres_reader.clone(), -1).await { + match FeatureFlagList::from_pg(reader.clone(), -1).await { Ok(flags) => assert_eq!(flags.flags.len(), 0), Err(err) => panic!("Expected empty result, got error: {:?}", err), } @@ -380,9 +380,9 @@ mod tests { #[tokio::test] async fn test_fetch_multiple_flags_from_pg() { - let postgres_reader = setup_pg_reader_client(None).await; + let reader = setup_pg_reader_client(None).await; - let team = insert_new_team_in_pg(postgres_reader.clone(), None) + let team = insert_new_team_in_pg(reader.clone(), None) .await .expect("Failed to insert team in pg"); @@ -412,15 +412,15 @@ mod tests { }; // Insert multiple flags for the team - insert_flag_for_team_in_pg(postgres_reader.clone(), team.id, Some(flag1)) + insert_flag_for_team_in_pg(reader.clone(), team.id, Some(flag1)) .await .expect("Failed to insert flags"); - insert_flag_for_team_in_pg(postgres_reader.clone(), team.id, Some(flag2)) + insert_flag_for_team_in_pg(reader.clone(), team.id, Some(flag2)) .await .expect("Failed to insert flags"); - let flags_from_pg = FeatureFlagList::from_pg(postgres_reader.clone(), team.id) + let flags_from_pg = FeatureFlagList::from_pg(reader.clone(), team.id) .await .expect("Failed to fetch flags from pg"); @@ -468,9 +468,9 @@ mod tests { #[tokio::test] async fn test_multivariate_flag_parsing() { let redis_client = setup_redis_client(None); - let postgres_reader = setup_pg_reader_client(None).await; + let reader = setup_pg_reader_client(None).await; - let team = insert_new_team_in_pg(postgres_reader.clone(), None) + let team = insert_new_team_in_pg(reader.clone(), None) .await .expect("Failed to insert team in pg"); @@ -521,7 +521,7 @@ mod tests { // Insert into Postgres insert_flag_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, Some(FeatureFlagRow { id: 1, @@ -548,7 +548,7 @@ mod tests { assert_eq!(redis_flag.get_variants().len(), 3); // Fetch and verify from Postgres - let pg_flags = FeatureFlagList::from_pg(postgres_reader, team.id) + let pg_flags = FeatureFlagList::from_pg(reader, team.id) .await .expect("Failed to fetch flags from Postgres"); @@ -561,9 +561,9 @@ mod tests { #[tokio::test] async fn test_multivariate_flag_with_payloads() { let redis_client = setup_redis_client(None); - let postgres_reader = setup_pg_reader_client(None).await; + let reader = setup_pg_reader_client(None).await; - let team = insert_new_team_in_pg(postgres_reader.clone(), None) + let team = insert_new_team_in_pg(reader.clone(), None) .await .expect("Failed to insert team in pg"); @@ -619,7 +619,7 @@ mod tests { // Insert into Postgres insert_flag_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, Some(FeatureFlagRow { id: 1, @@ -645,7 +645,7 @@ mod tests { assert_eq!(redis_flag.key, "multivariate_flag_with_payloads"); // Fetch and verify from Postgres - let pg_flags = FeatureFlagList::from_pg(postgres_reader, team.id) + let pg_flags = FeatureFlagList::from_pg(reader, team.id) .await .expect("Failed to fetch flags from Postgres"); @@ -704,9 +704,9 @@ mod tests { #[tokio::test] async fn test_flag_with_super_groups() { let redis_client = setup_redis_client(None); - let postgres_reader = setup_pg_reader_client(None).await; + let reader = setup_pg_reader_client(None).await; - let team = insert_new_team_in_pg(postgres_reader.clone(), None) + let team = insert_new_team_in_pg(reader.clone(), None) .await .expect("Failed to insert team in pg"); @@ -751,7 +751,7 @@ mod tests { // Insert into Postgres insert_flag_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, Some(FeatureFlagRow { id: 1, @@ -779,7 +779,7 @@ mod tests { assert_eq!(redis_flag.filters.super_groups.as_ref().unwrap().len(), 1); // Fetch and verify from Postgres - let pg_flags = FeatureFlagList::from_pg(postgres_reader, team.id) + let pg_flags = FeatureFlagList::from_pg(reader, team.id) .await .expect("Failed to fetch flags from Postgres"); @@ -793,9 +793,9 @@ mod tests { #[tokio::test] async fn test_flags_with_different_property_types() { let redis_client = setup_redis_client(None); - let postgres_reader = setup_pg_reader_client(None).await; + let reader = setup_pg_reader_client(None).await; - let team = insert_new_team_in_pg(postgres_reader.clone(), None) + let team = insert_new_team_in_pg(reader.clone(), None) .await .expect("Failed to insert team in pg"); @@ -846,7 +846,7 @@ mod tests { // Insert into Postgres insert_flag_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, Some(FeatureFlagRow { id: 1, @@ -877,7 +877,7 @@ mod tests { assert_eq!(redis_properties[2].prop_type, "event"); // Fetch and verify from Postgres - let pg_flags = FeatureFlagList::from_pg(postgres_reader, team.id) + let pg_flags = FeatureFlagList::from_pg(reader, team.id) .await .expect("Failed to fetch flags from Postgres"); @@ -894,9 +894,9 @@ mod tests { #[tokio::test] async fn test_deleted_and_inactive_flags() { let redis_client = setup_redis_client(None); - let postgres_reader = setup_pg_reader_client(None).await; + let reader = setup_pg_reader_client(None).await; - let team = insert_new_team_in_pg(postgres_reader.clone(), None) + let team = insert_new_team_in_pg(reader.clone(), None) .await .expect("Failed to insert team in pg"); @@ -931,7 +931,7 @@ mod tests { // Insert into Postgres insert_flag_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, Some(FeatureFlagRow { id: 0, @@ -948,7 +948,7 @@ mod tests { .expect("Failed to insert deleted flag in Postgres"); insert_flag_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, Some(FeatureFlagRow { id: 0, @@ -980,7 +980,7 @@ mod tests { .any(|f| f.key == "inactive_flag" && !f.active)); // Fetch and verify from Postgres - let pg_flags = FeatureFlagList::from_pg(postgres_reader, team.id) + let pg_flags = FeatureFlagList::from_pg(reader, team.id) .await .expect("Failed to fetch flags from Postgres"); @@ -998,7 +998,7 @@ mod tests { #[tokio::test] async fn test_error_handling() { let redis_client = setup_redis_client(Some("redis://localhost:6379/".to_string())); - let postgres_reader = setup_pg_reader_client(None).await; + let reader = setup_pg_reader_client(None).await; // Test Redis connection error let bad_redis_client = setup_redis_client(Some("redis://localhost:1111/".to_string())); @@ -1006,7 +1006,7 @@ mod tests { assert!(matches!(result, Err(FlagError::RedisUnavailable))); // Test malformed JSON in Redis - let team = insert_new_team_in_pg(postgres_reader.clone(), None) + let team = insert_new_team_in_pg(reader.clone(), None) .await .expect("Failed to insert team in pg"); @@ -1023,7 +1023,7 @@ mod tests { // Test database query error (using a non-existent table) let result = sqlx::query("SELECT * FROM non_existent_table") - .fetch_all(&mut *postgres_reader.get_connection().await.unwrap()) + .fetch_all(&mut *reader.get_connection().await.unwrap()) .await; assert!(result.is_err()); } @@ -1031,9 +1031,9 @@ mod tests { #[tokio::test] async fn test_concurrent_access() { let redis_client = setup_redis_client(None); - let postgres_reader = setup_pg_reader_client(None).await; + let reader = setup_pg_reader_client(None).await; - let team = insert_new_team_in_pg(postgres_reader.clone(), None) + let team = insert_new_team_in_pg(reader.clone(), None) .await .expect("Failed to insert team in pg"); @@ -1056,7 +1056,7 @@ mod tests { .expect("Failed to insert flag in Redis"); insert_flag_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, Some(FeatureFlagRow { id: 0, @@ -1075,14 +1075,14 @@ mod tests { let mut handles = vec![]; for _ in 0..10 { let redis_client = redis_client.clone(); - let postgres_reader = postgres_reader.clone(); + let reader = reader.clone(); let team_id = team.id; let handle = task::spawn(async move { let redis_flags = FeatureFlagList::from_redis(redis_client, team_id) .await .unwrap(); - let pg_flags = FeatureFlagList::from_pg(postgres_reader, team_id) + let pg_flags = FeatureFlagList::from_pg(reader, team_id) .await .unwrap(); (redis_flags, pg_flags) @@ -1104,9 +1104,9 @@ mod tests { #[ignore] async fn test_performance() { let redis_client = setup_redis_client(None); - let postgres_reader = setup_pg_reader_client(None).await; + let reader = setup_pg_reader_client(None).await; - let team = insert_new_team_in_pg(postgres_reader.clone(), None) + let team = insert_new_team_in_pg(reader.clone(), None) .await .expect("Failed to insert team in pg"); @@ -1136,7 +1136,7 @@ mod tests { for flag in flags { insert_flag_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, Some(FeatureFlagRow { id: 0, @@ -1160,7 +1160,7 @@ mod tests { let redis_duration = start.elapsed(); let start = Instant::now(); - let pg_flags = FeatureFlagList::from_pg(postgres_reader, team.id) + let pg_flags = FeatureFlagList::from_pg(reader, team.id) .await .expect("Failed to fetch flags from Postgres"); let pg_duration = start.elapsed(); @@ -1178,9 +1178,9 @@ mod tests { #[tokio::test] async fn test_edge_cases() { let redis_client = setup_redis_client(None); - let postgres_reader = setup_pg_reader_client(None).await; + let reader = setup_pg_reader_client(None).await; - let team = insert_new_team_in_pg(postgres_reader.clone(), None) + let team = insert_new_team_in_pg(reader.clone(), None) .await .expect("Failed to insert team in pg"); @@ -1225,7 +1225,7 @@ mod tests { for flag in edge_case_flags.as_array().unwrap() { insert_flag_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, Some(FeatureFlagRow { id: 0, @@ -1246,7 +1246,7 @@ mod tests { let redis_flags = FeatureFlagList::from_redis(redis_client, team.id) .await .expect("Failed to fetch flags from Redis"); - let pg_flags = FeatureFlagList::from_pg(postgres_reader, team.id) + let pg_flags = FeatureFlagList::from_pg(reader, team.id) .await .expect("Failed to fetch flags from Postgres"); @@ -1271,9 +1271,9 @@ mod tests { #[tokio::test] async fn test_consistent_behavior_from_both_clients() { let redis_client = setup_redis_client(None); - let postgres_reader = setup_pg_reader_client(None).await; + let reader = setup_pg_reader_client(None).await; - let team = insert_new_team_in_pg(postgres_reader.clone(), None) + let team = insert_new_team_in_pg(reader.clone(), None) .await .expect("Failed to insert team in pg"); @@ -1305,7 +1305,7 @@ mod tests { for flag in flags.as_array().unwrap() { insert_flag_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, Some(FeatureFlagRow { id: 0, @@ -1326,7 +1326,7 @@ mod tests { let mut redis_flags = FeatureFlagList::from_redis(redis_client, team.id) .await .expect("Failed to fetch flags from Redis"); - let mut pg_flags = FeatureFlagList::from_pg(postgres_reader, team.id) + let mut pg_flags = FeatureFlagList::from_pg(reader, team.id) .await .expect("Failed to fetch flags from Postgres"); @@ -1370,9 +1370,9 @@ mod tests { #[tokio::test] async fn test_rollout_percentage_edge_cases() { let redis_client = setup_redis_client(None); - let postgres_reader = setup_pg_reader_client(None).await; + let reader = setup_pg_reader_client(None).await; - let team = insert_new_team_in_pg(postgres_reader.clone(), None) + let team = insert_new_team_in_pg(reader.clone(), None) .await .expect("Failed to insert team in pg"); @@ -1413,7 +1413,7 @@ mod tests { for flag in flags.as_array().unwrap() { insert_flag_for_team_in_pg( - postgres_reader.clone(), + reader.clone(), team.id, Some(FeatureFlagRow { id: 0, @@ -1434,7 +1434,7 @@ mod tests { let redis_flags = FeatureFlagList::from_redis(redis_client, team.id) .await .expect("Failed to fetch flags from Redis"); - let pg_flags = FeatureFlagList::from_pg(postgres_reader, team.id) + let pg_flags = FeatureFlagList::from_pg(reader, team.id) .await .expect("Failed to fetch flags from Postgres"); diff --git a/rust/feature-flags/src/router.rs b/rust/feature-flags/src/router.rs index 46706586a07e6..0353aefa40339 100644 --- a/rust/feature-flags/src/router.rs +++ b/rust/feature-flags/src/router.rs @@ -21,8 +21,8 @@ use crate::{ #[derive(Clone)] pub struct State { pub redis: Arc, - pub postgres_reader: Arc, - pub postgres_writer: Arc, + pub reader: Arc, + pub writer: Arc, pub cohort_cache: Arc, // TODO does this need a better name than just `cohort_cache`? pub geoip: Arc, pub team_ids_to_track: TeamIdsToTrack, @@ -30,8 +30,8 @@ pub struct State { pub fn router( redis: Arc, - postgres_reader: Arc, - postgres_writer: Arc, + reader: Arc, + writer: Arc, cohort_cache: Arc, geoip: Arc, liveness: HealthRegistry, @@ -43,8 +43,8 @@ where { let state = State { redis, - postgres_reader, - postgres_writer, + reader, + writer, cohort_cache, geoip, team_ids_to_track: config.team_ids_to_track.clone(), diff --git a/rust/feature-flags/src/server.rs b/rust/feature-flags/src/server.rs index 12a79b4f1b499..fa30ac51c8220 100644 --- a/rust/feature-flags/src/server.rs +++ b/rust/feature-flags/src/server.rs @@ -27,8 +27,7 @@ where // TODO - we should have a dedicated URL for both this and the writer – the reader will read // from the replica, and the writer will write to the main database. - let postgres_reader = match get_pool(&config.read_database_url, config.max_pg_connections).await - { + let reader = match get_pool(&config.read_database_url, config.max_pg_connections).await { Ok(client) => Arc::new(client), Err(e) => { tracing::error!("Failed to create read Postgres client: {}", e); @@ -36,7 +35,7 @@ where } }; - let postgres_writer = + let writer = // TODO - we should have a dedicated URL for both this and the reader – the reader will read // from the replica, and the writer will write to the main database. match get_pool(&config.write_database_url, config.max_pg_connections).await { @@ -55,7 +54,7 @@ where } }; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); let health = HealthRegistry::new("liveness"); @@ -68,8 +67,8 @@ where // You can decide which client to pass to the router, or pass both if needed let app = router::router( redis_client, - postgres_reader, - postgres_writer, + reader, + writer, cohort_cache, geoip_service, health, diff --git a/rust/feature-flags/tests/test_flag_matching_consistency.rs b/rust/feature-flags/tests/test_flag_matching_consistency.rs index c31ac2094ad68..82e45a6a6d963 100644 --- a/rust/feature-flags/tests/test_flag_matching_consistency.rs +++ b/rust/feature-flags/tests/test_flag_matching_consistency.rs @@ -112,17 +112,17 @@ async fn it_is_consistent_with_rollout_calculation_for_simple_flags() { ]; for (i, result) in results.iter().enumerate().take(1000) { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); let distinct_id = format!("distinct_id_{}", i); let feature_flag_match = FeatureFlagMatcher::new( distinct_id, 1, - postgres_reader, - postgres_writer, + reader, + writer, cohort_cache, None, None, @@ -1212,16 +1212,16 @@ async fn it_is_consistent_with_rollout_calculation_for_multivariate_flags() { ]; for (i, result) in results.iter().enumerate().take(1000) { - let postgres_reader = setup_pg_reader_client(None).await; - let postgres_writer = setup_pg_writer_client(None).await; - let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); + let reader = setup_pg_reader_client(None).await; + let writer = setup_pg_writer_client(None).await; + let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); let distinct_id = format!("distinct_id_{}", i); let feature_flag_match = FeatureFlagMatcher::new( distinct_id, 1, - postgres_reader, - postgres_writer, + reader, + writer, cohort_cache, None, None, From 172a6741afaea3908ee8973cc3605c9f91c513e9 Mon Sep 17 00:00:00 2001 From: dylan Date: Tue, 26 Nov 2024 11:47:05 -0800 Subject: [PATCH 2/7] made some code review-related changes --- rust/Cargo.toml | 2 +- rust/feature-flags/Cargo.toml | 2 +- rust/feature-flags/src/api/handler.rs | 2 +- rust/feature-flags/src/config.rs | 8 ++++++++ rust/feature-flags/src/router.rs | 4 ++-- rust/feature-flags/src/server.rs | 6 +++++- 6 files changed, 18 insertions(+), 6 deletions(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 137470fb33406..3a2301337da90 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -96,4 +96,4 @@ ahash = "0.8.11" aws-config = { version = "1.1.7", features = ["behavior-version-latest"] } aws-sdk-s3 = "1.58.0" mockall = "0.13.0" -moka = { version = "0.12.8", features = ["sync"] } +moka = { version = "0.12.8", features = ["sync", "future"] } diff --git a/rust/feature-flags/Cargo.toml b/rust/feature-flags/Cargo.toml index 4099fd8ab06fd..536c85e836a7a 100644 --- a/rust/feature-flags/Cargo.toml +++ b/rust/feature-flags/Cargo.toml @@ -40,7 +40,7 @@ common-metrics = { path = "../common/metrics" } tower = { workspace = true } derive_builder = "0.20.1" petgraph = "0.6.5" -moka = { version = "0.12.8", features = ["future"] } +moka = { workspace = true } [lints] workspace = true diff --git a/rust/feature-flags/src/api/handler.rs b/rust/feature-flags/src/api/handler.rs index 21dfa60c67e8a..0fdade8d95128 100644 --- a/rust/feature-flags/src/api/handler.rs +++ b/rust/feature-flags/src/api/handler.rs @@ -121,7 +121,7 @@ pub async fn process_request(context: RequestContext) -> Result, pub reader: Arc, pub writer: Arc, - pub cohort_cache: Arc, // TODO does this need a better name than just `cohort_cache`? + pub cohort_cache_manager: Arc, pub geoip: Arc, pub team_ids_to_track: TeamIdsToTrack, } @@ -45,7 +45,7 @@ where redis, reader, writer, - cohort_cache, + cohort_cache_manager: cohort_cache, geoip, team_ids_to_track: config.team_ids_to_track.clone(), }; diff --git a/rust/feature-flags/src/server.rs b/rust/feature-flags/src/server.rs index fa30ac51c8220..7a5fde80a629f 100644 --- a/rust/feature-flags/src/server.rs +++ b/rust/feature-flags/src/server.rs @@ -54,7 +54,11 @@ where } }; - let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); + let cohort_cache = Arc::new(CohortCacheManager::new( + reader.clone(), + Some(config.cache_max_capacity), + Some(config.cache_ttl_seconds), + )); let health = HealthRegistry::new("liveness"); From 574d4a64b63526d07f2bb1c0709f9704807a7ff3 Mon Sep 17 00:00:00 2001 From: dylan Date: Tue, 26 Nov 2024 11:53:02 -0800 Subject: [PATCH 3/7] fix some formatting --- .../src/flags/flag_operations.rs | 4 +-- .../tests/test_flag_matching_consistency.rs | 34 ++++++------------- 2 files changed, 11 insertions(+), 27 deletions(-) diff --git a/rust/feature-flags/src/flags/flag_operations.rs b/rust/feature-flags/src/flags/flag_operations.rs index 205afd6479d23..d0d2aa65a0912 100644 --- a/rust/feature-flags/src/flags/flag_operations.rs +++ b/rust/feature-flags/src/flags/flag_operations.rs @@ -1082,9 +1082,7 @@ mod tests { let redis_flags = FeatureFlagList::from_redis(redis_client, team_id) .await .unwrap(); - let pg_flags = FeatureFlagList::from_pg(reader, team_id) - .await - .unwrap(); + let pg_flags = FeatureFlagList::from_pg(reader, team_id).await.unwrap(); (redis_flags, pg_flags) }); diff --git a/rust/feature-flags/tests/test_flag_matching_consistency.rs b/rust/feature-flags/tests/test_flag_matching_consistency.rs index 82e45a6a6d963..8d61e6d5fb23e 100644 --- a/rust/feature-flags/tests/test_flag_matching_consistency.rs +++ b/rust/feature-flags/tests/test_flag_matching_consistency.rs @@ -118,18 +118,11 @@ async fn it_is_consistent_with_rollout_calculation_for_simple_flags() { let distinct_id = format!("distinct_id_{}", i); - let feature_flag_match = FeatureFlagMatcher::new( - distinct_id, - 1, - reader, - writer, - cohort_cache, - None, - None, - ) - .get_match(&flags[0], None, None) - .await - .unwrap(); + let feature_flag_match = + FeatureFlagMatcher::new(distinct_id, 1, reader, writer, cohort_cache, None, None) + .get_match(&flags[0], None, None) + .await + .unwrap(); if *result { assert_eq!( @@ -1217,18 +1210,11 @@ async fn it_is_consistent_with_rollout_calculation_for_multivariate_flags() { let cohort_cache = Arc::new(CohortCacheManager::new(reader.clone(), None, None)); let distinct_id = format!("distinct_id_{}", i); - let feature_flag_match = FeatureFlagMatcher::new( - distinct_id, - 1, - reader, - writer, - cohort_cache, - None, - None, - ) - .get_match(&flags[0], None, None) - .await - .unwrap(); + let feature_flag_match = + FeatureFlagMatcher::new(distinct_id, 1, reader, writer, cohort_cache, None, None) + .get_match(&flags[0], None, None) + .await + .unwrap(); if let Some(variant) = &result { assert_eq!( From 386ff7b68d500cd64cf248ef51d3692ebc84210d Mon Sep 17 00:00:00 2001 From: dylan Date: Tue, 26 Nov 2024 12:16:18 -0800 Subject: [PATCH 4/7] clip y --- rust/feature-flags/src/cohort/cohort_operations.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/feature-flags/src/cohort/cohort_operations.rs b/rust/feature-flags/src/cohort/cohort_operations.rs index 9ff522f89f236..60afc7ca30f1c 100644 --- a/rust/feature-flags/src/cohort/cohort_operations.rs +++ b/rust/feature-flags/src/cohort/cohort_operations.rs @@ -46,7 +46,7 @@ impl Cohort { let mut props = cohort_property.properties.to_inner(); props.retain(|f| !(f.key == "id" && f.prop_type == "cohort")); - return Ok(props); + Ok(props) } /// Extracts dependent CohortIds from the cohort's filters From 9068c0b14294abee7ffa44defa77d9b4fca837db Mon Sep 17 00:00:00 2001 From: dylan Date: Tue, 26 Nov 2024 14:08:04 -0800 Subject: [PATCH 5/7] more metrics more metrics --- .../src/cohort/cohort_cache_manager.rs | 46 +++++- rust/feature-flags/src/flags/flag_matching.rs | 103 ++++++++++-- rust/feature-flags/src/flags/flag_request.rs | 153 ++++++++++++------ .../src/metrics/metrics_consts.rs | 25 ++- 4 files changed, 260 insertions(+), 67 deletions(-) diff --git a/rust/feature-flags/src/cohort/cohort_cache_manager.rs b/rust/feature-flags/src/cohort/cohort_cache_manager.rs index d2e43f529dea6..c545321fe2542 100644 --- a/rust/feature-flags/src/cohort/cohort_cache_manager.rs +++ b/rust/feature-flags/src/cohort/cohort_cache_manager.rs @@ -1,6 +1,10 @@ use crate::api::errors::FlagError; use crate::cohort::cohort_models::Cohort; use crate::flags::flag_matching::{PostgresReader, TeamId}; +use crate::metrics::metrics_consts::{ + COHORT_CACHE_HIT_COUNTER, COHORT_CACHE_MISS_COUNTER, DB_COHORT_ERRORS_COUNTER, + DB_COHORT_READS_COUNTER, +}; use moka::future::Cache; use std::sync::Arc; use std::time::Duration; @@ -63,22 +67,56 @@ impl CohortCacheManager { /// If the cohorts are not present in the cache or have expired, it fetches them from the database, /// caches the result upon successful retrieval, and then returns it. pub async fn get_cohorts(&self, team_id: TeamId) -> Result, FlagError> { + // First check cache before acquiring lock if let Some(cached_cohorts) = self.cache.get(&team_id).await { + common_metrics::inc( + COHORT_CACHE_HIT_COUNTER, + &[("team_id".to_string(), team_id.to_string())], + 1, + ); return Ok(cached_cohorts.clone()); } // Acquire the lock before fetching let _lock = self.fetch_lock.lock().await; - // Double-check the cache after acquiring the lock + // Double-check the cache after acquiring lock if let Some(cached_cohorts) = self.cache.get(&team_id).await { + common_metrics::inc( + COHORT_CACHE_HIT_COUNTER, + &[("team_id".to_string(), team_id.to_string())], + 1, + ); return Ok(cached_cohorts.clone()); } - let fetched_cohorts = Cohort::list_from_pg(self.reader.clone(), team_id).await?; - self.cache.insert(team_id, fetched_cohorts.clone()).await; + // If we get here, we have a cache miss + common_metrics::inc( + COHORT_CACHE_MISS_COUNTER, + &[("team_id".to_string(), team_id.to_string())], + 1, + ); - Ok(fetched_cohorts) + // Attempt to fetch from DB + match Cohort::list_from_pg(self.reader.clone(), team_id).await { + Ok(fetched_cohorts) => { + common_metrics::inc( + DB_COHORT_READS_COUNTER, + &[("team_id".to_string(), team_id.to_string())], + 1, + ); + self.cache.insert(team_id, fetched_cohorts.clone()).await; + Ok(fetched_cohorts) + } + Err(e) => { + common_metrics::inc( + DB_COHORT_ERRORS_COUNTER, + &[("team_id".to_string(), team_id.to_string())], + 1, + ); + Err(e) + } + } } } diff --git a/rust/feature-flags/src/flags/flag_matching.rs b/rust/feature-flags/src/flags/flag_matching.rs index aeda22a1efeff..73666f02f74eb 100644 --- a/rust/feature-flags/src/flags/flag_matching.rs +++ b/rust/feature-flags/src/flags/flag_matching.rs @@ -5,7 +5,11 @@ use crate::cohort::cohort_cache_manager::CohortCacheManager; use crate::cohort::cohort_models::{Cohort, CohortId}; use crate::flags::flag_match_reason::FeatureFlagMatchReason; use crate::flags::flag_models::{FeatureFlag, FeatureFlagList, FlagGroupType}; -use crate::metrics::metrics_consts::{FLAG_EVALUATION_ERROR_COUNTER, FLAG_HASH_KEY_WRITES_COUNTER}; +use crate::metrics::metrics_consts::{ + DB_GROUP_PROPERTIES_READS_COUNTER, DB_PERSON_AND_GROUP_PROPERTIES_READS_COUNTER, + DB_PERSON_PROPERTIES_READS_COUNTER, FLAG_EVALUATION_ERROR_COUNTER, + FLAG_HASH_KEY_WRITES_COUNTER, PROPERTY_CACHE_HITS_COUNTER, PROPERTY_CACHE_MISSES_COUNTER, +}; use crate::metrics::metrics_utils::parse_exception_for_prometheus_label; use crate::properties::property_matching::match_property; use crate::properties::property_models::{OperatorType, PropertyFilter}; @@ -107,11 +111,22 @@ impl GroupTypeMappingCache { Ok(mapping) if !mapping.is_empty() => mapping, Ok(_) => { self.failed_to_fetch_flags = true; - // TODO add the `"Failed to fetch group"` type of lable. See posthog/models/feature_flag/flag_matching.py:parse_exception_for_error_message + let reason = "no_group_type_mappings"; + inc( + FLAG_EVALUATION_ERROR_COUNTER, + &[("reason".to_string(), reason.to_string())], + 1, + ); return Err(FlagError::NoGroupTypeMappings); } Err(e) => { self.failed_to_fetch_flags = true; + let reason = parse_exception_for_prometheus_label(&e); + inc( + FLAG_EVALUATION_ERROR_COUNTER, + &[("reason".to_string(), reason.to_string())], + 1, + ); return Err(e); } }; @@ -135,7 +150,12 @@ impl GroupTypeMappingCache { self.group_indexes_to_types.clone_from(&result); Ok(result) } else { - // TODO add the `"Failed to fetch group"` type of lable. See posthog/models/feature_flag/flag_matching.py:parse_exception_for_error_message + let reason = "no_group_type_mappings"; + inc( + FLAG_EVALUATION_ERROR_COUNTER, + &[("reason".to_string(), reason.to_string())], + 1, + ); Err(FlagError::NoGroupTypeMappings) } } @@ -164,7 +184,12 @@ impl GroupTypeMappingCache { .collect(); if mapping.is_empty() { - // TODO add the `"Failed to fetch group"` type of lable. See posthog/models/feature_flag/flag_matching.py:parse_exception_for_error_message + let reason = "no_group_type_mappings"; + inc( + FLAG_EVALUATION_ERROR_COUNTER, + &[("reason".to_string(), reason.to_string())], + 1, + ); Err(FlagError::NoGroupTypeMappings) } else { Ok(mapping) @@ -328,7 +353,6 @@ impl FeatureFlagMatcher { .await { error!("Failed to set feature flag hash key overrides: {:?}", e); - // Increment the counter for failed write let reason = parse_exception_for_prometheus_label(&e); inc( FLAG_EVALUATION_ERROR_COUNTER, @@ -340,7 +364,6 @@ impl FeatureFlagMatcher { writing_hash_key_override = true; } - // TODO I'm not sure if this is the right place to increment this counter inc( FLAG_HASH_KEY_WRITES_COUNTER, &[ @@ -443,7 +466,13 @@ impl FeatureFlagMatcher { ) .await { - Ok(_) => {} + Ok(_) => { + inc( + DB_PERSON_AND_GROUP_PROPERTIES_READS_COUNTER, + &[("team_id".to_string(), team_id.to_string())], + 1, + ); + } Err(e) => { error_while_computing_flags = true; // TODO add sentry exception tracking @@ -806,7 +835,7 @@ impl FeatureFlagMatcher { } } - /// Get group properties from cache or database. + /// Get group properties from overrides, cache or database. /// /// This function attempts to retrieve group properties either from a cache or directly from the database. /// It first checks if there are any locally computable property overrides. If so, it returns those. @@ -832,9 +861,26 @@ impl FeatureFlagMatcher { /// and updates the cache accordingly. async fn get_person_id(&mut self) -> Result { match self.properties_cache.person_id { - Some(id) => Ok(id), + Some(id) => { + inc( + PROPERTY_CACHE_HITS_COUNTER, + &[("type".to_string(), "person_id".to_string())], + 1, + ); + Ok(id) + } None => { + inc( + PROPERTY_CACHE_MISSES_COUNTER, + &[("type".to_string(), "person_id".to_string())], + 1, + ); let id = self.get_person_id_from_db().await?; + inc( + DB_PERSON_PROPERTIES_READS_COUNTER, + &[("team_id".to_string(), self.team_id.to_string())], + 1, + ); self.properties_cache.person_id = Some(id); Ok(id) } @@ -852,7 +898,7 @@ impl FeatureFlagMatcher { .map(|(_, person_id)| person_id) } - /// Get person properties from cache or database. + /// Get person properties from overrides, cache or database. /// /// This function attempts to retrieve person properties either from a cache or directly from the database. /// It first checks if there are any locally computable property overrides. If so, it returns those. @@ -997,16 +1043,33 @@ impl FeatureFlagMatcher { .group_properties .get(&group_type_index) { + inc( + PROPERTY_CACHE_HITS_COUNTER, + &[("type".to_string(), "group_properties".to_string())], + 1, + ); let mut result = HashMap::new(); result.clone_from(properties); return Ok(result); } + inc( + PROPERTY_CACHE_MISSES_COUNTER, + &[("type".to_string(), "group_properties".to_string())], + 1, + ); + let reader = self.reader.clone(); let team_id = self.team_id; let db_properties = fetch_group_properties_from_db(reader, team_id, group_type_index).await?; + inc( + DB_GROUP_PROPERTIES_READS_COUNTER, + &[("team_id".to_string(), team_id.to_string())], + 1, + ); + // once the properties are fetched, cache them so we don't need to fetch again in a given request self.properties_cache .group_properties @@ -1025,17 +1088,34 @@ impl FeatureFlagMatcher { ) -> Result, FlagError> { // check if the properties are already cached, if so return them if let Some(properties) = &self.properties_cache.person_properties { + inc( + PROPERTY_CACHE_HITS_COUNTER, + &[("type".to_string(), "person_properties".to_string())], + 1, + ); let mut result = HashMap::new(); result.clone_from(properties); return Ok(result); } + inc( + PROPERTY_CACHE_MISSES_COUNTER, + &[("type".to_string(), "person_properties".to_string())], + 1, + ); + let reader = self.reader.clone(); let distinct_id = self.distinct_id.clone(); let team_id = self.team_id; let (db_properties, person_id) = fetch_person_properties_from_db(reader, distinct_id, team_id).await?; + inc( + DB_PERSON_PROPERTIES_READS_COUNTER, + &[("team_id".to_string(), team_id.to_string())], + 1, + ); + // once the properties and person ID are fetched, cache them so we don't need to fetch again in a given request self.properties_cache.person_properties = Some(db_properties.clone()); self.properties_cache.person_id = Some(person_id); @@ -1555,9 +1635,6 @@ fn locally_computable_property_overrides( property_filters: &[PropertyFilter], ) -> Option> { property_overrides.as_ref().and_then(|overrides| { - // TODO handle note from Neil: https://github.com/PostHog/posthog/pull/24589#discussion_r1735828561 - // TL;DR – we'll need to handle cohort properties at the DB level, i.e. we'll need to adjust the cohort query - // to account for if a given person is an element of the cohort X, Y, Z, etc let should_prefer_overrides = property_filters .iter() .all(|prop| overrides.contains_key(&prop.key) && prop.prop_type != "cohort"); diff --git a/rust/feature-flags/src/flags/flag_request.rs b/rust/feature-flags/src/flags/flag_request.rs index 89890505c6c4b..1a9878791c381 100644 --- a/rust/feature-flags/src/flags/flag_request.rs +++ b/rust/feature-flags/src/flags/flag_request.rs @@ -10,7 +10,11 @@ use crate::{ api::errors::FlagError, client::{database::Client as DatabaseClient, redis::Client as RedisClient}, flags::flag_models::FeatureFlagList, - metrics::metrics_consts::FLAG_CACHE_HIT_COUNTER, + metrics::metrics_consts::{ + DB_FLAG_READS_COUNTER, DB_TEAM_READS_COUNTER, FLAG_CACHE_ERRORS_COUNTER, + FLAG_CACHE_HIT_COUNTER, TEAM_CACHE_ERRORS_COUNTER, TEAM_CACHE_HIT_COUNTER, + TOKEN_VALIDATION_ERRORS_COUNTER, + }, team::team_models::Team, }; @@ -83,23 +87,50 @@ impl FlagRequest { _ => return Err(FlagError::NoTokenError), }; - match Team::from_redis(redis_client.clone(), token.clone()).await { - Ok(_) => Ok(token), + let (result, cache_hit) = match Team::from_redis(redis_client.clone(), token.clone()).await + { + Ok(_) => (Ok(token.clone()), true), Err(_) => { - // Fallback: Check PostgreSQL if not found in Redis match Team::from_pg(pg_client, token.clone()).await { Ok(team) => { + inc( + DB_TEAM_READS_COUNTER, + &[("token".to_string(), token.clone())], + 1, + ); // Token found in PostgreSQL, update Redis cache so that we can verify it from Redis next time if let Err(e) = Team::update_redis_cache(redis_client, &team).await { tracing::warn!("Failed to update Redis cache: {}", e); + inc( + TEAM_CACHE_ERRORS_COUNTER, + &[("reason".to_string(), "redis_update_failed".to_string())], + 1, + ); } - Ok(token) + (Ok(token.clone()), false) + } + Err(_) => { + inc( + TOKEN_VALIDATION_ERRORS_COUNTER, + &[("reason".to_string(), "token_not_found".to_string())], + 1, + ); + (Err(FlagError::TokenValidationError), false) } - // TODO do we need a custom error here to track the fallback - Err(_) => Err(FlagError::TokenValidationError), } } - } + }; + + inc( + &TEAM_CACHE_HIT_COUNTER, + &[ + ("token".to_string(), token.clone()), + ("cache_hit".to_string(), cache_hit.to_string()), + ], + 1, + ); + + result } /// Fetches the team from the cache or the database. @@ -111,22 +142,42 @@ impl FlagRequest { redis_client: Arc, pg_client: Arc, ) -> Result { - match Team::from_redis(redis_client.clone(), token.to_owned()).await { - Ok(team) => Ok(team), - Err(_) => match Team::from_pg(pg_client, token.to_owned()).await { - Ok(team) => { - // If we have the team in postgres, but not redis, update redis so we're faster next time - // TODO: we have some counters in django for tracking these cache misses - // we should probably do the same here - if let Err(e) = Team::update_redis_cache(redis_client, &team).await { - tracing::warn!("Failed to update Redis cache: {}", e); + let (team_result, cache_hit) = + match Team::from_redis(redis_client.clone(), token.to_owned()).await { + Ok(team) => (Ok(team), true), + Err(_) => match Team::from_pg(pg_client, token.to_owned()).await { + Ok(team) => { + inc( + DB_TEAM_READS_COUNTER, + &[("token".to_string(), token.to_string())], + 1, + ); + // If we have the team in postgres, but not redis, update redis so we're faster next time + if let Err(e) = Team::update_redis_cache(redis_client, &team).await { + tracing::warn!("Failed to update Redis cache: {}", e); + inc( + TEAM_CACHE_ERRORS_COUNTER, + &[("reason".to_string(), "redis_update_failed".to_string())], + 1, + ); + } + (Ok(team), false) } - Ok(team) - } - // TODO what kind of error should we return here? - Err(e) => Err(e), - }, - } + // TODO what kind of error should we return here? + Err(e) => (Err(e), false), + }, + }; + + inc( + &TEAM_CACHE_HIT_COUNTER, + &[ + ("token".to_string(), token.to_string()), + ("cache_hit".to_string(), cache_hit.to_string()), + ], + 1, + ); + + team_result } /// Extracts the distinct_id from the request. @@ -164,31 +215,37 @@ impl FlagRequest { redis_client: &Arc, pg_client: &Arc, ) -> Result { - let mut cache_hit = false; - let flags = match FeatureFlagList::from_redis(redis_client.clone(), team_id).await { - Ok(flags) => { - cache_hit = true; - Ok(flags) - } - Err(_) => match FeatureFlagList::from_pg(pg_client.clone(), team_id).await { - Ok(flags) => { - if let Err(e) = FeatureFlagList::update_flags_in_redis( - redis_client.clone(), - team_id, - &flags, - ) - .await - { - tracing::warn!("Failed to update Redis cache: {}", e); - // TODO add new metric category for this + let (flags_result, cache_hit) = + match FeatureFlagList::from_redis(redis_client.clone(), team_id).await { + Ok(flags) => (Ok(flags), true), + Err(_) => match FeatureFlagList::from_pg(pg_client.clone(), team_id).await { + Ok(flags) => { + inc( + DB_FLAG_READS_COUNTER, + &[("team_id".to_string(), team_id.to_string())], + 1, + ); + if let Err(e) = FeatureFlagList::update_flags_in_redis( + redis_client.clone(), + team_id, + &flags, + ) + .await + { + tracing::warn!("Failed to update Redis cache: {}", e); + inc( + FLAG_CACHE_ERRORS_COUNTER, + &[("reason".to_string(), "redis_update_failed".to_string())], + 1, + ); + } + (Ok(flags), false) } - Ok(flags) - } - // TODO what kind of error should we return here? This should be postgres - // I guess it can be whatever the FlagError is - Err(e) => Err(e), - }, - }; + // TODO what kind of error should we return here? This should be postgres + // I guess it can be whatever the FlagError is + Err(e) => (Err(e), false), + }, + }; inc( FLAG_CACHE_HIT_COUNTER, @@ -199,7 +256,7 @@ impl FlagRequest { 1, ); - flags + flags_result } } diff --git a/rust/feature-flags/src/metrics/metrics_consts.rs b/rust/feature-flags/src/metrics/metrics_consts.rs index 5ece796159739..477b88175eb6b 100644 --- a/rust/feature-flags/src/metrics/metrics_consts.rs +++ b/rust/feature-flags/src/metrics/metrics_consts.rs @@ -1,5 +1,26 @@ pub const FLAG_EVALUATION_ERROR_COUNTER: &str = "flag_evaluation_error_total"; pub const FLAG_CACHE_HIT_COUNTER: &str = "flag_cache_hit_total"; +pub const FLAG_CACHE_ERRORS_COUNTER: &str = "flag_cache_errors_total"; pub const FLAG_HASH_KEY_WRITES_COUNTER: &str = "flag_hash_key_writes_total"; -// TODO add metrics for failing to update redis? Does that really happen? -// maybe worth adding for rollout, since writing to redis is a critical path thing +pub const TEAM_CACHE_HIT_COUNTER: &str = "team_cache_hit_total"; +pub const TEAM_CACHE_ERRORS_COUNTER: &str = "team_cache_errors_total"; +pub const DB_TEAM_READS_COUNTER: &str = "db_team_reads_total"; +pub const TOKEN_VALIDATION_ERRORS_COUNTER: &str = "token_validation_errors_total"; +pub const DB_FLAG_READS_COUNTER: &str = "db_flag_reads_total"; +pub const DB_FLAG_ERRORS_COUNTER: &str = "db_flag_errors_total"; +pub const DB_COHORT_READS_COUNTER: &str = "db_cohort_reads_total"; +pub const DB_COHORT_WRITES_COUNTER: &str = "db_cohort_writes_total"; +pub const DB_COHORT_ERRORS_COUNTER: &str = "db_cohort_errors_total"; +pub const COHORT_CACHE_HIT_COUNTER: &str = "cohort_cache_hit_total"; +pub const COHORT_CACHE_MISS_COUNTER: &str = "cohort_cache_miss_total"; +pub const COHORT_CACHE_ERRORS_COUNTER: &str = "cohort_cache_errors_total"; +pub const GROUP_TYPE_CACHE_HIT_COUNTER: &str = "group_type_cache_hit_total"; +pub const GROUP_TYPE_CACHE_MISS_COUNTER: &str = "group_type_cache_miss_total"; +pub const GROUP_TYPE_CACHE_ERRORS_COUNTER: &str = "group_type_cache_errors_total"; +pub const FAILED_TO_FETCH_GROUP_COUNTER: &str = "failed_to_fetch_group_total"; +pub const PROPERTY_CACHE_HITS_COUNTER: &str = "property_cache_hits_total"; +pub const PROPERTY_CACHE_MISSES_COUNTER: &str = "property_cache_misses_total"; +pub const DB_PERSON_AND_GROUP_PROPERTIES_READS_COUNTER: &str = + "db_person_and_group_properties_reads_total"; +pub const DB_PERSON_PROPERTIES_READS_COUNTER: &str = "db_person_properties_reads_total"; +pub const DB_GROUP_PROPERTIES_READS_COUNTER: &str = "db_group_properties_reads_total"; From ffe10d00a874dc60e8377cda0d23d9b6b3352e30 Mon Sep 17 00:00:00 2001 From: dylan Date: Tue, 26 Nov 2024 14:15:50 -0800 Subject: [PATCH 6/7] fix --- rust/feature-flags/src/flags/flag_request.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rust/feature-flags/src/flags/flag_request.rs b/rust/feature-flags/src/flags/flag_request.rs index 1a9878791c381..cab455e13bbbc 100644 --- a/rust/feature-flags/src/flags/flag_request.rs +++ b/rust/feature-flags/src/flags/flag_request.rs @@ -122,7 +122,7 @@ impl FlagRequest { }; inc( - &TEAM_CACHE_HIT_COUNTER, + TEAM_CACHE_HIT_COUNTER, &[ ("token".to_string(), token.clone()), ("cache_hit".to_string(), cache_hit.to_string()), @@ -169,7 +169,7 @@ impl FlagRequest { }; inc( - &TEAM_CACHE_HIT_COUNTER, + TEAM_CACHE_HIT_COUNTER, &[ ("token".to_string(), token.to_string()), ("cache_hit".to_string(), cache_hit.to_string()), From 21a9e25b8052532b3eebdeccc3145e81667af1b6 Mon Sep 17 00:00:00 2001 From: dylan Date: Wed, 27 Nov 2024 14:11:11 -0800 Subject: [PATCH 7/7] will this commit? --- frontend/src/lib/lemon-ui/Popover/Popover.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frontend/src/lib/lemon-ui/Popover/Popover.tsx b/frontend/src/lib/lemon-ui/Popover/Popover.tsx index 3a8d3b9fda802..67da3be6ebae7 100644 --- a/frontend/src/lib/lemon-ui/Popover/Popover.tsx +++ b/frontend/src/lib/lemon-ui/Popover/Popover.tsx @@ -155,7 +155,7 @@ export const Popover = React.forwardRef(function P const mergedReferenceRef = useMergeRefs([ referenceRef, extraReferenceRef || null, - children?.ref, + (children as any)?.ref, ]) as React.RefCallback const arrowStyle = middlewareData.arrow