Skip to content

Commit

Permalink
fix(cohorts): Optimise person loop for flag matching (#18846)
Browse files Browse the repository at this point in the history
  • Loading branch information
neilkakkar authored Nov 23, 2023
1 parent 3a32c10 commit 7540489
Show file tree
Hide file tree
Showing 6 changed files with 170 additions and 622 deletions.
99 changes: 56 additions & 43 deletions posthog/api/cohort.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import json

from django.db import DatabaseError
from sentry_sdk import start_span
import structlog

from posthog.models.feature_flag.flag_matching import (
Expand All @@ -10,7 +11,8 @@
get_feature_flag_hash_key_overrides,
)
from posthog.models.person.person import PersonDistinctId
from posthog.models.property.property import Property
from posthog.models.property.property import Property, PropertyGroup
from posthog.queries.base import property_group_to_Q
from posthog.queries.insight import insight_sync_execute
import posthoganalytics
from posthog.metrics import LABEL_TEAM_ID
Expand Down Expand Up @@ -47,6 +49,7 @@
INSIGHT_TRENDS,
LIMIT,
OFFSET,
PropertyOperatorType,
)
from posthog.event_usage import report_user_action
from posthog.hogql.context import HogQLContext
Expand Down Expand Up @@ -584,11 +587,20 @@ def get_cohort_actors_for_feature_flag(cohort_id: int, flag: str, team_id: int,
for property in property_list:
default_person_properties.update(get_default_person_property(property, cohorts_cache))

flag_property_conditions = [Filter(data=condition).property_groups for condition in feature_flag.conditions]
flag_property_group = PropertyGroup(type=PropertyOperatorType.OR, values=flag_property_conditions)

try:
# QuerySet.Iterator() doesn't work with pgbouncer, it will load everything into memory and then stream
# which doesn't work for us, so need a manual chunking here.
# Because of this pgbouncer transaction pooling mode, we can't use server-side cursors.
queryset = Person.objects.filter(team_id=team_id).order_by("id")
# We pre-filter all persons to be ones that will match the feature flag, so that we don't have to
# iterate through all persons
queryset = (
Person.objects.filter(team_id=team_id)
.filter(property_group_to_Q(flag_property_group, cohorts_cache=cohorts_cache))
.order_by("id")
)
# get batchsize number of people at a time
start = 0
batch_of_persons = queryset[start : start + batchsize]
Expand All @@ -614,48 +626,49 @@ def get_cohort_actors_for_feature_flag(cohort_id: int, flag: str, team_id: int,
if len(all_persons) == 0:
break

for person in all_persons:
# ignore almost-deleted persons / persons with no distinct ids
if len(person.distinct_ids) == 0:
continue

distinct_id = person.distinct_ids[0]
person_overrides = {}
if feature_flag.ensure_experience_continuity:
# :TRICKY: This is inefficient because it tries to get the hashkey overrides one by one.
# But reusing functions is better for maintainability. Revisit optimising if this becomes a bottleneck.
person_overrides = get_feature_flag_hash_key_overrides(
team_id, [distinct_id], person_id_to_distinct_id_mapping={person.id: distinct_id}
)
with start_span(op="batch_flag_matching_with_overrides"):
for person in all_persons:
# ignore almost-deleted persons / persons with no distinct ids
if len(person.distinct_ids) == 0:
continue

distinct_id = person.distinct_ids[0]
person_overrides = {}
if feature_flag.ensure_experience_continuity:
# :TRICKY: This is inefficient because it tries to get the hashkey overrides one by one.
# But reusing functions is better for maintainability. Revisit optimising if this becomes a bottleneck.
person_overrides = get_feature_flag_hash_key_overrides(
team_id, [distinct_id], person_id_to_distinct_id_mapping={person.id: distinct_id}
)

try:
match = FeatureFlagMatcher(
[feature_flag],
distinct_id,
groups={},
cache=matcher_cache,
hash_key_overrides=person_overrides,
property_value_overrides={**default_person_properties, **person.properties},
group_property_value_overrides={},
cohorts_cache=cohorts_cache,
).get_match(feature_flag)
if match.match:
uuids_to_add_to_cohort.append(str(person.uuid))
except (DatabaseError, ValueError, ValidationError):
logger.exception(
"Error evaluating feature flag for person", person_uuid=str(person.uuid), team_id=team_id
)
except Exception as err:
# matching errors are not fatal, so we just log them and move on.
# Capturing in sentry for now just in case there are some unexpected errors
# we did not account for.
capture_exception(err)

if len(uuids_to_add_to_cohort) >= batchsize - 1:
cohort.insert_users_list_by_uuid(
uuids_to_add_to_cohort, insert_in_clickhouse=True, batchsize=batchsize
)
uuids_to_add_to_cohort = []
try:
match = FeatureFlagMatcher(
[feature_flag],
distinct_id,
groups={},
cache=matcher_cache,
hash_key_overrides=person_overrides,
property_value_overrides={**default_person_properties, **person.properties},
group_property_value_overrides={},
cohorts_cache=cohorts_cache,
).get_match(feature_flag)
if match.match:
uuids_to_add_to_cohort.append(str(person.uuid))
except (DatabaseError, ValueError, ValidationError):
logger.exception(
"Error evaluating feature flag for person", person_uuid=str(person.uuid), team_id=team_id
)
except Exception as err:
# matching errors are not fatal, so we just log them and move on.
# Capturing in sentry for now just in case there are some unexpected errors
# we did not account for.
capture_exception(err)

if len(uuids_to_add_to_cohort) >= batchsize:
cohort.insert_users_list_by_uuid(
uuids_to_add_to_cohort, insert_in_clickhouse=True, batchsize=batchsize
)
uuids_to_add_to_cohort = []

start += batchsize
batch_of_persons = queryset[start : start + batchsize]
Expand Down
1 change: 1 addition & 0 deletions posthog/api/feature_flag.py
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,7 @@ def create_static_cohort_for_flag(self, request: request.Request, **kwargs):
"is_static": True,
"key": feature_flag_key,
"name": f'Users with feature flag {feature_flag_key} enabled at {datetime.now().strftime("%Y-%m-%d %H:%M:%S")}',
"is_calculating": True,
},
context={
"request": request,
Expand Down
Loading

0 comments on commit 7540489

Please sign in to comment.