Skip to content

Commit

Permalink
fix: message size two large (#23304)
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldambra authored Jun 28, 2024
1 parent a02e9c6 commit 38cf761
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 59 deletions.
84 changes: 27 additions & 57 deletions posthog/api/capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,31 +547,43 @@ def get_event(request):
replay_events, settings.SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES
)

replay_futures: list[FutureRecordMetadata | None] = []
replay_futures: list[tuple[FutureRecordMetadata, tuple, dict]] = []

# We want to be super careful with our new ingestion flow for now so the whole thing is separated
# This is mostly a copy of above except we only log, we don't error out
if alternative_replay_events:
processed_events = list(preprocess_events(alternative_replay_events))
for event, event_uuid, distinct_id in processed_events:
replay_futures.append(
capture_internal_with_message_replacement(
event,
distinct_id,
ip,
site_url,
now,
sent_at,
event_uuid,
token,
lib_version,
)
capture_args = (
event,
distinct_id,
ip,
site_url,
now,
sent_at,
event_uuid,
token,
)
capture_kwargs = {
"extra_headers": [("lib_version", lib_version)],
}
this_future = capture_internal(*capture_args, **capture_kwargs)
replay_futures.append((this_future, capture_args, capture_kwargs))

start_time = time.monotonic()
for future in replay_futures:
for future, args, kwargs in replay_futures:
if future is not None:
future.get(timeout=settings.KAFKA_PRODUCE_ACK_TIMEOUT_SECONDS - (time.monotonic() - start_time))
try:
future.get(
timeout=settings.KAFKA_PRODUCE_ACK_TIMEOUT_SECONDS - (time.monotonic() - start_time)
)
except MessageSizeTooLargeError:
REPLAY_MESSAGE_SIZE_TOO_LARGE_COUNTER.inc()
warning_event = replace_with_warning(args[0])
if warning_event:
warning_future = capture_internal(warning_event, *args[1:], **kwargs)
warning_future.get(timeout=settings.KAFKA_PRODUCE_ACK_TIMEOUT_SECONDS)

except ValueError as e:
with sentry_sdk.push_scope() as scope:
scope.set_tag("capture-pathway", "replay")
Expand Down Expand Up @@ -660,48 +672,6 @@ def replace_with_warning(event: dict[str, Any]) -> dict[str, Any] | None:
return None


def capture_internal_with_message_replacement(
event: dict[str, Any],
distinct_id: str,
ip: str,
site_url: str,
now: datetime,
sent_at: datetime | None,
event_uuid: UUIDT,
token: str | None,
lib_version: str,
) -> FutureRecordMetadata | None:
try:
return capture_internal(
event,
distinct_id,
ip,
site_url,
now,
sent_at,
event_uuid,
token,
extra_headers=[("lib_version", lib_version)],
)
except MessageSizeTooLargeError:
REPLAY_MESSAGE_SIZE_TOO_LARGE_COUNTER.inc()
warning_event = replace_with_warning(event)
if not warning_event:
return None

return capture_internal(
warning_event,
distinct_id,
ip,
site_url,
now,
sent_at,
event_uuid,
token,
extra_headers=[("lib_version", lib_version)],
)


def preprocess_events(events: list[dict[str, Any]]) -> Iterator[tuple[dict[str, Any], UUIDT, str]]:
for event in events:
event_uuid = UUIDT()
Expand Down
10 changes: 8 additions & 2 deletions posthog/api/test/test_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,16 @@ def test_capture_snapshot_event(self, _kafka_produce: MagicMock) -> None:

@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_capture_snapshot_event_too_large(self, kafka_produce: MagicMock) -> None:
kafka_produce.side_effect = [
mock_future = MagicMock()

mock_future.get.side_effect = [
MessageSizeTooLargeError("Message size too large"),
None, # Return None for successful calls
None,
]

# kafka_produce return this future, so that when capture calls `.get` on it, we can control the behavior
kafka_produce.return_value = mock_future

response = self._send_august_2023_version_session_recording_event(
event_data=[
{"type": 2, "data": {"lots": "of data"}, "$window_id": "the window id", "timestamp": 1234567890}
Expand Down

0 comments on commit 38cf761

Please sign in to comment.