From c24fceca4c226d5866caa484e3bd3e06c63af00e Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Mon, 18 Dec 2023 16:19:32 +0000 Subject: [PATCH] feat: count snapshot bytesless events (#19375) --- posthog/api/capture.py | 1 - .../session_recording_helpers.py | 18 +++++++++-- .../test/test_session_recording_helpers.py | 30 +++++++++++++++++++ 3 files changed, 45 insertions(+), 4 deletions(-) diff --git a/posthog/api/capture.py b/posthog/api/capture.py index 383e0b012e28a..b00c363e6de16 100644 --- a/posthog/api/capture.py +++ b/posthog/api/capture.py @@ -470,7 +470,6 @@ def get_event(request): try: if replay_events: - # The new flow we only enable if the dedicated kafka is enabled alternative_replay_events = preprocess_replay_events_for_blob_ingestion( replay_events, settings.SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES ) diff --git a/posthog/session_recordings/session_recording_helpers.py b/posthog/session_recordings/session_recording_helpers.py index 8cf0cfd70b1bb..1eccc2be26e32 100644 --- a/posthog/session_recordings/session_recording_helpers.py +++ b/posthog/session_recordings/session_recording_helpers.py @@ -6,8 +6,10 @@ from typing import Any, Callable, Dict, Generator, List, Tuple from dateutil.parser import parse +from prometheus_client import Counter from sentry_sdk.api import capture_exception +from posthog.metrics import LABEL_RESOURCE_TYPE from posthog.session_recordings.models.metadata import ( SessionRecordingEventSummary, ) @@ -21,6 +23,12 @@ # event.type +EVENTS_RECEIVED_WITHOUT_BYTES_COUNTER = Counter( + "recording_events_received_without_bytes", + "We want all recording events to be received with snapshot bytes so we can simplify processing, tagged by resource type.", + labelnames=[LABEL_RESOURCE_TYPE], +) + class RRWEB_MAP_EVENT_TYPE: DomContentLoaded = 0 @@ -107,7 +115,8 @@ def preprocess_replay_events( These are easy to group as we can simply make sure the total size is not higher than our max message size in Kafka. If one message has this property, they all do (thanks to batching). 2. If this property isn't set, we estimate the size (json.dumps) and if it is small enough - merge it all together in one event - 3. If not, we split out the "full snapshots" from the rest (they are typically bigger) and send them individually, trying one more time to group the rest, otherwise sending them individually + 3. If not, we split out the "full snapshots" from the rest (they are typically bigger) and send them individually, + trying one more time to group the rest, otherwise sending them individually """ if isinstance(_events, Generator): @@ -133,7 +142,7 @@ def new_event(items: List[dict] | None = None) -> Event: "properties": { "distinct_id": distinct_id, "$session_id": session_id, - "$window_id": window_id, + "$window_id": window_id or session_id, # We instantiate here instead of in the arg to avoid mutable default args "$snapshot_items": items or [], "$snapshot_source": snapshot_source, @@ -163,6 +172,8 @@ def new_event(items: List[dict] | None = None) -> Event: if current_event: yield current_event else: + EVENTS_RECEIVED_WITHOUT_BYTES_COUNTER.labels(resource_type="recordings").inc() + snapshot_data_list = list(flatten([event["properties"]["$snapshot_data"] for event in events], max_depth=1)) # 2. Otherwise, try and group all the events if they are small enough @@ -200,7 +211,8 @@ def _process_windowed_events( events: List[Event], fn: Callable[[List[Any]], Generator[Event, None, None]] ) -> List[Event]: """ - Helper method to simplify grouping events by window_id and session_id, processing them with the given function, and then returning the flattened list + Helper method to simplify grouping events by window_id and session_id, processing them with the given function, + and then returning the flattened list """ result: List[Event] = [] snapshots_by_session_and_window_id = defaultdict(list) diff --git a/posthog/session_recordings/test/test_session_recording_helpers.py b/posthog/session_recordings/test/test_session_recording_helpers.py index 652373c7868e0..8597b70084c42 100644 --- a/posthog/session_recordings/test/test_session_recording_helpers.py +++ b/posthog/session_recordings/test/test_session_recording_helpers.py @@ -183,6 +183,36 @@ def test_new_ingestion(raw_snapshot_events, mocker: MockerFixture): ] +def test_absent_window_id_is_added(raw_snapshot_events, mocker: MockerFixture): + mocker.patch("time.time", return_value=0) + + events = [ + { + "event": "$snapshot", + "properties": { + "$session_id": "1234", + "$snapshot_data": {"type": 3, "timestamp": MILLISECOND_TIMESTAMP}, + "distinct_id": "abc123", + }, + }, + ] + + assert list(mock_capture_flow(events, max_size_bytes=2000)[1]) == [ + { + "event": "$snapshot_items", + "properties": { + "distinct_id": "abc123", + "$session_id": "1234", + "$window_id": "1234", # window_id is defaulted to session id + "$snapshot_items": [ + {"type": 3, "timestamp": 1546300800000}, + ], + "$snapshot_source": "web", + }, + } + ] + + def test_received_snapshot_source_is_respected_for_first_event(raw_snapshot_events, mocker: MockerFixture): mocker.patch("time.time", return_value=0)