diff --git a/posthog/api/capture.py b/posthog/api/capture.py index b00c363e6de16..237348ef71d67 100644 --- a/posthog/api/capture.py +++ b/posthog/api/capture.py @@ -137,7 +137,7 @@ def build_kafka_event_data( } -def _kafka_topic(event_name: str, data: Dict) -> str: +def _kafka_topic(event_name: str, data: Dict, historical: bool = False) -> str: # To allow for different quality of service on session recordings # and other events, we push to a different topic. @@ -149,15 +149,19 @@ def _kafka_topic(event_name: str, data: Dict) -> str: case _: # If the token is in the TOKENS_HISTORICAL_DATA list, we push to the # historical data topic. - if data.get("token") in settings.TOKENS_HISTORICAL_DATA: + if historical: return KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL return settings.KAFKA_EVENTS_PLUGIN_INGESTION_TOPIC def log_event( - data: Dict, event_name: str, partition_key: Optional[str], headers: Optional[List] = None + data: Dict, + event_name: str, + partition_key: Optional[str], + headers: Optional[List] = None, + historical: bool = False, ) -> FutureRecordMetadata: - kafka_topic = _kafka_topic(event_name, data) + kafka_topic = _kafka_topic(event_name, data, historical=historical) logger.debug("logging_event", event_name=event_name, kafka_topic=kafka_topic) @@ -356,11 +360,17 @@ def get_event(request): replay_events: List[Any] = [] + historical = token in settings.TOKENS_HISTORICAL_DATA with start_span(op="request.process"): if isinstance(data, dict): if data.get("batch"): # posthog-python and posthog-ruby + if not historical: + # If they're not forced into historical by token, they can still opt into it + # for batches via `historical_migration=true` + historical = bool(data.get("historical_migration", False)) data = data["batch"] assert data is not None + KLUDGES_COUNTER.labels(kludge="data_is_batch_field").inc() elif "engage" in request.path_info: # JS identify call data["event"] = "$identify" # make sure it has an event name @@ -412,14 +422,7 @@ def get_event(request): try: futures.append( capture_internal( - event, - distinct_id, - ip, - site_url, - now, - sent_at, - event_uuid, - token, + event, distinct_id, ip, site_url, now, sent_at, event_uuid, token, historical=historical ) ) except Exception as exc: @@ -541,7 +544,7 @@ def parse_event(event): return event -def capture_internal(event, distinct_id, ip, site_url, now, sent_at, event_uuid=None, token=None): +def capture_internal(event, distinct_id, ip, site_url, now, sent_at, event_uuid=None, token=None, historical=False): if event_uuid is None: event_uuid = UUIDT() @@ -573,11 +576,11 @@ def capture_internal(event, distinct_id, ip, site_url, now, sent_at, event_uuid= if ( distinct_id.lower() not in LIKELY_ANONYMOUS_IDS and is_randomly_partitioned(candidate_partition_key) is False - or token in settings.TOKENS_HISTORICAL_DATA + or historical ): kafka_partition_key = hashlib.sha256(candidate_partition_key.encode()).hexdigest() - return log_event(parsed_event, event["event"], partition_key=kafka_partition_key) + return log_event(parsed_event, event["event"], partition_key=kafka_partition_key, historical=historical) def is_randomly_partitioned(candidate_partition_key: str) -> bool: diff --git a/posthog/api/test/__snapshots__/test_cohort.ambr b/posthog/api/test/__snapshots__/test_cohort.ambr index db80c80c4ae19..e32cd65cb7367 100644 --- a/posthog/api/test/__snapshots__/test_cohort.ambr +++ b/posthog/api/test/__snapshots__/test_cohort.ambr @@ -1,7 +1,7 @@ # serializer version: 1 # name: TestCohort.test_async_deletion_of_cohort ''' - /* user_id:121 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -11,7 +11,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.1 ''' - /* user_id:121 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ INSERT INTO cohortpeople SELECT id, 2 as cohort_id, @@ -84,7 +84,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.2 ''' - /* user_id:121 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -94,7 +94,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.3 ''' - /* user_id:121 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ + /* user_id:122 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ SELECT count() FROM cohortpeople WHERE team_id = 2 @@ -104,7 +104,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.4 ''' - /* user_id:121 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -114,7 +114,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.5 ''' - /* user_id:121 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ INSERT INTO cohortpeople SELECT id, 2 as cohort_id, @@ -148,7 +148,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.6 ''' - /* user_id:121 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ + /* user_id:122 celery:posthog.tasks.calculate_cohort.calculate_cohort_ch */ SELECT count(DISTINCT person_id) FROM cohortpeople WHERE team_id = 2 @@ -158,7 +158,7 @@ # --- # name: TestCohort.test_async_deletion_of_cohort.7 ''' - /* user_id:121 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ + /* user_id:122 celery:posthog.tasks.calculate_cohort.clear_stale_cohort */ SELECT count() FROM cohortpeople WHERE team_id = 2 diff --git a/posthog/api/test/__snapshots__/test_feature_flag.ambr b/posthog/api/test/__snapshots__/test_feature_flag.ambr index 45994e5494910..fbcb087f7d371 100644 --- a/posthog/api/test/__snapshots__/test_feature_flag.ambr +++ b/posthog/api/test/__snapshots__/test_feature_flag.ambr @@ -1731,7 +1731,7 @@ # --- # name: TestFeatureFlag.test_creating_static_cohort.14 ''' - /* user_id:195 celery:posthog.tasks.calculate_cohort.insert_cohort_from_feature_flag */ + /* user_id:196 celery:posthog.tasks.calculate_cohort.insert_cohort_from_feature_flag */ SELECT count(DISTINCT person_id) FROM person_static_cohort WHERE team_id = 2 diff --git a/posthog/api/test/test_capture.py b/posthog/api/test/test_capture.py index 73654abf7a269..67b6560e6723a 100644 --- a/posthog/api/test/test_capture.py +++ b/posthog/api/test/test_capture.py @@ -1838,3 +1838,49 @@ def test_capture_historical_analytics_events(self, kafka_produce) -> None: kafka_produce.call_args_list[0][1]["topic"], KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL, ) + + @patch("posthog.kafka_client.client._KafkaProducer.produce") + def test_capture_historical_analytics_events_opt_in(self, kafka_produce) -> None: + """ + Based on `historical_migration` flag in the payload, we send data + to the KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL topic. + """ + resp = self.client.post( + "/batch/", + data={ + "data": json.dumps( + { + "api_key": self.team.api_token, + "historical_migration": True, + "batch": [ + { + "event": "$autocapture", + "properties": { + "distinct_id": 2, + "$elements": [ + { + "tag_name": "a", + "nth_child": 1, + "nth_of_type": 2, + "attr__class": "btn btn-sm", + }, + { + "tag_name": "div", + "nth_child": 1, + "nth_of_type": 2, + "$el_text": "💻", + }, + ], + }, + } + ], + } + ) + }, + ) + self.assertEqual(resp.status_code, 200) + self.assertEqual(kafka_produce.call_count, 1) + self.assertEqual( + kafka_produce.call_args_list[0][1]["topic"], + KAFKA_EVENTS_PLUGIN_INGESTION_HISTORICAL, + )