Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Format all BatchExports datetimes as isoformat #17468

Merged
merged 2 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions posthog/temporal/tests/batch_exports/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime as dt
import json
import typing

Expand Down Expand Up @@ -71,3 +72,10 @@ async def insert_events(client: ClickHouseClient, events: list[EventValues]):
def amaterialize(table: typing.Literal["events", "person", "groups"], column: str):
"""Materialize a column in a table."""
return materialize(table, column)


def to_isoformat(d: str | None) -> str | None:
"""Parse a string and return it as default isoformatted."""
if d is None:
return None
return dt.datetime.fromisoformat(d).replace(tzinfo=dt.timezone.utc).isoformat()
24 changes: 21 additions & 3 deletions posthog/temporal/tests/batch_exports/test_batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
import pytest_asyncio
from django.conf import settings

from posthog.temporal.tests.batch_exports.base import (
to_isoformat,
)
from posthog.temporal.workflows.batch_exports import (
BatchExportTemporaryFile,
get_data_interval,
Expand Down Expand Up @@ -288,8 +291,13 @@ async def test_get_results_iterator(client):

for expected, result in zip(all_expected, all_result):
for key, value in result.items():
if key in ("timestamp", "inserted_at", "created_at"):
expected_value = to_isoformat(expected[key])
else:
expected_value = expected[key]

# Some keys will be missing from result, so let's only check the ones we have.
assert value == expected[key], f"{key} value in {result} didn't match value in {expected}"
assert value == expected_value, f"{key} value in {result} didn't match value in {expected}"


@pytest.mark.django_db
Expand Down Expand Up @@ -343,8 +351,13 @@ async def test_get_results_iterator_handles_duplicates(client):

for expected, result in zip(all_expected, all_result):
for key, value in result.items():
if key in ("timestamp", "inserted_at", "created_at"):
expected_value = to_isoformat(expected[key])
else:
expected_value = expected[key]

# Some keys will be missing from result, so let's only check the ones we have.
assert value == expected[key], f"{key} value in {result} didn't match value in {expected}"
assert value == expected_value, f"{key} value in {result} didn't match value in {expected}"


@pytest.mark.django_db
Expand Down Expand Up @@ -400,8 +413,13 @@ async def test_get_results_iterator_can_exclude_events(client):

for expected, result in zip(all_expected, all_result):
for key, value in result.items():
if key in ("timestamp", "inserted_at", "created_at"):
expected_value = to_isoformat(expected[key])
else:
expected_value = expected[key]

# Some keys will be missing from result, so let's only check the ones we have.
assert value == expected[key], f"{key} value in {result} didn't match value in {expected}"
assert value == expected_value, f"{key} value in {result} didn't match value in {expected}"


@pytest.mark.parametrize(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
EventValues,
amaterialize,
insert_events,
to_isoformat,
)
from posthog.temporal.tests.batch_exports.fixtures import (
acreate_batch_export,
Expand Down Expand Up @@ -124,12 +125,19 @@ def assert_events_in_s3(
if exclude_events is None:
exclude_events = []

expected_events = [
{k: v for k, v in event.items() if k not in ["team_id", "_timestamp"]}
for event in events
if event["event"] not in exclude_events
]
expected_events.sort(key=lambda x: x["timestamp"])
def to_expected_event(event):
mapping_functions = {
"timestamp": to_isoformat,
"inserted_at": to_isoformat,
"created_at": to_isoformat,
}
return {
k: mapping_functions.get(k, lambda x: x)(v) for k, v in event.items() if k not in ["team_id", "_timestamp"]
}

expected_events = list(map(to_expected_event, (event for event in events if event["event"] not in exclude_events)))

expected_events.sort(key=lambda x: x["timestamp"] if x["timestamp"] is not None else 0)

# First check one event, the first one, so that we can get a nice diff if
# the included data is different.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from posthog.temporal.tests.batch_exports.base import (
EventValues,
insert_events,
to_isoformat,
)
from posthog.temporal.tests.batch_exports.fixtures import (
acreate_batch_export,
Expand Down Expand Up @@ -369,14 +370,16 @@ async def test_snowflake_export_workflow_exports_events_in_the_last_hour_for_the
]
json_data.sort(key=lambda x: x["timestamp"])
# Drop _timestamp and team_id from events
expected_events = [
{
expected_events = []
for event in events:
expected_event = {
key: value
for key, value in event.items()
if key in ("uuid", "event", "timestamp", "properties", "person_id")
}
for event in events
]
expected_event["timestamp"] = to_isoformat(event["timestamp"])
expected_events.append(expected_event)

assert json_data[0] == expected_events[0]
assert json_data == expected_events

Expand Down
8 changes: 3 additions & 5 deletions posthog/temporal/workflows/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,12 @@ def iter_batch_records(batch) -> typing.Generator[dict[str, typing.Any], None, N
elements = json.dumps(record.get("elements_chain").decode())

record = {
"created_at": record.get("created_at").strftime("%Y-%m-%d %H:%M:%S.%f"),
"created_at": record.get("created_at").isoformat(),
"distinct_id": record.get("distinct_id").decode(),
"elements": elements,
"elements_chain": record.get("elements_chain").decode(),
"event": record.get("event").decode(),
"inserted_at": record.get("inserted_at").strftime("%Y-%m-%d %H:%M:%S.%f")
if record.get("inserted_at")
else None,
"inserted_at": record.get("inserted_at").isoformat() if record.get("inserted_at") else None,
"ip": properties.get("$ip", None) if properties else None,
"person_id": record.get("person_id").decode(),
"person_properties": json.loads(person_properties) if person_properties else None,
Expand All @@ -159,7 +157,7 @@ def iter_batch_records(batch) -> typing.Generator[dict[str, typing.Any], None, N
"properties": properties,
"site_url": properties.get("$current_url", None) if properties else None,
"team_id": record.get("team_id"),
"timestamp": record.get("timestamp").strftime("%Y-%m-%d %H:%M:%S.%f"),
"timestamp": record.get("timestamp").isoformat(),
"uuid": record.get("uuid").decode(),
}

Expand Down