Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(capture): implement capture overflow for recordings capture #20753

Merged
merged 8 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 52 additions & 10 deletions posthog/api/capture.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,40 @@
import hashlib
import json
import re
import time
from datetime import datetime
from typing import Any, Dict, Iterator, List, Optional, Tuple

import structlog
import time
from datetime import datetime, timedelta
from dateutil import parser
from django.conf import settings
from django.http import JsonResponse
from django.utils import timezone
from django.views.decorators.csrf import csrf_exempt
from enum import Enum
from kafka.errors import KafkaError, MessageSizeTooLargeError
from kafka.producer.future import FutureRecordMetadata
from prometheus_client import Counter
from prometheus_client import Counter, Gauge
from rest_framework import status
from sentry_sdk import configure_scope
from sentry_sdk.api import capture_exception, start_span
from statshog.defaults.django import statsd
from token_bucket import Limiter, MemoryStorage
from typing import Any, Dict, Iterator, List, Optional, Tuple, Set

from ee.billing.quota_limiting import QuotaLimitingCaches
from posthog.api.utils import get_data, get_token, safe_clickhouse_string
from posthog.cache_utils import cache_for
from posthog.exceptions import generate_exception_response
from posthog.kafka_client.client import KafkaProducer, sessionRecordingKafkaProducer
from posthog.kafka_client.topics import (
KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL,
KAFKA_SESSION_RECORDING_EVENTS,
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW,
)
from posthog.logging.timing import timed
from posthog.metrics import KLUDGES_COUNTER, LABEL_RESOURCE_TYPE
from posthog.models.utils import UUIDT
from posthog.redis import get_client
from posthog.session_recordings.session_recording_helpers import (
preprocess_replay_events_for_blob_ingestion,
split_replay_events,
Expand Down Expand Up @@ -85,6 +88,12 @@
labelnames=["reason"],
)

OVERFLOWING_KEYS_LOADED_GAUGE = Gauge(
"capture_overflowing_keys_loaded",
"Number of keys loaded for the overflow redirection, per resource_type.",
labelnames=[LABEL_RESOURCE_TYPE],
)

# This is a heuristic of ids we have seen used as anonymous. As they frequently
# have significantly more traffic than non-anonymous distinct_ids, and likely
# don't refer to the same underlying person we prefer to partition them randomly
Expand All @@ -111,6 +120,13 @@
"undefined",
}

OVERFLOWING_REDIS_KEY = "@posthog/capture-overflow/"


class InputType(Enum):
EVENTS = "events"
REPLAY = "replay"


