From a21983be08286eb74c8a98ae1c10629977f54845 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Mon, 18 Mar 2024 20:41:15 +0000 Subject: [PATCH 1/4] feat: send lib version to blobby in kafka headers --- posthog/api/capture.py | 30 ++++++++++++++++++++++++++++-- posthog/api/test/test_capture.py | 28 ++++++++++++++++++++++++++-- 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/posthog/api/capture.py b/posthog/api/capture.py index 9dfc61aa3979f..0a4ed672551a6 100644 --- a/posthog/api/capture.py +++ b/posthog/api/capture.py @@ -297,6 +297,13 @@ def drop_events_over_quota(token: str, events: List[Any]) -> List[Any]: return results +def lib_from_query_params(request) -> Tuple[str, str]: + # url has a ver parameter from posthog-js + # for now we know mobile is newer than web, so we can ignore unknown + lib = request.GET.get("ver", "unknown") + return "web", lib + + @csrf_exempt @timed("posthog_cloud_event_endpoint") def get_event(request): @@ -475,6 +482,8 @@ def get_event(request): try: if replay_events: + lib, lib_version = lib_from_query_params(request) + alternative_replay_events = preprocess_replay_events_for_blob_ingestion( replay_events, settings.SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES ) @@ -496,6 +505,7 @@ def get_event(request): sent_at, event_uuid, token, + extra_headers=[("lib_version", lib_version)], ) ) @@ -546,10 +556,24 @@ def parse_event(event): return event -def capture_internal(event, distinct_id, ip, site_url, now, sent_at, event_uuid=None, token=None, historical=False): +def capture_internal( + event, + distinct_id, + ip, + site_url, + now, + sent_at, + event_uuid=None, + token=None, + historical=False, + extra_headers: List[Tuple[str, str]] | None = None, +): if event_uuid is None: event_uuid = UUIDT() + if extra_headers is None: + extra_headers = [] + parsed_event = build_kafka_event_data( distinct_id=distinct_id, ip=ip, @@ -568,9 +592,11 @@ def capture_internal(event, distinct_id, ip, site_url, now, sent_at, event_uuid= if event["event"] in SESSION_RECORDING_EVENT_NAMES: kafka_partition_key = event["properties"]["$session_id"] + headers = [ ("token", token), - ] + ] + extra_headers + return log_event(parsed_event, event["event"], partition_key=kafka_partition_key, headers=headers) candidate_partition_key = f"{token}:{distinct_id}" diff --git a/posthog/api/test/test_capture.py b/posthog/api/test/test_capture.py index 4696609ce94ad..70ea768f2677b 100644 --- a/posthog/api/test/test_capture.py +++ b/posthog/api/test/test_capture.py @@ -230,6 +230,7 @@ def _send_august_2023_version_session_recording_event( distinct_id="ghi789", timestamp=1658516991883, content_type: str | None = None, + query_params: str = "", ) -> HttpResponse: if event_data is None: # event_data is an array of RRWeb events @@ -262,7 +263,7 @@ def _send_august_2023_version_session_recording_event( post_data = {"api_key": self.team.api_token, "data": json.dumps([event for _ in range(number_of_events)])} return self.client.post( - "/s/", + "/s/" + "?" + query_params if query_params else "/s/", data=post_data, content_type=content_type or MULTIPART_CONTENT, ) @@ -1604,7 +1605,30 @@ def test_recording_ingestion_can_write_headers_with_the_message(self, kafka_prod ): self._send_august_2023_version_session_recording_event() - assert kafka_produce.mock_calls[0].kwargs["headers"] == [("token", "token123")] + assert kafka_produce.mock_calls[0].kwargs["headers"] == [ + ("token", "token123"), + ( + # without setting a version in the URL the default is unknown + "lib_version", + "unknown", + ), + ] + + @patch("posthog.kafka_client.client._KafkaProducer.produce") + def test_recording_ingestion_can_read_version_from_request(self, kafka_produce: MagicMock) -> None: + with self.settings( + SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=20480, + ): + self._send_august_2023_version_session_recording_event(query_params="ver=1.123.4") + + assert kafka_produce.mock_calls[0].kwargs["headers"] == [ + ("token", "token123"), + ( + # without setting a version in the URL the default is unknown + "lib_version", + "1.123.4", + ), + ] @patch("posthog.kafka_client.client.SessionRecordingKafkaProducer") def test_create_session_recording_kafka_with_expected_hosts( From 47b300e40bb47e97b94af095b8abc5dd2cc2e7d4 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Mon, 18 Mar 2024 20:53:35 +0000 Subject: [PATCH 2/4] less info passing around --- posthog/api/capture.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/posthog/api/capture.py b/posthog/api/capture.py index 0a4ed672551a6..10ecb3e6c8fc0 100644 --- a/posthog/api/capture.py +++ b/posthog/api/capture.py @@ -297,11 +297,9 @@ def drop_events_over_quota(token: str, events: List[Any]) -> List[Any]: return results -def lib_from_query_params(request) -> Tuple[str, str]: +def lib_from_query_params(request) -> str: # url has a ver parameter from posthog-js - # for now we know mobile is newer than web, so we can ignore unknown - lib = request.GET.get("ver", "unknown") - return "web", lib + return request.GET.get("ver", "unknown") @csrf_exempt @@ -482,7 +480,7 @@ def get_event(request): try: if replay_events: - lib, lib_version = lib_from_query_params(request) + lib_version = lib_from_query_params(request) alternative_replay_events = preprocess_replay_events_for_blob_ingestion( replay_events, settings.SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES From 7e4feae3b9c9cfd65602d75f0c108ac287111011 Mon Sep 17 00:00:00 2001 From: Paul D'Ambra Date: Mon, 18 Mar 2024 20:57:05 +0000 Subject: [PATCH 3/4] rename refactor --- posthog/api/capture.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/posthog/api/capture.py b/posthog/api/capture.py index 10ecb3e6c8fc0..ee7e09c8ae154 100644 --- a/posthog/api/capture.py +++ b/posthog/api/capture.py @@ -297,7 +297,7 @@ def drop_events_over_quota(token: str, events: List[Any]) -> List[Any]: return results -def lib_from_query_params(request) -> str: +def lib_version_from_query_params(request) -> str: # url has a ver parameter from posthog-js return request.GET.get("ver", "unknown") @@ -480,7 +480,7 @@ def get_event(request): try: if replay_events: - lib_version = lib_from_query_params(request) + lib_version = lib_version_from_query_params(request) alternative_replay_events = preprocess_replay_events_for_blob_ingestion( replay_events, settings.SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES From fcd73cb4bacb95c88dda260d0799b01da205ac64 Mon Sep 17 00:00:00 2001 From: github-actions <41898282+github-actions[bot]@users.noreply.github.com> Date: Mon, 18 Mar 2024 21:00:53 +0000 Subject: [PATCH 4/4] Update query snapshots --- posthog/api/test/__snapshots__/test_cohort.ambr | 16 ++++++++-------- .../test/__snapshots__/test_feature_flag.ambr | 2 +- posthog/api/test/__snapshots__/test_query.ambr | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/posthog/api/test/__snapshots__/test_cohort.ambr b/posthog/api/test/__snapshots__/test_cohort.ambr index e32cd65cb7367..9789176ddd21b 100644 --- a/posthog/api/test/__snapshots__/test_cohort.ambr +++ b/posthog/api/test/__snapshots__/test_cohort.ambr @@ -1,7 +1,7 @@ # serializer version: 1 # name: TestCohort.test_async_deletion_of_cohort ''' - /* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -11,7 +11,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.1 ''' - /* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ INSERT INTO cohortpeople SELECT id, 2 as cohort_id, @@ -84,7 +84,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.2 ''' - /* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -94,7 +94,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.3 ''' - /* user_id:122 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ + /* user_id:123 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ SELECT count() FROM cohortpeople WHERE team_id = 2 @@ -104,7 +104,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.4 ''' - /* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -114,7 +114,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.5 ''' - /* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ INSERT INTO cohortpeople SELECT id, 2 as cohort_id, @@ -148,7 +148,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.6 ''' - /* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -158,7 +158,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.7 ''' - /* user_id:122 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ + /* user_id:123 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ SELECT count() FROM cohortpeople WHERE team_id = 2 diff --git a/posthog/api/test/__snapshots__/test_feature_flag.ambr b/posthog/api/test/__snapshots__/test_feature_flag.ambr index 2d11fc4500367..fbf8995d66e68 100644 --- a/posthog/api/test/__snapshots__/test_feature_flag.ambr +++ b/posthog/api/test/__snapshots__/test_feature_flag.ambr @@ -1739,7 +1739,7 @@ # --- # name: TestFeatureFlag.test_creating_static_cohort.14 ''' - /* user_id:196 celery:posthog.tasks.calculate_cohort.insert_cohort_from_feature_flag */ + /* user_id:197 celery:posthog.tasks.calculate_cohort.insert_cohort_from_feature_flag */ SELECT count(DISTINCT person_id) FROM person_static_cohort WHERE team_id = 2 diff --git a/posthog/api/test/__snapshots__/test_query.ambr b/posthog/api/test/__snapshots__/test_query.ambr index e52c9362b4398..628038e741728 100644 --- a/posthog/api/test/__snapshots__/test_query.ambr +++ b/posthog/api/test/__snapshots__/test_query.ambr @@ -157,7 +157,7 @@ # --- # name: TestQuery.test_full_hogql_query_async ''' - /* user_id:463 celery:posthog.tasks.tasks.process_query_task */ + /* user_id:464 celery:posthog.tasks.tasks.process_query_task */ SELECT events.uuid AS uuid, events.event AS event, events.properties AS properties,