diff --git a/rust/common/metrics/src/lib.rs b/rust/common/metrics/src/lib.rs index 0089736300ba4..9e82e98cc004f 100644 --- a/rust/common/metrics/src/lib.rs +++ b/rust/common/metrics/src/lib.rs @@ -5,6 +5,22 @@ use axum::{ routing::get, Router, }; use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; +use std::sync::OnceLock; + +type LabelFilterFn = + Box Vec<(String, String)> + Send + Sync + 'static>; + +static LABEL_FILTER: OnceLock = OnceLock::new(); + +pub fn set_label_filter(filter: F) +where + F: Fn(&[(String, String)]) -> Vec<(String, String)> + Send + Sync + 'static, +{ + let boxed_filter: LabelFilterFn = Box::new(filter); + if LABEL_FILTER.set(boxed_filter).is_err() { + panic!("Label filter already set"); + } +} /// Bind a `TcpListener` on the provided bind address to serve a `Router` on it. /// This function is intended to take a Router as returned by `setup_metrics_router`, potentially with more routes added by the caller. @@ -83,7 +99,16 @@ pub fn get_current_timestamp_seconds() -> f64 { // Shorthand for common metric types pub fn inc(name: &'static str, labels: &[(String, String)], value: u64) { - metrics::counter!(name, labels).increment(value); + let filtered_labels = apply_label_filter(labels); + metrics::counter!(name, &filtered_labels).increment(value); +} + +fn apply_label_filter(labels: &[(String, String)]) -> Vec<(String, String)> { + if let Some(filter) = LABEL_FILTER.get() { + filter(labels) + } else { + labels.to_vec() + } } pub fn gauge(name: &'static str, lables: &[(String, String)], value: f64) { diff --git a/rust/feature-flags/src/config.rs b/rust/feature-flags/src/config.rs index 02fd621224345..2194f84ac3eff 100644 --- a/rust/feature-flags/src/config.rs +++ b/rust/feature-flags/src/config.rs @@ -1,10 +1,72 @@ use envconfig::Envconfig; use once_cell::sync::Lazy; use std::net::SocketAddr; +use std::num::ParseIntError; use std::path::{Path, PathBuf}; use std::str::FromStr; -// TODO rewrite this to follow the AppConfig pattern in other files +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum TeamIdsToTrack { + All, + TeamIds(Vec), +} + +#[derive(Debug)] +pub enum ParseTeamIdsError { + InvalidRange(String), + InvalidNumber(ParseIntError), +} + +impl std::fmt::Display for ParseTeamIdsError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ParseTeamIdsError::InvalidRange(r) => write!(f, "Invalid range: {}", r), + ParseTeamIdsError::InvalidNumber(e) => write!(f, "Invalid number: {}", e), + } + } +} + +impl std::error::Error for ParseTeamIdsError {} + +impl FromStr for TeamIdsToTrack { + type Err = ParseTeamIdsError; + + fn from_str(s: &str) -> Result { + let s = s.trim(); + if s.eq_ignore_ascii_case("all") { + Ok(TeamIdsToTrack::All) + } else { + let mut team_ids = Vec::new(); + for part in s.split(',').map(|p| p.trim()) { + if part.contains(':') { + let bounds: Vec<&str> = part.split(':').collect(); + if bounds.len() != 2 { + return Err(ParseTeamIdsError::InvalidRange(part.to_string())); + } + let start = bounds[0] + .parse::() + .map_err(ParseTeamIdsError::InvalidNumber)?; + let end = bounds[1] + .parse::() + .map_err(ParseTeamIdsError::InvalidNumber)?; + if end < start { + return Err(ParseTeamIdsError::InvalidRange(part.to_string())); + } + for id in start..=end { + team_ids.push(id); + } + } else { + let id = part + .parse::() + .map_err(ParseTeamIdsError::InvalidNumber)?; + team_ids.push(id); + } + } + Ok(TeamIdsToTrack::TeamIds(team_ids)) + } + } +} + #[derive(Envconfig, Clone, Debug)] pub struct Config { #[envconfig(default = "127.0.0.1:3001")] @@ -33,6 +95,9 @@ pub struct Config { #[envconfig(default = "false")] pub enable_metrics: bool, + + #[envconfig(from = "TEAM_IDS_TO_TRACK", default = "all")] + pub team_ids_to_track: TeamIdsToTrack, } impl Config { @@ -48,6 +113,7 @@ impl Config { acquire_timeout_secs: 1, maxmind_db_path: "".to_string(), enable_metrics: false, + team_ids_to_track: TeamIdsToTrack::All, } } @@ -90,6 +156,7 @@ mod tests { assert_eq!(config.max_concurrency, 1000); assert_eq!(config.max_pg_connections, 10); assert_eq!(config.redis_url, "redis://localhost:6379/"); + assert_eq!(config.team_ids_to_track, TeamIdsToTrack::All); } #[test] @@ -107,6 +174,7 @@ mod tests { assert_eq!(config.max_concurrency, 1000); assert_eq!(config.max_pg_connections, 10); assert_eq!(config.redis_url, "redis://localhost:6379/"); + assert_eq!(config.team_ids_to_track, TeamIdsToTrack::All); } #[test] @@ -124,5 +192,42 @@ mod tests { assert_eq!(config.max_concurrency, 1000); assert_eq!(config.max_pg_connections, 10); assert_eq!(config.redis_url, "redis://localhost:6379/"); + assert_eq!(config.team_ids_to_track, TeamIdsToTrack::All); + } + + #[test] + fn test_team_ids_to_track_all() { + let team_ids: TeamIdsToTrack = "all".parse().unwrap(); + assert_eq!(team_ids, TeamIdsToTrack::All); + } + + #[test] + fn test_team_ids_to_track_single_ids() { + let team_ids: TeamIdsToTrack = "1,5,7,13".parse().unwrap(); + assert_eq!(team_ids, TeamIdsToTrack::TeamIds(vec![1, 5, 7, 13])); + } + + #[test] + fn test_team_ids_to_track_ranges() { + let team_ids: TeamIdsToTrack = "1:3".parse().unwrap(); + assert_eq!(team_ids, TeamIdsToTrack::TeamIds(vec![1, 2, 3])); + } + + #[test] + fn test_team_ids_to_track_mixed() { + let team_ids: TeamIdsToTrack = "1:3,5,7:9".parse().unwrap(); + assert_eq!(team_ids, TeamIdsToTrack::TeamIds(vec![1, 2, 3, 5, 7, 8, 9])); + } + + #[test] + fn test_invalid_range() { + let result: Result = "5:3".parse(); + assert!(result.is_err()); + } + + #[test] + fn test_invalid_number() { + let result: Result = "abc".parse(); + assert!(result.is_err()); } } diff --git a/rust/feature-flags/src/flag_matching.rs b/rust/feature-flags/src/flag_matching.rs index 4dd72ed32aba3..d6449f993d15d 100644 --- a/rust/feature-flags/src/flag_matching.rs +++ b/rust/feature-flags/src/flag_matching.rs @@ -3,9 +3,12 @@ use crate::{ database::Client as DatabaseClient, feature_flag_match_reason::FeatureFlagMatchReason, flag_definitions::{FeatureFlag, FeatureFlagList, FlagGroupType, PropertyFilter}, + metrics_consts::{FLAG_EVALUATION_ERROR_COUNTER, FLAG_HASH_KEY_WRITES_COUNTER}, property_matching::match_property, + utils::parse_exception_for_prometheus_label, }; use anyhow::Result; +use common_metrics::inc; use serde_json::Value; use sha1::{Digest, Sha1}; use sqlx::{postgres::PgQueryResult, Acquire, FromRow}; @@ -99,6 +102,7 @@ impl GroupTypeMappingCache { Ok(mapping) if !mapping.is_empty() => mapping, Ok(_) => { self.failed_to_fetch_flags = true; + // TODO add the `"Failed to fetch group"` type of lable. See posthog/models/feature_flag/flag_matching.py:parse_exception_for_error_message return Err(FlagError::NoGroupTypeMappings); } Err(e) => { @@ -126,6 +130,7 @@ impl GroupTypeMappingCache { self.group_indexes_to_types.clone_from(&result); Ok(result) } else { + // TODO add the `"Failed to fetch group"` type of lable. See posthog/models/feature_flag/flag_matching.py:parse_exception_for_error_message Err(FlagError::NoGroupTypeMappings) } } @@ -154,6 +159,7 @@ impl GroupTypeMappingCache { .collect(); if mapping.is_empty() { + // TODO add the `"Failed to fetch group"` type of lable. See posthog/models/feature_flag/flag_matching.py:parse_exception_for_error_message Err(FlagError::NoGroupTypeMappings) } else { Ok(mapping) @@ -237,6 +243,16 @@ impl FeatureFlagMatcher { (None, false) }; + // If there was an initial error in processing hash key overrides, increment the error counter + if initial_error { + let reason = "hash_key_override_error"; + common_metrics::inc( + FLAG_EVALUATION_ERROR_COUNTER, + &[("reason".to_string(), reason.to_string())], + 1, + ); + } + let flags_response = self .evaluate_flags_with_overrides( feature_flags, @@ -272,25 +288,54 @@ impl FeatureFlagMatcher { "Failed to check if hash key override should be written: {:?}", e ); + let reason = parse_exception_for_prometheus_label(&e); + inc( + FLAG_EVALUATION_ERROR_COUNTER, + &[("reason".to_string(), reason.to_string())], + 1, + ); return (None, true); } }; + let mut writing_hash_key_override = false; + if should_write { if let Err(e) = set_feature_flag_hash_key_overrides( // NB: this is the only method that writes to the database, so it's the only one that should use the writer self.postgres_writer.clone(), self.team_id, target_distinct_ids.clone(), - hash_key, + hash_key.clone(), ) .await { error!("Failed to set feature flag hash key overrides: {:?}", e); + // Increment the counter for failed write + let reason = parse_exception_for_prometheus_label(&e); + inc( + FLAG_EVALUATION_ERROR_COUNTER, + &[("reason".to_string(), reason.to_string())], + 1, + ); return (None, true); } + writing_hash_key_override = true; } + // TODO I'm not sure if this is the right place to increment this counter + inc( + FLAG_HASH_KEY_WRITES_COUNTER, + &[ + ("team_id".to_string(), self.team_id.to_string()), + ( + "successful_write".to_string(), + writing_hash_key_override.to_string(), + ), + ], + 1, + ); + match get_feature_flag_hash_key_overrides( self.postgres_reader.clone(), self.team_id, @@ -301,6 +346,12 @@ impl FeatureFlagMatcher { Ok(overrides) => (Some(overrides), false), Err(e) => { error!("Failed to get feature flag hash key overrides: {:?}", e); + let reason = parse_exception_for_prometheus_label(&e); + common_metrics::inc( + FLAG_EVALUATION_ERROR_COUNTER, + &[("reason".to_string(), reason.to_string())], + 1, + ); (None, true) } } @@ -345,6 +396,12 @@ impl FeatureFlagMatcher { "Error evaluating feature flag '{}' with overrides for distinct_id '{}': {:?}", flag.key, self.distinct_id, e ); + let reason = parse_exception_for_prometheus_label(&e); + inc( + FLAG_EVALUATION_ERROR_COUNTER, + &[("reason".to_string(), reason.to_string())], + 1, + ); } } } @@ -374,6 +431,12 @@ impl FeatureFlagMatcher { error_while_computing_flags = true; // TODO add sentry exception tracking error!("Error fetching properties: {:?}", e); + let reason = parse_exception_for_prometheus_label(&e); + inc( + FLAG_EVALUATION_ERROR_COUNTER, + &[("reason".to_string(), reason.to_string())], + 1, + ); } } @@ -397,6 +460,12 @@ impl FeatureFlagMatcher { "Error evaluating feature flag '{}' for distinct_id '{}': {:?}", flag.key, self.distinct_id, e ); + let reason = parse_exception_for_prometheus_label(&e); + inc( + FLAG_EVALUATION_ERROR_COUNTER, + &[("reason".to_string(), reason.to_string())], + 1, + ); } } } @@ -1210,7 +1279,7 @@ async fn get_feature_flag_hash_key_overrides( } async fn set_feature_flag_hash_key_overrides( - postgres_writer: PostgresReader, + postgres_writer: PostgresWriter, team_id: TeamId, distinct_ids: Vec, hash_key_override: String, diff --git a/rust/feature-flags/src/flag_request.rs b/rust/feature-flags/src/flag_request.rs index 4d215867813e9..771c216834c96 100644 --- a/rust/feature-flags/src/flag_request.rs +++ b/rust/feature-flags/src/flag_request.rs @@ -1,13 +1,14 @@ use std::{collections::HashMap, sync::Arc}; use bytes::Bytes; +use common_metrics::inc; use serde::{Deserialize, Serialize}; use serde_json::Value; use tracing::instrument; use crate::{ api::FlagError, database::Client as DatabaseClient, flag_definitions::FeatureFlagList, - redis::Client as RedisClient, team::Team, + metrics_consts::FLAG_CACHE_HIT_COUNTER, redis::Client as RedisClient, team::Team, }; #[derive(Debug, Clone, Copy)] @@ -160,25 +161,38 @@ impl FlagRequest { redis_client: Arc, pg_client: Arc, ) -> Result { - // TODO add a cache hit/miss counter - match FeatureFlagList::from_redis(redis_client.clone(), team_id).await { - Ok(flags) => Ok(flags), + let mut cache_hit = false; + let flags = match FeatureFlagList::from_redis(redis_client.clone(), team_id).await { + Ok(flags) => { + cache_hit = true; + Ok(flags) + } Err(_) => match FeatureFlagList::from_pg(pg_client, team_id).await { Ok(flags) => { - // If we have the flags in postgres, but not redis, update redis so we're faster next time - // TODO: we have some counters in django for tracking these cache misses - // we should probably do the same here if let Err(e) = FeatureFlagList::update_flags_in_redis(redis_client, team_id, &flags).await { tracing::warn!("Failed to update Redis cache: {}", e); + // TODO add new metric category for this } Ok(flags) } - // TODO what kind of error should we return here? + // TODO what kind of error should we return here? This should be postgres + // I guess it can be whatever the FlagError is Err(e) => Err(e), }, - } + }; + + inc( + FLAG_CACHE_HIT_COUNTER, + &[ + ("team_id".to_string(), team_id.to_string()), + ("cache_hit".to_string(), cache_hit.to_string()), + ], + 1, + ); + + flags } } diff --git a/rust/feature-flags/src/lib.rs b/rust/feature-flags/src/lib.rs index ecce361fabc4c..051b3e27697f3 100644 --- a/rust/feature-flags/src/lib.rs +++ b/rust/feature-flags/src/lib.rs @@ -7,12 +7,14 @@ pub mod flag_definitions; pub mod flag_matching; pub mod flag_request; pub mod geoip; +pub mod metrics_consts; pub mod property_matching; pub mod redis; pub mod request_handler; pub mod router; pub mod server; pub mod team; +pub mod utils; pub mod v0_endpoint; // Test modules don't need to be compiled with main binary diff --git a/rust/feature-flags/src/metrics_consts.rs b/rust/feature-flags/src/metrics_consts.rs index e69de29bb2d1d..5ece796159739 100644 --- a/rust/feature-flags/src/metrics_consts.rs +++ b/rust/feature-flags/src/metrics_consts.rs @@ -0,0 +1,5 @@ +pub const FLAG_EVALUATION_ERROR_COUNTER: &str = "flag_evaluation_error_total"; +pub const FLAG_CACHE_HIT_COUNTER: &str = "flag_cache_hit_total"; +pub const FLAG_HASH_KEY_WRITES_COUNTER: &str = "flag_hash_key_writes_total"; +// TODO add metrics for failing to update redis? Does that really happen? +// maybe worth adding for rollout, since writing to redis is a critical path thing diff --git a/rust/feature-flags/src/router.rs b/rust/feature-flags/src/router.rs index e12b32b464795..505f18adfb008 100644 --- a/rust/feature-flags/src/router.rs +++ b/rust/feature-flags/src/router.rs @@ -9,7 +9,11 @@ use health::HealthRegistry; use tower::limit::ConcurrencyLimitLayer; use crate::{ - database::Client as DatabaseClient, geoip::GeoIpClient, redis::Client as RedisClient, + config::{Config, TeamIdsToTrack}, + database::Client as DatabaseClient, + geoip::GeoIpClient, + redis::Client as RedisClient, + utils::team_id_label_filter, v0_endpoint, }; @@ -19,6 +23,7 @@ pub struct State { pub postgres_reader: Arc, pub postgres_writer: Arc, pub geoip: Arc, + pub team_ids_to_track: TeamIdsToTrack, } pub fn router( @@ -27,8 +32,7 @@ pub fn router( postgres_writer: Arc, geoip: Arc, liveness: HealthRegistry, - metrics: bool, - concurrency: usize, + config: Config, ) -> Router where R: RedisClient + Send + Sync + 'static, @@ -39,6 +43,7 @@ where postgres_reader, postgres_writer, geoip, + team_ids_to_track: config.team_ids_to_track.clone(), }; let status_router = Router::new() @@ -48,14 +53,15 @@ where let flags_router = Router::new() .route("/flags", post(v0_endpoint::flags).get(v0_endpoint::flags)) - .layer(ConcurrencyLimitLayer::new(concurrency)) + .layer(ConcurrencyLimitLayer::new(config.max_concurrency)) .with_state(state); let router = Router::new().merge(status_router).merge(flags_router); // Don't install metrics unless asked to // Global metrics recorders can play poorly with e.g. tests - if metrics { + if config.enable_metrics { + common_metrics::set_label_filter(team_id_label_filter(config.team_ids_to_track.clone())); let recorder_handle = setup_metrics_recorder(); router.route("/metrics", get(move || ready(recorder_handle.render()))) } else { diff --git a/rust/feature-flags/src/server.rs b/rust/feature-flags/src/server.rs index dbda44fe244e1..c9e238fa8fd4e 100644 --- a/rust/feature-flags/src/server.rs +++ b/rust/feature-flags/src/server.rs @@ -69,8 +69,7 @@ where postgres_writer, geoip_service, health, - config.enable_metrics, - config.max_concurrency, + config, ); tracing::info!("listening on {:?}", listener.local_addr().unwrap()); diff --git a/rust/feature-flags/src/utils.rs b/rust/feature-flags/src/utils.rs new file mode 100644 index 0000000000000..e17b4caf13d91 --- /dev/null +++ b/rust/feature-flags/src/utils.rs @@ -0,0 +1,162 @@ +use crate::{api::FlagError, config::TeamIdsToTrack}; + +pub fn team_id_label_filter( + team_ids_to_track: TeamIdsToTrack, +) -> impl Fn(&[(String, String)]) -> Vec<(String, String)> { + move |labels: &[(String, String)]| { + labels + .iter() + .map(|(key, value)| { + if key == "team_id" { + match value.parse::() { + Ok(team_id) => { + let filtered_value = match &team_ids_to_track { + TeamIdsToTrack::All => value.clone(), + TeamIdsToTrack::TeamIds(ids) => { + if ids.contains(&team_id) { + value.clone() + } else { + "unknown".to_string() + } + } + }; + (key.clone(), filtered_value) + } + Err(_) => (key.clone(), "unknown".to_string()), + } + } else { + (key.clone(), value.clone()) + } + }) + .collect() + } +} + +pub fn parse_exception_for_prometheus_label(err: &FlagError) -> &'static str { + match err { + FlagError::DatabaseError(msg) => { + if msg.contains("statement timeout") { + "timeout" + } else if msg.contains("no more connections") { + "no_more_connections" + } else if msg.contains("Failed to fetch conditions") { + "flag_condition_retry" + } else if msg.contains("Failed to fetch group") { + "group_mapping_retry" + } else if msg.contains("Database healthcheck failed") { + "healthcheck_failed" + } else if msg.contains("query_wait_timeout") { + "query_wait_timeout" + } else { + "database_error" + } + } + FlagError::DatabaseUnavailable => "database_unavailable", + FlagError::RedisUnavailable => "redis_unavailable", + FlagError::TimeoutError => "timeout_error", + FlagError::NoGroupTypeMappings => "no_group_type_mappings", + _ => "unknown", + } +} + +#[cfg(test)] +#[test] +fn test_all_team_ids_pass_through() { + let filter = team_id_label_filter(TeamIdsToTrack::All); + + let labels = vec![ + ("env".to_string(), "production".to_string()), + ("team_id".to_string(), "123".to_string()), + ("version".to_string(), "1.0".to_string()), + ]; + + let filtered_labels = filter(&labels); + + assert_eq!(filtered_labels, labels); +} + +#[test] +fn test_specific_team_id_matches() { + let filter = team_id_label_filter(TeamIdsToTrack::TeamIds(vec![123])); + + let labels = vec![ + ("env".to_string(), "production".to_string()), + ("team_id".to_string(), "123".to_string()), + ("version".to_string(), "1.0".to_string()), + ]; + + let filtered_labels = filter(&labels); + + assert_eq!(filtered_labels, labels); +} + +#[test] +fn test_specific_team_id_does_not_match() { + let filter = team_id_label_filter(TeamIdsToTrack::TeamIds(vec![456])); + + let labels = vec![ + ("env".to_string(), "production".to_string()), + ("team_id".to_string(), "123".to_string()), + ("version".to_string(), "1.0".to_string()), + ]; + + let expected_labels = vec![ + ("env".to_string(), "production".to_string()), + ("team_id".to_string(), "unknown".to_string()), + ("version".to_string(), "1.0".to_string()), + ]; + + let filtered_labels = filter(&labels); + + assert_eq!(filtered_labels, expected_labels); +} + +#[test] +fn test_invalid_team_id_value() { + let filter = team_id_label_filter(TeamIdsToTrack::TeamIds(vec![123])); + + let labels = vec![ + ("env".to_string(), "production".to_string()), + ("team_id".to_string(), "abc".to_string()), // Invalid team_id + ("version".to_string(), "1.0".to_string()), + ]; + + let expected_labels = vec![ + ("env".to_string(), "production".to_string()), + ("team_id".to_string(), "unknown".to_string()), + ("version".to_string(), "1.0".to_string()), + ]; + + let filtered_labels = filter(&labels); + + assert_eq!(filtered_labels, expected_labels); +} + +#[test] +fn test_missing_team_id_label() { + let filter = team_id_label_filter(TeamIdsToTrack::TeamIds(vec![123])); + + let labels = vec![ + ("env".to_string(), "production".to_string()), + ("version".to_string(), "1.0".to_string()), + ]; + + let filtered_labels = filter(&labels); + + assert_eq!(filtered_labels, labels); +} + +#[test] +fn test_multiple_team_ids() { + let filter = team_id_label_filter(TeamIdsToTrack::TeamIds(vec![123, 456])); + + let labels = vec![ + ("env".to_string(), "staging".to_string()), + ("team_id".to_string(), "456".to_string()), + ("version".to_string(), "2.0".to_string()), + ]; + + let filtered_labels = filter(&labels); + + assert_eq!(filtered_labels, labels); +}