From ae3f5a00538461fe391d16d9affda2eac1db5fa7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 25 Jun 2024 11:14:30 +0200 Subject: [PATCH] feat: Support multiple models in S3 batch export (#23105) * refactor: Update metrics to fetch counts at request time * fix: Move import to method * fix: Add function * feat: Custom schemas for batch exports * feat: Frontend support for model field * fix: Clean-up * fix: Add missing migration * fix: Make new field nullable * Update UI snapshots for `chromium` (1) * Update UI snapshots for `chromium` (1) * Update UI snapshots for `chromium` (1) * Update UI snapshots for `chromium` (1) * Update UI snapshots for `chromium` (1) * Update UI snapshots for `chromium` (1) * fix: Bump migration number * fix: Bump migration number * refactor: Update metrics to fetch counts at request time * refactor: Switch to counting runs * refactor: Support batch export models as views * fix: Merge conflict * fix: Quality check fixes * refactor: Update metrics to fetch counts at request time * fix: Move import to method * fix: Add function * fix: Typing fixes * feat: Custom schemas for batch exports * feat: Frontend support for model field * fix: Clean-up * fix: Add missing migration * fix: Make new field nullable * Update UI snapshots for `chromium` (1) * Update UI snapshots for `chromium` (1) * Update UI snapshots for `chromium` (1) * Update UI snapshots for `chromium` (1) * Update UI snapshots for `chromium` (1) * Update UI snapshots for `chromium` (1) * fix: Bump migration number * fix: Clean-up unused code * fix: Clean-up unused function * fix: Only run extra clickhouse queries in batch exports tests * feat: Support multiple models in all batch export destinations This commit introduces a new `batch_export_model` input for all batch export destinations. This input is of type `BatchExportModel` which can be used to indicate which model a batch export is supposed to target. This opens the door to creating "persons" model exports, and eventually extending this further with more changes. The change was done in a backwards compatible way, so `batch_export_schema` was not removed, and existing export configurations should continue to work (as evidenced by unit tests passing without changes). However, moving forward all batch exports will be created with `batch_export_schema` set to `None` and `batch_export_model` defined in its place. After updating existing exports, `batch_export_schema` and any other code associated with backwards compatibility can be deleted. * fix: Add additional type hints and update tests * chore: Add test utilities for multi-model batch exports * fix: Pass properties included as keyword arguments * fix: Support custom key prefixes for multi-model * feat: Unit testing for S3 batch exports with multi-model support * fix: Add and correct type hints * fix: Typo in parameter * fix: API tests now work with model parameter * fix: Re-add hosts to docker-compose * fix: Use UTC timezone alias * fix: Add missing test column * revert: Python 3.11 alias --------- Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> --- posthog/api/test/batch_exports/test_create.py | 2 +- posthog/api/test/batch_exports/test_update.py | 37 +- posthog/batch_exports/http.py | 2 +- posthog/batch_exports/service.py | 37 +- .../temporal/batch_exports/batch_exports.py | 59 ++- .../batch_exports/bigquery_batch_export.py | 26 +- .../batch_exports/http_batch_export.py | 9 +- .../batch_exports/postgres_batch_export.py | 28 +- .../batch_exports/redshift_batch_export.py | 30 +- .../temporal/batch_exports/s3_batch_export.py | 32 +- .../batch_exports/snowflake_batch_export.py | 24 +- .../temporal/tests/batch_exports/conftest.py | 24 +- .../tests/batch_exports/test_batch_exports.py | 4 +- .../test_s3_batch_export_workflow.py | 478 +++++++++--------- posthog/temporal/tests/utils/persons.py | 239 +++++++++ 15 files changed, 680 insertions(+), 351 deletions(-) create mode 100644 posthog/temporal/tests/utils/persons.py diff --git a/posthog/api/test/batch_exports/test_create.py b/posthog/api/test/batch_exports/test_create.py index 2bc22b56d0b25..0728ac6bdfa4e 100644 --- a/posthog/api/test/batch_exports/test_create.py +++ b/posthog/api/test/batch_exports/test_create.py @@ -294,7 +294,7 @@ def test_create_batch_export_with_custom_schema(client: HttpClient): } assert batch_export.schema == expected_schema - assert args["batch_export_schema"] == expected_schema + assert args["batch_export_model"] == {"name": "events", "schema": expected_schema} @pytest.mark.parametrize( diff --git a/posthog/api/test/batch_exports/test_update.py b/posthog/api/test/batch_exports/test_update.py index 30333c206f953..7b749c62dc24f 100644 --- a/posthog/api/test/batch_exports/test_update.py +++ b/posthog/api/test/batch_exports/test_update.py @@ -315,23 +315,26 @@ def test_can_patch_hogql_query(client: HttpClient): args = json.loads(decoded_payload[0].data) assert args["bucket_name"] == "my-production-s3-bucket" assert args["interval"] == "hour" - assert args["batch_export_schema"] == { - "fields": [ - { - "alias": "uuid", - "expression": "toString(events.uuid)", - }, - { - "alias": "test", - "expression": "%(hogql_val_0)s", - }, - { - "alias": "n", - "expression": "accurateCastOrNull(plus(1, 1), %(hogql_val_1)s)", - }, - ], - "values": {"hogql_val_0": "test", "hogql_val_1": "Int64"}, - "hogql_query": "SELECT toString(uuid) AS uuid, 'test' AS test, toInt(plus(1, 1)) AS n FROM events", + assert args["batch_export_model"] == { + "name": "events", + "schema": { + "fields": [ + { + "alias": "uuid", + "expression": "toString(events.uuid)", + }, + { + "alias": "test", + "expression": "%(hogql_val_0)s", + }, + { + "alias": "n", + "expression": "accurateCastOrNull(plus(1, 1), %(hogql_val_1)s)", + }, + ], + "values": {"hogql_val_0": "test", "hogql_val_1": "Int64"}, + "hogql_query": "SELECT toString(uuid) AS uuid, 'test' AS test, toInt(plus(1, 1)) AS n FROM events", + }, } diff --git a/posthog/batch_exports/http.py b/posthog/batch_exports/http.py index 9977b3bc75729..98a97a74b3f4a 100644 --- a/posthog/batch_exports/http.py +++ b/posthog/batch_exports/http.py @@ -5,7 +5,7 @@ import structlog from django.db import transaction from django.utils.timezone import now -from rest_framework import request, response, serializers, viewsets, filters +from rest_framework import filters, request, response, serializers, viewsets from rest_framework.decorators import action from rest_framework.exceptions import ( NotAuthenticated, diff --git a/posthog/batch_exports/service.py b/posthog/batch_exports/service.py index 89992d2228f75..2483738cefbc0 100644 --- a/posthog/batch_exports/service.py +++ b/posthog/batch_exports/service.py @@ -54,9 +54,15 @@ class BatchExportSchema(typing.TypedDict): values: dict[str, str] +@dataclass +class BatchExportModel: + name: str + schema: BatchExportSchema | None + + class BatchExportsInputsProtocol(typing.Protocol): team_id: int - batch_export_schema: BatchExportSchema | None = None + batch_export_model: BatchExportModel | None = None is_backfill: bool = False @@ -90,10 +96,11 @@ class S3BatchExportInputs: include_events: list[str] | None = None encryption: str | None = None kms_key_id: str | None = None - batch_export_schema: BatchExportSchema | None = None endpoint_url: str | None = None file_format: str = "JSONLines" is_backfill: bool = False + batch_export_model: BatchExportModel | None = None + batch_export_schema: BatchExportSchema | None = None @dataclass @@ -114,8 +121,9 @@ class SnowflakeBatchExportInputs: role: str | None = None exclude_events: list[str] | None = None include_events: list[str] | None = None - batch_export_schema: BatchExportSchema | None = None is_backfill: bool = False + batch_export_model: BatchExportModel | None = None + batch_export_schema: BatchExportSchema | None = None @dataclass @@ -136,8 +144,9 @@ class PostgresBatchExportInputs: data_interval_end: str | None = None exclude_events: list[str] | None = None include_events: list[str] | None = None - batch_export_schema: BatchExportSchema | None = None is_backfill: bool = False + batch_export_model: BatchExportModel | None = None + batch_export_schema: BatchExportSchema | None = None @dataclass @@ -165,8 +174,9 @@ class BigQueryBatchExportInputs: exclude_events: list[str] | None = None include_events: list[str] | None = None use_json_type: bool = False - batch_export_schema: BatchExportSchema | None = None is_backfill: bool = False + batch_export_model: BatchExportModel | None = None + batch_export_schema: BatchExportSchema | None = None @dataclass @@ -181,8 +191,9 @@ class HttpBatchExportInputs: data_interval_end: str | None = None exclude_events: list[str] | None = None include_events: list[str] | None = None - batch_export_schema: BatchExportSchema | None = None is_backfill: bool = False + batch_export_model: BatchExportModel | None = None + batch_export_schema: BatchExportSchema | None = None @dataclass @@ -193,8 +204,9 @@ class NoOpInputs: team_id: int interval: str = "hour" arg: str = "" - batch_export_schema: BatchExportSchema | None = None is_backfill: bool = False + batch_export_model: BatchExportModel | None = None + batch_export_schema: BatchExportSchema | None = None DESTINATION_WORKFLOWS = { @@ -609,7 +621,16 @@ def sync_batch_export(batch_export: BatchExport, created: bool): team_id=batch_export.team.id, batch_export_id=str(batch_export.id), interval=str(batch_export.interval), - batch_export_schema=batch_export.schema, + batch_export_model=BatchExportModel( + name=batch_export.model or "events", + schema=batch_export.schema, + ), + # TODO: This field is deprecated, but we still set it for backwards compatibility. + # New exports created will always have `batch_export_schema` set to `None`, but existing + # batch exports may still be using it. + # This assignment should be removed after updating all existing exports to use + # `batch_export_model` instead. + batch_export_schema=None, **destination_config, ) ), diff --git a/posthog/temporal/batch_exports/batch_exports.py b/posthog/temporal/batch_exports/batch_exports.py index 04e9a7fa000f0..3b4e5e0b435c9 100644 --- a/posthog/temporal/batch_exports/batch_exports.py +++ b/posthog/temporal/batch_exports/batch_exports.py @@ -13,6 +13,8 @@ from posthog.batch_exports.models import BatchExportBackfill, BatchExportRun from posthog.batch_exports.service import ( BatchExportField, + BatchExportModel, + BatchExportSchema, acount_failed_batch_export_runs, acreate_batch_export_backfill, acreate_batch_export_run, @@ -111,26 +113,53 @@ def default_fields() -> list[BatchExportField]: ] -DEFAULT_MODELS = {"events", "persons"} - - async def iter_model_records( - client: ClickHouseClient, model: str, team_id: int, is_backfill: bool, **parameters + client: ClickHouseClient, + model: BatchExportModel | BatchExportSchema | None, + team_id: int, + is_backfill: bool, + destination_default_fields: list[BatchExportField] | None = None, + **parameters, ) -> AsyncRecordsGenerator: - if model in DEFAULT_MODELS: + if destination_default_fields is None: + batch_export_default_fields = default_fields() + else: + batch_export_default_fields = destination_default_fields + + if isinstance(model, BatchExportModel): async for record in iter_records_from_model_view( - client=client, model=model, team_id=team_id, is_backfill=is_backfill, **parameters + client=client, + model_name=model.name, + team_id=team_id, + is_backfill=is_backfill, + fields=model.schema["fields"] if model.schema is not None else batch_export_default_fields, + extra_query_parameters=model.schema["values"] if model.schema is not None else None, + **parameters, ): yield record + else: - for record in iter_records(client, team_id=team_id, is_backfill=is_backfill, **parameters): + for record in iter_records( + client, + team_id=team_id, + is_backfill=is_backfill, + fields=model["fields"] if model is not None else batch_export_default_fields, + extra_query_parameters=model["values"] if model is not None else None, + **parameters, + ): yield record async def iter_records_from_model_view( - client: ClickHouseClient, model: str, is_backfill: bool, team_id: int, **parameters + client: ClickHouseClient, + model_name: str, + is_backfill: bool, + team_id: int, + interval_start: str, + interval_end: str, + **parameters, ) -> AsyncRecordsGenerator: - if model == "persons": + if model_name == "persons": view = SELECT_FROM_PERSONS_VIEW else: # TODO: Let this model be exported by `astream_query_as_arrow`. @@ -138,10 +167,20 @@ async def iter_records_from_model_view( # without battle testing it first. # There are already changes going out to the queries themselves that will impact events in a # positive way. So, we can come back later and drop this block. - for record_batch in iter_records(client, team_id=team_id, is_backfill=is_backfill, **parameters): + for record_batch in iter_records( + client, + team_id=team_id, + is_backfill=is_backfill, + interval_start=interval_start, + interval_end=interval_end, + **parameters, + ): yield record_batch return + parameters["team_id"] = team_id + parameters["interval_start"] = dt.datetime.fromisoformat(interval_start).strftime("%Y-%m-%d %H:%M:%S") + parameters["interval_end"] = dt.datetime.fromisoformat(interval_end).strftime("%Y-%m-%d %H:%M:%S") async for record_batch in client.astream_query_as_arrow(view, query_parameters=parameters): yield record_batch diff --git a/posthog/temporal/batch_exports/bigquery_batch_export.py b/posthog/temporal/batch_exports/bigquery_batch_export.py index 85385b1c80108..af46bef7e4576 100644 --- a/posthog/temporal/batch_exports/bigquery_batch_export.py +++ b/posthog/temporal/batch_exports/bigquery_batch_export.py @@ -14,6 +14,7 @@ from posthog.batch_exports.models import BatchExportRun from posthog.batch_exports.service import ( BatchExportField, + BatchExportModel, BatchExportSchema, BigQueryBatchExportInputs, ) @@ -150,9 +151,11 @@ class BigQueryInsertInputs: exclude_events: list[str] | None = None include_events: list[str] | None = None use_json_type: bool = False - batch_export_schema: BatchExportSchema | None = None run_id: str | None = None is_backfill: bool = False + batch_export_model: BatchExportModel | None = None + # TODO: Remove after updating existing batch exports + batch_export_schema: BatchExportSchema | None = None @contextlib.contextmanager @@ -230,24 +233,23 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records if not await client.is_alive(): raise ConnectionError("Cannot establish connection to ClickHouse") - if inputs.batch_export_schema is None: - fields = bigquery_default_fields() - query_parameters = None - + model: BatchExportModel | BatchExportSchema | None = None + if inputs.batch_export_schema is None and "batch_export_model" in { + field.name for field in dataclasses.fields(inputs) + }: + model = inputs.batch_export_model else: - fields = inputs.batch_export_schema["fields"] - query_parameters = inputs.batch_export_schema["values"] + model = inputs.batch_export_schema records_iterator = iter_model_records( client=client, - model="events", + model=model, team_id=inputs.team_id, interval_start=data_interval_start, interval_end=inputs.data_interval_end, exclude_events=inputs.exclude_events, include_events=inputs.include_events, - fields=fields, - extra_query_parameters=query_parameters, + destination_default_fields=bigquery_default_fields(), is_backfill=inputs.is_backfill, ) @@ -408,9 +410,11 @@ async def run(self, inputs: BigQueryBatchExportInputs): exclude_events=inputs.exclude_events, include_events=inputs.include_events, use_json_type=inputs.use_json_type, - batch_export_schema=inputs.batch_export_schema, run_id=run_id, is_backfill=inputs.is_backfill, + batch_export_model=inputs.batch_export_model, + # TODO: Remove after updating existing batch exports. + batch_export_schema=inputs.batch_export_schema, ) await execute_batch_export_insert_activity( diff --git a/posthog/temporal/batch_exports/http_batch_export.py b/posthog/temporal/batch_exports/http_batch_export.py index cf0e9b485f376..38e54c836653d 100644 --- a/posthog/temporal/batch_exports/http_batch_export.py +++ b/posthog/temporal/batch_exports/http_batch_export.py @@ -1,8 +1,8 @@ import asyncio +import dataclasses import datetime as dt import io import json -from dataclasses import dataclass import aiohttp from django.conf import settings @@ -11,6 +11,7 @@ from posthog.batch_exports.service import ( BatchExportField, + BatchExportModel, BatchExportSchema, HttpBatchExportInputs, ) @@ -92,7 +93,7 @@ def from_activity_details(cls, details) -> "HeartbeatDetails": return HeartbeatDetails(last_uploaded_timestamp) -@dataclass +@dataclasses.dataclass class HttpInsertInputs: """Inputs for HTTP insert activity.""" @@ -104,8 +105,9 @@ class HttpInsertInputs: exclude_events: list[str] | None = None include_events: list[str] | None = None run_id: str | None = None - batch_export_schema: BatchExportSchema | None = None is_backfill: bool = False + batch_export_model: BatchExportModel | None = None + batch_export_schema: BatchExportSchema | None = None async def maybe_resume_from_heartbeat(inputs: HttpInsertInputs) -> str: @@ -357,6 +359,7 @@ async def run(self, inputs: HttpBatchExportInputs): batch_export_schema=inputs.batch_export_schema, run_id=run_id, is_backfill=inputs.is_backfill, + batch_export_model=inputs.batch_export_model, ) await execute_batch_export_insert_activity( diff --git a/posthog/temporal/batch_exports/postgres_batch_export.py b/posthog/temporal/batch_exports/postgres_batch_export.py index 54eb667062fbc..02f0634bd1540 100644 --- a/posthog/temporal/batch_exports/postgres_batch_export.py +++ b/posthog/temporal/batch_exports/postgres_batch_export.py @@ -1,10 +1,10 @@ import collections.abc import contextlib import csv +import dataclasses import datetime as dt import json import typing -from dataclasses import dataclass import psycopg import pyarrow as pa @@ -16,6 +16,7 @@ from posthog.batch_exports.models import BatchExportRun from posthog.batch_exports.service import ( BatchExportField, + BatchExportModel, BatchExportSchema, PostgresBatchExportInputs, ) @@ -220,7 +221,7 @@ def get_postgres_fields_from_record_schema( return pg_schema -@dataclass +@dataclasses.dataclass class PostgresInsertInputs: """Inputs for Postgres insert activity.""" @@ -237,9 +238,10 @@ class PostgresInsertInputs: port: int = 5432 exclude_events: list[str] | None = None include_events: list[str] | None = None - batch_export_schema: BatchExportSchema | None = None run_id: str | None = None is_backfill: bool = False + batch_export_model: BatchExportModel | None = None + batch_export_schema: BatchExportSchema | None = None @activity.defn @@ -262,24 +264,23 @@ async def insert_into_postgres_activity(inputs: PostgresInsertInputs) -> Records if not await client.is_alive(): raise ConnectionError("Cannot establish connection to ClickHouse") - if inputs.batch_export_schema is None: - fields = postgres_default_fields() - query_parameters = None - + model: BatchExportModel | BatchExportSchema | None = None + if inputs.batch_export_schema is None and "batch_export_model" in { + field.name for field in dataclasses.fields(inputs) + }: + model = inputs.batch_export_model else: - fields = inputs.batch_export_schema["fields"] - query_parameters = inputs.batch_export_schema["values"] + model = inputs.batch_export_schema record_iterator = iter_model_records( client=client, - model="events", + model=model, team_id=inputs.team_id, interval_start=inputs.data_interval_start, interval_end=inputs.data_interval_end, exclude_events=inputs.exclude_events, include_events=inputs.include_events, - fields=fields, - extra_query_parameters=query_parameters, + destination_default_fields=postgres_default_fields(), is_backfill=inputs.is_backfill, ) first_record_batch, record_iterator = await apeek_first_and_rewind(record_iterator) @@ -424,8 +425,9 @@ async def run(self, inputs: PostgresBatchExportInputs): data_interval_end=data_interval_end.isoformat(), exclude_events=inputs.exclude_events, include_events=inputs.include_events, - batch_export_schema=inputs.batch_export_schema, run_id=run_id, + batch_export_model=inputs.batch_export_model, + batch_export_schema=inputs.batch_export_schema, ) await execute_batch_export_insert_activity( diff --git a/posthog/temporal/batch_exports/redshift_batch_export.py b/posthog/temporal/batch_exports/redshift_batch_export.py index 52ce4e9db32cc..180e9fc18fd1e 100644 --- a/posthog/temporal/batch_exports/redshift_batch_export.py +++ b/posthog/temporal/batch_exports/redshift_batch_export.py @@ -1,9 +1,9 @@ import collections.abc import contextlib +import dataclasses import datetime as dt import json import typing -from dataclasses import dataclass import psycopg import pyarrow as pa @@ -12,7 +12,12 @@ from temporalio.common import RetryPolicy from posthog.batch_exports.models import BatchExportRun -from posthog.batch_exports.service import BatchExportField, RedshiftBatchExportInputs +from posthog.batch_exports.service import ( + BatchExportField, + BatchExportModel, + BatchExportSchema, + RedshiftBatchExportInputs, +) from posthog.temporal.batch_exports.base import PostHogWorkflow from posthog.temporal.batch_exports.batch_exports import ( FinishBatchExportRunInputs, @@ -262,7 +267,7 @@ async def async_client_cursor_from_connection( psycopg_connection.cursor_factory = current_factory -@dataclass +@dataclasses.dataclass class RedshiftInsertInputs(PostgresInsertInputs): """Inputs for Redshift insert activity. @@ -307,24 +312,24 @@ async def insert_into_redshift_activity(inputs: RedshiftInsertInputs) -> Records if not await client.is_alive(): raise ConnectionError("Cannot establish connection to ClickHouse") - if inputs.batch_export_schema is None: - fields = redshift_default_fields() - query_parameters = None + model: BatchExportModel | BatchExportSchema | None = None + if inputs.batch_export_schema is None and "batch_export_model" in { + field.name for field in dataclasses.fields(inputs) + }: + model = inputs.batch_export_model else: - fields = inputs.batch_export_schema["fields"] - query_parameters = inputs.batch_export_schema["values"] + model = inputs.batch_export_schema record_iterator = iter_model_records( client=client, - model="events", + model=model, team_id=inputs.team_id, interval_start=inputs.data_interval_start, interval_end=inputs.data_interval_end, exclude_events=inputs.exclude_events, include_events=inputs.include_events, - fields=fields, - extra_query_parameters=query_parameters, + destination_default_fields=redshift_default_fields(), is_backfill=inputs.is_backfill, ) first_record_batch, record_iterator = await apeek_first_and_rewind(record_iterator) @@ -462,9 +467,10 @@ async def run(self, inputs: RedshiftBatchExportInputs): exclude_events=inputs.exclude_events, include_events=inputs.include_events, properties_data_type=inputs.properties_data_type, - batch_export_schema=inputs.batch_export_schema, run_id=run_id, is_backfill=inputs.is_backfill, + batch_export_model=inputs.batch_export_model, + batch_export_schema=inputs.batch_export_schema, ) await execute_batch_export_insert_activity( diff --git a/posthog/temporal/batch_exports/s3_batch_export.py b/posthog/temporal/batch_exports/s3_batch_export.py index 7f460cb12fa7b..fa629f705abc9 100644 --- a/posthog/temporal/batch_exports/s3_batch_export.py +++ b/posthog/temporal/batch_exports/s3_batch_export.py @@ -1,11 +1,11 @@ import collections.abc import contextlib +import dataclasses import datetime as dt import io import json import posixpath import typing -from dataclasses import dataclass import aioboto3 import orjson @@ -17,6 +17,7 @@ from posthog.batch_exports.models import BatchExportRun from posthog.batch_exports.service import ( BatchExportField, + BatchExportModel, BatchExportSchema, S3BatchExportInputs, ) @@ -61,7 +62,7 @@ def get_allowed_template_variables(inputs) -> dict[str, str]: "year": f"{export_datetime:%Y}", "data_interval_start": inputs.data_interval_start, "data_interval_end": inputs.data_interval_end, - "table": "events", + "table": inputs.batch_export_model.name if inputs.batch_export_model is not None else "events", } @@ -315,7 +316,7 @@ def from_activity_details(cls, details): return cls(last_uploaded_part_timestamp, upload_state) -@dataclass +@dataclasses.dataclass class S3InsertInputs: """Inputs for S3 exports.""" @@ -336,12 +337,14 @@ class S3InsertInputs: include_events: list[str] | None = None encryption: str | None = None kms_key_id: str | None = None - batch_export_schema: BatchExportSchema | None = None endpoint_url: str | None = None # TODO: In Python 3.11, this could be a enum.StrEnum. file_format: str = "JSONLines" run_id: str | None = None is_backfill: bool = False + batch_export_model: BatchExportModel | None = None + # TODO: Remove after updating existing batch exports + batch_export_schema: BatchExportSchema | None = None async def initialize_and_resume_multipart_upload(inputs: S3InsertInputs) -> tuple[S3MultiPartUpload, str]: @@ -444,25 +447,24 @@ async def insert_into_s3_activity(inputs: S3InsertInputs) -> RecordsCompleted: s3_upload, interval_start = await initialize_and_resume_multipart_upload(inputs) - if inputs.batch_export_schema is None: - fields = s3_default_fields() - query_parameters = None - + model: BatchExportModel | BatchExportSchema | None = None + if inputs.batch_export_schema is None and "batch_export_model" in { + field.name for field in dataclasses.fields(inputs) + }: + model = inputs.batch_export_model else: - fields = inputs.batch_export_schema["fields"] - query_parameters = inputs.batch_export_schema["values"] + model = inputs.batch_export_schema record_iterator = iter_model_records( - model="events", + model=model, client=client, team_id=inputs.team_id, interval_start=interval_start, interval_end=inputs.data_interval_end, exclude_events=inputs.exclude_events, include_events=inputs.include_events, - fields=fields, - extra_query_parameters=query_parameters, is_backfill=inputs.is_backfill, + destination_default_fields=s3_default_fields(), ) first_record_batch, record_iterator = await apeek_first_and_rewind(record_iterator) @@ -674,10 +676,12 @@ async def run(self, inputs: S3BatchExportInputs): include_events=inputs.include_events, encryption=inputs.encryption, kms_key_id=inputs.kms_key_id, - batch_export_schema=inputs.batch_export_schema, file_format=inputs.file_format, run_id=run_id, is_backfill=inputs.is_backfill, + batch_export_model=inputs.batch_export_model, + # TODO: Remove after updating existing batch exports. + batch_export_schema=inputs.batch_export_schema, ) await execute_batch_export_insert_activity( diff --git a/posthog/temporal/batch_exports/snowflake_batch_export.py b/posthog/temporal/batch_exports/snowflake_batch_export.py index 374248e83aff5..4d966fc4e00a4 100644 --- a/posthog/temporal/batch_exports/snowflake_batch_export.py +++ b/posthog/temporal/batch_exports/snowflake_batch_export.py @@ -17,6 +17,7 @@ from posthog.batch_exports.models import BatchExportRun from posthog.batch_exports.service import ( BatchExportField, + BatchExportModel, BatchExportSchema, SnowflakeBatchExportInputs, ) @@ -114,9 +115,10 @@ class SnowflakeInsertInputs: role: str | None = None exclude_events: list[str] | None = None include_events: list[str] | None = None - batch_export_schema: BatchExportSchema | None = None run_id: str | None = None is_backfill: bool = False + batch_export_model: BatchExportModel | None = None + batch_export_schema: BatchExportSchema | None = None def use_namespace(connection: SnowflakeConnection, database: str, schema: str) -> None: @@ -448,24 +450,23 @@ async def flush_to_snowflake( rows_exported.add(file.records_since_last_reset) bytes_exported.add(file.bytes_since_last_reset) - if inputs.batch_export_schema is None: - fields = snowflake_default_fields() - query_parameters = None - + model: BatchExportModel | BatchExportSchema | None = None + if inputs.batch_export_schema is None and "batch_export_model" in { + field.name for field in dataclasses.fields(inputs) + }: + model = inputs.batch_export_model else: - fields = inputs.batch_export_schema["fields"] - query_parameters = inputs.batch_export_schema["values"] + model = inputs.batch_export_schema record_iterator = iter_model_records( client=client, - model="events", + model=model, team_id=inputs.team_id, interval_start=data_interval_start, interval_end=inputs.data_interval_end, exclude_events=inputs.exclude_events, include_events=inputs.include_events, - fields=fields, - extra_query_parameters=query_parameters, + destination_default_fields=snowflake_default_fields(), is_backfill=inputs.is_backfill, ) first_record_batch, record_iterator = await apeek_first_and_rewind(record_iterator) @@ -601,9 +602,10 @@ async def run(self, inputs: SnowflakeBatchExportInputs): role=inputs.role, exclude_events=inputs.exclude_events, include_events=inputs.include_events, - batch_export_schema=inputs.batch_export_schema, run_id=run_id, is_backfill=inputs.is_backfill, + batch_export_model=inputs.batch_export_model, + batch_export_schema=inputs.batch_export_schema, ) await execute_batch_export_insert_activity( diff --git a/posthog/temporal/tests/batch_exports/conftest.py b/posthog/temporal/tests/batch_exports/conftest.py index deebf15349e3a..da5598f9d8ca0 100644 --- a/posthog/temporal/tests/batch_exports/conftest.py +++ b/posthog/temporal/tests/batch_exports/conftest.py @@ -46,6 +46,17 @@ async def truncate_events(clickhouse_client): await clickhouse_client.execute_query("TRUNCATE TABLE IF EXISTS `sharded_events`") +@pytest_asyncio.fixture(autouse=True) +async def truncate_persons(clickhouse_client): + """Fixture to automatically truncate person and person_distinct_id2 after a test. + + This is useful if during the test setup we insert a lot of persons we wish to clean-up. + """ + yield + await clickhouse_client.execute_query("TRUNCATE TABLE IF EXISTS `person`") + await clickhouse_client.execute_query("TRUNCATE TABLE IF EXISTS `person_distinct_id2`") + + @pytest.fixture def batch_export_schema(request) -> dict | None: """A parametrizable fixture to configure a batch export schema. @@ -135,7 +146,7 @@ async def create_clickhouse_tables_and_views(clickhouse_client, django_db_setup) CREATE_EVENTS_BATCH_EXPORT_VIEW_UNBOUNDED, CREATE_PERSONS_BATCH_EXPORT_VIEW, ) - from posthog.clickhouse.schema import CREATE_KAFKA_TABLE_QUERIES + from posthog.clickhouse.schema import CREATE_KAFKA_TABLE_QUERIES, build_query create_view_queries = ( CREATE_EVENTS_BATCH_EXPORT_VIEW, @@ -145,11 +156,18 @@ async def create_clickhouse_tables_and_views(clickhouse_client, django_db_setup) ) clickhouse_tasks = set() - for query in create_view_queries + CREATE_KAFKA_TABLE_QUERIES: + for query in create_view_queries + tuple(map(build_query, CREATE_KAFKA_TABLE_QUERIES)): task = asyncio.create_task(clickhouse_client.execute_query(query)) clickhouse_tasks.add(task) task.add_done_callback(clickhouse_tasks.discard) - await asyncio.wait(clickhouse_tasks) + done, pending = await asyncio.wait(clickhouse_tasks) + + if len(pending) > 0: + raise ValueError("Not all required tables and views were created in time") + + for task in done: + if exc := task.exception(): + raise exc return diff --git a/posthog/temporal/tests/batch_exports/test_batch_exports.py b/posthog/temporal/tests/batch_exports/test_batch_exports.py index b146352c74958..2634da9c1dff9 100644 --- a/posthog/temporal/tests/batch_exports/test_batch_exports.py +++ b/posthog/temporal/tests/batch_exports/test_batch_exports.py @@ -6,6 +6,7 @@ import pytest from django.test import override_settings +from posthog.batch_exports.service import BatchExportModel from posthog.temporal.batch_exports.batch_exports import ( get_data_interval, iter_model_records, @@ -266,12 +267,11 @@ async def test_iter_records_with_single_field_and_alias(clickhouse_client, field record async for record_batch in iter_model_records( client=clickhouse_client, - model="events", + model=BatchExportModel(name="events", schema={"fields": [field], "values": {}}), team_id=team_id, is_backfill=False, interval_start=data_interval_start.isoformat(), interval_end=data_interval_end.isoformat(), - fields=[field], ) for record in record_batch.to_pylist() ] diff --git a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py index 9467b294b7107..ad33a48cd6149 100644 --- a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py @@ -3,9 +3,7 @@ import functools import json import os -from random import randint -from unittest import skip -from uuid import uuid4 +import uuid import aioboto3 import botocore.exceptions @@ -19,10 +17,10 @@ from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker -from posthog.batch_exports.service import BatchExportSchema +from posthog.batch_exports.service import BatchExportModel, BatchExportSchema from posthog.temporal.batch_exports.batch_exports import ( finish_batch_export_run, - iter_records, + iter_model_records, start_batch_export_run, ) from posthog.temporal.batch_exports.s3_batch_export import ( @@ -45,6 +43,10 @@ adelete_batch_export, afetch_batch_export_runs, ) +from posthog.temporal.tests.utils.persons import ( + generate_test_person_distinct_id2_in_clickhouse, + generate_test_persons_in_clickhouse, +) from posthog.temporal.tests.utils.s3 import read_parquet_from_s3, read_s3_data_as_json pytestmark = [pytest.mark.asyncio, pytest.mark.django_db] @@ -100,13 +102,16 @@ def bucket_name(request) -> str: try: return request.param except AttributeError: - return f"{TEST_ROOT_BUCKET}-{str(uuid4())}" + return f"{TEST_ROOT_BUCKET}-{str(uuid.uuid4())}" @pytest.fixture -def s3_key_prefix(): +def s3_key_prefix(request): """An S3 key prefix to use when putting files in a bucket.""" - return f"posthog-events-{str(uuid4())}" + try: + return request.param + except AttributeError: + return f"posthog-data-{str(uuid.uuid4())}" @pytest.fixture @@ -181,9 +186,10 @@ async def assert_clickhouse_records_in_s3( data_interval_end: dt.datetime, exclude_events: list[str] | None = None, include_events: list[str] | None = None, - batch_export_schema: BatchExportSchema | None = None, + batch_export_model: BatchExportModel | BatchExportSchema | None = None, compression: str | None = None, file_format: str = "JSONLines", + is_backfill: bool = False, ): """Assert ClickHouse records are written to JSON in key_prefix in S3 bucket_name. @@ -210,21 +216,29 @@ async def assert_clickhouse_records_in_s3( json_columns=json_columns, ) - if batch_export_schema is not None: - schema_column_names = [field["alias"] for field in batch_export_schema["fields"]] - else: - schema_column_names = [field["alias"] for field in s3_default_fields()] + schema_column_names = [field["alias"] for field in s3_default_fields()] + if batch_export_model is not None: + if isinstance(batch_export_model, BatchExportModel): + batch_export_schema = batch_export_model.schema + else: + batch_export_schema = batch_export_model + + if batch_export_schema is not None: + schema_column_names = [field["alias"] for field in batch_export_schema["fields"]] + elif isinstance(batch_export_model, BatchExportModel) and batch_export_model.name == "persons": + schema_column_names = ["team_id", "distinct_id", "person_id", "properties", "version", "_inserted_at"] expected_records = [] - for record_batch in iter_records( + async for record_batch in iter_model_records( client=clickhouse_client, + model=batch_export_model, team_id=team_id, interval_start=data_interval_start.isoformat(), interval_end=data_interval_end.isoformat(), exclude_events=exclude_events, include_events=include_events, - fields=batch_export_schema["fields"] if batch_export_schema is not None else s3_default_fields(), - extra_query_parameters=batch_export_schema["values"] if batch_export_schema is not None else None, + destination_default_fields=s3_default_fields(), + is_backfill=is_backfill, ): for record in record_batch.to_pylist(): expected_record = {} @@ -243,12 +257,29 @@ async def assert_clickhouse_records_in_s3( expected_records.append(expected_record) - assert len(s3_data) == len(expected_records) - assert s3_data[0] == expected_records[0] - assert s3_data == expected_records, f"Not all s3 records match expected records. Not printing due to large size." - + if "team_id" in schema_column_names: + assert all(record["team_id"] == team_id for record in s3_data) -TEST_S3_SCHEMAS: list[BatchExportSchema | None] = [ + assert s3_data[0] == expected_records[0] + assert len(s3_data) == len(expected_records) + assert s3_data == expected_records + + +TEST_S3_MODELS: list[BatchExportModel | BatchExportSchema | None] = [ + BatchExportModel( + name="a-custom-model", + schema={ + "fields": [ + {"expression": "event", "alias": "my_event_name"}, + {"expression": "nullIf(JSONExtractString(properties, %(hogql_val_0)s), '')", "alias": "browser"}, + {"expression": "nullIf(JSONExtractString(properties, %(hogql_val_1)s), '')", "alias": "os"}, + {"expression": "nullIf(properties, '')", "alias": "all_properties"}, + ], + "values": {"hogql_val_0": "$browser", "hogql_val_1": "$os"}, + }, + ), + BatchExportModel(name="events", schema=None), + BatchExportModel(name="persons", schema=None), { "fields": [ {"expression": "event", "alias": "my_event_name"}, @@ -258,57 +289,42 @@ async def assert_clickhouse_records_in_s3( ], "values": {"hogql_val_0": "$browser", "hogql_val_1": "$os"}, }, - { - "fields": [ - {"expression": "event", "alias": "my_event_name"}, - {"expression": "1 + 1", "alias": "two"}, - ], - "values": {}, - }, None, ] -@pytest.mark.parametrize("compression", [None, "gzip", "brotli"], indirect=True) -@pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True) -@pytest.mark.parametrize("batch_export_schema", TEST_S3_SCHEMAS) -@pytest.mark.parametrize("file_format", FILE_FORMAT_EXTENSIONS.keys()) -async def test_insert_into_s3_activity_puts_data_into_s3( - clickhouse_client, - bucket_name, - minio_client, - activity_environment, - compression, - exclude_events, - file_format, - batch_export_schema: BatchExportSchema | None, -): - """Test that the insert_into_s3_activity function ends up with data into S3. +@pytest.fixture +def data_interval_start(data_interval_end, interval): + if interval == "hour": + interval_time_delta = dt.timedelta(hours=1) + elif interval == "day": + interval_time_delta = dt.timedelta(days=1) + elif interval == "week": + interval_time_delta = dt.timedelta(weeks=1) + elif interval.startswith("every"): + _, value, unit = interval.split(" ") + kwargs = {unit: int(value)} + interval_time_delta = dt.timedelta(**kwargs) + else: + raise ValueError(f"Invalid interval: '{interval}'") - We use the generate_test_events_in_clickhouse function to generate several sets - of events. Some of these sets are expected to be exported, and others not. Expected - events are those that: - * Are created for the team_id of the batch export. - * Are created in the date range of the batch export. - * Are not duplicates of other events that are in the same batch. - * Do not have an event name contained in the batch export's exclude_events. + return data_interval_end - interval_time_delta - Once we have these events, we pass them to the assert_clickhouse_records_in_s3 function to check - that they appear in the expected S3 bucket and key. - """ - data_interval_start = dt.datetime(2023, 4, 20, 14, 0, 0, tzinfo=dt.timezone.utc) - data_interval_end = dt.datetime(2023, 4, 25, 15, 0, 0, tzinfo=dt.timezone.utc) - # Generate a random team id integer. There's still a chance of a collision, - # but it's very small. - team_id = randint(1, 1000000) +@pytest.fixture +def data_interval_end(interval): + return dt.datetime(2023, 4, 25, 15, 0, 0, tzinfo=dt.timezone.utc) + +@pytest_asyncio.fixture +async def generate_test_data(ateam, clickhouse_client, exclude_events, data_interval_start, data_interval_end): + """Generate test data in ClickHouse.""" await generate_test_events_in_clickhouse( client=clickhouse_client, - team_id=team_id, + team_id=ateam.pk, start_time=data_interval_start, end_time=data_interval_end, - count=10000, + count=1000, count_outside_range=10, count_other_team=10, duplicate=True, @@ -318,7 +334,7 @@ async def test_insert_into_s3_activity_puts_data_into_s3( await generate_test_events_in_clickhouse( client=clickhouse_client, - team_id=team_id, + team_id=ateam.pk, start_time=data_interval_start, end_time=data_interval_end, count=5, @@ -327,12 +343,13 @@ async def test_insert_into_s3_activity_puts_data_into_s3( properties=None, person_properties=None, ) + events_to_export_created = 1005 if exclude_events: for event_name in exclude_events: await generate_test_events_in_clickhouse( client=clickhouse_client, - team_id=team_id, + team_id=ateam.pk, start_time=data_interval_start, end_time=data_interval_end, count=5, @@ -341,15 +358,78 @@ async def test_insert_into_s3_activity_puts_data_into_s3( event_name=event_name, ) - # Make a random string to prefix the S3 keys with. This allows us to ensure - # isolation of the test, and also to check that the data is being written. - prefix = str(uuid4()) + persons, _ = await generate_test_persons_in_clickhouse( + client=clickhouse_client, + team_id=ateam.pk, + start_time=data_interval_start, + end_time=data_interval_end, + count=100, + count_other_team=10, + properties={"utm_medium": "referral", "$initial_os": "Linux"}, + ) + + for person in persons: + await generate_test_person_distinct_id2_in_clickhouse( + client=clickhouse_client, + team_id=ateam.pk, + person_id=uuid.UUID(person["id"]), + distinct_id=f"distinct-id-{uuid.UUID(person['id'])}", + timestamp=dt.datetime.fromisoformat(person["_timestamp"]), + ) + + persons_to_export_created = 100 + + return (events_to_export_created, persons_to_export_created) + + +@pytest.mark.parametrize("compression", [None, "gzip", "brotli"], indirect=True) +@pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True) +@pytest.mark.parametrize("model", TEST_S3_MODELS) +@pytest.mark.parametrize("file_format", FILE_FORMAT_EXTENSIONS.keys()) +async def test_insert_into_s3_activity_puts_data_into_s3( + clickhouse_client, + bucket_name, + minio_client, + activity_environment, + compression, + exclude_events, + file_format, + data_interval_start, + data_interval_end, + model: BatchExportModel | BatchExportSchema | None, + generate_test_data, + ateam, +): + """Test that the insert_into_s3_activity function ends up with data into S3. + + We use the generate_test_events_in_clickhouse function to generate several sets + of events. Some of these sets are expected to be exported, and others not. Expected + events are those that: + * Are created for the team_id of the batch export. + * Are created in the date range of the batch export. + * Are not duplicates of other events that are in the same batch. + * Do not have an event name contained in the batch export's exclude_events. + + Once we have these events, we pass them to the assert_clickhouse_records_in_s3 function to check + that they appear in the expected S3 bucket and key. + """ + data_interval_start = dt.datetime(2023, 4, 20, 14, 0, 0, tzinfo=dt.timezone.utc) + data_interval_end = dt.datetime(2023, 4, 25, 15, 0, 0, tzinfo=dt.timezone.utc) + + prefix = str(uuid.uuid4()) + + batch_export_schema: BatchExportSchema | None = None + batch_export_model: BatchExportModel | None = None + if isinstance(model, BatchExportModel): + batch_export_model = model + elif model is not None: + batch_export_schema = model insert_inputs = S3InsertInputs( bucket_name=bucket_name, region="us-east-1", prefix=prefix, - team_id=team_id, + team_id=ateam.pk, data_interval_start=data_interval_start.isoformat(), data_interval_end=data_interval_end.isoformat(), aws_access_key_id="object_storage_root_user", @@ -357,8 +437,9 @@ async def test_insert_into_s3_activity_puts_data_into_s3( endpoint_url=settings.OBJECT_STORAGE_ENDPOINT, compression=compression, exclude_events=exclude_events, - batch_export_schema=batch_export_schema, file_format=file_format, + batch_export_schema=batch_export_schema, + batch_export_model=batch_export_model, ) with override_settings( @@ -366,21 +447,23 @@ async def test_insert_into_s3_activity_puts_data_into_s3( ): # 5MB, the minimum for Multipart uploads records_exported = await activity_environment.run(insert_into_s3_activity, insert_inputs) - assert records_exported == 10005 + (events_to_export_created, persons_to_export_created) = generate_test_data + assert records_exported == events_to_export_created or records_exported == persons_to_export_created await assert_clickhouse_records_in_s3( s3_compatible_client=minio_client, clickhouse_client=clickhouse_client, bucket_name=bucket_name, key_prefix=prefix, - team_id=team_id, + team_id=ateam.pk, data_interval_start=data_interval_start, data_interval_end=data_interval_end, - batch_export_schema=batch_export_schema, + batch_export_model=model, exclude_events=exclude_events, include_events=None, compression=compression, file_format=file_format, + is_backfill=False, ) @@ -434,7 +517,7 @@ async def s3_batch_export( @pytest.mark.parametrize("interval", ["hour", "day"], indirect=True) @pytest.mark.parametrize("compression", [None, "gzip", "brotli"], indirect=True) @pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True) -@pytest.mark.parametrize("batch_export_schema", TEST_S3_SCHEMAS) +@pytest.mark.parametrize("model", TEST_S3_MODELS) @pytest.mark.parametrize("file_format", FILE_FORMAT_EXTENSIONS.keys(), indirect=True) async def test_s3_export_workflow_with_minio_bucket( clickhouse_client, @@ -446,8 +529,11 @@ async def test_s3_export_workflow_with_minio_bucket( compression, exclude_events, s3_key_prefix, - batch_export_schema, file_format, + data_interval_start, + data_interval_end, + model: BatchExportModel | BatchExportSchema | None, + generate_test_data, ): """Test S3BatchExport Workflow end-to-end by using a local MinIO bucket instead of S3. @@ -458,41 +544,24 @@ async def test_s3_export_workflow_with_minio_bucket( will require its prescense in the database when running. This model is indirectly parametrized by several fixtures. Refer to them for more information. """ - data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") - data_interval_start = data_interval_end - s3_batch_export.interval_time_delta - - await generate_test_events_in_clickhouse( - client=clickhouse_client, - team_id=ateam.pk, - start_time=data_interval_start, - end_time=data_interval_end, - count=100, - count_outside_range=10, - count_other_team=10, - duplicate=True, - properties={"$browser": "Chrome", "$os": "Mac OS X"}, - person_properties={"utm_medium": "referral", "$initial_os": "Linux"}, - ) - - if exclude_events: - for event_name in exclude_events: - await generate_test_events_in_clickhouse( - client=clickhouse_client, - team_id=ateam.pk, - start_time=data_interval_start, - end_time=data_interval_end, - count=5, - count_outside_range=0, - count_other_team=0, - event_name=event_name, - ) - - workflow_id = str(uuid4()) + if isinstance(model, BatchExportModel) and model.name == "person" and exclude_events is not None: + # Eventually, this setting should be part of the model via some "filters" attribute. + pytest.skip("Unnecessary test case as person batch export is not affected by 'exclude_events'") + + batch_export_schema: BatchExportSchema | None = None + batch_export_model: BatchExportModel | None = None + if isinstance(model, BatchExportModel): + batch_export_model = model + elif model is not None: + batch_export_schema = model + + workflow_id = str(uuid.uuid4()) inputs = S3BatchExportInputs( team_id=ateam.pk, batch_export_id=str(s3_batch_export.id), data_interval_end=data_interval_end.isoformat(), interval=interval, + batch_export_model=batch_export_model, batch_export_schema=batch_export_schema, **s3_batch_export.destination.config, ) @@ -532,7 +601,7 @@ async def test_s3_export_workflow_with_minio_bucket( team_id=ateam.pk, data_interval_start=data_interval_start, data_interval_end=data_interval_end, - batch_export_schema=batch_export_schema, + batch_export_model=model, exclude_events=exclude_events, compression=compression, file_format=file_format, @@ -542,7 +611,7 @@ async def test_s3_export_workflow_with_minio_bucket( @pytest.mark.parametrize("interval", ["hour"], indirect=True) @pytest.mark.parametrize("compression", [None], indirect=True) @pytest.mark.parametrize("exclude_events", [None], indirect=True) -@pytest.mark.parametrize("batch_export_schema", TEST_S3_SCHEMAS) +@pytest.mark.parametrize("model", TEST_S3_MODELS) async def test_s3_export_workflow_with_minio_bucket_without_events( clickhouse_client, minio_client, @@ -553,21 +622,29 @@ async def test_s3_export_workflow_with_minio_bucket_without_events( compression, exclude_events, s3_key_prefix, - batch_export_schema, + model, + data_interval_start, + data_interval_end, ): """Test S3BatchExport Workflow end-to-end without any events to export. The workflow should update the batch export run status to completed and set 0 as `records_completed`. """ - data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") - - workflow_id = str(uuid4()) + batch_export_schema: BatchExportSchema | None = None + batch_export_model: BatchExportModel | None = None + if isinstance(model, BatchExportModel): + batch_export_model = model + elif model is not None: + batch_export_schema = model + + workflow_id = str(uuid.uuid4()) inputs = S3BatchExportInputs( team_id=ateam.pk, batch_export_id=str(s3_batch_export.id), data_interval_end=data_interval_end.isoformat(), interval=interval, batch_export_schema=batch_export_schema, + batch_export_model=batch_export_model, **s3_batch_export.destination.config, ) @@ -628,7 +705,7 @@ async def s3_client(bucket_name, s3_key_prefix): @pytest.mark.parametrize("exclude_events", [None, ["test-exclude"]], indirect=True) @pytest.mark.parametrize("encryption", [None, "AES256", "aws:kms"], indirect=True) @pytest.mark.parametrize("bucket_name", [os.getenv("S3_TEST_BUCKET")], indirect=True) -@pytest.mark.parametrize("batch_export_schema", TEST_S3_SCHEMAS) +@pytest.mark.parametrize("model", TEST_S3_MODELS) @pytest.mark.parametrize("file_format", FILE_FORMAT_EXTENSIONS.keys(), indirect=True) async def test_s3_export_workflow_with_s3_bucket( s3_client, @@ -641,8 +718,11 @@ async def test_s3_export_workflow_with_s3_bucket( encryption, exclude_events, ateam, - batch_export_schema, file_format, + data_interval_start, + data_interval_end, + model: BatchExportModel | BatchExportSchema | None, + generate_test_data, ): """Test S3 Export Workflow end-to-end by using an S3 bucket. @@ -657,36 +737,17 @@ async def test_s3_export_workflow_with_s3_bucket( will require its prescense in the database when running. This model is indirectly parametrized by several fixtures. Refer to them for more information. """ - data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000") - data_interval_start = data_interval_end - s3_batch_export.interval_time_delta + if isinstance(model, BatchExportModel) and model.name == "person" and exclude_events is not None: + pytest.skip("Unnecessary test case as person batch export is not affected by 'exclude_events'") - await generate_test_events_in_clickhouse( - client=clickhouse_client, - team_id=ateam.pk, - start_time=data_interval_start, - end_time=data_interval_end, - count=100, - count_outside_range=10, - count_other_team=10, - duplicate=True, - properties={"$browser": "Chrome", "$os": "Mac OS X"}, - person_properties={"utm_medium": "referral", "$initial_os": "Linux"}, - ) - - if exclude_events: - for event_name in exclude_events: - await generate_test_events_in_clickhouse( - client=clickhouse_client, - team_id=ateam.pk, - start_time=data_interval_start, - end_time=data_interval_end, - count=5, - count_outside_range=0, - count_other_team=0, - event_name=event_name, - ) + batch_export_schema: BatchExportSchema | None = None + batch_export_model: BatchExportModel | None = None + if isinstance(model, BatchExportModel): + batch_export_model = model + elif model is not None: + batch_export_schema = model - workflow_id = str(uuid4()) + workflow_id = str(uuid.uuid4()) destination_config = s3_batch_export.destination.config | { "endpoint_url": None, "aws_access_key_id": os.getenv("AWS_ACCESS_KEY_ID"), @@ -698,6 +759,7 @@ async def test_s3_export_workflow_with_s3_bucket( data_interval_end=data_interval_end.isoformat(), interval=interval, batch_export_schema=batch_export_schema, + batch_export_model=batch_export_model, **destination_config, ) @@ -736,7 +798,7 @@ async def test_s3_export_workflow_with_s3_bucket( team_id=ateam.pk, data_interval_start=data_interval_start, data_interval_end=data_interval_end, - batch_export_schema=batch_export_schema, + batch_export_model=model, exclude_events=exclude_events, include_events=None, compression=compression, @@ -744,88 +806,6 @@ async def test_s3_export_workflow_with_s3_bucket( ) -@skip("Failing in CI, skip for now") -async def test_s3_export_workflow_with_minio_bucket_and_a_lot_of_data( - clickhouse_client, - minio_client, - bucket_name, - ateam, - compression, - exclude_events, - s3_key_prefix, - interval, - s3_batch_export, -): - """Test the S3BatchExport Workflow end-to-end by using a local MinIO bucket instead of S3. - - This test is the same as test_s3_export_workflow_with_minio_bucket, but we significantly increase the number - of rows generated to export to provide some guidance on whether we can perform under load. - """ - data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") - data_interval_start = data_interval_end - s3_batch_export.interval_time_delta - - await generate_test_events_in_clickhouse( - client=clickhouse_client, - team_id=ateam.pk, - start_time=data_interval_start, - end_time=data_interval_end, - count=100000, - count_outside_range=1000, - count_other_team=1000, - duplicate=True, - properties={"$browser": "Chrome", "$os": "Mac OS X"}, - person_properties={"utm_medium": "referral", "$initial_os": "Linux"}, - ) - - workflow_id = str(uuid4()) - inputs = S3BatchExportInputs( - team_id=ateam.pk, - batch_export_id=str(s3_batch_export.id), - data_interval_end=data_interval_end.isoformat(), - interval=interval, - **s3_batch_export.destination.config, - ) - - async with await WorkflowEnvironment.start_time_skipping() as activity_environment: - async with Worker( - activity_environment.client, - task_queue=settings.TEMPORAL_TASK_QUEUE, - workflows=[S3BatchExportWorkflow], - activities=[ - start_batch_export_run, - insert_into_s3_activity, - finish_batch_export_run, - ], - workflow_runner=UnsandboxedWorkflowRunner(), - ): - await activity_environment.client.execute_workflow( - S3BatchExportWorkflow.run, - inputs, - id=workflow_id, - task_queue=settings.TEMPORAL_TASK_QUEUE, - retry_policy=RetryPolicy(maximum_attempts=1), - execution_timeout=dt.timedelta(seconds=360), - ) - - runs = await afetch_batch_export_runs(batch_export_id=s3_batch_export.id) - assert len(runs) == 1 - - run = runs[0] - assert run.status == "Completed" - - await assert_clickhouse_records_in_s3( - s3_compatible_client=minio_client, - clickhouse_client=clickhouse_client, - bucket_name=bucket_name, - key_prefix=s3_key_prefix, - team_id=ateam.pk, - data_interval_start=data_interval_start, - data_interval_end=data_interval_end, - exclude_events=exclude_events, - compression=compression, - ) - - @pytest.mark.parametrize( "s3_key_prefix", [ @@ -837,8 +817,20 @@ async def test_s3_export_workflow_with_minio_bucket_and_a_lot_of_data( ], indirect=True, ) +@pytest.mark.parametrize("model", [TEST_S3_MODELS[1], TEST_S3_MODELS[2], None]) async def test_s3_export_workflow_with_minio_bucket_and_custom_key_prefix( - clickhouse_client, ateam, minio_client, bucket_name, compression, interval, s3_batch_export, s3_key_prefix + clickhouse_client, + ateam, + minio_client, + bucket_name, + compression, + interval, + s3_batch_export, + s3_key_prefix, + data_interval_end, + data_interval_start, + model: BatchExportModel | BatchExportSchema | None, + generate_test_data, ): """Test the S3BatchExport Workflow end-to-end by specifying a custom key prefix. @@ -846,27 +838,20 @@ async def test_s3_export_workflow_with_minio_bucket_and_custom_key_prefix( inserted_at to assert we properly default to _timestamp. This is relevant for rows inserted before inserted_at was added. """ - data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") - data_interval_start = data_interval_end - s3_batch_export.interval_time_delta - - await generate_test_events_in_clickhouse( - client=clickhouse_client, - team_id=ateam.pk, - start_time=data_interval_start, - end_time=data_interval_end, - count=100, - count_outside_range=10, - count_other_team=10, - duplicate=True, - properties={"$browser": "Chrome", "$os": "Mac OS X"}, - person_properties={"utm_medium": "referral", "$initial_os": "Linux"}, - ) - - workflow_id = str(uuid4()) + batch_export_schema: BatchExportSchema | None = None + batch_export_model: BatchExportModel | None = None + if isinstance(model, BatchExportModel): + batch_export_model = model + elif model is not None: + batch_export_schema = model + + workflow_id = str(uuid.uuid4()) inputs = S3BatchExportInputs( team_id=ateam.pk, batch_export_id=str(s3_batch_export.id), data_interval_end=data_interval_end.isoformat(), + batch_export_model=batch_export_model, + batch_export_schema=batch_export_schema, interval=interval, **s3_batch_export.destination.config, ) @@ -896,11 +881,12 @@ async def test_s3_export_workflow_with_minio_bucket_and_custom_key_prefix( assert len(runs) == 1 run = runs[0] + (events_to_export_created, persons_to_export_created) = generate_test_data assert run.status == "Completed" - assert run.records_completed == 100 + assert run.records_completed == events_to_export_created or run.records_completed == persons_to_export_created expected_key_prefix = s3_key_prefix.format( - table="events", + table=batch_export_model.name if batch_export_model is not None else "events", year=data_interval_end.year, # All of these must include leading 0s. month=data_interval_end.strftime("%m"), @@ -909,6 +895,7 @@ async def test_s3_export_workflow_with_minio_bucket_and_custom_key_prefix( minute=data_interval_end.strftime("%M"), second=data_interval_end.strftime("%S"), ) + objects = await minio_client.list_objects_v2(Bucket=bucket_name, Prefix=expected_key_prefix) key = objects["Contents"][0].get("Key") assert len(objects.get("Contents", [])) == 1 @@ -918,11 +905,12 @@ async def test_s3_export_workflow_with_minio_bucket_and_custom_key_prefix( s3_compatible_client=minio_client, clickhouse_client=clickhouse_client, bucket_name=bucket_name, - key_prefix=s3_key_prefix, + key_prefix=expected_key_prefix, team_id=ateam.pk, data_interval_start=data_interval_start, data_interval_end=data_interval_end, compression=compression, + batch_export_model=model, ) @@ -933,7 +921,7 @@ async def test_s3_export_workflow_handles_insert_activity_errors(ateam, s3_batch """ data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") - workflow_id = str(uuid4()) + workflow_id = str(uuid.uuid4()) inputs = S3BatchExportInputs( team_id=ateam.pk, batch_export_id=str(s3_batch_export.id), @@ -983,7 +971,7 @@ async def test_s3_export_workflow_handles_insert_activity_non_retryable_errors(a """ data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") - workflow_id = str(uuid4()) + workflow_id = str(uuid.uuid4()) inputs = S3BatchExportInputs( team_id=ateam.pk, batch_export_id=str(s3_batch_export.id), @@ -1036,7 +1024,7 @@ async def test_s3_export_workflow_handles_cancellation(ateam, s3_batch_export, i """ data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") - workflow_id = str(uuid4()) + workflow_id = str(uuid.uuid4()) inputs = S3BatchExportInputs( team_id=ateam.pk, batch_export_id=str(s3_batch_export.id), diff --git a/posthog/temporal/tests/utils/persons.py b/posthog/temporal/tests/utils/persons.py new file mode 100644 index 0000000000000..0a63a96d98fe5 --- /dev/null +++ b/posthog/temporal/tests/utils/persons.py @@ -0,0 +1,239 @@ +"""Test utilities that deal with test person generation.""" + +import datetime as dt +import json +import random +import typing +import uuid + +from posthog.temporal.common.clickhouse import ClickHouseClient +from posthog.temporal.tests.utils.datetimes import date_range + + +class PersonValues(typing.TypedDict): + """Person values to be inserted for testing.""" + + id: str + created_at: str + team_id: int + properties: dict | None + is_identified: bool + is_deleted: bool + version: int + _timestamp: str + + +def generate_test_persons( + count: int, + team_id: int, + timestamp_start: dt.datetime, + timestamp_end: dt.datetime, + person_id: uuid.UUID | None = None, + version: int = 1, + properties: dict | None = None, + is_identified: bool = True, + is_deleted: bool = False, + start: int = 0, +) -> list[PersonValues]: + """Generate a list of persons for testing.""" + timestamps = random.sample( + list(date_range(timestamp_start + dt.timedelta(seconds=1) * start, timestamp_end, dt.timedelta(seconds=1))), + k=count, + ) + + persons: list[PersonValues] = [] + for _ in range(start, count + start): + timestamp = timestamps.pop() + person: PersonValues = { + "id": str(person_id) if person_id else str(uuid.uuid4()), + "created_at": timestamp.strftime("%Y-%m-%d %H:%M:%S.%f"), + "team_id": team_id, + "properties": properties, + "is_identified": is_identified, + "is_deleted": is_deleted, + "version": version, + "_timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S"), + } + persons.append(person) + + return persons + + +async def insert_person_values_in_clickhouse(client: ClickHouseClient, persons: list[PersonValues]): + """Execute an insert query to insert provided PersonValues into person.""" + await client.execute_query( + f""" + INSERT INTO `person` ( + id, + team_id, + created_at, + properties, + is_identified, + is_deleted, + version, + _timestamp + ) + VALUES + """, + *[ + ( + person["id"], + person["team_id"], + person["created_at"], + json.dumps(person["properties"]) if isinstance(person["properties"], dict) else person["properties"], + person["is_identified"], + person["is_deleted"], + person["version"], + person["_timestamp"], + ) + for person in persons + ], + ) + + +async def generate_test_persons_in_clickhouse( + client: ClickHouseClient, + team_id: int, + start_time: dt.datetime, + end_time: dt.datetime, + count: int = 100, + count_other_team: int = 0, + person_id: uuid.UUID | None = None, + version: int = 1, + properties: dict | None = None, + is_identified: bool = True, + is_deleted: bool = False, + batch_size: int = 10000, +) -> tuple[list[PersonValues], list[PersonValues]]: + persons: list[PersonValues] = [] + while len(persons) < count: + persons_to_insert = generate_test_persons( + count=min(count - len(persons), batch_size), + team_id=team_id, + timestamp_start=start_time, + timestamp_end=end_time, + person_id=person_id, + properties=properties, + is_identified=is_identified, + is_deleted=is_deleted, + version=version, + start=len(persons), + ) + + await insert_person_values_in_clickhouse(client=client, persons=persons_to_insert) + + persons.extend(persons_to_insert) + + persons_from_other_team = generate_test_persons( + count=count_other_team, + team_id=team_id + random.randint(1, 1000), + timestamp_start=start_time, + timestamp_end=end_time, + person_id=person_id, + properties=properties, + is_identified=is_identified, + is_deleted=is_deleted, + version=version, + start=len(persons), + ) + + await insert_person_values_in_clickhouse(client=client, persons=persons_from_other_team) + return (persons, persons_from_other_team) + + +class PersonDistinctId2Values(typing.TypedDict): + """Values to be inserted in person_distinct_id2 for testing.""" + + team_id: int + distinct_id: str + person_id: str + is_deleted: bool + version: int + _timestamp: str + + +def generate_test_person_distinct_id2( + count: int, + team_id: int, + timestamp: dt.datetime, + distinct_id: str, + person_id: uuid.UUID | None = None, + version: int = 1, + is_deleted: bool = False, +) -> PersonDistinctId2Values: + """Generate a row of person_distinct_id2 values for testing.""" + person: PersonDistinctId2Values = { + "team_id": team_id, + "distinct_id": distinct_id, + "person_id": str(person_id) if person_id else str(uuid.uuid4()), + "is_deleted": is_deleted, + "version": version, + "_timestamp": timestamp.strftime("%Y-%m-%d %H:%M:%S"), + } + + return person + + +async def insert_person_distinct_id2_values_in_clickhouse( + client: ClickHouseClient, persons: list[PersonDistinctId2Values] +): + """Execute an insert query to insert provided PersonDistinctId2Values into person.""" + await client.execute_query( + f""" + INSERT INTO `person_distinct_id2` ( + team_id, + distinct_id, + person_id, + is_deleted, + version, + _timestamp + ) + VALUES + """, + *[ + ( + person["team_id"], + person["distinct_id"], + person["person_id"], + person["is_deleted"], + person["version"], + person["_timestamp"], + ) + for person in persons + ], + ) + + +async def generate_test_person_distinct_id2_in_clickhouse( + client: ClickHouseClient, + team_id: int, + distinct_id: str, + timestamp: dt.datetime, + person_id: uuid.UUID | None = None, + version: int = 1, + is_deleted: bool = False, +) -> tuple[PersonDistinctId2Values, PersonDistinctId2Values]: + person = generate_test_person_distinct_id2( + count=1, + team_id=team_id, + timestamp=timestamp, + distinct_id=distinct_id, + person_id=person_id, + is_deleted=is_deleted, + version=version, + ) + + await insert_person_distinct_id2_values_in_clickhouse(client=client, persons=[person]) + + person_from_other_team = generate_test_person_distinct_id2( + count=1, + team_id=team_id + random.randint(1, 1000), + timestamp=timestamp, + distinct_id=distinct_id, + person_id=person_id, + is_deleted=is_deleted, + version=version, + ) + + await insert_person_distinct_id2_values_in_clickhouse(client=client, persons=[person_from_other_team]) + return (person, person_from_other_team)