Skip to content

Commit

Permalink
fix: Add setting to unset timestamp predicates (#18611)
Browse files Browse the repository at this point in the history
* fix: Add setting to unset timestamp predicates

* test: Add tests

* fix: Formatting

* fix: Check for str team_id

* fix: Test against string team_ids
  • Loading branch information
tomasfarias authored Nov 14, 2023
1 parent 96dd0d2 commit 0fcd386
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 6 deletions.
5 changes: 5 additions & 0 deletions posthog/settings/temporal.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os

from posthog.settings.utils import get_list

TEMPORAL_NAMESPACE = os.getenv("TEMPORAL_NAMESPACE", "default")
TEMPORAL_TASK_QUEUE = os.getenv("TEMPORAL_TASK_QUEUE", "no-sandbox-python-django")
TEMPORAL_HOST = os.getenv("TEMPORAL_HOST", "127.0.0.1")
Expand All @@ -14,3 +16,6 @@
BATCH_EXPORT_SNOWFLAKE_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 100 # 100MB
BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 50 # 50MB
BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 100 # 100MB


UNCONSTRAINED_TIMESTAMP_TEAM_IDS = get_list(os.getenv("UNCONSTRAINED_TIMESTAMP_TEAM_IDS", ""))
109 changes: 109 additions & 0 deletions posthog/temporal/tests/batch_exports/test_batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from random import randint

import pytest
from django.test import override_settings

from posthog.temporal.tests.utils.datetimes import (
to_isoformat,
Expand Down Expand Up @@ -129,6 +130,48 @@ async def test_get_rows_count_can_include_events(clickhouse_client):
assert row_count == 2500


async def test_get_rows_count_ignores_timestamp_predicates(clickhouse_client):
"""Test the count of rows returned by get_rows_count can ignore timestamp predicates."""
team_id = randint(1, 1000000)

inserted_at = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00")
data_interval_end = inserted_at + dt.timedelta(hours=1)

# Insert some data with timestamps a couple of years before inserted_at
timestamp_start = inserted_at - dt.timedelta(hours=24 * 365 * 2)
timestamp_end = inserted_at - dt.timedelta(hours=24 * 365)

await generate_test_events_in_clickhouse(
client=clickhouse_client,
team_id=team_id,
start_time=timestamp_start,
end_time=timestamp_end,
count=10,
count_outside_range=0,
count_other_team=0,
duplicate=False,
inserted_at=inserted_at,
)

row_count = await get_rows_count(
clickhouse_client,
team_id,
inserted_at.isoformat(),
data_interval_end.isoformat(),
)
# All events are outside timestamp bounds (a year difference with inserted_at)
assert row_count == 0

with override_settings(UNCONSTRAINED_TIMESTAMP_TEAM_IDS=[str(team_id)]):
row_count = await get_rows_count(
clickhouse_client,
team_id,
inserted_at.isoformat(),
data_interval_end.isoformat(),
)
assert row_count == 10


@pytest.mark.parametrize("include_person_properties", (False, True))
async def test_get_results_iterator(clickhouse_client, include_person_properties):
"""Test the rows returned by get_results_iterator."""
Expand Down Expand Up @@ -326,6 +369,72 @@ async def test_get_results_iterator_can_include_events(clickhouse_client, includ
assert value == expected_value, f"{key} value in {result} didn't match value in {expected}"


@pytest.mark.parametrize("include_person_properties", (False, True))
async def test_get_results_iterator_ignores_timestamp_predicates(clickhouse_client, include_person_properties):
"""Test the rows returned by get_results_iterator ignores timestamp predicates when configured."""
team_id = randint(1, 1000000)

inserted_at = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00")
data_interval_end = inserted_at + dt.timedelta(hours=1)

# Insert some data with timestamps a couple of years before inserted_at
timestamp_start = inserted_at - dt.timedelta(hours=24 * 365 * 2)
timestamp_end = inserted_at - dt.timedelta(hours=24 * 365)

(events, _, _) = await generate_test_events_in_clickhouse(
client=clickhouse_client,
team_id=team_id,
start_time=timestamp_start,
end_time=timestamp_end,
count=10,
count_outside_range=0,
count_other_team=0,
duplicate=True,
person_properties={"$browser": "Chrome", "$os": "Mac OS X"},
inserted_at=inserted_at,
)

iter_ = get_results_iterator(
clickhouse_client,
team_id,
inserted_at.isoformat(),
data_interval_end.isoformat(),
include_person_properties=include_person_properties,
)
rows = [row for row in iter_]

assert len(rows) == 0

with override_settings(UNCONSTRAINED_TIMESTAMP_TEAM_IDS=[str(team_id)]):
iter_ = get_results_iterator(
clickhouse_client,
team_id,
inserted_at.isoformat(),
data_interval_end.isoformat(),
include_person_properties=include_person_properties,
)
rows = [row for row in iter_]

all_expected = sorted(events, key=operator.itemgetter("event"))
all_result = sorted(rows, key=operator.itemgetter("event"))

assert len(all_expected) == len(all_result)
assert len([row["uuid"] for row in all_result]) == len(set(row["uuid"] for row in all_result))

for expected, result in zip(all_expected, all_result):
for key, value in result.items():
if key == "person_properties" and not include_person_properties:
continue

if key in ("timestamp", "inserted_at", "created_at"):
expected_value = to_isoformat(expected[key])
else:
expected_value = expected[key]

# Some keys will be missing from result, so let's only check the ones we have.
assert value == expected_value, f"{key} value in {result} didn't match value in {expected}"


@pytest.mark.parametrize(
"interval,data_interval_end,expected",
[
Expand Down
27 changes: 21 additions & 6 deletions posthog/temporal/workflows/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import brotli
from asgiref.sync import sync_to_async
from django.conf import settings
from temporalio import activity, exceptions, workflow
from temporalio.common import RetryPolicy

Expand All @@ -28,21 +29,25 @@
SELECT $fields
FROM events
WHERE
-- These 'timestamp' checks are a heuristic to exploit the sort key.
-- Ideally, we need a schema that serves our needs, i.e. with a sort key on the _timestamp field used for batch exports.
-- As a side-effect, this heuristic will discard historical loads older than 2 days.
timestamp >= toDateTime64({data_interval_start}, 6, 'UTC') - INTERVAL 2 DAY
AND timestamp < toDateTime64({data_interval_end}, 6, 'UTC') + INTERVAL 1 DAY
AND COALESCE(inserted_at, _timestamp) >= toDateTime64({data_interval_start}, 6, 'UTC')
COALESCE(inserted_at, _timestamp) >= toDateTime64({data_interval_start}, 6, 'UTC')
AND COALESCE(inserted_at, _timestamp) < toDateTime64({data_interval_end}, 6, 'UTC')
AND team_id = {team_id}
$timestamp
$exclude_events
$include_events
$order_by
$format
"""
)

TIMESTAMP_PREDICATES = """
-- These 'timestamp' checks are a heuristic to exploit the sort key.
-- Ideally, we need a schema that serves our needs, i.e. with a sort key on the _timestamp field used for batch exports.
-- As a side-effect, this heuristic will discard historical loads older than a day.
AND timestamp >= toDateTime64({data_interval_start}, 6, 'UTC') - INTERVAL 2 DAY
AND timestamp < toDateTime64({data_interval_end}, 6, 'UTC') + INTERVAL 1 DAY
"""


async def get_rows_count(
client,
Expand All @@ -69,10 +74,15 @@ async def get_rows_count(
include_events_statement = ""
events_to_include_tuple = ()

timestamp_predicates = TIMESTAMP_PREDICATES
if str(team_id) in settings.UNCONSTRAINED_TIMESTAMP_TEAM_IDS:
timestamp_predicates = ""

query = SELECT_QUERY_TEMPLATE.substitute(
fields="count(DISTINCT event, cityHash64(distinct_id), cityHash64(uuid)) as count",
order_by="",
format="",
timestamp=timestamp_predicates,
exclude_events=exclude_events_statement,
include_events=include_events_statement,
)
Expand Down Expand Up @@ -154,10 +164,15 @@ def get_results_iterator(
include_events_statement = ""
events_to_include_tuple = ()

timestamp_predicates = TIMESTAMP_PREDICATES
if str(team_id) in settings.UNCONSTRAINED_TIMESTAMP_TEAM_IDS:
timestamp_predicates = ""

query = SELECT_QUERY_TEMPLATE.substitute(
fields=S3_FIELDS if include_person_properties else FIELDS,
order_by="ORDER BY inserted_at",
format="FORMAT ArrowStream",
timestamp=timestamp_predicates,
exclude_events=exclude_events_statement,
include_events=include_events_statement,
)
Expand Down

0 comments on commit 0fcd386

Please sign in to comment.