Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(capture): Add setting to be able to disable capture overflow entirely #21168

Merged
merged 7 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions posthog/api/capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,11 +602,6 @@ def capture_internal(
token=token,
)

# We aim to always partition by {team_id}:{distinct_id} but allow
# overriding this to deal with hot partitions in specific cases.
# Setting the partition key to None means using random partitioning.
kafka_partition_key = None

if event["event"] in SESSION_RECORDING_EVENT_NAMES:
session_id = event["properties"]["$session_id"]
headers = [
Expand All @@ -623,13 +618,17 @@ def capture_internal(
parsed_event, event["event"], partition_key=session_id, headers=headers, overflowing=overflowing
)

# We aim to always partition by {team_id}:{distinct_id} but allow
# overriding this to deal with hot partitions in specific cases.
# Setting the partition key to None means using random partitioning.
candidate_partition_key = f"{token}:{distinct_id}"

if (
distinct_id.lower() not in LIKELY_ANONYMOUS_IDS
and not is_randomly_partitioned(candidate_partition_key)
or historical
not historical
and settings.CAPTURE_ALLOW_RANDOM_PARTITIONING
and (distinct_id.lower() in LIKELY_ANONYMOUS_IDS or is_randomly_partitioned(candidate_partition_key))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love me some parens, and also bringing kafka_partition_key down here.

):
kafka_partition_key = None
else:
kafka_partition_key = hashlib.sha256(candidate_partition_key.encode()).hexdigest()

return log_event(parsed_event, event["event"], partition_key=kafka_partition_key, historical=historical)
Expand Down
20 changes: 17 additions & 3 deletions posthog/api/test/test_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import base64
import gzip
import json
from django.test import override_settings
import lzstring
import pathlib
import pytest
Expand Down Expand Up @@ -281,8 +282,7 @@ def test_is_randomly_partitioned(self):
assert is_randomly_partitioned(override_key) is True

@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_capture_randomly_partitions_with_likely_anonymous_ids(self, kafka_produce):
"""Test is_randomly_partitioned in the prescence of likely anonymous ids."""
def _do_test_capture_with_likely_anonymous_ids(self, kafka_produce, expect_random_partitioning: bool):
for distinct_id in LIKELY_ANONYMOUS_IDS:
data = {
"event": "$autocapture",
Expand All @@ -298,9 +298,23 @@ def test_capture_randomly_partitions_with_likely_anonymous_ids(self, kafka_produ
)

kafka_produce.assert_called_with(
topic=KAFKA_EVENTS_PLUGIN_INGESTION_TOPIC, data=ANY, key=None, headers=None
topic=KAFKA_EVENTS_PLUGIN_INGESTION_TOPIC,
data=ANY,
key=None if expect_random_partitioning else ANY,
headers=None,
)

if not expect_random_partitioning:
assert kafka_produce.mock_calls[0].kwargs["key"] is not None

def test_capture_randomly_partitions_with_likely_anonymous_ids(self):
"""Test is_randomly_partitioned in the prescence of likely anonymous ids, if enabled."""
with override_settings(CAPTURE_ALLOW_RANDOM_PARTITIONING=True):
self._do_test_capture_with_likely_anonymous_ids(expect_random_partitioning=True)

with override_settings(CAPTURE_ALLOW_RANDOM_PARTITIONING=False):
self._do_test_capture_with_likely_anonymous_ids(expect_random_partitioning=False)

def test_cached_is_randomly_partitioned(self):
"""Assert the behavior of is_randomly_partitioned under certain cache settings.

Expand Down
5 changes: 5 additions & 0 deletions posthog/settings/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@
# KEEP IN SYNC WITH plugin-server/src/config/config.ts
BUFFER_CONVERSION_SECONDS = get_from_env("BUFFER_CONVERSION_SECONDS", default=60, type_cast=int)

# Whether or not random partitioning (i.e. overflow routing) should be allowed.
# (Enabling this setting does not cause messages to be randomly
# partitioned.) Note that this setting, if disabled, takes precedence over other
# partitioning-related settings below.
CAPTURE_ALLOW_RANDOM_PARTITIONING = get_from_env("CAPTURE_ALLOW_RANDOM_PARTITIONING", True, type_cast=str_to_bool)

# A list of <team_id:distinct_id> pairs (in the format 2:myLovelyId) that we should use
# random partitioning for when producing events to the Kafka topic consumed by the plugin server.
Expand Down
Loading