From 1f8485303b3607521586eac205b3d9c6112fc460 Mon Sep 17 00:00:00 2001 From: Ross Gray Date: Thu, 16 Jan 2025 14:06:04 +0000 Subject: [PATCH] Use distributed_events_recent table --- posthog/batch_exports/sql.py | 32 +++++++- ...h_exports_distributed_recent_event_view.py | 9 +++ posthog/settings/temporal.py | 7 +- .../temporal/batch_exports/batch_exports.py | 57 ++++++++++++- posthog/temporal/batch_exports/spmc.py | 30 ++++--- posthog/temporal/batch_exports/sql.py | 22 +++++ .../temporal/tests/batch_exports/conftest.py | 17 +++- .../test_s3_batch_export_workflow.py | 80 +++++++++++++++++++ .../temporal/tests/batch_exports/test_spmc.py | 36 +-------- 9 files changed, 240 insertions(+), 50 deletions(-) create mode 100644 posthog/clickhouse/migrations/0096_create_batch_exports_distributed_recent_event_view.py diff --git a/posthog/batch_exports/sql.py b/posthog/batch_exports/sql.py index 9a7fd0cea95aa..1f60846cafc0b 100644 --- a/posthog/batch_exports/sql.py +++ b/posthog/batch_exports/sql.py @@ -289,6 +289,37 @@ ) """ +CREATE_EVENTS_BATCH_EXPORT_VIEW_RECENT_DISTRIBUTED = f""" +CREATE OR REPLACE VIEW events_batch_export_recent_distributed ON CLUSTER {settings.CLICKHOUSE_CLUSTER} AS ( + SELECT DISTINCT ON (team_id, event, cityHash64(distributed_events_recent.distinct_id), cityHash64(distributed_events_recent.uuid)) + team_id AS team_id, + timestamp AS timestamp, + event AS event, + distinct_id AS distinct_id, + toString(uuid) AS uuid, + inserted_at AS _inserted_at, + created_at AS created_at, + elements_chain AS elements_chain, + toString(person_id) AS person_id, + nullIf(properties, '') AS properties, + nullIf(person_properties, '') AS person_properties, + nullIf(JSONExtractString(properties, '$set'), '') AS set, + nullIf(JSONExtractString(properties, '$set_once'), '') AS set_once + FROM + distributed_events_recent + PREWHERE + distributed_events_recent.inserted_at >= {{interval_start:DateTime64}} + AND distributed_events_recent.inserted_at < {{interval_end:DateTime64}} + WHERE + team_id = {{team_id:Int64}} + AND (length({{include_events:Array(String)}}) = 0 OR event IN {{include_events:Array(String)}}) + AND (length({{exclude_events:Array(String)}}) = 0 OR event NOT IN {{exclude_events:Array(String)}}) + ORDER BY + _inserted_at, event + SETTINGS optimize_aggregation_in_order=1 +) +""" + CREATE_EVENTS_BATCH_EXPORT_VIEW_BACKFILL = f""" CREATE OR REPLACE VIEW events_batch_export_backfill ON CLUSTER {settings.CLICKHOUSE_CLUSTER} AS ( SELECT DISTINCT ON (team_id, event, cityHash64(events.distinct_id), cityHash64(events.uuid)) @@ -319,7 +350,6 @@ ) """ -# TODO: is this the best query to use? EVENT_COUNT_BY_INTERVAL = """ SELECT toStartOfInterval(_inserted_at, INTERVAL {interval}) AS interval_start, diff --git a/posthog/clickhouse/migrations/0096_create_batch_exports_distributed_recent_event_view.py b/posthog/clickhouse/migrations/0096_create_batch_exports_distributed_recent_event_view.py new file mode 100644 index 0000000000000..416f9e51904cf --- /dev/null +++ b/posthog/clickhouse/migrations/0096_create_batch_exports_distributed_recent_event_view.py @@ -0,0 +1,9 @@ +from posthog.batch_exports.sql import CREATE_EVENTS_BATCH_EXPORT_VIEW_RECENT_DISTRIBUTED +from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions + +operations = map( + run_sql_with_exceptions, + [ + CREATE_EVENTS_BATCH_EXPORT_VIEW_RECENT_DISTRIBUTED, + ], +) diff --git a/posthog/settings/temporal.py b/posthog/settings/temporal.py index 60740693eac88..4157b2ce24edd 100644 --- a/posthog/settings/temporal.py +++ b/posthog/settings/temporal.py @@ -66,6 +66,9 @@ ) CLICKHOUSE_OFFLINE_5MIN_CLUSTER_HOST: str | None = os.getenv("CLICKHOUSE_OFFLINE_5MIN_CLUSTER_HOST", None) -# What percentage of teams should use events_recent for batch exports (should be a value from 0 to 1 and we only support increments of 0.1) +# What percentage of teams should use distributed_events_recent for batch exports (should be a value from 0 to 1 and we +# only support increments of 0.1) # TODO - remove this once migration is complete -BATCH_EXPORT_EVENTS_RECENT_ROLLOUT: float = get_from_env("BATCH_EXPORT_EVENTS_RECENT_ROLLOUT", 0.0, type_cast=float) +BATCH_EXPORT_DISTRIBUTED_EVENTS_RECENT_ROLLOUT: float = get_from_env( + "BATCH_EXPORT_DISTRIBUTED_EVENTS_RECENT_ROLLOUT", 0.0, type_cast=float +) diff --git a/posthog/temporal/batch_exports/batch_exports.py b/posthog/temporal/batch_exports/batch_exports.py index 0458363a9a394..233132db996dc 100644 --- a/posthog/temporal/batch_exports/batch_exports.py +++ b/posthog/temporal/batch_exports/batch_exports.py @@ -199,6 +199,27 @@ """ ) +SELECT_FROM_EVENTS_VIEW_RECENT_DISTRIBUTED = Template( + """ +SELECT + $fields +FROM + events_batch_export_recent_distributed( + team_id={team_id}, + interval_start={interval_start}, + interval_end={interval_end}, + include_events={include_events}::Array(String), + exclude_events={exclude_events}::Array(String) + ) AS events +FORMAT ArrowStream +SETTINGS + -- This is half of configured MAX_MEMORY_USAGE for batch exports. + max_bytes_before_external_sort=50000000000, + max_replica_delay_for_distributed_queries=60, + fallback_to_stale_replicas_for_distributed_queries=0 +""" +) + SELECT_FROM_EVENTS_VIEW_BACKFILL = Template( """ SELECT @@ -348,8 +369,14 @@ async def iter_records_from_model_view( else: is_5_min_batch_export = False + # for 5 min batch exports we query the events_recent table, which is known to have zero replication lag, but + # may not be able to handle the load from all batch exports if is_5_min_batch_export and not is_backfill: query_template = SELECT_FROM_EVENTS_VIEW_RECENT + # for other batch exports that should use `events_recent` we use the `distributed_events_recent` table + # which is a distributed table that sits in front of the `events_recent` table + elif use_distributed_events_recent_table(is_backfill=is_backfill, team_id=team_id): + query_template = SELECT_FROM_EVENTS_VIEW_RECENT_DISTRIBUTED elif str(team_id) in settings.UNCONSTRAINED_TIMESTAMP_TEAM_IDS: query_template = SELECT_FROM_EVENTS_VIEW_UNBOUNDED elif is_backfill: @@ -449,6 +476,7 @@ def __init__(self, task: str): super().__init__(f"Expected task '{task}' to be done by now") +# TODO - not sure this is being used anymore? def start_produce_batch_export_record_batches( client: ClickHouseClient, model_name: str, @@ -510,9 +538,15 @@ def start_produce_batch_export_record_batches( else: is_5_min_batch_export = False + # for 5 min batch exports we query the events_recent table, which is known to have zero replication lag, but + # may not be able to handle the load from all batch exports if is_5_min_batch_export and not is_backfill: query_template = SELECT_FROM_EVENTS_VIEW_RECENT - if str(team_id) in settings.UNCONSTRAINED_TIMESTAMP_TEAM_IDS: + # for other batch exports that should use `events_recent` we use the `distributed_events_recent` table + # which is a distributed table that sits in front of the `events_recent` table + elif use_distributed_events_recent_table(is_backfill=is_backfill, team_id=team_id): + query_template = SELECT_FROM_EVENTS_VIEW_RECENT_DISTRIBUTED + elif str(team_id) in settings.UNCONSTRAINED_TIMESTAMP_TEAM_IDS: query_template = SELECT_FROM_EVENTS_VIEW_UNBOUNDED elif is_backfill: query_template = SELECT_FROM_EVENTS_VIEW_BACKFILL @@ -653,6 +687,21 @@ async def raise_on_produce_task_failure(produce_task: asyncio.Task) -> None: raise RecordBatchProducerError() from exc +def use_distributed_events_recent_table(is_backfill: bool, team_id: int) -> bool: + if is_backfill: + return False + + events_recent_rollout: float = settings.BATCH_EXPORT_DISTRIBUTED_EVENTS_RECENT_ROLLOUT + # sanity check + if events_recent_rollout < 0: + events_recent_rollout = 0 + elif events_recent_rollout > 1: + events_recent_rollout = 1 + + bucket = team_id % 10 + return bucket < events_recent_rollout * 10 + + def iter_records( client: ClickHouseClient, team_id: int, @@ -723,8 +772,14 @@ def iter_records( else: is_5_min_batch_export = False + # for 5 min batch exports we query the events_recent table, which is known to have zero replication lag, but + # may not be able to handle the load from all batch exports if is_5_min_batch_export and not is_backfill: query = SELECT_FROM_EVENTS_VIEW_RECENT + # for other batch exports that should use `events_recent` we use the `distributed_events_recent` table + # which is a distributed table that sits in front of the `events_recent` table + elif use_distributed_events_recent_table(is_backfill=is_backfill, team_id=team_id): + query = SELECT_FROM_EVENTS_VIEW_RECENT_DISTRIBUTED elif str(team_id) in settings.UNCONSTRAINED_TIMESTAMP_TEAM_IDS: query = SELECT_FROM_EVENTS_VIEW_UNBOUNDED elif is_backfill: diff --git a/posthog/temporal/batch_exports/spmc.py b/posthog/temporal/batch_exports/spmc.py index cd2383a9c9d66..02491a403f038 100644 --- a/posthog/temporal/batch_exports/spmc.py +++ b/posthog/temporal/batch_exports/spmc.py @@ -22,6 +22,7 @@ SELECT_FROM_EVENTS_VIEW, SELECT_FROM_EVENTS_VIEW_BACKFILL, SELECT_FROM_EVENTS_VIEW_RECENT, + SELECT_FROM_EVENTS_VIEW_RECENT_DISTRIBUTED, SELECT_FROM_EVENTS_VIEW_UNBOUNDED, SELECT_FROM_PERSONS_VIEW, SELECT_FROM_PERSONS_VIEW_BACKFILL, @@ -507,13 +508,11 @@ def is_5_min_batch_export(full_range: tuple[dt.datetime | None, dt.datetime]) -> return False -def use_events_recent(full_range: tuple[dt.datetime | None, dt.datetime], is_backfill: bool, team_id: int) -> bool: +def use_distributed_events_recent_table(is_backfill: bool, team_id: int) -> bool: if is_backfill: return False - if is_5_min_batch_export(full_range): - return True - events_recent_rollout: float = settings.BATCH_EXPORT_EVENTS_RECENT_ROLLOUT + events_recent_rollout: float = settings.BATCH_EXPORT_DISTRIBUTED_EVENTS_RECENT_ROLLOUT # sanity check if events_recent_rollout < 0: events_recent_rollout = 0 @@ -585,8 +584,14 @@ def start( else: parameters["include_events"] = [] - if use_events_recent(full_range=full_range, is_backfill=is_backfill, team_id=team_id): + # for 5 min batch exports we query the events_recent table, which is known to have zero replication lag, but + # may not be able to handle the load from all batch exports + if is_5_min_batch_export(full_range=full_range) and not is_backfill: query_template = SELECT_FROM_EVENTS_VIEW_RECENT + # for other batch exports that should use `events_recent` we use the `distributed_events_recent` table + # which is a distributed table that sits in front of the `events_recent` table + elif use_distributed_events_recent_table(is_backfill=is_backfill, team_id=team_id): + query_template = SELECT_FROM_EVENTS_VIEW_RECENT_DISTRIBUTED elif str(team_id) in settings.UNCONSTRAINED_TIMESTAMP_TEAM_IDS: query_template = SELECT_FROM_EVENTS_VIEW_UNBOUNDED elif is_backfill: @@ -660,14 +665,19 @@ async def produce_batch_export_record_batches_from_range( this number of records. """ clickhouse_url = None - if use_events_recent(full_range=full_range, is_backfill=is_backfill, team_id=team_id): + # 5 min batch exports should query a single node, which is known to have zero replication lag + if is_5_min_batch_export(full_range=full_range): clickhouse_url = settings.CLICKHOUSE_OFFLINE_5MIN_CLUSTER_HOST - # data can sometimes take a while to settle, so for 5 min batch exports - # we wait several seconds just to be safe + # Data can sometimes take a while to settle, so for 5 min batch exports we wait several seconds just to be safe. + # For all other batch exports we wait for 1 minute since we're querying the events_recent table using a + # distributed table and setting `max_replica_delay_for_distributed_queries` to 1 minute if is_5_min_batch_export(full_range): - end_at = full_range[1] - await wait_for_delta_past_data_interval_end(end_at) + delta = dt.timedelta(seconds=30) + else: + delta = dt.timedelta(minutes=1) + end_at = full_range[1] + await wait_for_delta_past_data_interval_end(end_at, delta) async with get_client(team_id=team_id, clickhouse_url=clickhouse_url) as client: if not await client.is_alive(): diff --git a/posthog/temporal/batch_exports/sql.py b/posthog/temporal/batch_exports/sql.py index 7cb3922268ead..fadbdbdc6cd7f 100644 --- a/posthog/temporal/batch_exports/sql.py +++ b/posthog/temporal/batch_exports/sql.py @@ -134,6 +134,28 @@ """ ) +SELECT_FROM_EVENTS_VIEW_RECENT_DISTRIBUTED = Template( + """ +SELECT + $fields +FROM + events_batch_export_recent_distributed( + team_id={team_id}, + interval_start={interval_start}, + interval_end={interval_end}, + include_events={include_events}::Array(String), + exclude_events={exclude_events}::Array(String) + ) AS events +FORMAT ArrowStream +SETTINGS + -- This is half of configured MAX_MEMORY_USAGE for batch exports. + max_bytes_before_external_sort=50000000000, + max_replica_delay_for_distributed_queries=60, + fallback_to_stale_replicas_for_distributed_queries=0 +""" +) + + SELECT_FROM_EVENTS_VIEW_UNBOUNDED = Template( """ SELECT diff --git a/posthog/temporal/tests/batch_exports/conftest.py b/posthog/temporal/tests/batch_exports/conftest.py index c95385a761888..99de7715ed28c 100644 --- a/posthog/temporal/tests/batch_exports/conftest.py +++ b/posthog/temporal/tests/batch_exports/conftest.py @@ -173,6 +173,7 @@ async def create_clickhouse_tables_and_views(clickhouse_client, django_db_setup) CREATE_EVENTS_BATCH_EXPORT_VIEW, CREATE_EVENTS_BATCH_EXPORT_VIEW_BACKFILL, CREATE_EVENTS_BATCH_EXPORT_VIEW_RECENT, + CREATE_EVENTS_BATCH_EXPORT_VIEW_RECENT_DISTRIBUTED, CREATE_EVENTS_BATCH_EXPORT_VIEW_UNBOUNDED, CREATE_PERSONS_BATCH_EXPORT_VIEW, CREATE_PERSONS_BATCH_EXPORT_VIEW_BACKFILL, @@ -184,6 +185,7 @@ async def create_clickhouse_tables_and_views(clickhouse_client, django_db_setup) CREATE_EVENTS_BATCH_EXPORT_VIEW_BACKFILL, CREATE_EVENTS_BATCH_EXPORT_VIEW_UNBOUNDED, CREATE_EVENTS_BATCH_EXPORT_VIEW_RECENT, + CREATE_EVENTS_BATCH_EXPORT_VIEW_RECENT_DISTRIBUTED, CREATE_PERSONS_BATCH_EXPORT_VIEW, CREATE_PERSONS_BATCH_EXPORT_VIEW_BACKFILL, ) @@ -260,6 +262,14 @@ def test_person_properties(request): return {"utm_medium": "referral", "$initial_os": "Linux"} +@pytest.fixture +def use_distributed_events_recent_table(request): + try: + return request.param + except AttributeError: + return False + + @pytest_asyncio.fixture async def generate_test_data( ateam, @@ -270,12 +280,13 @@ async def generate_test_data( interval, test_properties, test_person_properties, + use_distributed_events_recent_table, ): """Generate test data in ClickHouse.""" - if interval != "every 5 minutes": - table = "sharded_events" - else: + if interval == "every 5 minutes" or use_distributed_events_recent_table: table = "events_recent" + else: + table = "sharded_events" events_to_export_created, _, _ = await generate_test_events_in_clickhouse( client=clickhouse_client, diff --git a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py index 2195725a5d8ed..fe19f26ce47cd 100644 --- a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py @@ -2108,3 +2108,83 @@ def __init__(self, *args, **kwargs): data_interval_end=data_interval_end, batch_export_model=model, ) + + +# TODO - this can be removed once we've fully migrated to using distributed events_recent +# for all teams +@pytest.mark.parametrize("use_distributed_events_recent_table", [True, False]) +async def test_insert_into_s3_activity_when_using_distributed_events_recent_table( + clickhouse_client, + bucket_name, + minio_client, + activity_environment, + compression, + exclude_events, + file_format, + data_interval_start, + data_interval_end, + generate_test_data, + ateam, + use_distributed_events_recent_table, +): + """We're migrating to using distributed events_recent for all realtime batch exports (except for 5 minute exports). + + This test ensures that the insert_into_s3_activity function works as expected when using the + distributed events_recent table. + + It can be removed once we've fully migrated to using distributed events_recent for all teams and the tests always + use this new table. + """ + + model = BatchExportModel(name="events", schema=None) + + prefix = str(uuid.uuid4()) + + batch_export_schema: BatchExportSchema | None = None + batch_export_model: BatchExportModel | None = None + if isinstance(model, BatchExportModel): + batch_export_model = model + elif model is not None: + batch_export_schema = model + + insert_inputs = S3InsertInputs( + bucket_name=bucket_name, + region="us-east-1", + prefix=prefix, + team_id=ateam.pk, + data_interval_start=data_interval_start.isoformat(), + data_interval_end=data_interval_end.isoformat(), + aws_access_key_id="object_storage_root_user", + aws_secret_access_key="object_storage_root_password", + endpoint_url=settings.OBJECT_STORAGE_ENDPOINT, + compression=compression, + exclude_events=exclude_events, + file_format=file_format, + batch_export_schema=batch_export_schema, + batch_export_model=batch_export_model, + ) + + with override_settings( + BATCH_EXPORT_S3_UPLOAD_CHUNK_SIZE_BYTES=5 * 1024**2, + BATCH_EXPORT_DISTRIBUTED_EVENTS_RECENT_ROLLOUT=1 if use_distributed_events_recent_table else 0, + ): # 5MB, the minimum for Multipart uploads + records_exported = await activity_environment.run(insert_into_s3_activity, insert_inputs) + + events_to_export_created, persons_to_export_created = generate_test_data + assert records_exported == len(events_to_export_created) or records_exported == len(persons_to_export_created) + + await assert_clickhouse_records_in_s3( + s3_compatible_client=minio_client, + clickhouse_client=clickhouse_client, + bucket_name=bucket_name, + key_prefix=prefix, + team_id=ateam.pk, + data_interval_start=data_interval_start, + data_interval_end=data_interval_end, + batch_export_model=model, + exclude_events=exclude_events, + include_events=None, + compression=compression, + file_format=file_format, + is_backfill=False, + ) diff --git a/posthog/temporal/tests/batch_exports/test_spmc.py b/posthog/temporal/tests/batch_exports/test_spmc.py index 9bc7108d9ebed..eff5a2f2cebc3 100644 --- a/posthog/temporal/tests/batch_exports/test_spmc.py +++ b/posthog/temporal/tests/batch_exports/test_spmc.py @@ -11,7 +11,7 @@ Producer, RecordBatchQueue, slice_record_batch, - use_events_recent, + use_distributed_events_recent_table, ) from posthog.temporal.tests.utils.events import generate_test_events_in_clickhouse @@ -176,7 +176,6 @@ def test_slice_record_batch_in_half(): [ # is backfill so shouldn't use events recent { - "full_range": (None, dt.datetime.fromisoformat("2023-04-25T14:00:00+00:00")), "is_backfill": True, "team_id": 1, "rollout": 1.0, @@ -184,10 +183,6 @@ def test_slice_record_batch_in_half(): }, # rollout is 0 so shouldn't use events recent { - "full_range": ( - dt.datetime.fromisoformat("2023-04-25T13:00:00+00:00"), - dt.datetime.fromisoformat("2023-04-25T14:00:00+00:00"), - ), "is_backfill": False, "team_id": 1, "rollout": 0.0, @@ -195,32 +190,13 @@ def test_slice_record_batch_in_half(): }, # rollout is 1 so should use events recent { - "full_range": ( - dt.datetime.fromisoformat("2023-04-25T13:00:00+00:00"), - dt.datetime.fromisoformat("2023-04-25T14:00:00+00:00"), - ), "is_backfill": False, "team_id": 1, "rollout": 1.0, "use_events_recent": True, }, - # is 5 min batch export so should always use events recent - { - "full_range": ( - dt.datetime.fromisoformat("2023-04-25T14:00:00+00:00"), - dt.datetime.fromisoformat("2023-04-25T14:05:00+00:00"), - ), - "is_backfill": False, - "team_id": 10, - "rollout": 0.0, - "use_events_recent": True, - }, # rollout is 0.4 but team_id mod 10 is 7 so should use events recent { - "full_range": ( - dt.datetime.fromisoformat("2023-04-25T13:00:00+00:00"), - dt.datetime.fromisoformat("2023-04-25T14:00:00+00:00"), - ), "is_backfill": False, "team_id": 17, "rollout": 0.4, @@ -228,10 +204,6 @@ def test_slice_record_batch_in_half(): }, # rollout is 0.4 but team_id mod 10 is 3 so should use events recent { - "full_range": ( - dt.datetime.fromisoformat("2023-04-25T13:00:00+00:00"), - dt.datetime.fromisoformat("2023-04-25T14:00:00+00:00"), - ), "is_backfill": False, "team_id": 13, "rollout": 0.4, @@ -240,10 +212,8 @@ def test_slice_record_batch_in_half(): ], ) def test_use_events_recent(test_data: dict[str, t.Any]): - with override_settings(BATCH_EXPORT_EVENTS_RECENT_ROLLOUT=test_data["rollout"]): + with override_settings(BATCH_EXPORT_DISTRIBUTED_EVENTS_RECENT_ROLLOUT=test_data["rollout"]): assert ( - use_events_recent( - full_range=test_data["full_range"], is_backfill=test_data["is_backfill"], team_id=test_data["team_id"] - ) + use_distributed_events_recent_table(is_backfill=test_data["is_backfill"], team_id=test_data["team_id"]) == test_data["use_events_recent"] )