diff --git a/posthog/api/capture.py b/posthog/api/capture.py index ee7e09c8ae154..73998505bb822 100644 --- a/posthog/api/capture.py +++ b/posthog/api/capture.py @@ -1,37 +1,40 @@ import hashlib import json import re -import time -from datetime import datetime -from typing import Any, Dict, Iterator, List, Optional, Tuple - import structlog +import time +from datetime import datetime, timedelta from dateutil import parser from django.conf import settings from django.http import JsonResponse from django.utils import timezone from django.views.decorators.csrf import csrf_exempt +from enum import Enum from kafka.errors import KafkaError, MessageSizeTooLargeError from kafka.producer.future import FutureRecordMetadata -from prometheus_client import Counter +from prometheus_client import Counter, Gauge from rest_framework import status from sentry_sdk import configure_scope from sentry_sdk.api import capture_exception, start_span from statshog.defaults.django import statsd from token_bucket import Limiter, MemoryStorage +from typing import Any, Dict, Iterator, List, Optional, Tuple, Set from ee.billing.quota_limiting import QuotaLimitingCaches from posthog.api.utils import get_data, get_token, safe_clickhouse_string +from posthog.cache_utils import cache_for from posthog.exceptions import generate_exception_response from posthog.kafka_client.client import KafkaProducer, sessionRecordingKafkaProducer from posthog.kafka_client.topics import ( KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL, KAFKA_SESSION_RECORDING_EVENTS, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, + KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW, ) from posthog.logging.timing import timed from posthog.metrics import KLUDGES_COUNTER, LABEL_RESOURCE_TYPE from posthog.models.utils import UUIDT +from posthog.redis import get_client from posthog.session_recordings.session_recording_helpers import ( preprocess_replay_events_for_blob_ingestion, split_replay_events, @@ -85,6 +88,12 @@ labelnames=["reason"], ) +OVERFLOWING_KEYS_LOADED_GAUGE = Gauge( + "capture_overflowing_keys_loaded", + "Number of keys loaded for the overflow redirection, per resource_type.", + labelnames=[LABEL_RESOURCE_TYPE], +) + # This is a heuristic of ids we have seen used as anonymous. As they frequently # have significantly more traffic than non-anonymous distinct_ids, and likely # don't refer to the same underlying person we prefer to partition them randomly @@ -111,6 +120,13 @@ "undefined", } +OVERFLOWING_REDIS_KEY = "@posthog/capture-overflow/" + + +class InputType(Enum): + EVENTS = "events" + REPLAY = "replay" + def build_kafka_event_data( distinct_id: str, @@ -135,7 +151,7 @@ def build_kafka_event_data( } -def _kafka_topic(event_name: str, data: Dict, historical: bool = False) -> str: +def _kafka_topic(event_name: str, historical: bool = False, overflowing: bool = False) -> str: # To allow for different quality of service on session recordings # and other events, we push to a different topic. @@ -143,6 +159,8 @@ def _kafka_topic(event_name: str, data: Dict, historical: bool = False) -> str: case "$snapshot": return KAFKA_SESSION_RECORDING_EVENTS case "$snapshot_items": + if overflowing: + return KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW return KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS case _: # If the token is in the TOKENS_HISTORICAL_DATA list, we push to the @@ -158,8 +176,9 @@ def log_event( partition_key: Optional[str], headers: Optional[List] = None, historical: bool = False, + overflowing: bool = False, ) -> FutureRecordMetadata: - kafka_topic = _kafka_topic(event_name, data, historical=historical) + kafka_topic = _kafka_topic(event_name, historical=historical, overflowing=overflowing) logger.debug("logging_event", event_name=event_name, kafka_topic=kafka_topic) @@ -589,13 +608,20 @@ def capture_internal( kafka_partition_key = None if event["event"] in SESSION_RECORDING_EVENT_NAMES: - kafka_partition_key = event["properties"]["$session_id"] - + session_id = event["properties"]["$session_id"] headers = [ ("token", token), ] + extra_headers - return log_event(parsed_event, event["event"], partition_key=kafka_partition_key, headers=headers) + overflowing = False + if token in settings.REPLAY_OVERFLOW_FORCED_TOKENS: + overflowing = True + elif settings.REPLAY_OVERFLOW_SESSIONS_ENABLED: + overflowing = session_id in _list_overflowing_keys(InputType.REPLAY) + + return log_event( + parsed_event, event["event"], partition_key=session_id, headers=headers, overflowing=overflowing + ) candidate_partition_key = f"{token}:{distinct_id}" @@ -657,3 +683,19 @@ def is_randomly_partitioned(candidate_partition_key: str) -> bool: keys_to_override = settings.EVENT_PARTITION_KEYS_TO_OVERRIDE return candidate_partition_key in keys_to_override + + +@cache_for(timedelta(seconds=30), background_refresh=True) +def _list_overflowing_keys(input_type: InputType) -> Set[str]: + """Retrieve the active overflows from Redis with caching and pre-fetching + + cache_for will keep the old value if Redis is temporarily unavailable. + In case of a prolonged Redis outage, new pods would fail to retrieve anything and fail + to ingest, but Django is currently unable to start if the common Redis is unhealthy. + Setting REPLAY_OVERFLOW_SESSIONS_ENABLED back to false neutralizes this code path. + """ + now = timezone.now() + redis_client = get_client() + results = redis_client.zrangebyscore(f"{OVERFLOWING_REDIS_KEY}{input_type.value}", min=now.timestamp(), max="+inf") + OVERFLOWING_KEYS_LOADED_GAUGE.labels(input_type.value).set(len(results)) + return {x.decode("utf-8") for x in results} diff --git a/posthog/api/test/__snapshots__/test_cohort.ambr b/posthog/api/test/__snapshots__/test_cohort.ambr index 9789176ddd21b..c7c000f17f8ed 100644 --- a/posthog/api/test/__snapshots__/test_cohort.ambr +++ b/posthog/api/test/__snapshots__/test_cohort.ambr @@ -1,7 +1,7 @@ # serializer version: 1 # name: TestCohort.test_async_deletion_of_cohort ''' - /* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:126 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -11,7 +11,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.1 ''' - /* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:126 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ INSERT INTO cohortpeople SELECT id, 2 as cohort_id, @@ -84,7 +84,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.2 ''' - /* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:126 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -94,7 +94,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.3 ''' - /* user_id:123 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ + /* user_id:126 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ SELECT count() FROM cohortpeople WHERE team_id = 2 @@ -104,7 +104,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.4 ''' - /* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:126 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -114,7 +114,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.5 ''' - /* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:126 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ INSERT INTO cohortpeople SELECT id, 2 as cohort_id, @@ -148,7 +148,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.6 ''' - /* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:126 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -158,7 +158,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.7 ''' - /* user_id:123 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ + /* user_id:126 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ SELECT count() FROM cohortpeople WHERE team_id = 2 diff --git a/posthog/api/test/__snapshots__/test_feature_flag.ambr b/posthog/api/test/__snapshots__/test_feature_flag.ambr index fbf8995d66e68..f38f19faf3f04 100644 --- a/posthog/api/test/__snapshots__/test_feature_flag.ambr +++ b/posthog/api/test/__snapshots__/test_feature_flag.ambr @@ -1739,7 +1739,7 @@ # --- # name: TestFeatureFlag.test_creating_static_cohort.14 ''' - /* user_id:197 celery:posthog.tasks.calculate_cohort.insert_cohort_from_feature_flag */ + /* user_id:200 celery:posthog.tasks.calculate_cohort.insert_cohort_from_feature_flag */ SELECT count(DISTINCT person_id) FROM person_static_cohort WHERE team_id = 2 diff --git a/posthog/api/test/__snapshots__/test_query.ambr b/posthog/api/test/__snapshots__/test_query.ambr index 628038e741728..246efec9566f1 100644 --- a/posthog/api/test/__snapshots__/test_query.ambr +++ b/posthog/api/test/__snapshots__/test_query.ambr @@ -157,7 +157,7 @@ # --- # name: TestQuery.test_full_hogql_query_async ''' - /* user_id:464 celery:posthog.tasks.tasks.process_query_task */ + /* user_id:467 celery:posthog.tasks.tasks.process_query_task */ SELECT events.uuid AS uuid, events.event AS event, events.properties AS properties, diff --git a/posthog/api/test/test_capture.py b/posthog/api/test/test_capture.py index 70ea768f2677b..2a80186082dea 100644 --- a/posthog/api/test/test_capture.py +++ b/posthog/api/test/test_capture.py @@ -1,21 +1,18 @@ +from collections import Counter +from unittest import mock + import base64 import gzip import json +import lzstring import pathlib +import pytest import random import string +import structlog import zlib -from collections import Counter from datetime import datetime, timedelta from datetime import timezone as tz -from typing import Any, Dict, List, Union, cast -from unittest import mock -from unittest.mock import ANY, MagicMock, call, patch -from urllib.parse import quote - -import lzstring -import pytest -import structlog from django.http import HttpResponse from django.test.client import MULTIPART_CONTENT, Client from django.utils import timezone @@ -27,6 +24,9 @@ from prance import ResolvingParser from rest_framework import status from token_bucket import Limiter, MemoryStorage +from typing import Any, Dict, List, Union, cast +from unittest.mock import ANY, MagicMock, call, patch +from urllib.parse import quote from ee.billing.quota_limiting import QuotaLimitingCaches from posthog.api import capture @@ -41,7 +41,9 @@ from posthog.kafka_client.topics import ( KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL, KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, + KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW, ) +from posthog.redis import get_client from posthog.settings import ( DATA_UPLOAD_MAX_MEMORY_SIZE, KAFKA_EVENTS_PLUGIN_INGESTION_TOPIC, @@ -1598,6 +1600,65 @@ def test_recording_ingestion_can_write_to_blob_ingestion_topic(self, kafka_produ assert topic_counter == Counter({KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS: 1}) + @patch("posthog.kafka_client.client._KafkaProducer.produce") + def test_recording_ingestion_can_overflow_from_forced_tokens(self, kafka_produce) -> None: + with self.settings( + SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=20480, + REPLAY_OVERFLOW_FORCED_TOKENS={"another", self.team.api_token}, + REPLAY_OVERFLOW_SESSIONS_ENABLED=False, + ): + self._send_august_2023_version_session_recording_event(event_data=large_data_array) + topic_counter = Counter([call[1]["topic"] for call in kafka_produce.call_args_list]) + + assert topic_counter == Counter({KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW: 1}) + + @patch("posthog.kafka_client.client._KafkaProducer.produce") + def test_recording_ingestion_can_overflow_from_redis_instructions(self, kafka_produce) -> None: + with self.settings(SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=20480, REPLAY_OVERFLOW_SESSIONS_ENABLED=True): + redis = get_client() + redis.zadd( + "@posthog/capture-overflow/replay", + { + "overflowing": timezone.now().timestamp() + 1000, + "expired_overflow": timezone.now().timestamp() - 1000, + }, + ) + + # Session is currently overflowing + self._send_august_2023_version_session_recording_event( + event_data=large_data_array, session_id="overflowing" + ) + topic_counter = Counter([call[1]["topic"] for call in kafka_produce.call_args_list]) + assert topic_counter == Counter({KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW: 1}) + + # This session's entry is expired, data should go to the main topic + kafka_produce.reset_mock() + self._send_august_2023_version_session_recording_event( + event_data=large_data_array, session_id="expired_overflow" + ) + topic_counter = Counter([call[1]["topic"] for call in kafka_produce.call_args_list]) + assert topic_counter == Counter({KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS: 1}) + + @patch("posthog.kafka_client.client._KafkaProducer.produce") + def test_recording_ingestion_ignores_overflow_from_redis_if_disabled(self, kafka_produce) -> None: + with self.settings( + SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=20480, REPLAY_OVERFLOW_SESSIONS_ENABLED=False + ): + redis = get_client() + redis.zadd( + "@posthog/capture-overflow/replay", + { + "overflowing": timezone.now().timestamp() + 1000, + }, + ) + + # Session is currently overflowing but REPLAY_OVERFLOW_SESSIONS_ENABLED is false + self._send_august_2023_version_session_recording_event( + event_data=large_data_array, session_id="overflowing" + ) + topic_counter = Counter([call[1]["topic"] for call in kafka_produce.call_args_list]) + assert topic_counter == Counter({KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS: 1}) + @patch("posthog.kafka_client.client._KafkaProducer.produce") def test_recording_ingestion_can_write_headers_with_the_message(self, kafka_produce: MagicMock) -> None: with self.settings( diff --git a/posthog/kafka_client/topics.py b/posthog/kafka_client/topics.py index c15b6024be4cd..27e1ce307deae 100644 --- a/posthog/kafka_client/topics.py +++ b/posthog/kafka_client/topics.py @@ -24,6 +24,8 @@ KAFKA_SESSION_RECORDING_EVENTS = f"{KAFKA_PREFIX}session_recording_events{SUFFIX}" # from capture to recordings blob ingestion consumer KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS = f"{KAFKA_PREFIX}session_recording_snapshot_item_events{SUFFIX}" +KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW = f"{KAFKA_PREFIX}session_recording_snapshot_item_overflow{SUFFIX}" + # from recordings consumer to clickhouse KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = f"{KAFKA_PREFIX}clickhouse_session_replay_events{SUFFIX}" KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = f"{KAFKA_PREFIX}clickhouse_session_recording_events{SUFFIX}" diff --git a/posthog/settings/ingestion.py b/posthog/settings/ingestion.py index ce0882aeb439f..2bc532ac9cd92 100644 --- a/posthog/settings/ingestion.py +++ b/posthog/settings/ingestion.py @@ -1,5 +1,4 @@ import os - import structlog from posthog.settings.utils import get_from_env, get_list, get_set @@ -23,6 +22,8 @@ QUOTA_LIMITING_ENABLED = get_from_env("QUOTA_LIMITING_ENABLED", False, type_cast=str_to_bool) +# Capture-side overflow detection for analytics events. +# Not accurate enough, superseded by detection in plugin-server and should be phased out. PARTITION_KEY_AUTOMATIC_OVERRIDE_ENABLED = get_from_env( "PARTITION_KEY_AUTOMATIC_OVERRIDE_ENABLED", type_cast=bool, default=False ) @@ -31,6 +32,10 @@ "PARTITION_KEY_BUCKET_REPLENTISH_RATE", type_cast=float, default=1.0 ) +# Overflow configuration for session replay +REPLAY_OVERFLOW_FORCED_TOKENS = get_set(os.getenv("REPLAY_OVERFLOW_FORCED_TOKENS", "")) +REPLAY_OVERFLOW_SESSIONS_ENABLED = get_from_env("REPLAY_OVERFLOW_SESSIONS_ENABLED", type_cast=bool, default=False) + REPLAY_RETENTION_DAYS_MIN = get_from_env("REPLAY_RETENTION_DAYS_MIN", type_cast=int, default=30) REPLAY_RETENTION_DAYS_MAX = get_from_env("REPLAY_RETENTION_DAYS_MAX", type_cast=int, default=90)