-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(flags): dynamic cohort matching in rust #25776
Conversation
…g into feat/dynamic-cohorts-rust
OperatorType::In | OperatorType::NotIn => { | ||
// TODO: we handle these in cohort matching, so we can just return false here | ||
// because by the time we match properties, we've already decomposed the cohort | ||
// filter into multiple property filters | ||
Ok(false) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems safe enough to do – I handle these condition within evaluate_cohort_filters
, not with match_property
directly.
// Separate cohort and non-cohort filters | ||
let (cohort_filters, non_cohort_filters): (Vec<PropertyFilter>, Vec<PropertyFilter>) = | ||
flag_property_filters | ||
.iter() | ||
.cloned() | ||
.partition(|prop| prop.prop_type == "cohort"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to be able to evaluate the cohort properties and non-cohort properties with different methods, since I have to handle the in
/not_in
case for cohorts that I don't have to deal with using match_properties
.
// Separate cohort filters from non-cohort filters | ||
for filter in filters { | ||
if filter.prop_type == "cohort" { | ||
cohort_filters.push(filter); | ||
} else { | ||
non_cohort_filters.push(filter); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not so much that I'm separating, I'm differentiating. Basically, non_cohort_filters can be evaluated using match_property
, but cohort_filters
have to be pulled apart until they are property filters. This lets us evaluate cohorts properties like this
{"properties": {"type": "OR", "values": [{"type": "OR", "values": [{"key": "id", "type": "cohort", "value": 8, "negation": false}, {"key": "$browser", "type": "person", "value": ["Safari"], "negation": false, "operator": "exact"}]}]}}
where one property is a regular person property, and the other one needs to look up the cohort to evaluate.
if !cohort_filters.is_empty() { | ||
let cohort_ids: HashSet<CohortId> = cohort_filters | ||
.iter() | ||
.filter_map(|f| f.value.as_i64().map(|id| id as CohortId)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
value
needs to be i64
but then I cast it to match the DB type. It's dumb but safe enough.
.map(|cohort| (cohort.id, CohortOrEmpty::Cohort(cohort))) | ||
.collect(); | ||
|
||
let sorted_cohort_ids = sort_cohorts_topologically(cohort_ids, &seen_cohorts_cache); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this cohort filter depends on multiple cohorts, sort them topologically.
match filter.operator { | ||
Some(OperatorType::In) if !cohort_match => return Ok(false), | ||
Some(OperatorType::NotIn) if cohort_match => return Ok(false), | ||
_ => {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we handle In
and NotIn
here, which is a cool feature.
Oh yeah, this also implements this almost 2-year old feature request! #13145 |
Only real concern to me is that cohort recursion thing, that's going to brutalise PG if you do it on every request. Sketched out a couple of mitigations you could make, one order-of-evaluation one and one caching/fetching approach one. |
…ht idea. Next up I will implement a version that stores the dependency graph as well so that we can only cache the relevant cohorts instead of caching and iterating through cohort
// TODO: see if you can combine these two structs, like we do with cohort models | ||
// this will require not deserializing on read and instead doing it lazily, on-demand | ||
// (which, tbh, is probably a better idea) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a problem for future me, not in scope for this PR
@@ -39,6 +39,8 @@ health = { path = "../common/health" } | |||
common-metrics = { path = "../common/metrics" } | |||
tower = { workspace = true } | |||
derive_builder = "0.20.1" | |||
petgraph = "0.6.5" | |||
moka = { version = "0.12.8", features = ["future"] } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
caching lib with support for TTL and feature weighting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a heads up, this is already in the workspace, you can probably pull it in (we're using it in error tracking).
…g into feat/dynamic-cohorts-rust
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bunch of comments here, that I think you should write down somewhere separately and then just merge this as-is - it's been around too long and is too big to keep doing review iterations on efficiently for anyone involved, it getting merged is blocking you, and imo it's totally shippable - if this was deployed today I'd still approve.
) -> Self { | ||
// We use the size of the cohort list (i.e., the number of cohorts for a given team)as the weight of the entry | ||
let weigher = | ||
|_: &TeamId, value: &Vec<Cohort>| -> u32 { value.len().try_into().unwrap_or(u32::MAX) }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a thought about casts generally, I think this is totally fine and you shouldn't pay the CI time to change it:
I'd almost argue for a raw unwrap
here (or an expect
with a helpful message), under the consideration you probably do want to fail loudly if a team has more than u32::MAX
cohorts, but also, you'll never end up in this situation because fetching them would bring down postgres, you'd OOM, etc, so I'd then almost go for an as
cast instead, knowing the truncation will never happen.
@@ -54,6 +55,8 @@ where | |||
} | |||
}; | |||
|
|||
let cohort_cache = Arc::new(CohortCacheManager::new(postgres_reader.clone(), None, None)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I was you, I'd do the effort now of piping the cache sizes all the way into the Config object - during initial deployment and tuning, it's WAY nicer to be able to change those by editing the deployment's env vars directly, rather than needing a whole build cycle, and it's easy to forget since everything will work.
let cache = Cache::builder() | ||
.time_to_live(Duration::from_secs(ttl_seconds.unwrap_or(300))) // Default to 5 minutes | ||
.weigher(weigher) | ||
.max_capacity(max_capacity.unwrap_or(10_000)) // Default to 10,000 cohorts |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This default strikes me as quite low, I'd bump it an order of magnitude (or set it an order of magnitude larger) - that's a pure gut feeling though.
|
||
impl CohortCacheManager { | ||
pub fn new( | ||
postgres_reader: PostgresReader, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, but if the variable name is the same as the type name, I go for stuff like "pr: PostgresReader" - the ide tells me everything I need to know about it anyway. I'd make it reader: PostgresReader
in the struct declaration
#[derive(Clone)] | ||
pub struct CohortCacheManager { | ||
postgres_reader: PostgresReader, | ||
per_team_cohort_cache: Cache<TeamId, Vec<Cohort>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, same as below re: postgres_reader I suppose, but I know from the type that the cache is per-team (it's got TeamId as a key), and I know it's caching cohorts. You can be shorter here, the type shows up everywhere it's used.
This is purely taste though, if you disagree feel free to ignore.
|
||
/// Returns all cohorts for a given team | ||
#[instrument(skip_all)] | ||
pub async fn list_from_pg( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A note for later - this and cache hit/miss paths are GREAT place to put a metric btw - a counter of cohort fetches lets you see rates super easily, extremely useful for when you're first tuning the deployment
})?; | ||
|
||
let query = "SELECT id, name, description, team_id, deleted, filters, query, version, pending_version, count, is_calculating, is_static, errors_calculating, groups, created_by_id FROM posthog_cohort WHERE team_id = $1"; | ||
let cohorts = sqlx::query_as::<_, Cohort>(query) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An aside but I'm happy to log on a bit late if you want to sync up for an hour some day next week, I can show you how to get sqlx query macros working nicely with CI / your local dev flow - we're finding them super useful over in error tracking land because they give errors at compile time (and therefor also from rust analyser while writing the code) if there's a problem with a query or a struct definition, letting you skip writing a lot of tests asserting simple queries are correct. I wouldn't go re-writing all the existing ones in this codebase, but for new ones you might find it handy. Feel free to throw something in my calendar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd love that; I'm in Austin next week so I'll be even closer to you timezone-wise. Grabbed some time on Wednesday
/// ] | ||
/// } | ||
/// ``` | ||
pub fn to_property_filters(&self) -> Vec<PropertyFilter> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just checked - you can consume the self
here and nothing breaks, there's nowhere you want to both get a cloned copy of the inner filters and keep the wrapper around. This function becomes:
pub fn to_property_filters(self) -> Vec<PropertyFilter> {
self.values
.into_iter()
.flat_map(|value| value.values)
.collect()
}
And I'd call it something like to_inner()
. A semantic note but in rust, to_
prefixed functions almost always consume the self
, whereas as
or get
ones take a &str
(the to_
implying no allocation).
.properties | ||
.to_property_filters() | ||
.into_iter() | ||
.filter(|f| !(f.key == "id" && f.prop_type == "cohort")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use retain
here, like:
let mut props = cohort_property.properties.to_property_filters();
props.retain(|f| !(f.key == "id" && f.prop_type == "cohort"));
Ok(props)
Or you can make to_property_filters
return an impl Iter<Item = PropertyFilter>
and then do the filter().collect()
as you already do - collecting to a vec just to into_iter
and then collect again is an antipattern.
.get_cohort_id() | ||
.ok_or(FlagError::CohortFiltersParsingError)?; | ||
let match_result = | ||
evaluate_cohort_dependencies(cohort_id, target_properties, cohorts.clone())?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This /definitely/ doesn't need a clone of the cohorts vec. Cohorts are cheap, but not free, cloning the cohort set once per request is already expensive enough. This diff compiles:
@@ -1108,10 +1108,9 @@ impl FeatureFlagMatcher {
fn evaluate_cohort_dependencies(
initial_cohort_id: CohortId,
target_properties: &HashMap<String, Value>,
- cohorts: Vec<Cohort>,
+ cohorts: &[Cohort],
) -> Result<bool, FlagError> {
- let cohort_dependency_graph =
- build_cohort_dependency_graph(initial_cohort_id, cohorts.clone())?;
+ let cohort_dependency_graph = build_cohort_dependency_graph(initial_cohort_id, cohorts)?;
// We need to sort cohorts topologically to ensure we evaluate dependencies before the cohorts that depend on them.
// For example, if cohort A depends on cohort B, we need to evaluate B first to know if A matches.
@@ -1216,7 +1215,7 @@ fn apply_cohort_membership_logic(
/// The graph is acyclic, which is required for valid cohort dependencies.
fn build_cohort_dependency_graph(
initial_cohort_id: CohortId,
- cohorts: Vec<Cohort>,
+ cohorts: &[Cohort],
) -> Result<DiGraph<CohortId, ()>, FlagError> {
let mut graph = DiGraph::new();
let mut node_map = HashMap::new();
Problem
Implements cohort matching for dynamic cohorts in Rust. Required for the new
/flags
endpoint. One very exciting thing that this change does is support Cohortnot_in
matching for the new flags service, something that we didn't support in the original feature flags service. There is more work to be done to support this flow in the feature flags product (need to change the UI to expand cohort matching options and also redouser_blast_radius
), but having the platform to support this type of behavior is great.Changes
NB: I know the diff is huge, but it's also kinda something that makes sense to do in a bit chunk. I'm going to do a rigorous self-review and call out the parts that I want extra attention on, but if this is still too beefy to review then please let me know and I can break it into two parts – data modeling, and then matching.
that said, here's all that this change introduces:
cohort_definitions
, similar toflag_definitions
, this module introduces an interface for mapping Cohorts to Postgres. It also includes some utility methods for working with cohorts, includingparse_filters
, which turns a cohort property into property filters, andsort_cohorts_topologically
, which I rewrote in rust to support doing nested property matching for cohorts that contain cohorts. All of these methods have tests.OperatorType
, but rather than trying to resizeproperty_matching
to handlein/not_in
for cohorts, I do that at a higher level. I'll call out my implementation.in
andnot_in
values into theflag_matching
module. This has tests too.A few things I want to call out that I want to revisit.
Does this work well for both Cloud and self-hosted?
No impact yet
How did you test this code?
added the following new tests:
Could add more cases here, too