diff --git a/posthog/temporal/tests/batch_exports/test_batch_exports.py b/posthog/temporal/tests/batch_exports/test_batch_exports.py index 5e2bed9945d2b8..a760baed9d4b07 100644 --- a/posthog/temporal/tests/batch_exports/test_batch_exports.py +++ b/posthog/temporal/tests/batch_exports/test_batch_exports.py @@ -309,7 +309,8 @@ async def test_get_rows_count_can_include_events(client): @pytest.mark.django_db @pytest.mark.asyncio -async def test_get_results_iterator(client): +@pytest.mark.parametrize("include_person_properties", (False, True)) +async def test_get_results_iterator(client, include_person_properties): """Test the rows returned by get_results_iterator.""" team_id = randint(1, 1000000) @@ -345,7 +346,13 @@ async def test_get_results_iterator(client): events=events, ) - iter_ = get_results_iterator(client, team_id, "2023-04-20 14:30:00", "2023-04-20 14:31:00") + iter_ = get_results_iterator( + client, + team_id, + "2023-04-20 14:30:00", + "2023-04-20 14:31:00", + include_person_properties=include_person_properties, + ) rows = [row for row in iter_] all_expected = sorted(events, key=operator.itemgetter("event")) @@ -355,6 +362,9 @@ async def test_get_results_iterator(client): for expected, result in zip(all_expected, all_result): for key, value in result.items(): + if key == "person_properties" and not include_person_properties: + continue + if key in ("timestamp", "inserted_at", "created_at"): expected_value = to_isoformat(expected[key]) else: @@ -366,7 +376,8 @@ async def test_get_results_iterator(client): @pytest.mark.django_db @pytest.mark.asyncio -async def test_get_results_iterator_handles_duplicates(client): +@pytest.mark.parametrize("include_person_properties", (False, True)) +async def test_get_results_iterator_handles_duplicates(client, include_person_properties): """Test the rows returned by get_results_iterator are de-duplicated.""" team_id = randint(1, 1000000) @@ -404,7 +415,13 @@ async def test_get_results_iterator_handles_duplicates(client): events=duplicate_events, ) - iter_ = get_results_iterator(client, team_id, "2023-04-20 14:30:00", "2023-04-20 14:31:00") + iter_ = get_results_iterator( + client, + team_id, + "2023-04-20 14:30:00", + "2023-04-20 14:31:00", + include_person_properties=include_person_properties, + ) rows = [row for row in iter_] all_expected = sorted(events, key=operator.itemgetter("event")) @@ -415,6 +432,9 @@ async def test_get_results_iterator_handles_duplicates(client): for expected, result in zip(all_expected, all_result): for key, value in result.items(): + if key == "person_properties" and not include_person_properties: + continue + if key in ("timestamp", "inserted_at", "created_at"): expected_value = to_isoformat(expected[key]) else: @@ -426,7 +446,8 @@ async def test_get_results_iterator_handles_duplicates(client): @pytest.mark.django_db @pytest.mark.asyncio -async def test_get_results_iterator_can_exclude_events(client): +@pytest.mark.parametrize("include_person_properties", (False, True)) +async def test_get_results_iterator_can_exclude_events(client, include_person_properties): """Test the rows returned by get_results_iterator can exclude events.""" team_id = randint(1, 1000000) @@ -467,7 +488,12 @@ async def test_get_results_iterator_can_exclude_events(client): # Exclude the latter half of events. exclude_events = (f"test-{i}" for i in range(5000, 10000)) iter_ = get_results_iterator( - client, team_id, "2023-04-20 14:30:00", "2023-04-20 14:31:00", exclude_events=exclude_events + client, + team_id, + "2023-04-20 14:30:00", + "2023-04-20 14:31:00", + exclude_events=exclude_events, + include_person_properties=include_person_properties, ) rows = [row for row in iter_] @@ -479,6 +505,9 @@ async def test_get_results_iterator_can_exclude_events(client): for expected, result in zip(all_expected, all_result): for key, value in result.items(): + if key == "person_properties" and not include_person_properties: + continue + if key in ("timestamp", "inserted_at", "created_at"): expected_value = to_isoformat(expected[key]) else: @@ -490,7 +519,8 @@ async def test_get_results_iterator_can_exclude_events(client): @pytest.mark.django_db @pytest.mark.asyncio -async def test_get_results_iterator_can_include_events(client): +@pytest.mark.parametrize("include_person_properties", (False, True)) +async def test_get_results_iterator_can_include_events(client, include_person_properties): """Test the rows returned by get_results_iterator can include events.""" team_id = randint(1, 1000000) @@ -531,7 +561,12 @@ async def test_get_results_iterator_can_include_events(client): # Include the latter half of events. include_events = (f"test-{i}" for i in range(5000, 10000)) iter_ = get_results_iterator( - client, team_id, "2023-04-20 14:30:00", "2023-04-20 14:31:00", include_events=include_events + client, + team_id, + "2023-04-20 14:30:00", + "2023-04-20 14:31:00", + include_events=include_events, + include_person_properties=include_person_properties, ) rows = [row for row in iter_] @@ -543,6 +578,9 @@ async def test_get_results_iterator_can_include_events(client): for expected, result in zip(all_expected, all_result): for key, value in result.items(): + if key == "person_properties" and not include_person_properties: + continue + if key in ("timestamp", "inserted_at", "created_at"): expected_value = to_isoformat(expected[key]) else: diff --git a/posthog/temporal/workflows/batch_exports.py b/posthog/temporal/workflows/batch_exports.py index 4144940e471a89..c0d253852417fb 100644 --- a/posthog/temporal/workflows/batch_exports.py +++ b/posthog/temporal/workflows/batch_exports.py @@ -109,6 +109,22 @@ async def get_rows_count( -- Point in time identity fields toString(distinct_id) as distinct_id, toString(person_id) as person_id, +-- Autocapture fields +elements_chain +""" + +S3_FIELDS = """ +DISTINCT ON (event, cityHash64(distinct_id), cityHash64(uuid)) +toString(uuid) as uuid, +team_id, +timestamp, +inserted_at, +created_at, +event, +properties, +-- Point in time identity fields +toString(distinct_id) as distinct_id, +toString(person_id) as person_id, person_properties, -- Autocapture fields elements_chain @@ -122,6 +138,7 @@ def get_results_iterator( interval_end: str, exclude_events: collections.abc.Iterable[str] | None = None, include_events: collections.abc.Iterable[str] | None = None, + include_person_properties: bool = False, ) -> typing.Generator[dict[str, typing.Any], None, None]: data_interval_start_ch = dt.datetime.fromisoformat(interval_start).strftime("%Y-%m-%d %H:%M:%S") data_interval_end_ch = dt.datetime.fromisoformat(interval_end).strftime("%Y-%m-%d %H:%M:%S") @@ -141,7 +158,7 @@ def get_results_iterator( events_to_include_tuple = () query = SELECT_QUERY_TEMPLATE.substitute( - fields=FIELDS, + fields=S3_FIELDS if include_person_properties else FIELDS, order_by="ORDER BY inserted_at", format="FORMAT ArrowStream", exclude_events=exclude_events_statement, diff --git a/posthog/temporal/workflows/s3_batch_export.py b/posthog/temporal/workflows/s3_batch_export.py index 18fe92f98e3cec..2c7545fecfb3e3 100644 --- a/posthog/temporal/workflows/s3_batch_export.py +++ b/posthog/temporal/workflows/s3_batch_export.py @@ -379,6 +379,7 @@ async def insert_into_s3_activity(inputs: S3InsertInputs): interval_end=inputs.data_interval_end, exclude_events=inputs.exclude_events, include_events=inputs.include_events, + include_person_properties=True, ) result = None