Skip to content

Commit

Permalink
feat(capture): Add historical_migration flag to batch capture
Browse files Browse the repository at this point in the history
  • Loading branch information
bretthoerner committed Feb 14, 2024
1 parent 8d467d2 commit 2fdce2d
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 15 deletions.
33 changes: 18 additions & 15 deletions posthog/api/capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def build_kafka_event_data(
}


def _kafka_topic(event_name: str, data: Dict) -> str:
def _kafka_topic(event_name: str, data: Dict, historical: bool = False) -> str:
# To allow for different quality of service on session recordings
# and other events, we push to a different topic.

Expand All @@ -149,15 +149,19 @@ def _kafka_topic(event_name: str, data: Dict) -> str:
case _:
# If the token is in the TOKENS_HISTORICAL_DATA list, we push to the
# historical data topic.
if data.get("token") in settings.TOKENS_HISTORICAL_DATA:
if historical:
return KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL
return settings.KAFKA_EVENTS_PLUGIN_INGESTION_TOPIC


def log_event(
data: Dict, event_name: str, partition_key: Optional[str], headers: Optional[List] = None
data: Dict,
event_name: str,
partition_key: Optional[str],
headers: Optional[List] = None,
historical: bool = False,
) -> FutureRecordMetadata:
kafka_topic = _kafka_topic(event_name, data)
kafka_topic = _kafka_topic(event_name, data, historical=historical)

logger.debug("logging_event", event_name=event_name, kafka_topic=kafka_topic)

Expand Down Expand Up @@ -356,11 +360,17 @@ def get_event(request):

replay_events: List[Any] = []

historical = token in settings.TOKENS_HISTORICAL_DATA
with start_span(op="request.process"):
if isinstance(data, dict):
if data.get("batch"): # posthog-python and posthog-ruby
if not historical:
# If they're not forced into historical by token, they can still opt into it
# for batches via `historical_migration=true`
historical = bool(data.get("historical_migration", False))
data = data["batch"]
assert data is not None

KLUDGES_COUNTER.labels(kludge="data_is_batch_field").inc()
elif "engage" in request.path_info: # JS identify call
data["event"] = "$identify" # make sure it has an event name
Expand Down Expand Up @@ -412,14 +422,7 @@ def get_event(request):
try:
futures.append(
capture_internal(
event,
distinct_id,
ip,
site_url,
now,
sent_at,
event_uuid,
token,
event, distinct_id, ip, site_url, now, sent_at, event_uuid, token, historical=historical
)
)
except Exception as exc:
Expand Down Expand Up @@ -541,7 +544,7 @@ def parse_event(event):
return event


def capture_internal(event, distinct_id, ip, site_url, now, sent_at, event_uuid=None, token=None):
def capture_internal(event, distinct_id, ip, site_url, now, sent_at, event_uuid=None, token=None, historical=False):
if event_uuid is None:
event_uuid = UUIDT()

Expand Down Expand Up @@ -573,11 +576,11 @@ def capture_internal(event, distinct_id, ip, site_url, now, sent_at, event_uuid=
if (
distinct_id.lower() not in LIKELY_ANONYMOUS_IDS
and is_randomly_partitioned(candidate_partition_key) is False
or token in settings.TOKENS_HISTORICAL_DATA
or historical
):
kafka_partition_key = hashlib.sha256(candidate_partition_key.encode()).hexdigest()

return log_event(parsed_event, event["event"], partition_key=kafka_partition_key)
return log_event(parsed_event, event["event"], partition_key=kafka_partition_key, historical=historical)


def is_randomly_partitioned(candidate_partition_key: str) -> bool:
Expand Down
46 changes: 46 additions & 0 deletions posthog/api/test/test_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -1838,3 +1838,49 @@ def test_capture_historical_analytics_events(self, kafka_produce) -> None:
kafka_produce.call_args_list[0][1]["topic"],
KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL,
)

@patch("posthog.kafka_client.client._KafkaProducer.produce")
def test_capture_historical_analytics_events_opt_in(self, kafka_produce) -> None:
"""
Based on `historical_migration` flag in the payload, we send data
to the KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL topic.
"""
resp = self.client.post(
"/batch/",
data={
"data": json.dumps(
{
"api_key": self.team.api_token,
"historical_migration": True,
"batch": [
{
"event": "$autocapture",
"properties": {
"distinct_id": 2,
"$elements": [
{
"tag_name": "a",
"nth_child": 1,
"nth_of_type": 2,
"attr__class": "btn btn-sm",
},
{
"tag_name": "div",
"nth_child": 1,
"nth_of_type": 2,
"$el_text": "💻",
},
],
},
}
],
}
)
},
)
self.assertEqual(resp.status_code, 200)
self.assertEqual(kafka_produce.call_count, 1)
self.assertEqual(
kafka_produce.call_args_list[0][1]["topic"],
KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL,
)

0 comments on commit 2fdce2d

Please sign in to comment.