Skip to content

Commit

Permalink
chore(decide): More meaningful profiling (#17729)
Browse files Browse the repository at this point in the history
  • Loading branch information
neilkakkar authored Oct 3, 2023
1 parent b9e358c commit 9b85453
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 112 deletions.
210 changes: 106 additions & 104 deletions posthog/models/feature_flag/flag_matching.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from django.db.models.fields import BooleanField
from django.db.models import Q
from django.db.models.query import QuerySet
from sentry_sdk.api import capture_exception
from sentry_sdk.api import capture_exception, start_span
from posthog.metrics import LABEL_TEAM_ID

from posthog.models.filters import Filter
Expand Down Expand Up @@ -609,115 +609,117 @@ def get_all_feature_flags(
feature_flag.ensure_experience_continuity for feature_flag in all_feature_flags
)

# check every 10 seconds whether the database is alive or not
is_database_alive = postgres_healthcheck.is_connected()

if not is_database_alive or not flags_have_experience_continuity_enabled:
return _get_all_feature_flags(
all_feature_flags,
team_id,
distinct_id,
groups=groups,
property_value_overrides=property_value_overrides,
group_property_value_overrides=group_property_value_overrides,
skip_database_flags=not is_database_alive,
)

# For flags with experience continuity enabled, we want a consistent distinct_id that doesn't change,
# no matter what other distinct_ids the user has.
# FeatureFlagHashKeyOverride stores a distinct_id (hash_key_override) given a flag, person_id, and team_id.
with start_span(op="without_experience_continuity"):
# check every 10 seconds whether the database is alive or not
is_database_alive = postgres_healthcheck.is_connected()

if not is_database_alive or not flags_have_experience_continuity_enabled:
return _get_all_feature_flags(
all_feature_flags,
team_id,
distinct_id,
groups=groups,
property_value_overrides=property_value_overrides,
group_property_value_overrides=group_property_value_overrides,
skip_database_flags=not is_database_alive,
)

should_write_hash_key_override = False
writing_hash_key_override = False
# This is the write-path for experience continuity flags. When a hash_key_override is sent to decide,
# we want to store it in the database, and then use it in the read-path to get flags with experience continuity enabled.
if hash_key_override is not None and not settings.DECIDE_SKIP_HASH_KEY_OVERRIDE_WRITES:
# First, check if the hash_key_override is already in the database.
# We don't have to check this in an ideal world, but read replica operations are much more resilient than write operations.
# So, if an extra query check helps us avoid the write path, it's worth it.
with start_span(op="with_experience_continuity_write_path"):
# For flags with experience continuity enabled, we want a consistent distinct_id that doesn't change,
# no matter what other distinct_ids the user has.
# FeatureFlagHashKeyOverride stores a distinct_id (hash_key_override) given a flag, person_id, and team_id.
should_write_hash_key_override = False
writing_hash_key_override = False
# This is the write-path for experience continuity flags. When a hash_key_override is sent to decide,
# we want to store it in the database, and then use it in the read-path to get flags with experience continuity enabled.
if hash_key_override is not None and not settings.DECIDE_SKIP_HASH_KEY_OVERRIDE_WRITES:
# First, check if the hash_key_override is already in the database.
# We don't have to check this in an ideal world, but read replica operations are much more resilient than write operations.
# So, if an extra query check helps us avoid the write path, it's worth it.

try:
with execute_with_timeout(FLAG_MATCHING_QUERY_TIMEOUT_MS, DATABASE_FOR_FLAG_MATCHING) as cursor:
distinct_ids = [distinct_id, str(hash_key_override)]
query = """
WITH target_person_ids AS (
SELECT team_id, person_id FROM posthog_persondistinctid WHERE team_id = %(team_id)s AND distinct_id IN %(distinct_ids)s
),
existing_overrides AS (
SELECT team_id, person_id, feature_flag_key, hash_key FROM posthog_featureflaghashkeyoverride
WHERE team_id = %(team_id)s AND person_id IN (SELECT person_id FROM target_person_ids)
)
SELECT key FROM posthog_featureflag WHERE team_id = %(team_id)s AND ensure_experience_continuity = TRUE AND active = TRUE AND deleted = FALSE
AND key NOT IN (SELECT feature_flag_key FROM existing_overrides)
"""
cursor.execute(query, {"team_id": team_id, "distinct_ids": tuple(distinct_ids)}) # type: ignore
flags_with_no_overrides = [row[0] for row in cursor.fetchall()]
should_write_hash_key_override = len(flags_with_no_overrides) > 0
except Exception as e:
handle_feature_flag_exception(e, "[Feature Flags] Error figuring out hash key overrides")

if should_write_hash_key_override:
try:
hash_key_override = str(hash_key_override)

# :TRICKY: There are a few cases for write we need to handle:
# 1. Ingestion delay causing the person to not have been created yet or the distinct_id not yet associated
# 2. Merging of two different already existing persons, which results in 1 person_id being deleted and ff hash key overrides to be moved.
# 3. Person being deleted via UI or API (this is rare)
#
# In all cases, we simply try to find all personIDs associated with the distinct_id
# and the hash_key_override, and add overrides for all these personIDs.
# On merge, if a person is deleted, it is fine because the below line in plugin-server will take care of it.
# https://github.com/PostHog/posthog/blob/master/plugin-server/src/worker/ingestion/person-state.ts#L696 (addFeatureFlagHashKeysForMergedPerson)

writing_hash_key_override = set_feature_flag_hash_key_overrides(
team_id, [distinct_id, hash_key_override], hash_key_override
)
team_id_label = label_for_team_id_to_track(team_id)
FLAG_HASH_KEY_WRITES_COUNTER.labels(
team_id=team_id_label, successful_write=writing_hash_key_override
).inc()
with execute_with_timeout(FLAG_MATCHING_QUERY_TIMEOUT_MS, DATABASE_FOR_FLAG_MATCHING) as cursor:
distinct_ids = [distinct_id, str(hash_key_override)]
query = """
WITH target_person_ids AS (
SELECT team_id, person_id FROM posthog_persondistinctid WHERE team_id = %(team_id)s AND distinct_id IN %(distinct_ids)s
),
existing_overrides AS (
SELECT team_id, person_id, feature_flag_key, hash_key FROM posthog_featureflaghashkeyoverride
WHERE team_id = %(team_id)s AND person_id IN (SELECT person_id FROM target_person_ids)
)
SELECT key FROM posthog_featureflag WHERE team_id = %(team_id)s AND ensure_experience_continuity = TRUE AND active = TRUE AND deleted = FALSE
AND key NOT IN (SELECT feature_flag_key FROM existing_overrides)
"""
cursor.execute(query, {"team_id": team_id, "distinct_ids": tuple(distinct_ids)}) # type: ignore
flags_with_no_overrides = [row[0] for row in cursor.fetchall()]
should_write_hash_key_override = len(flags_with_no_overrides) > 0
except Exception as e:
# If the database is in read-only mode, we can't handle experience continuity flags,
# since the set_feature_flag_hash_key_overrides call will fail.

# For this case, and for any other case, do not error out on decide, just continue assuming continuity couldn't happen.
# At the same time, don't set db down, because the read-replica might still be up.
handle_feature_flag_exception(
e, "[Feature Flags] Error while setting feature flag hash key overrides", set_healthcheck=False
)
handle_feature_flag_exception(e, "[Feature Flags] Error figuring out hash key overrides")

if should_write_hash_key_override:
try:
hash_key_override = str(hash_key_override)

# :TRICKY: There are a few cases for write we need to handle:
# 1. Ingestion delay causing the person to not have been created yet or the distinct_id not yet associated
# 2. Merging of two different already existing persons, which results in 1 person_id being deleted and ff hash key overrides to be moved.
# 3. Person being deleted via UI or API (this is rare)
#
# In all cases, we simply try to find all personIDs associated with the distinct_id
# and the hash_key_override, and add overrides for all these personIDs.
# On merge, if a person is deleted, it is fine because the below line in plugin-server will take care of it.
# https://github.com/PostHog/posthog/blob/master/plugin-server/src/worker/ingestion/person-state.ts#L696 (addFeatureFlagHashKeysForMergedPerson)

writing_hash_key_override = set_feature_flag_hash_key_overrides(
team_id, [distinct_id, hash_key_override], hash_key_override
)
team_id_label = label_for_team_id_to_track(team_id)
FLAG_HASH_KEY_WRITES_COUNTER.labels(
team_id=team_id_label, successful_write=writing_hash_key_override
).inc()
except Exception as e:
# If the database is in read-only mode, we can't handle experience continuity flags,
# since the set_feature_flag_hash_key_overrides call will fail.

# For this case, and for any other case, do not error out on decide, just continue assuming continuity couldn't happen.
# At the same time, don't set db down, because the read-replica might still be up.
handle_feature_flag_exception(
e, "[Feature Flags] Error while setting feature flag hash key overrides", set_healthcheck=False
)

# This is the read-path for experience continuity. We need to get the overrides, and to do that, we get the person_id.
using_database = None
try:
# when we're writing a hash_key_override, we query the main database, not the replica
# this is because we need to make sure the write is successful before we read it
using_database = "default" if writing_hash_key_override else DATABASE_FOR_FLAG_MATCHING
person_overrides = {}
with execute_with_timeout(FLAG_MATCHING_QUERY_TIMEOUT_MS, using_database):
target_distinct_ids = [distinct_id]
if hash_key_override is not None:
target_distinct_ids.append(str(hash_key_override))
person_overrides = get_feature_flag_hash_key_overrides(team_id, target_distinct_ids, using_database)

except Exception as e:
handle_feature_flag_exception(
e,
f"[Feature Flags] Error fetching hash key overrides from {using_database} db",
set_healthcheck=not writing_hash_key_override,
)
# database is down, we can't handle experience continuity flags at all.
# Treat this same as if there are no experience continuity flags.
# This automatically sets 'errorsWhileComputingFlags' to True.
return _get_all_feature_flags(
all_feature_flags,
team_id,
distinct_id,
groups=groups,
property_value_overrides=property_value_overrides,
group_property_value_overrides=group_property_value_overrides,
skip_database_flags=True,
)
with start_span(op="with_experience_continuity_read_path"):
using_database = None
try:
# when we're writing a hash_key_override, we query the main database, not the replica
# this is because we need to make sure the write is successful before we read it
using_database = "default" if writing_hash_key_override else DATABASE_FOR_FLAG_MATCHING
person_overrides = {}
with execute_with_timeout(FLAG_MATCHING_QUERY_TIMEOUT_MS, using_database):
target_distinct_ids = [distinct_id]
if hash_key_override is not None:
target_distinct_ids.append(str(hash_key_override))
person_overrides = get_feature_flag_hash_key_overrides(team_id, target_distinct_ids, using_database)

except Exception as e:
handle_feature_flag_exception(
e,
f"[Feature Flags] Error fetching hash key overrides from {using_database} db",
set_healthcheck=not writing_hash_key_override,
)
# database is down, we can't handle experience continuity flags at all.
# Treat this same as if there are no experience continuity flags.
# This automatically sets 'errorsWhileComputingFlags' to True.
return _get_all_feature_flags(
all_feature_flags,
team_id,
distinct_id,
groups=groups,
property_value_overrides=property_value_overrides,
group_property_value_overrides=group_property_value_overrides,
skip_database_flags=True,
)

return _get_all_feature_flags(
all_feature_flags,
Expand Down
45 changes: 37 additions & 8 deletions posthog/settings/sentry.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,39 @@
from posthog.settings import get_from_env
from posthog.settings.base_variables import TEST

from datetime import datetime, timezone
from dateutil import parser
from random import random
from datetime import timedelta


def before_send_transaction(event, hint):
url_string = event.get("request", {}).get("url")
if url_string and "decide" in url_string:
DECIDE_SAMPLE_RATE = 0.00001 # 0.001%
should_sample = random() < DECIDE_SAMPLE_RATE

transaction_start_time = event.get("start_timestamp")
transaction_end_time = event.get("timestamp")
if transaction_start_time and transaction_end_time:
try:
parsed_start_time = parser.parse(transaction_start_time)
parsed_end_time = parser.parse(transaction_end_time)

duration = parsed_end_time - parsed_start_time

if duration >= timedelta(seconds=8):
# return all events for transactions that took more than 8 seconds
return event
elif duration > timedelta(seconds=2):
# very high sample rate for transactions that took more than 2 seconds
return event if random() < 0.5 else None

except Exception:
return event if should_sample else None

return event if should_sample else None
else:
return event


def traces_sampler(sampling_context: dict) -> float:
Expand Down Expand Up @@ -38,13 +70,9 @@ def traces_sampler(sampling_context: dict) -> float:
return 0.0000001 # 0.00001%
# Get more traces for /decide than other high volume endpoints
elif path.startswith("/decide"):
# Get the current time in GMT
current_time_gmt = datetime.now(timezone.utc)
# Check if the time is between 5 and 6:59 am GMT, where we get spikes of latency
# so we can get more traces to debug
if 5 <= current_time_gmt.hour < 7:
return 0.001 # 0.1%
return 0.00001 # 0.001%
# decide sampling happens in before_send_transaction,
# where we sample on duration instead of no. of requests
return 1.0 # 100%
# Probes/monitoring endpoints
elif path.startswith(("/_health", "/_readyz", "/_livez")):
return 0.00001 # 0.001%
Expand Down Expand Up @@ -103,6 +131,7 @@ def sentry_init() -> None:
# Configures the sample rate for error events, in the range of 0.0 to 1.0 (default).
# If set to 0.1 only 10% of error events will be sent. Events are picked randomly.
traces_sampler=traces_sampler,
before_send_transaction=before_send_transaction,
_experiments={
# https://docs.sentry.io/platforms/python/profiling/
# The profiles_sample_rate setting is relative to the traces_sample_rate setting.
Expand Down

0 comments on commit 9b85453

Please sign in to comment.