Skip to content

Commit

Permalink
refactor: Format all BatchExports datetimes as isoformat (#17468)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias authored Sep 18, 2023
1 parent 683512e commit 8a7709d
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 18 deletions.
8 changes: 8 additions & 0 deletions posthog/temporal/tests/batch_exports/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime as dt
import json
import typing

Expand Down Expand Up @@ -71,3 +72,10 @@ async def insert_events(client: ClickHouseClient, events: list[EventValues]):
def amaterialize(table: typing.Literal["events", "person", "groups"], column: str):
"""Materialize a column in a table."""
return materialize(table, column)


def to_isoformat(d: str | None) -> str | None:
"""Parse a string and return it as default isoformatted."""
if d is None:
return None
return dt.datetime.fromisoformat(d).replace(tzinfo=dt.timezone.utc).isoformat()
24 changes: 21 additions & 3 deletions posthog/temporal/tests/batch_exports/test_batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import pytest_asyncio
from django.conf import settings

from posthog.temporal.tests.batch_exports.base import (
to_isoformat,
)
from posthog.temporal.workflows.batch_exports import (
BatchExportTemporaryFile,
get_data_interval,
Expand Down Expand Up @@ -288,8 +291,13 @@ async def test_get_results_iterator(client):

for expected, result in zip(all_expected, all_result):
for key, value in result.items():
if key in ("timestamp", "inserted_at", "created_at"):
expected_value = to_isoformat(expected[key])
else:
expected_value = expected[key]

# Some keys will be missing from result, so let's only check the ones we have.
assert value == expected[key], f"{key} value in {result} didn't match value in {expected}"
assert value == expected_value, f"{key} value in {result} didn't match value in {expected}"


@pytest.mark.django_db
Expand Down Expand Up @@ -343,8 +351,13 @@ async def test_get_results_iterator_handles_duplicates(client):

for expected, result in zip(all_expected, all_result):
for key, value in result.items():
if key in ("timestamp", "inserted_at", "created_at"):
expected_value = to_isoformat(expected[key])
else:
expected_value = expected[key]

# Some keys will be missing from result, so let's only check the ones we have.
assert value == expected[key], f"{key} value in {result} didn't match value in {expected}"
assert value == expected_value, f"{key} value in {result} didn't match value in {expected}"


@pytest.mark.django_db
Expand Down Expand Up @@ -400,8 +413,13 @@ async def test_get_results_iterator_can_exclude_events(client):

for expected, result in zip(all_expected, all_result):
for key, value in result.items():
if key in ("timestamp", "inserted_at", "created_at"):
expected_value = to_isoformat(expected[key])
else:
expected_value = expected[key]

# Some keys will be missing from result, so let's only check the ones we have.
assert value == expected[key], f"{key} value in {result} didn't match value in {expected}"
assert value == expected_value, f"{key} value in {result} didn't match value in {expected}"


@pytest.mark.parametrize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
EventValues,
amaterialize,
insert_events,
to_isoformat,
)
from posthog.temporal.tests.batch_exports.fixtures import (
acreate_batch_export,
Expand Down Expand Up @@ -124,12 +125,19 @@ def assert_events_in_s3(
if exclude_events is None:
exclude_events = []

expected_events = [
{k: v for k, v in event.items() if k not in ["team_id", "_timestamp"]}
for event in events
if event["event"] not in exclude_events
]
expected_events.sort(key=lambda x: x["timestamp"])
def to_expected_event(event):
mapping_functions = {
"timestamp": to_isoformat,
"inserted_at": to_isoformat,
"created_at": to_isoformat,
}
return {
k: mapping_functions.get(k, lambda x: x)(v) for k, v in event.items() if k not in ["team_id", "_timestamp"]
}

expected_events = list(map(to_expected_event, (event for event in events if event["event"] not in exclude_events)))

expected_events.sort(key=lambda x: x["timestamp"] if x["timestamp"] is not None else 0)

# First check one event, the first one, so that we can get a nice diff if
# the included data is different.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from posthog.temporal.tests.batch_exports.base import (
EventValues,
insert_events,
to_isoformat,
)
from posthog.temporal.tests.batch_exports.fixtures import (
acreate_batch_export,
Expand Down Expand Up @@ -369,14 +370,16 @@ async def test_snowflake_export_workflow_exports_events_in_the_last_hour_for_the
]
json_data.sort(key=lambda x: x["timestamp"])
# Drop _timestamp and team_id from events
expected_events = [
{
expected_events = []
for event in events:
expected_event = {
key: value
for key, value in event.items()
if key in ("uuid", "event", "timestamp", "properties", "person_id")
}
for event in events
]
expected_event["timestamp"] = to_isoformat(event["timestamp"])
expected_events.append(expected_event)

assert json_data[0] == expected_events[0]
assert json_data == expected_events

Expand Down
8 changes: 3 additions & 5 deletions posthog/temporal/workflows/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,12 @@ def iter_batch_records(batch) -> typing.Generator[dict[str, typing.Any], None, N
elements = json.dumps(record.get("elements_chain").decode())

record = {
"created_at": record.get("created_at").strftime("%Y-%m-%d %H:%M:%S.%f"),
"created_at": record.get("created_at").isoformat(),
"distinct_id": record.get("distinct_id").decode(),
"elements": elements,
"elements_chain": record.get("elements_chain").decode(),
"event": record.get("event").decode(),
"inserted_at": record.get("inserted_at").strftime("%Y-%m-%d %H:%M:%S.%f")
if record.get("inserted_at")
else None,
"inserted_at": record.get("inserted_at").isoformat() if record.get("inserted_at") else None,
"ip": properties.get("$ip", None) if properties else None,
"person_id": record.get("person_id").decode(),
"person_properties": json.loads(person_properties) if person_properties else None,
Expand All @@ -159,7 +157,7 @@ def iter_batch_records(batch) -> typing.Generator[dict[str, typing.Any], None, N
"properties": properties,
"site_url": properties.get("$current_url", None) if properties else None,
"team_id": record.get("team_id"),
"timestamp": record.get("timestamp").strftime("%Y-%m-%d %H:%M:%S.%f"),
"timestamp": record.get("timestamp").isoformat(),
"uuid": record.get("uuid").decode(),
}

Expand Down

0 comments on commit 8a7709d

Please sign in to comment.