Skip to content

Commit

Permalink
refactor: BatchExport tests for S3
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Sep 22, 2023
1 parent c1b8e2d commit f860144
Show file tree
Hide file tree
Showing 5 changed files with 459 additions and 773 deletions.
143 changes: 141 additions & 2 deletions posthog/temporal/tests/batch_exports/base.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
import datetime as dt
import json
import random
import typing
import uuid

from asgiref.sync import sync_to_async
from temporalio.client import Client

from ee.clickhouse.materialized_columns.columns import materialize
from posthog.batch_exports.models import (
BatchExport,
BatchExportDestination,
BatchExportRun,
)
from posthog.batch_exports.service import sync_batch_export
from posthog.temporal.workflows.clickhouse import ClickHouseClient


Expand All @@ -25,8 +34,99 @@ class EventValues(typing.TypedDict):
uuid: str


async def insert_events(client: ClickHouseClient, events: list[EventValues]):
def date_range(start: dt.datetime, stop: dt.datetime, step: dt.timedelta):
"""Generate a range of dates between two dates."""
current = start

while current < stop:
yield current
current += step


async def insert_events(
client: ClickHouseClient,
team,
start_time,
end_time,
n: int = 100,
n_outside_range: int = 10,
n_other_team: int = 10,
override_values: dict | None = None,
duplicate: bool = False,
) -> tuple[list[EventValues], list[EventValues], list[EventValues]]:
"""Insert some events into the sharded_events table."""
possible_datetimes = list(date_range(start_time, end_time, dt.timedelta(minutes=1)))
if override_values is None:
override_dict = {}
else:
override_dict = override_values

properties = {"$browser": "Chrome", "$os": "Mac OS X", "super-property": "super-value"}

events: list[EventValues] = [
{
"uuid": str(uuid.uuid4()),
"event": f"test-{i}",
"timestamp": random.choice(possible_datetimes).strftime("%Y-%m-%d %H:%M:%S.%f"),
"created_at": random.choice(possible_datetimes).strftime("%Y-%m-%d %H:%M:%S.%f"),
"inserted_at": override_dict.get(
"inserted_at", random.choice(possible_datetimes).strftime("%Y-%m-%d %H:%M:%S.%f")
),
"_timestamp": random.choice(possible_datetimes).strftime("%Y-%m-%d %H:%M:%S"),
"person_id": str(uuid.uuid4()),
"person_properties": override_dict.get("properties", properties),
"team_id": team.pk,
"properties": override_dict.get("properties", properties),
"distinct_id": str(uuid.uuid4()),
"elements_chain": "this is a comman, separated, list, of css selectors(?)",
}
for i in range(n)
]

duplicate_events = []
if duplicate is True:
duplicate_events = events

delta = (end_time - start_time) + dt.timedelta(hours=1)
events_outside_range: list[EventValues] = [
{
"uuid": str(uuid.uuid4()),
"event": f"test-{i}",
"timestamp": (random.choice(possible_datetimes) + delta).strftime("%Y-%m-%d %H:%M:%S.%f"),
"created_at": (random.choice(possible_datetimes) + delta).strftime("%Y-%m-%d %H:%M:%S.%f"),
"inserted_at": override_dict.get(
"inserted_at", (random.choice(possible_datetimes) + delta).strftime("%Y-%m-%d %H:%M:%S.%f")
),
"_timestamp": (random.choice(possible_datetimes) + delta).strftime("%Y-%m-%d %H:%M:%S"),
"person_id": str(uuid.uuid4()),
"person_properties": override_dict.get("properties", properties),
"team_id": team.pk,
"properties": override_dict.get("properties", properties),
"distinct_id": str(uuid.uuid4()),
"elements_chain": "this is a comman, separated, list, of css selectors(?)",
}
for i in range(n_outside_range)
]

events_from_other_team: list[EventValues] = [
{
"uuid": str(uuid.uuid4()),
"event": f"test-{i}",
"timestamp": random.choice(possible_datetimes).strftime("%Y-%m-%d %H:%M:%S.%f"),
"created_at": random.choice(possible_datetimes).strftime("%Y-%m-%d %H:%M:%S.%f"),
"inserted_at": override_dict.get(
"inserted_at", random.choice(possible_datetimes).strftime("%Y-%m-%d %H:%M:%S.%f")
),
"_timestamp": random.choice(possible_datetimes).strftime("%Y-%m-%d %H:%M:%S"),
"person_id": str(uuid.uuid4()),
"person_properties": override_dict.get("properties", properties),
"team_id": team.pk + 1,
"properties": override_dict.get("properties", properties),
"distinct_id": str(uuid.uuid4()),
"elements_chain": "this is a comman, separated, list, of css selectors(?)",
}
for i in range(n_other_team)
]

await client.execute_query(
f"""
Expand Down Expand Up @@ -63,10 +163,12 @@ async def insert_events(client: ClickHouseClient, events: list[EventValues]):
if isinstance(event["person_properties"], dict)
else event["person_properties"],
)
for event in events
for event in events + events_outside_range + events_from_other_team + duplicate_events
],
)

return (events, events_outside_range, events_from_other_team)


@sync_to_async
def amaterialize(table: typing.Literal["events", "person", "groups"], column: str):
Expand All @@ -79,3 +181,40 @@ def to_isoformat(d: str | None) -> str | None:
if d is None:
return None
return dt.datetime.fromisoformat(d).replace(tzinfo=dt.timezone.utc).isoformat()


def create_batch_export(team_id: int, interval: str, name: str, destination_data: dict) -> BatchExport:
"""Create a BatchExport and its underlying Schedule."""

destination = BatchExportDestination(**destination_data)
batch_export = BatchExport(team_id=team_id, destination=destination, interval=interval, name=name)

sync_batch_export(batch_export, created=True)

destination.save()
batch_export.save()

return batch_export


async def acreate_batch_export(team_id: int, interval: str, name: str, destination_data: dict) -> BatchExport:
"""Create a BatchExport and its underlying Schedule."""
return await sync_to_async(create_batch_export)(team_id, interval, name, destination_data) # type: ignore


def fetch_batch_export_runs(batch_export_id: uuid.UUID, limit: int = 100) -> list[BatchExportRun]:
"""Fetch the BatchExportRuns for a given BatchExport."""
return list(BatchExportRun.objects.filter(batch_export_id=batch_export_id).order_by("-created_at")[:limit])


async def afetch_batch_export_runs(batch_export_id: uuid.UUID, limit: int = 100) -> list[BatchExportRun]:
"""Fetch the BatchExportRuns for a given BatchExport."""
return await sync_to_async(fetch_batch_export_runs)(batch_export_id, limit) # type: ignore


async def adelete_batch_export(batch_export: BatchExport, temporal: Client) -> None:
"""Delete a BatchExport and its underlying Schedule."""
handle = temporal.get_schedule_handle(str(batch_export.id))
await handle.delete()

await sync_to_async(batch_export.delete)() # type: ignore
48 changes: 0 additions & 48 deletions posthog/temporal/tests/batch_exports/fixtures.py

This file was deleted.

Loading

0 comments on commit f860144

Please sign in to comment.