def build_kafka_event_data(
distinct_id: str,
Expand All @@ -135,14 +151,16 @@ def build_kafka_event_data(
}


def _kafka_topic(event_name: str, data: Dict, historical: bool = False) -> str:
def _kafka_topic(event_name: str, historical: bool = False, overflowing: bool = False) -> str:
# To allow for different quality of service on session recordings
# and other events, we push to a different topic.

match event_name:
case "$snapshot":
return KAFKA_SESSION_RECORDING_EVENTS
case "$snapshot_items":
if overflowing:
return KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW
return KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS
case _:
# If the token is in the TOKENS_HISTORICAL_DATA list, we push to the
Expand All @@ -158,8 +176,9 @@ def log_event(
partition_key: Optional[str],
headers: Optional[List] = None,
historical: bool = False,
overflowing: bool = False,
) -> FutureRecordMetadata:
kafka_topic = _kafka_topic(event_name, data, historical=historical)
kafka_topic = _kafka_topic(event_name, historical=historical, overflowing=overflowing)

logger.debug("logging_event", event_name=event_name, kafka_topic=kafka_topic)

Expand Down Expand Up @@ -589,13 +608,20 @@ def capture_internal(
kafka_partition_key = None

if event["event"] in SESSION_RECORDING_EVENT_NAMES:
kafka_partition_key = event["properties"]["$session_id"]

session_id = event["properties"]["$session_id"]
headers = [
("token", token),
] + extra_headers

return log_event(parsed_event, event["event"], partition_key=kafka_partition_key, headers=headers)
overflowing = False
if token in settings.REPLAY_OVERFLOW_FORCED_TOKENS:
overflowing = True
elif settings.REPLAY_OVERFLOW_SESSIONS_ENABLED:
overflowing = session_id in _list_overflowing_keys(InputType.REPLAY)

return log_event(
parsed_event, event["event"], partition_key=session_id, headers=headers, overflowing=overflowing
)

candidate_partition_key = f"{token}:{distinct_id}"

Expand Down Expand Up @@ -657,3 +683,19 @@ def is_randomly_partitioned(candidate_partition_key: str) -> bool:
keys_to_override = settings.EVENT_PARTITION_KEYS_TO_OVERRIDE

return candidate_partition_key in keys_to_override


@cache_for(timedelta(seconds=30), background_refresh=True)
def _list_overflowing_keys(input_type: InputType) -> Set[str]:
"""Retrieve the active overflows from Redis with caching and pre-fetching

cache_for will keep the old value if Redis is temporarily unavailable.
In case of a prolonged Redis outage, new pods would fail to retrieve anything and fail
to ingest, but Django is currently unable to start if the common Redis is unhealthy.
Setting REPLAY_OVERFLOW_SESSIONS_ENABLED back to false neutralizes this code path.
"""
now = timezone.now()
redis_client = get_client()
results = redis_client.zrangebyscore(f"{OVERFLOWING_REDIS_KEY}{input_type.value}", min=now.timestamp(), max="+inf")
OVERFLOWING_KEYS_LOADED_GAUGE.labels(input_type.value).set(len(results))
return {x.decode("utf-8") for x in results}
16 changes: 8 additions & 8 deletions posthog/api/test/__snapshots__/test_cohort.ambr
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# serializer version: 1
# name: TestCohort.test_async_deletion_of_cohort
'''
/* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:126 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
SELECT count(DISTINCT person_id)
FROM cohortpeople
WHERE team_id = 2
Expand All @@ -11,7 +11,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.1
'''
/* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:126 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
INSERT INTO cohortpeople
SELECT id,
2 as cohort_id,
Expand Down Expand Up @@ -84,7 +84,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.2
'''
/* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:126 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
SELECT count(DISTINCT person_id)
FROM cohortpeople
WHERE team_id = 2
Expand All @@ -94,7 +94,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.3
'''
/* user_id:123 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */
/* user_id:126 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */
SELECT count()
FROM cohortpeople
WHERE team_id = 2
Expand All @@ -104,7 +104,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.4
'''
/* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:126 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
SELECT count(DISTINCT person_id)
FROM cohortpeople
WHERE team_id = 2
Expand All @@ -114,7 +114,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.5
'''
/* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:126 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
INSERT INTO cohortpeople
SELECT id,
2 as cohort_id,
Expand Down Expand Up @@ -148,7 +148,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.6
'''
/* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:126 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
SELECT count(DISTINCT person_id)
FROM cohortpeople
WHERE team_id = 2
Expand All @@ -158,7 +158,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.7
'''
/* user_id:123 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */
/* user_id:126 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */
SELECT count()
FROM cohortpeople
WHERE team_id = 2
Expand Down
2 changes: 1 addition & 1 deletion posthog/api/test/__snapshots__/test_feature_flag.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -1739,7 +1739,7 @@
# ---
# name: TestFeatureFlag.test_creating_static_cohort.14
'''
/* user_id:197 celery:posthog.tasks.calculate_cohort.insert_cohort_from_feature_flag */
/* user_id:200 celery:posthog.tasks.calculate_cohort.insert_cohort_from_feature_flag */
SELECT count(DISTINCT person_id)
FROM person_static_cohort
WHERE team_id = 2
Expand Down
2 changes: 1 addition & 1 deletion posthog/api/test/__snapshots__/test_query.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@
# ---
# name: TestQuery.test_full_hogql_query_async
'''
/* user_id:464 celery:posthog.tasks.tasks.process_query_task */
/* user_id:467 celery:posthog.tasks.tasks.process_query_task */
SELECT events.uuid AS uuid,
events.event AS event,
events.properties AS properties,
Expand Down
79 changes: 70 additions & 9 deletions posthog/api/test/test_capture.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
from collections import Counter
from unittest import mock

import base64
import gzip
import json
import lzstring
import pathlib
import pytest
import random
import string
import structlog
import zlib
from collections import Counter
from datetime import datetime, timedelta
from datetime import timezone as tz
from typing import Any, Dict, List, Union, cast
from unittest import mock
from unittest.mock import ANY, MagicMock, call, patch
from urllib.parse import quote

import lzstring
import pytest
import structlog
from django.http import HttpResponse
from django.test.client import MULTIPART_CONTENT, Client
from django.utils import timezone
Expand All @@ -27,6 +24,9 @@
from prance import ResolvingParser
from rest_framework import status
from token_bucket import Limiter, MemoryStorage
from typing import Any, Dict, List, Union, cast
from unittest.mock import ANY, MagicMock, call, patch
from urllib.parse import quote

from ee.billing.quota_limiting import QuotaLimitingCaches
from posthog.api import capture
Expand All @@ -41,7 +41,9 @@
from posthog.kafka_client.topics import (
KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL,
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS,
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW,
)
from posthog.redis import get_client
from posthog.settings import (
DATA_UPLOAD_MAX_MEMORY_SIZE,
KAFKA_EVENTS_PLUGIN_INGESTION_TOPIC,
Expand Down Expand Up @@ -1598,6 +1600,65 @@ def test_recording_ingestion_can_write_to_blob_ingestion_topic(self, kafka_produ

assert topic_counter == Counter({KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS: 1})

@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_recording_ingestion_can_overflow_from_forced_tokens(self, kafka_produce) -> None:
with self.settings(
SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=20480,
REPLAY_OVERFLOW_FORCED_TOKENS={"another", self.team.api_token},
REPLAY_OVERFLOW_SESSIONS_ENABLED=False,
):
self._send_august_2023_version_session_recording_event(event_data=large_data_array)
topic_counter = Counter([call[1]["topic"] for call in kafka_produce.call_args_list])

assert topic_counter == Counter({KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW: 1})

@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_recording_ingestion_can_overflow_from_redis_instructions(self, kafka_produce) -> None:
with self.settings(SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=20480, REPLAY_OVERFLOW_SESSIONS_ENABLED=True):
redis = get_client()
redis.zadd(
"@posthog/capture-overflow/replay",
{
"overflowing": timezone.now().timestamp() + 1000,
"expired_overflow": timezone.now().timestamp() - 1000,
},
)

# Session is currently overflowing
self._send_august_2023_version_session_recording_event(
event_data=large_data_array, session_id="overflowing"
)
topic_counter = Counter([call[1]["topic"] for call in kafka_produce.call_args_list])
assert topic_counter == Counter({KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW: 1})

# This session's entry is expired, data should go to the main topic
kafka_produce.reset_mock()
self._send_august_2023_version_session_recording_event(
event_data=large_data_array, session_id="expired_overflow"
)
topic_counter = Counter([call[1]["topic"] for call in kafka_produce.call_args_list])
assert topic_counter == Counter({KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS: 1})

@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_recording_ingestion_ignores_overflow_from_redis_if_disabled(self, kafka_produce) -> None:
with self.settings(
SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=20480, REPLAY_OVERFLOW_SESSIONS_ENABLED=False
):
redis = get_client()
redis.zadd(
"@posthog/capture-overflow/replay",
{
"overflowing": timezone.now().timestamp() + 1000,
},
)

# Session is currently overflowing but REPLAY_OVERFLOW_SESSIONS_ENABLED is false
self._send_august_2023_version_session_recording_event(
event_data=large_data_array, session_id="overflowing"
)
topic_counter = Counter([call[1]["topic"] for call in kafka_produce.call_args_list])
assert topic_counter == Counter({KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS: 1})

@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_recording_ingestion_can_write_headers_with_the_message(self, kafka_produce: MagicMock) -> None:
with self.settings(
Expand Down
2 changes: 2 additions & 0 deletions posthog/kafka_client/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
KAFKA_SESSION_RECORDING_EVENTS = f"{KAFKA_PREFIX}session_recording_events{SUFFIX}"
# from capture to recordings blob ingestion consumer
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS = f"{KAFKA_PREFIX}session_recording_snapshot_item_events{SUFFIX}"
KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW = f"{KAFKA_PREFIX}session_recording_snapshot_item_overflow{SUFFIX}"

# from recordings consumer to clickhouse
KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = f"{KAFKA_PREFIX}clickhouse_session_replay_events{SUFFIX}"
KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = f"{KAFKA_PREFIX}clickhouse_session_recording_events{SUFFIX}"
7 changes: 6 additions & 1 deletion posthog/settings/ingestion.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os

import structlog

from posthog.settings.utils import get_from_env, get_list, get_set
Expand All @@ -23,6 +22,8 @@

QUOTA_LIMITING_ENABLED = get_from_env("QUOTA_LIMITING_ENABLED", False, type_cast=str_to_bool)

# Capture-side overflow detection for analytics events.
# Not accurate enough, superseded by detection in plugin-server and should be phased out.
PARTITION_KEY_AUTOMATIC_OVERRIDE_ENABLED = get_from_env(
"PARTITION_KEY_AUTOMATIC_OVERRIDE_ENABLED", type_cast=bool, default=False
)
Expand All @@ -31,6 +32,10 @@
"PARTITION_KEY_BUCKET_REPLENTISH_RATE", type_cast=float, default=1.0
)

# Overflow configuration for session replay
REPLAY_OVERFLOW_FORCED_TOKENS = get_set(os.getenv("REPLAY_OVERFLOW_FORCED_TOKENS", ""))
REPLAY_OVERFLOW_SESSIONS_ENABLED = get_from_env("REPLAY_OVERFLOW_SESSIONS_ENABLED", type_cast=bool, default=False)

REPLAY_RETENTION_DAYS_MIN = get_from_env("REPLAY_RETENTION_DAYS_MIN", type_cast=int, default=30)
REPLAY_RETENTION_DAYS_MAX = get_from_env("REPLAY_RETENTION_DAYS_MAX", type_cast=int, default=90)

Expand Down
Loading