Skip to content

Commit

Permalink
feat: send lib version to blobby in kafka headers (#20989)
Browse files Browse the repository at this point in the history
* feat: send lib version to blobby in kafka headers

* less info passing around

* rename refactor

* Update query snapshots

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
pauldambra and github-actions[bot] authored Mar 19, 2024
1 parent f28d3b1 commit 9ad8dca
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 14 deletions.
28 changes: 26 additions & 2 deletions posthog/api/capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,11 @@ def drop_events_over_quota(token: str, events: List[Any]) -> List[Any]:
return results


def lib_version_from_query_params(request) -> str:
# url has a ver parameter from posthog-js
return request.GET.get("ver", "unknown")


@csrf_exempt
@timed("posthog_cloud_event_endpoint")
def get_event(request):
Expand Down Expand Up @@ -475,6 +480,8 @@ def get_event(request):

try:
if replay_events:
lib_version = lib_version_from_query_params(request)

alternative_replay_events = preprocess_replay_events_for_blob_ingestion(
replay_events, settings.SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES
)
Expand All @@ -496,6 +503,7 @@ def get_event(request):
sent_at,
event_uuid,
token,
extra_headers=[("lib_version", lib_version)],
)
)

Expand Down Expand Up @@ -546,10 +554,24 @@ def parse_event(event):
return event


def capture_internal(event, distinct_id, ip, site_url, now, sent_at, event_uuid=None, token=None, historical=False):
def capture_internal(
event,
distinct_id,
ip,
site_url,
now,
sent_at,
event_uuid=None,
token=None,
historical=False,
extra_headers: List[Tuple[str, str]] | None = None,
):
if event_uuid is None:
event_uuid = UUIDT()

if extra_headers is None:
extra_headers = []

parsed_event = build_kafka_event_data(
distinct_id=distinct_id,
ip=ip,
Expand All @@ -568,9 +590,11 @@ def capture_internal(event, distinct_id, ip, site_url, now, sent_at, event_uuid=

if event["event"] in SESSION_RECORDING_EVENT_NAMES:
kafka_partition_key = event["properties"]["$session_id"]

headers = [
("token", token),
]
] + extra_headers

return log_event(parsed_event, event["event"], partition_key=kafka_partition_key, headers=headers)

candidate_partition_key = f"{token}:{distinct_id}"
Expand Down
16 changes: 8 additions & 8 deletions posthog/api/test/__snapshots__/test_cohort.ambr
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# serializer version: 1
# name: TestCohort.test_async_deletion_of_cohort
'''
/* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
SELECT count(DISTINCT person_id)
FROM cohortpeople
WHERE team_id = 2
Expand All @@ -11,7 +11,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.1
'''
/* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
INSERT INTO cohortpeople
SELECT id,
2 as cohort_id,
Expand Down Expand Up @@ -84,7 +84,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.2
'''
/* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
SELECT count(DISTINCT person_id)
FROM cohortpeople
WHERE team_id = 2
Expand All @@ -94,7 +94,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.3
'''
/* user_id:122 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */
/* user_id:123 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */
SELECT count()
FROM cohortpeople
WHERE team_id = 2
Expand All @@ -104,7 +104,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.4
'''
/* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
SELECT count(DISTINCT person_id)
FROM cohortpeople
WHERE team_id = 2
Expand All @@ -114,7 +114,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.5
'''
/* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
INSERT INTO cohortpeople
SELECT id,
2 as cohort_id,
Expand Down Expand Up @@ -148,7 +148,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.6
'''
/* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:123 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
SELECT count(DISTINCT person_id)
FROM cohortpeople
WHERE team_id = 2
Expand All @@ -158,7 +158,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.7
'''
/* user_id:122 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */
/* user_id:123 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */
SELECT count()
FROM cohortpeople
WHERE team_id = 2
Expand Down
2 changes: 1 addition & 1 deletion posthog/api/test/__snapshots__/test_feature_flag.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -1739,7 +1739,7 @@
# ---
# name: TestFeatureFlag.test_creating_static_cohort.14
'''
/* user_id:196 celery:posthog.tasks.calculate_cohort.insert_cohort_from_feature_flag */
/* user_id:197 celery:posthog.tasks.calculate_cohort.insert_cohort_from_feature_flag */
SELECT count(DISTINCT person_id)
FROM person_static_cohort
WHERE team_id = 2
Expand Down
2 changes: 1 addition & 1 deletion posthog/api/test/__snapshots__/test_query.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@
# ---
# name: TestQuery.test_full_hogql_query_async
'''
/* user_id:463 celery:posthog.tasks.tasks.process_query_task */
/* user_id:464 celery:posthog.tasks.tasks.process_query_task */
SELECT events.uuid AS uuid,
events.event AS event,
events.properties AS properties,
Expand Down
28 changes: 26 additions & 2 deletions posthog/api/test/test_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ def _send_august_2023_version_session_recording_event(
distinct_id="ghi789",
timestamp=1658516991883,
content_type: str | None = None,
query_params: str = "",
) -> HttpResponse:
if event_data is None:
# event_data is an array of RRWeb events
Expand Down Expand Up @@ -262,7 +263,7 @@ def _send_august_2023_version_session_recording_event(
post_data = {"api_key": self.team.api_token, "data": json.dumps([event for _ in range(number_of_events)])}

return self.client.post(
"/s/",
"/s/" + "?" + query_params if query_params else "/s/",
data=post_data,
content_type=content_type or MULTIPART_CONTENT,
)
Expand Down Expand Up @@ -1604,7 +1605,30 @@ def test_recording_ingestion_can_write_headers_with_the_message(self, kafka_prod
):
self._send_august_2023_version_session_recording_event()

assert kafka_produce.mock_calls[0].kwargs["headers"] == [("token", "token123")]
assert kafka_produce.mock_calls[0].kwargs["headers"] == [
("token", "token123"),
(
# without setting a version in the URL the default is unknown
"lib_version",
"unknown",
),
]

@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_recording_ingestion_can_read_version_from_request(self, kafka_produce: MagicMock) -> None:
with self.settings(
SESSION_RECORDING_KAFKA_MAX_REQUEST_SIZE_BYTES=20480,
):
self._send_august_2023_version_session_recording_event(query_params="ver=1.123.4")

assert kafka_produce.mock_calls[0].kwargs["headers"] == [
("token", "token123"),
(
# without setting a version in the URL the default is unknown
"lib_version",
"1.123.4",
),
]

@patch("posthog.kafka_client.client.SessionRecordingKafkaProducer")
def test_create_session_recording_kafka_with_expected_hosts(
Expand Down

0 comments on commit 9ad8dca

Please sign in to comment.