Skip to content

Commit

Permalink
feat: count snapshot bytesless events (#19375)
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldambra authored Dec 18, 2023
1 parent c4fe25e commit c24fcec
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 4 deletions.
1 change: 0 additions & 1 deletion posthog/api/capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,6 @@ def get_event(request):

try:
if replay_events:
# The new flow we only enable if the dedicated kafka is enabled
alternative_replay_events = preprocess_replay_events_for_blob_ingestion(
replay_events, settings.SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES
)
Expand Down
18 changes: 15 additions & 3 deletions posthog/session_recordings/session_recording_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
from typing import Any, Callable, Dict, Generator, List, Tuple

from dateutil.parser import parse
from prometheus_client import Counter
from sentry_sdk.api import capture_exception

from posthog.metrics import LABEL_RESOURCE_TYPE
from posthog.session_recordings.models.metadata import (
SessionRecordingEventSummary,
)
Expand All @@ -21,6 +23,12 @@

# event.type

EVENTS_RECEIVED_WITHOUT_BYTES_COUNTER = Counter(
"recording_events_received_without_bytes",
"We want all recording events to be received with snapshot bytes so we can simplify processing, tagged by resource type.",
labelnames=[LABEL_RESOURCE_TYPE],
)


class RRWEB_MAP_EVENT_TYPE:
DomContentLoaded = 0
Expand Down Expand Up @@ -107,7 +115,8 @@ def preprocess_replay_events(
These are easy to group as we can simply make sure the total size is not higher than our max message size in Kafka.
If one message has this property, they all do (thanks to batching).
2. If this property isn't set, we estimate the size (json.dumps) and if it is small enough - merge it all together in one event
3. If not, we split out the "full snapshots" from the rest (they are typically bigger) and send them individually, trying one more time to group the rest, otherwise sending them individually
3. If not, we split out the "full snapshots" from the rest (they are typically bigger) and send them individually,
trying one more time to group the rest, otherwise sending them individually
"""

if isinstance(_events, Generator):
Expand All @@ -133,7 +142,7 @@ def new_event(items: List[dict] | None = None) -> Event:
"properties": {
"distinct_id": distinct_id,
"$session_id": session_id,
"$window_id": window_id,
"$window_id": window_id or session_id,
# We instantiate here instead of in the arg to avoid mutable default args
"$snapshot_items": items or [],
"$snapshot_source": snapshot_source,
Expand Down Expand Up @@ -163,6 +172,8 @@ def new_event(items: List[dict] | None = None) -> Event:
if current_event:
yield current_event
else:
EVENTS_RECEIVED_WITHOUT_BYTES_COUNTER.labels(resource_type="recordings").inc()

snapshot_data_list = list(flatten([event["properties"]["$snapshot_data"] for event in events], max_depth=1))

# 2. Otherwise, try and group all the events if they are small enough
Expand Down Expand Up @@ -200,7 +211,8 @@ def _process_windowed_events(
events: List[Event], fn: Callable[[List[Any]], Generator[Event, None, None]]
) -> List[Event]:
"""
Helper method to simplify grouping events by window_id and session_id, processing them with the given function, and then returning the flattened list
Helper method to simplify grouping events by window_id and session_id, processing them with the given function,
and then returning the flattened list
"""
result: List[Event] = []
snapshots_by_session_and_window_id = defaultdict(list)
Expand Down
30 changes: 30 additions & 0 deletions posthog/session_recordings/test/test_session_recording_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,36 @@ def test_new_ingestion(raw_snapshot_events, mocker: MockerFixture):
]


def test_absent_window_id_is_added(raw_snapshot_events, mocker: MockerFixture):
mocker.patch("time.time", return_value=0)

events = [
{
"event": "$snapshot",
"properties": {
"$session_id": "1234",
"$snapshot_data": {"type": 3, "timestamp": MILLISECOND_TIMESTAMP},
"distinct_id": "abc123",
},
},
]

assert list(mock_capture_flow(events, max_size_bytes=2000)[1]) == [
{
"event": "$snapshot_items",
"properties": {
"distinct_id": "abc123",
"$session_id": "1234",
"$window_id": "1234", # window_id is defaulted to session id
"$snapshot_items": [
{"type": 3, "timestamp": 1546300800000},
],
"$snapshot_source": "web",
},
}
]


def test_received_snapshot_source_is_respected_for_first_event(raw_snapshot_events, mocker: MockerFixture):
mocker.patch("time.time", return_value=0)

Expand Down

0 comments on commit c24fcec

Please sign in to comment.