From 9ad8dcad7d5b1aaa830b2363bd834540eb39ab62 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Tue, 19 Mar 2024 09:33:09 +0000 Subject: [PATCH] feat: send lib version to blobby in kafka headers (#20989) * feat: send lib version to blobby in kafka headers * less info passing around * rename refactor * Update query snapshots --------- Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> --- posthog/api/capture.py | 28 +++++++++++++++++-- .../api/test/__snapshots__/test_cohort.ambr | 16 +++++------ .../test/__snapshots__/test_feature_flag.ambr | 2 +- .../api/test/__snapshots__/test_query.ambr | 2 +- posthog/api/test/test_capture.py | 28 +++++++++++++++++-- 5 files changed, 62 insertions(+), 14 deletions(-) diff --git a/posthog/api/capture.py b/posthog/api/capture.py index 9dfc61aa3979f..ee7e09c8ae154 100644 --- a/posthog/api/capture.py +++ b/posthog/api/capture.py @@ -297,6 +297,11 @@ def drop_events_over_quota(token: str, events: List[Any]) -> List[Any]: return results +def lib_version_from_query_params(request) -> str: + # url has a ver parameter from posthog-js + return request.GET.get("ver", "unknown") + + @csrf_exempt @timed("posthog_cloud_event_endpoint") def get_event(request): @@ -475,6 +480,8 @@ def get_event(request): try: if replay_events: + lib_version = lib_version_from_query_params(request) + alternative_replay_events = preprocess_replay_events_for_blob_ingestion( replay_events, settings.SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES ) @@ -496,6 +503,7 @@ def get_event(request): sent_at, event_uuid, token, + extra_headers=[("lib_version", lib_version)], ) ) @@ -546,10 +554,24 @@ def parse_event(event): return event -def capture_internal(event, distinct_id, ip, site_url, now, sent_at, event_uuid=None, token=None, historical=False): +def capture_internal( + event, + distinct_id, + ip, + site_url, + now, + sent_at, + event_uuid=None, + token=None, + historical=False, + extra_headers: List[Tuple[str, str]] | None = None, +): if event_uuid is None: event_uuid = UUIDT() + if extra_headers is None: + extra_headers = [] + parsed_event = build_kafka_event_data( distinct_id=distinct_id, ip=ip, @@ -568,9 +590,11 @@ def capture_internal(event, distinct_id, ip, site_url, now, sent_at, event_uuid= if event["event"] in SESSION_RECORDING_EVENT_NAMES: kafka_partition_key = event["properties"]["$session_id"] + headers = [ ("token", token), - ] + ] + extra_headers + return log_event(parsed_event, event["event"], partition_key=kafka_partition_key, headers=headers) candidate_partition_key = f"{token}:{distinct_id}" diff --git a/posthog/api/test/__snapshots__/test_cohort.ambr b/posthog/api/test/__snapshots__/test_cohort.ambr index e32cd65cb7367..9789176ddd21b 100644 --- a/posthog/api/test/__snapshots__/test_cohort.ambr +++ b/posthog/api/test/__snapshots__/test_cohort.ambr @@ -1,7 +1,7 @@ # serializer version: 1 # name: TestCohort.test_async_deletion_of_cohort ''' - /* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -11,7 +11,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.1 ''' - /* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ INSERT INTO cohortpeople SELECT id, 2 as cohort_id, @@ -84,7 +84,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.2 ''' - /* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -94,7 +94,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.3 ''' - /* user_id:122 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ + /* user_id:123 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ SELECT count() FROM cohortpeople WHERE team_id = 2 @@ -104,7 +104,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.4 ''' - /* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -114,7 +114,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.5 ''' - /* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ INSERT INTO cohortpeople SELECT id, 2 as cohort_id, @@ -148,7 +148,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.6 ''' - /* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -158,7 +158,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.7 ''' - /* user_id:122 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ + /* user_id:123 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ SELECT count() FROM cohortpeople WHERE team_id = 2 diff --git a/posthog/api/test/__snapshots__/test_feature_flag.ambr b/posthog/api/test/__snapshots__/test_feature_flag.ambr index 2d11fc4500367..fbf8995d66e68 100644 --- a/posthog/api/test/__snapshots__/test_feature_flag.ambr +++ b/posthog/api/test/__snapshots__/test_feature_flag.ambr @@ -1739,7 +1739,7 @@ # --- # name: TestFeatureFlag.test_creating_static_cohort.14 ''' - /* user_id:196 celery:posthog.tasks.calculate_cohort.insert_cohort_from_feature_flag */ + /* user_id:197 celery:posthog.tasks.calculate_cohort.insert_cohort_from_feature_flag */ SELECT count(DISTINCT person_id) FROM person_static_cohort WHERE team_id = 2 diff --git a/posthog/api/test/__snapshots__/test_query.ambr b/posthog/api/test/__snapshots__/test_query.ambr index e52c9362b4398..628038e741728 100644 --- a/posthog/api/test/__snapshots__/test_query.ambr +++ b/posthog/api/test/__snapshots__/test_query.ambr @@ -157,7 +157,7 @@ # --- # name: TestQuery.test_full_hogql_query_async ''' - /* user_id:463 celery:posthog.tasks.tasks.process_query_task */ + /* user_id:464 celery:posthog.tasks.tasks.process_query_task */ SELECT events.uuid AS uuid, events.event AS event, events.properties AS properties, diff --git a/posthog/api/test/test_capture.py b/posthog/api/test/test_capture.py index 4696609ce94ad..70ea768f2677b 100644 --- a/posthog/api/test/test_capture.py +++ b/posthog/api/test/test_capture.py @@ -230,6 +230,7 @@ def _send_august_2023_version_session_recording_event( distinct_id="ghi789", timestamp=1658516991883, content_type: str | None = None, + query_params: str = "", ) -> HttpResponse: if event_data is None: # event_data is an array of RRWeb events @@ -262,7 +263,7 @@ def _send_august_2023_version_session_recording_event( post_data = {"api_key": self.team.api_token, "data": json.dumps([event for _ in range(number_of_events)])} return self.client.post( - "/s/", + "/s/" + "?" + query_params if query_params else "/s/", data=post_data, content_type=content_type or MULTIPART_CONTENT, ) @@ -1604,7 +1605,30 @@ def test_recording_ingestion_can_write_headers_with_the_message(self, kafka_prod ): self._send_august_2023_version_session_recording_event() - assert kafka_produce.mock_calls[0].kwargs["headers"] == [("token", "token123")] + assert kafka_produce.mock_calls[0].kwargs["headers"] == [ + ("token", "token123"), + ( + # without setting a version in the URL the default is unknown + "lib_version", + "unknown", + ), + ] + + @patch("posthog.kafka_client.client._KafkaProducer.produce") + def test_recording_ingestion_can_read_version_from_request(self, kafka_produce: MagicMock) -> None: + with self.settings( + SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=20480, + ): + self._send_august_2023_version_session_recording_event(query_params="ver=1.123.4") + + assert kafka_produce.mock_calls[0].kwargs["headers"] == [ + ("token", "token123"), + ( + # without setting a version in the URL the default is unknown + "lib_version", + "1.123.4", + ), + ] @patch("posthog.kafka_client.client.SessionRecordingKafkaProducer") def test_create_session_recording_kafka_with_expected_hosts(