From d929ca9fd98570ffd808b1f8bd060b1d1433d7b2 Mon Sep 17 00:00:00 2001 From: ted kaemming <65315+tkaemming@users.noreply.github.com> Date: Thu, 28 Mar 2024 11:35:12 -0700 Subject: [PATCH] feat(capture): Add setting to be able to disable capture overflow entirely (#21168) --- posthog/api/capture.py | 17 ++++++++--------- posthog/api/test/test_capture.py | 20 +++++++++++++++++--- posthog/settings/ingestion.py | 5 +++++ 3 files changed, 30 insertions(+), 12 deletions(-) diff --git a/posthog/api/capture.py b/posthog/api/capture.py index 73998505bb822..6b921dd27ea48 100644 --- a/posthog/api/capture.py +++ b/posthog/api/capture.py @@ -602,11 +602,6 @@ def capture_internal( token=token, ) - # We aim to always partition by {team_id}:{distinct_id} but allow - # overriding this to deal with hot partitions in specific cases. - # Setting the partition key to None means using random partitioning. - kafka_partition_key = None - if event["event"] in SESSION_RECORDING_EVENT_NAMES: session_id = event["properties"]["$session_id"] headers = [ @@ -623,13 +618,17 @@ def capture_internal( parsed_event, event["event"], partition_key=session_id, headers=headers, overflowing=overflowing ) + # We aim to always partition by {team_id}:{distinct_id} but allow + # overriding this to deal with hot partitions in specific cases. + # Setting the partition key to None means using random partitioning. candidate_partition_key = f"{token}:{distinct_id}" - if ( - distinct_id.lower() not in LIKELY_ANONYMOUS_IDS - and not is_randomly_partitioned(candidate_partition_key) - or historical + not historical + and settings.CAPTURE_ALLOW_RANDOM_PARTITIONING + and (distinct_id.lower() in LIKELY_ANONYMOUS_IDS or is_randomly_partitioned(candidate_partition_key)) ): + kafka_partition_key = None + else: kafka_partition_key = hashlib.sha256(candidate_partition_key.encode()).hexdigest() return log_event(parsed_event, event["event"], partition_key=kafka_partition_key, historical=historical) diff --git a/posthog/api/test/test_capture.py b/posthog/api/test/test_capture.py index 2a80186082dea..a0fc8826c95c6 100644 --- a/posthog/api/test/test_capture.py +++ b/posthog/api/test/test_capture.py @@ -4,6 +4,7 @@ import base64 import gzip import json +from django.test import override_settings import lzstring import pathlib import pytest @@ -281,8 +282,7 @@ def test_is_randomly_partitioned(self): assert is_randomly_partitioned(override_key) is True @patch("posthog.kafka_client.client._KafkaProducer.produce") - def test_capture_randomly_partitions_with_likely_anonymous_ids(self, kafka_produce): - """Test is_randomly_partitioned in the prescence of likely anonymous ids.""" + def _do_test_capture_with_likely_anonymous_ids(self, kafka_produce, expect_random_partitioning: bool): for distinct_id in LIKELY_ANONYMOUS_IDS: data = { "event": "$autocapture", @@ -298,9 +298,23 @@ def test_capture_randomly_partitions_with_likely_anonymous_ids(self, kafka_produ ) kafka_produce.assert_called_with( - topic=KAFKA_EVENTS_PLUGIN_INGESTION_TOPIC, data=ANY, key=None, headers=None + topic=KAFKA_EVENTS_PLUGIN_INGESTION_TOPIC, + data=ANY, + key=None if expect_random_partitioning else ANY, + headers=None, ) + if not expect_random_partitioning: + assert kafka_produce.mock_calls[0].kwargs["key"] is not None + + def test_capture_randomly_partitions_with_likely_anonymous_ids(self): + """Test is_randomly_partitioned in the prescence of likely anonymous ids, if enabled.""" + with override_settings(CAPTURE_ALLOW_RANDOM_PARTITIONING=True): + self._do_test_capture_with_likely_anonymous_ids(expect_random_partitioning=True) + + with override_settings(CAPTURE_ALLOW_RANDOM_PARTITIONING=False): + self._do_test_capture_with_likely_anonymous_ids(expect_random_partitioning=False) + def test_cached_is_randomly_partitioned(self): """Assert the behavior of is_randomly_partitioned under certain cache settings. diff --git a/posthog/settings/ingestion.py b/posthog/settings/ingestion.py index 2bc532ac9cd92..8a3a24d981ce0 100644 --- a/posthog/settings/ingestion.py +++ b/posthog/settings/ingestion.py @@ -11,6 +11,11 @@ # KEEP IN SYNC WITH plugin-server/src/config/config.ts BUFFER_CONVERSION_SECONDS = get_from_env("BUFFER_CONVERSION_SECONDS", default=60, type_cast=int) +# Whether or not random partitioning (i.e. overflow routing) should be allowed. +# (Enabling this setting does not cause messages to be randomly +# partitioned.) Note that this setting, if disabled, takes precedence over other +# partitioning-related settings below. +CAPTURE_ALLOW_RANDOM_PARTITIONING = get_from_env("CAPTURE_ALLOW_RANDOM_PARTITIONING", True, type_cast=str_to_bool) # A list of pairs (in the format 2:myLovelyId) that we should use # random partitioning for when producing events to the Kafka topic consumed by the plugin server.