diff --git a/posthog/settings/temporal.py b/posthog/settings/temporal.py index 83a01906cc609..744949c52e4f1 100644 --- a/posthog/settings/temporal.py +++ b/posthog/settings/temporal.py @@ -1,5 +1,7 @@ import os +from posthog.settings.utils import get_list + TEMPORAL_NAMESPACE = os.getenv("TEMPORAL_NAMESPACE", "default") TEMPORAL_TASK_QUEUE = os.getenv("TEMPORAL_TASK_QUEUE", "no-sandbox-python-django") TEMPORAL_HOST = os.getenv("TEMPORAL_HOST", "127.0.0.1") @@ -14,3 +16,6 @@ BATCH_EXPORT_SNOWFLAKE_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 100 # 100MB BATCH_EXPORT_POSTGRES_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 50 # 50MB BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES = 1024 * 1024 * 100 # 100MB + + +UNCONSTRAINED_TIMESTAMP_TEAM_IDS = get_list(os.getenv("UNCONSTRAINED_TIMESTAMP_TEAM_IDS", "")) diff --git a/posthog/temporal/tests/batch_exports/test_batch_exports.py b/posthog/temporal/tests/batch_exports/test_batch_exports.py index 097f13869d2eb..2f31ac871f60e 100644 --- a/posthog/temporal/tests/batch_exports/test_batch_exports.py +++ b/posthog/temporal/tests/batch_exports/test_batch_exports.py @@ -6,6 +6,7 @@ from random import randint import pytest +from django.test import override_settings from posthog.temporal.tests.utils.datetimes import ( to_isoformat, @@ -129,6 +130,48 @@ async def test_get_rows_count_can_include_events(clickhouse_client): assert row_count == 2500 +async def test_get_rows_count_ignores_timestamp_predicates(clickhouse_client): + """Test the count of rows returned by get_rows_count can ignore timestamp predicates.""" + team_id = randint(1, 1000000) + + inserted_at = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") + data_interval_end = inserted_at + dt.timedelta(hours=1) + + # Insert some data with timestamps a couple of years before inserted_at + timestamp_start = inserted_at - dt.timedelta(hours=24 * 365 * 2) + timestamp_end = inserted_at - dt.timedelta(hours=24 * 365) + + await generate_test_events_in_clickhouse( + client=clickhouse_client, + team_id=team_id, + start_time=timestamp_start, + end_time=timestamp_end, + count=10, + count_outside_range=0, + count_other_team=0, + duplicate=False, + inserted_at=inserted_at, + ) + + row_count = await get_rows_count( + clickhouse_client, + team_id, + inserted_at.isoformat(), + data_interval_end.isoformat(), + ) + # All events are outside timestamp bounds (a year difference with inserted_at) + assert row_count == 0 + + with override_settings(UNCONSTRAINED_TIMESTAMP_TEAM_IDS=[str(team_id)]): + row_count = await get_rows_count( + clickhouse_client, + team_id, + inserted_at.isoformat(), + data_interval_end.isoformat(), + ) + assert row_count == 10 + + @pytest.mark.parametrize("include_person_properties", (False, True)) async def test_get_results_iterator(clickhouse_client, include_person_properties): """Test the rows returned by get_results_iterator.""" @@ -326,6 +369,72 @@ async def test_get_results_iterator_can_include_events(clickhouse_client, includ assert value == expected_value, f"{key} value in {result} didn't match value in {expected}" +@pytest.mark.parametrize("include_person_properties", (False, True)) +async def test_get_results_iterator_ignores_timestamp_predicates(clickhouse_client, include_person_properties): + """Test the rows returned by get_results_iterator ignores timestamp predicates when configured.""" + team_id = randint(1, 1000000) + + inserted_at = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") + data_interval_end = inserted_at + dt.timedelta(hours=1) + + # Insert some data with timestamps a couple of years before inserted_at + timestamp_start = inserted_at - dt.timedelta(hours=24 * 365 * 2) + timestamp_end = inserted_at - dt.timedelta(hours=24 * 365) + + (events, _, _) = await generate_test_events_in_clickhouse( + client=clickhouse_client, + team_id=team_id, + start_time=timestamp_start, + end_time=timestamp_end, + count=10, + count_outside_range=0, + count_other_team=0, + duplicate=True, + person_properties={"$browser": "Chrome", "$os": "Mac OS X"}, + inserted_at=inserted_at, + ) + + iter_ = get_results_iterator( + clickhouse_client, + team_id, + inserted_at.isoformat(), + data_interval_end.isoformat(), + include_person_properties=include_person_properties, + ) + rows = [row for row in iter_] + + assert len(rows) == 0 + + with override_settings(UNCONSTRAINED_TIMESTAMP_TEAM_IDS=[str(team_id)]): + iter_ = get_results_iterator( + clickhouse_client, + team_id, + inserted_at.isoformat(), + data_interval_end.isoformat(), + include_person_properties=include_person_properties, + ) + rows = [row for row in iter_] + + all_expected = sorted(events, key=operator.itemgetter("event")) + all_result = sorted(rows, key=operator.itemgetter("event")) + + assert len(all_expected) == len(all_result) + assert len([row["uuid"] for row in all_result]) == len(set(row["uuid"] for row in all_result)) + + for expected, result in zip(all_expected, all_result): + for key, value in result.items(): + if key == "person_properties" and not include_person_properties: + continue + + if key in ("timestamp", "inserted_at", "created_at"): + expected_value = to_isoformat(expected[key]) + else: + expected_value = expected[key] + + # Some keys will be missing from result, so let's only check the ones we have. + assert value == expected_value, f"{key} value in {result} didn't match value in {expected}" + + @pytest.mark.parametrize( "interval,data_interval_end,expected", [ diff --git a/posthog/temporal/workflows/batch_exports.py b/posthog/temporal/workflows/batch_exports.py index b003578a575ce..5b71b87550892 100644 --- a/posthog/temporal/workflows/batch_exports.py +++ b/posthog/temporal/workflows/batch_exports.py @@ -11,6 +11,7 @@ import brotli from asgiref.sync import sync_to_async +from django.conf import settings from temporalio import activity, exceptions, workflow from temporalio.common import RetryPolicy @@ -28,14 +29,10 @@ SELECT $fields FROM events WHERE - -- These 'timestamp' checks are a heuristic to exploit the sort key. - -- Ideally, we need a schema that serves our needs, i.e. with a sort key on the _timestamp field used for batch exports. - -- As a side-effect, this heuristic will discard historical loads older than 2 days. - timestamp >= toDateTime64({data_interval_start}, 6, 'UTC') - INTERVAL 2 DAY - AND timestamp < toDateTime64({data_interval_end}, 6, 'UTC') + INTERVAL 1 DAY - AND COALESCE(inserted_at, _timestamp) >= toDateTime64({data_interval_start}, 6, 'UTC') + COALESCE(inserted_at, _timestamp) >= toDateTime64({data_interval_start}, 6, 'UTC') AND COALESCE(inserted_at, _timestamp) < toDateTime64({data_interval_end}, 6, 'UTC') AND team_id = {team_id} + $timestamp $exclude_events $include_events $order_by @@ -43,6 +40,14 @@ """ ) +TIMESTAMP_PREDICATES = """ +-- These 'timestamp' checks are a heuristic to exploit the sort key. +-- Ideally, we need a schema that serves our needs, i.e. with a sort key on the _timestamp field used for batch exports. +-- As a side-effect, this heuristic will discard historical loads older than a day. +AND timestamp >= toDateTime64({data_interval_start}, 6, 'UTC') - INTERVAL 2 DAY +AND timestamp < toDateTime64({data_interval_end}, 6, 'UTC') + INTERVAL 1 DAY +""" + async def get_rows_count( client, @@ -69,10 +74,15 @@ async def get_rows_count( include_events_statement = "" events_to_include_tuple = () + timestamp_predicates = TIMESTAMP_PREDICATES + if str(team_id) in settings.UNCONSTRAINED_TIMESTAMP_TEAM_IDS: + timestamp_predicates = "" + query = SELECT_QUERY_TEMPLATE.substitute( fields="count(DISTINCT event, cityHash64(distinct_id), cityHash64(uuid)) as count", order_by="", format="", + timestamp=timestamp_predicates, exclude_events=exclude_events_statement, include_events=include_events_statement, ) @@ -154,10 +164,15 @@ def get_results_iterator( include_events_statement = "" events_to_include_tuple = () + timestamp_predicates = TIMESTAMP_PREDICATES + if str(team_id) in settings.UNCONSTRAINED_TIMESTAMP_TEAM_IDS: + timestamp_predicates = "" + query = SELECT_QUERY_TEMPLATE.substitute( fields=S3_FIELDS if include_person_properties else FIELDS, order_by="ORDER BY inserted_at", format="FORMAT ArrowStream", + timestamp=timestamp_predicates, exclude_events=exclude_events_statement, include_events=include_events_statement, )