From a21983be08286eb74c8a98ae1c10629977f54845 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Mon, 18 Mar 2024 20:41:15 +0000 Subject: [PATCH] feat: send lib version to blobby in kafka headers --- posthog/api/capture.py | 30 ++++++++++++++++++++++++++++-- posthog/api/test/test_capture.py | 28 ++++++++++++++++++++++++++-- 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/posthog/api/capture.py b/posthog/api/capture.py index 9dfc61aa3979f..0a4ed672551a6 100644 --- a/posthog/api/capture.py +++ b/posthog/api/capture.py @@ -297,6 +297,13 @@ def drop_events_over_quota(token: str, events: List[Any]) -> List[Any]: return results +def lib_from_query_params(request) -> Tuple[str, str]: + # url has a ver parameter from posthog-js + # for now we know mobile is newer than web, so we can ignore unknown + lib = request.GET.get("ver", "unknown") + return "web", lib + + @csrf_exempt @timed("posthog_cloud_event_endpoint") def get_event(request): @@ -475,6 +482,8 @@ def get_event(request): try: if replay_events: + lib, lib_version = lib_from_query_params(request) + alternative_replay_events = preprocess_replay_events_for_blob_ingestion( replay_events, settings.SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES ) @@ -496,6 +505,7 @@ def get_event(request): sent_at, event_uuid, token, + extra_headers=[("lib_version", lib_version)], ) ) @@ -546,10 +556,24 @@ def parse_event(event): return event -def capture_internal(event, distinct_id, ip, site_url, now, sent_at, event_uuid=None, token=None, historical=False): +def capture_internal( + event, + distinct_id, + ip, + site_url, + now, + sent_at, + event_uuid=None, + token=None, + historical=False, + extra_headers: List[Tuple[str, str]] | None = None, +): if event_uuid is None: event_uuid = UUIDT() + if extra_headers is None: + extra_headers = [] + parsed_event = build_kafka_event_data( distinct_id=distinct_id, ip=ip, @@ -568,9 +592,11 @@ def capture_internal(event, distinct_id, ip, site_url, now, sent_at, event_uuid= if event["event"] in SESSION_RECORDING_EVENT_NAMES: kafka_partition_key = event["properties"]["$session_id"] + headers = [ ("token", token), - ] + ] + extra_headers + return log_event(parsed_event, event["event"], partition_key=kafka_partition_key, headers=headers) candidate_partition_key = f"{token}:{distinct_id}" diff --git a/posthog/api/test/test_capture.py b/posthog/api/test/test_capture.py index 4696609ce94ad..70ea768f2677b 100644 --- a/posthog/api/test/test_capture.py +++ b/posthog/api/test/test_capture.py @@ -230,6 +230,7 @@ def _send_august_2023_version_session_recording_event( distinct_id="ghi789", timestamp=1658516991883, content_type: str | None = None, + query_params: str = "", ) -> HttpResponse: if event_data is None: # event_data is an array of RRWeb events @@ -262,7 +263,7 @@ def _send_august_2023_version_session_recording_event( post_data = {"api_key": self.team.api_token, "data": json.dumps([event for _ in range(number_of_events)])} return self.client.post( - "/s/", + "/s/" + "?" + query_params if query_params else "/s/", data=post_data, content_type=content_type or MULTIPART_CONTENT, ) @@ -1604,7 +1605,30 @@ def test_recording_ingestion_can_write_headers_with_the_message(self, kafka_prod ): self._send_august_2023_version_session_recording_event() - assert kafka_produce.mock_calls[0].kwargs["headers"] == [("token", "token123")] + assert kafka_produce.mock_calls[0].kwargs["headers"] == [ + ("token", "token123"), + ( + # without setting a version in the URL the default is unknown + "lib_version", + "unknown", + ), + ] + + @patch("posthog.kafka_client.client._KafkaProducer.produce") + def test_recording_ingestion_can_read_version_from_request(self, kafka_produce: MagicMock) -> None: + with self.settings( + SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=20480, + ): + self._send_august_2023_version_session_recording_event(query_params="ver=1.123.4") + + assert kafka_produce.mock_calls[0].kwargs["headers"] == [ + ("token", "token123"), + ( + # without setting a version in the URL the default is unknown + "lib_version", + "1.123.4", + ), + ] @patch("posthog.kafka_client.client.SessionRecordingKafkaProducer") def test_create_session_recording_kafka_with_expected_hosts(