diff --git a/posthog/api/capture.py b/posthog/api/capture.py index 73998505bb822..3d67911f4a8b4 100644 --- a/posthog/api/capture.py +++ b/posthog/api/capture.py @@ -602,11 +602,6 @@ def capture_internal( token=token, ) - # We aim to always partition by {team_id}:{distinct_id} but allow - # overriding this to deal with hot partitions in specific cases. - # Setting the partition key to None means using random partitioning. - kafka_partition_key = None - if event["event"] in SESSION_RECORDING_EVENT_NAMES: session_id = event["properties"]["$session_id"] headers = [ @@ -623,13 +618,17 @@ def capture_internal( parsed_event, event["event"], partition_key=session_id, headers=headers, overflowing=overflowing ) + # We aim to always partition by {team_id}:{distinct_id} but allow + # overriding this to deal with hot partitions in specific cases. + # Setting the partition key to None means using random partitioning. candidate_partition_key = f"{token}:{distinct_id}" - if ( - distinct_id.lower() not in LIKELY_ANONYMOUS_IDS - and not is_randomly_partitioned(candidate_partition_key) - or historical + not historical + and settings.CAPTURE_OVERFLOW_ENABLED + and (distinct_id.lower() in LIKELY_ANONYMOUS_IDS or is_randomly_partitioned(candidate_partition_key)) ): + kafka_partition_key = None + else: kafka_partition_key = hashlib.sha256(candidate_partition_key.encode()).hexdigest() return log_event(parsed_event, event["event"], partition_key=kafka_partition_key, historical=historical) diff --git a/posthog/settings/ingestion.py b/posthog/settings/ingestion.py index 2bc532ac9cd92..3b4d039a25b20 100644 --- a/posthog/settings/ingestion.py +++ b/posthog/settings/ingestion.py @@ -11,6 +11,10 @@ # KEEP IN SYNC WITH plugin-server/src/config/config.ts BUFFER_CONVERSION_SECONDS = get_from_env("BUFFER_CONVERSION_SECONDS", default=60, type_cast=int) +# Whether or not overflow (random partitioning) should be enabled *at all*. +# Note that this setting takes precedence over other overflow-related settings +# below, if disabled. +CAPTURE_OVERFLOW_ENABLED = get_from_env("CAPTURE_OVERFLOW_ENABLED", True, type_cast=str_to_bool) # A list of pairs (in the format 2:myLovelyId) that we should use # random partitioning for when producing events to the Kafka topic consumed by the plugin server.