diff --git a/posthog/temporal/tests/batch_exports/base.py b/posthog/temporal/tests/batch_exports/base.py index b1ab1caf90909..88a52fe798426 100644 --- a/posthog/temporal/tests/batch_exports/base.py +++ b/posthog/temporal/tests/batch_exports/base.py @@ -1,3 +1,4 @@ +import datetime as dt import json import typing @@ -71,3 +72,10 @@ async def insert_events(client: ClickHouseClient, events: list[EventValues]): def amaterialize(table: typing.Literal["events", "person", "groups"], column: str): """Materialize a column in a table.""" return materialize(table, column) + + +def to_isoformat(d: str | None) -> str | None: + """Parse a string and return it as default isoformatted.""" + if d is None: + return None + return dt.datetime.fromisoformat(d).replace(tzinfo=dt.timezone.utc).isoformat() diff --git a/posthog/temporal/tests/batch_exports/test_batch_exports.py b/posthog/temporal/tests/batch_exports/test_batch_exports.py index 913cd5b45d2c3..50ee763b5d4d9 100644 --- a/posthog/temporal/tests/batch_exports/test_batch_exports.py +++ b/posthog/temporal/tests/batch_exports/test_batch_exports.py @@ -12,6 +12,9 @@ import pytest_asyncio from django.conf import settings +from posthog.temporal.tests.batch_exports.base import ( + to_isoformat, +) from posthog.temporal.workflows.batch_exports import ( BatchExportTemporaryFile, get_data_interval, @@ -288,8 +291,13 @@ async def test_get_results_iterator(client): for expected, result in zip(all_expected, all_result): for key, value in result.items(): + if key in ("timestamp", "inserted_at", "created_at"): + expected_value = to_isoformat(expected[key]) + else: + expected_value = expected[key] + # Some keys will be missing from result, so let's only check the ones we have. - assert value == expected[key], f"{key} value in {result} didn't match value in {expected}" + assert value == expected_value, f"{key} value in {result} didn't match value in {expected}" @pytest.mark.django_db @@ -343,8 +351,13 @@ async def test_get_results_iterator_handles_duplicates(client): for expected, result in zip(all_expected, all_result): for key, value in result.items(): + if key in ("timestamp", "inserted_at", "created_at"): + expected_value = to_isoformat(expected[key]) + else: + expected_value = expected[key] + # Some keys will be missing from result, so let's only check the ones we have. - assert value == expected[key], f"{key} value in {result} didn't match value in {expected}" + assert value == expected_value, f"{key} value in {result} didn't match value in {expected}" @pytest.mark.django_db @@ -400,8 +413,13 @@ async def test_get_results_iterator_can_exclude_events(client): for expected, result in zip(all_expected, all_result): for key, value in result.items(): + if key in ("timestamp", "inserted_at", "created_at"): + expected_value = to_isoformat(expected[key]) + else: + expected_value = expected[key] + # Some keys will be missing from result, so let's only check the ones we have. - assert value == expected[key], f"{key} value in {result} didn't match value in {expected}" + assert value == expected_value, f"{key} value in {result} didn't match value in {expected}" @pytest.mark.parametrize( diff --git a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py index b4e51bc9f8b8e..08f0d285a944c 100644 --- a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py @@ -25,6 +25,7 @@ EventValues, amaterialize, insert_events, + to_isoformat, ) from posthog.temporal.tests.batch_exports.fixtures import ( acreate_batch_export, @@ -124,12 +125,19 @@ def assert_events_in_s3( if exclude_events is None: exclude_events = [] - expected_events = [ - {k: v for k, v in event.items() if k not in ["team_id", "_timestamp"]} - for event in events - if event["event"] not in exclude_events - ] - expected_events.sort(key=lambda x: x["timestamp"]) + def to_expected_event(event): + mapping_functions = { + "timestamp": to_isoformat, + "inserted_at": to_isoformat, + "created_at": to_isoformat, + } + return { + k: mapping_functions.get(k, lambda x: x)(v) for k, v in event.items() if k not in ["team_id", "_timestamp"] + } + + expected_events = list(map(to_expected_event, (event for event in events if event["event"] not in exclude_events))) + + expected_events.sort(key=lambda x: x["timestamp"] if x["timestamp"] is not None else 0) # First check one event, the first one, so that we can get a nice diff if # the included data is different. diff --git a/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py index 3b153668d5549..979929d1ce205 100644 --- a/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py @@ -21,6 +21,7 @@ from posthog.temporal.tests.batch_exports.base import ( EventValues, insert_events, + to_isoformat, ) from posthog.temporal.tests.batch_exports.fixtures import ( acreate_batch_export, @@ -369,14 +370,16 @@ async def test_snowflake_export_workflow_exports_events_in_the_last_hour_for_the ] json_data.sort(key=lambda x: x["timestamp"]) # Drop _timestamp and team_id from events - expected_events = [ - { + expected_events = [] + for event in events: + expected_event = { key: value for key, value in event.items() if key in ("uuid", "event", "timestamp", "properties", "person_id") } - for event in events - ] + expected_event["timestamp"] = to_isoformat(event["timestamp"]) + expected_events.append(expected_event) + assert json_data[0] == expected_events[0] assert json_data == expected_events diff --git a/posthog/temporal/workflows/batch_exports.py b/posthog/temporal/workflows/batch_exports.py index 5fe3cde5e123a..c79262a0fe86a 100644 --- a/posthog/temporal/workflows/batch_exports.py +++ b/posthog/temporal/workflows/batch_exports.py @@ -143,14 +143,12 @@ def iter_batch_records(batch) -> typing.Generator[dict[str, typing.Any], None, N elements = json.dumps(record.get("elements_chain").decode()) record = { - "created_at": record.get("created_at").strftime("%Y-%m-%d %H:%M:%S.%f"), + "created_at": record.get("created_at").isoformat(), "distinct_id": record.get("distinct_id").decode(), "elements": elements, "elements_chain": record.get("elements_chain").decode(), "event": record.get("event").decode(), - "inserted_at": record.get("inserted_at").strftime("%Y-%m-%d %H:%M:%S.%f") - if record.get("inserted_at") - else None, + "inserted_at": record.get("inserted_at").isoformat() if record.get("inserted_at") else None, "ip": properties.get("$ip", None) if properties else None, "person_id": record.get("person_id").decode(), "person_properties": json.loads(person_properties) if person_properties else None, @@ -159,7 +157,7 @@ def iter_batch_records(batch) -> typing.Generator[dict[str, typing.Any], None, N "properties": properties, "site_url": properties.get("$current_url", None) if properties else None, "team_id": record.get("team_id"), - "timestamp": record.get("timestamp").strftime("%Y-%m-%d %H:%M:%S.%f"), + "timestamp": record.get("timestamp").isoformat(), "uuid": record.get("uuid").decode(), }