Skip to content

Commit

Permalink
chore: Clean-up unused function and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Jun 18, 2024
1 parent f1db285 commit 022a633
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 208 deletions.
58 changes: 0 additions & 58 deletions posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,64 +77,6 @@ def get_timestamp_field(is_backfill: bool) -> str:
return timestamp_field


async def get_rows_count(
client: ClickHouseClient,
team_id: int,
interval_start: str,
interval_end: str,
exclude_events: collections.abc.Iterable[str] | None = None,
include_events: collections.abc.Iterable[str] | None = None,
is_backfill: bool = False,
) -> int:
"""Return a count of rows to be batch exported."""
data_interval_start_ch = dt.datetime.fromisoformat(interval_start).strftime("%Y-%m-%d %H:%M:%S")
data_interval_end_ch = dt.datetime.fromisoformat(interval_end).strftime("%Y-%m-%d %H:%M:%S")

if exclude_events:
exclude_events_statement = "AND event NOT IN {exclude_events}"
events_to_exclude_tuple = tuple(exclude_events)
else:
exclude_events_statement = ""
events_to_exclude_tuple = ()

if include_events:
include_events_statement = "AND event IN {include_events}"
events_to_include_tuple = tuple(include_events)
else:
include_events_statement = ""
events_to_include_tuple = ()

timestamp_field = get_timestamp_field(is_backfill)
timestamp_predicates = get_timestamp_predicates_for_team(team_id, is_backfill)

query = SELECT_QUERY_TEMPLATE.substitute(
fields="count(DISTINCT event, cityHash64(distinct_id), cityHash64(uuid)) as count",
order_by="",
format="",
distinct="",
timestamp_field=timestamp_field,
timestamp=timestamp_predicates,
exclude_events=exclude_events_statement,
include_events=include_events_statement,
)

count = await client.read_query(
query,
query_parameters={
"team_id": team_id,
"data_interval_start": data_interval_start_ch,
"data_interval_end": data_interval_end_ch,
"exclude_events": events_to_exclude_tuple,
"include_events": events_to_include_tuple,
},
)

if count is None or len(count) == 0:
raise ValueError("Unexpected result from ClickHouse: `None` returned for count query")

return int(count)


def default_fields() -> list[BatchExportField]:
"""Return list of default batch export Fields."""
return [
Expand Down
150 changes: 0 additions & 150 deletions posthog/temporal/tests/batch_exports/test_batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,163 +8,13 @@

from posthog.temporal.batch_exports.batch_exports import (
get_data_interval,
get_rows_count,
iter_records,
)
from posthog.temporal.tests.utils.events import generate_test_events_in_clickhouse

pytestmark = [pytest.mark.asyncio, pytest.mark.django_db]


async def test_get_rows_count(clickhouse_client):
"""Test the count of rows returned by get_rows_count."""
team_id = randint(1, 1000000)
data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:31:00.000000+00:00")
data_interval_start = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00")

_ = await generate_test_events_in_clickhouse(
client=clickhouse_client,
team_id=team_id,
start_time=data_interval_start,
end_time=data_interval_end,
count=10000,
count_outside_range=0,
count_other_team=0,
duplicate=False,
)

row_count = await get_rows_count(
clickhouse_client, team_id, data_interval_start.isoformat(), data_interval_end.isoformat()
)
assert row_count == 10000


async def test_get_rows_count_handles_duplicates(clickhouse_client):
"""Test the count of rows returned by get_rows_count are de-duplicated."""
team_id = randint(1, 1000000)

data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:31:00.000000+00:00")
data_interval_start = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00")

_ = await generate_test_events_in_clickhouse(
client=clickhouse_client,
team_id=team_id,
start_time=data_interval_start,
end_time=data_interval_end,
count=10,
count_outside_range=0,
count_other_team=0,
duplicate=True,
)

row_count = await get_rows_count(
clickhouse_client, team_id, data_interval_start.isoformat(), data_interval_end.isoformat()
)
assert row_count == 10


async def test_get_rows_count_can_exclude_events(clickhouse_client):
"""Test the count of rows returned by get_rows_count can exclude events."""
team_id = randint(1, 1000000)

data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:31:00.000000+00:00")
data_interval_start = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00")

(events, _, _) = await generate_test_events_in_clickhouse(
client=clickhouse_client,
team_id=team_id,
start_time=data_interval_start,
end_time=data_interval_end,
count=10000,
count_outside_range=0,
count_other_team=0,
duplicate=False,
)

# Exclude the latter half of events.
exclude_events = (event["event"] for event in events[5000:])
row_count = await get_rows_count(
clickhouse_client,
team_id,
data_interval_start.isoformat(),
data_interval_end.isoformat(),
exclude_events=exclude_events,
)
assert row_count == 5000


async def test_get_rows_count_can_include_events(clickhouse_client):
"""Test the count of rows returned by get_rows_count can include events."""
team_id = randint(1, 1000000)

data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:31:00.000000+00:00")
data_interval_start = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00")

(events, _, _) = await generate_test_events_in_clickhouse(
client=clickhouse_client,
team_id=team_id,
start_time=data_interval_start,
end_time=data_interval_end,
count=5000,
count_outside_range=0,
count_other_team=0,
duplicate=False,
)

# Include the latter half of events.
include_events = (event["event"] for event in events[2500:])
row_count = await get_rows_count(
clickhouse_client,
team_id,
data_interval_start.isoformat(),
data_interval_end.isoformat(),
include_events=include_events,
)
assert row_count == 2500


async def test_get_rows_count_ignores_timestamp_predicates(clickhouse_client):
"""Test the count of rows returned by get_rows_count can ignore timestamp predicates."""
team_id = randint(1, 1000000)

inserted_at = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00")
data_interval_end = inserted_at + dt.timedelta(hours=1)

# Insert some data with timestamps a couple of years before inserted_at
timestamp_start = inserted_at - dt.timedelta(hours=24 * 365 * 2)
timestamp_end = inserted_at - dt.timedelta(hours=24 * 365)

await generate_test_events_in_clickhouse(
client=clickhouse_client,
team_id=team_id,
start_time=timestamp_start,
end_time=timestamp_end,
count=10,
count_outside_range=0,
count_other_team=0,
duplicate=False,
inserted_at=inserted_at,
)

row_count = await get_rows_count(
clickhouse_client,
team_id,
inserted_at.isoformat(),
data_interval_end.isoformat(),
)
# All events are outside timestamp bounds (a year difference with inserted_at)
assert row_count == 0

with override_settings(UNCONSTRAINED_TIMESTAMP_TEAM_IDS=[str(team_id)]):
row_count = await get_rows_count(
clickhouse_client,
team_id,
inserted_at.isoformat(),
data_interval_end.isoformat(),
)
assert row_count == 10


def assert_records_match_events(records, events):
"""Compare records returned from ClickHouse to events inserted into ClickHouse.
Expand Down

0 comments on commit 022a633

Please sign in to comment.