Skip to content

Commit

Permalink
feat(capture): Add historical_migration flag to batch capture (#20350)
Browse files Browse the repository at this point in the history
* feat(capture): Add historical_migration flag to batch capture

* Update query snapshots

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
bretthoerner and github-actions[bot] authored Feb 20, 2024
1 parent 3ab8cc1 commit 7d6fc16
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 24 deletions.
33 changes: 18 additions & 15 deletions posthog/api/capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def build_kafka_event_data(
}


def _kafka_topic(event_name: str, data: Dict) -> str:
def _kafka_topic(event_name: str, data: Dict, historical: bool = False) -> str:
# To allow for different quality of service on session recordings
# and other events, we push to a different topic.

Expand All @@ -149,15 +149,19 @@ def _kafka_topic(event_name: str, data: Dict) -> str:
case _:
# If the token is in the TOKENS_HISTORICAL_DATA list, we push to the
# historical data topic.
if data.get("token") in settings.TOKENS_HISTORICAL_DATA:
if historical:
return KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL
return settings.KAFKA_EVENTS_PLUGIN_INGESTION_TOPIC


def log_event(
data: Dict, event_name: str, partition_key: Optional[str], headers: Optional[List] = None
data: Dict,
event_name: str,
partition_key: Optional[str],
headers: Optional[List] = None,
historical: bool = False,
) -> FutureRecordMetadata:
kafka_topic = _kafka_topic(event_name, data)
kafka_topic = _kafka_topic(event_name, data, historical=historical)

logger.debug("logging_event", event_name=event_name, kafka_topic=kafka_topic)

Expand Down Expand Up @@ -356,11 +360,17 @@ def get_event(request):

replay_events: List[Any] = []

historical = token in settings.TOKENS_HISTORICAL_DATA
with start_span(op="request.process"):
if isinstance(data, dict):
if data.get("batch"): # posthog-python and posthog-ruby
if not historical:
# If they're not forced into historical by token, they can still opt into it
# for batches via `historical_migration=true`
historical = bool(data.get("historical_migration", False))
data = data["batch"]
assert data is not None

KLUDGES_COUNTER.labels(kludge="data_is_batch_field").inc()
elif "engage" in request.path_info: # JS identify call
data["event"] = "$identify" # make sure it has an event name
Expand Down Expand Up @@ -412,14 +422,7 @@ def get_event(request):
try:
futures.append(
capture_internal(
event,
distinct_id,
ip,
site_url,
now,
sent_at,
event_uuid,
token,
event, distinct_id, ip, site_url, now, sent_at, event_uuid, token, historical=historical
)
)
except Exception as exc:
Expand Down Expand Up @@ -541,7 +544,7 @@ def parse_event(event):
return event


def capture_internal(event, distinct_id, ip, site_url, now, sent_at, event_uuid=None, token=None):
def capture_internal(event, distinct_id, ip, site_url, now, sent_at, event_uuid=None, token=None, historical=False):
if event_uuid is None:
event_uuid = UUIDT()

Expand Down Expand Up @@ -573,11 +576,11 @@ def capture_internal(event, distinct_id, ip, site_url, now, sent_at, event_uuid=
if (
distinct_id.lower() not in LIKELY_ANONYMOUS_IDS
and is_randomly_partitioned(candidate_partition_key) is False
or token in settings.TOKENS_HISTORICAL_DATA
or historical
):
kafka_partition_key = hashlib.sha256(candidate_partition_key.encode()).hexdigest()

return log_event(parsed_event, event["event"], partition_key=kafka_partition_key)
return log_event(parsed_event, event["event"], partition_key=kafka_partition_key, historical=historical)


def is_randomly_partitioned(candidate_partition_key: str) -> bool:
Expand Down
16 changes: 8 additions & 8 deletions posthog/api/test/__snapshots__/test_cohort.ambr
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# serializer version: 1
# name: TestCohort.test_async_deletion_of_cohort
'''
/* user_id:121 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
SELECT count(DISTINCT person_id)
FROM cohortpeople
WHERE team_id = 2
Expand All @@ -11,7 +11,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.1
'''
/* user_id:121 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
INSERT INTO cohortpeople
SELECT id,
2 as cohort_id,
Expand Down Expand Up @@ -84,7 +84,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.2
'''
/* user_id:121 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
SELECT count(DISTINCT person_id)
FROM cohortpeople
WHERE team_id = 2
Expand All @@ -94,7 +94,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.3
'''
/* user_id:121 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */
/* user_id:122 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */
SELECT count()
FROM cohortpeople
WHERE team_id = 2
Expand All @@ -104,7 +104,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.4
'''
/* user_id:121 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
SELECT count(DISTINCT person_id)
FROM cohortpeople
WHERE team_id = 2
Expand All @@ -114,7 +114,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.5
'''
/* user_id:121 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
INSERT INTO cohortpeople
SELECT id,
2 as cohort_id,
Expand Down Expand Up @@ -148,7 +148,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.6
'''
/* user_id:121 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
/* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */
SELECT count(DISTINCT person_id)
FROM cohortpeople
WHERE team_id = 2
Expand All @@ -158,7 +158,7 @@
# ---
# name: TestCohort.test_async_deletion_of_cohort.7
'''
/* user_id:121 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */
/* user_id:122 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */
SELECT count()
FROM cohortpeople
WHERE team_id = 2
Expand Down
2 changes: 1 addition & 1 deletion posthog/api/test/__snapshots__/test_feature_flag.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -1731,7 +1731,7 @@
# ---
# name: TestFeatureFlag.test_creating_static_cohort.14
'''
/* user_id:195 celery:posthog.tasks.calculate_cohort.insert_cohort_from_feature_flag */
/* user_id:196 celery:posthog.tasks.calculate_cohort.insert_cohort_from_feature_flag */
SELECT count(DISTINCT person_id)
FROM person_static_cohort
WHERE team_id = 2
Expand Down
46 changes: 46 additions & 0 deletions posthog/api/test/test_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -1838,3 +1838,49 @@ def test_capture_historical_analytics_events(self, kafka_produce) -> None:
kafka_produce.call_args_list[0][1]["topic"],
KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL,
)

@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_capture_historical_analytics_events_opt_in(self, kafka_produce) -> None:
"""
Based on `historical_migration` flag in the payload, we send data
to the KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL topic.
"""
resp = self.client.post(
"/batch/",
data={
"data": json.dumps(
{
"api_key": self.team.api_token,
"historical_migration": True,
"batch": [
{
"event": "$autocapture",
"properties": {
"distinct_id": 2,
"$elements": [
{
"tag_name": "a",
"nth_child": 1,
"nth_of_type": 2,
"attr__class": "btn btn-sm",
},
{
"tag_name": "div",
"nth_child": 1,
"nth_of_type": 2,
"$el_text": "💻",
},
],
},
}
],
}
)
},
)
self.assertEqual(resp.status_code, 200)
self.assertEqual(kafka_produce.call_count, 1)
self.assertEqual(
kafka_produce.call_args_list[0][1]["topic"],
KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL,
)

0 comments on commit 7d6fc16

Please sign in to comment.