diff --git a/frontend/src/scenes/pipeline/pipelineBatchExportConfigurationLogic.tsx b/frontend/src/scenes/pipeline/pipelineBatchExportConfigurationLogic.tsx index 30b34ac6acb4e..7d0b3b405b43e 100644 --- a/frontend/src/scenes/pipeline/pipelineBatchExportConfigurationLogic.tsx +++ b/frontend/src/scenes/pipeline/pipelineBatchExportConfigurationLogic.tsx @@ -188,6 +188,12 @@ const personsTable: DatabaseSchemaBatchExportTable = { type: 'integer', schema_valid: true, }, + created_at: { + name: 'created_at', + hogql_value: 'created_at', + type: 'datetime', + schema_valid: true, + }, }, } diff --git a/posthog/batch_exports/sql.py b/posthog/batch_exports/sql.py index 8c6318037f623..baa0216afdbbc 100644 --- a/posthog/batch_exports/sql.py +++ b/posthog/batch_exports/sql.py @@ -80,6 +80,7 @@ p.properties AS properties, pd.version AS person_distinct_id_version, p.version AS person_version, + p.created_at AS created_at, multiIf( ( pd._timestamp >= {{interval_start:DateTime64}} @@ -140,6 +141,7 @@ p.properties AS properties, pd.version AS person_distinct_id_version, p.version AS person_version, + p.created_at AS created_at, multiIf( pd._timestamp < {{interval_end:DateTime64}} AND NOT p._timestamp < {{interval_end:DateTime64}}, @@ -170,6 +172,7 @@ id, max(version) AS version, argMax(properties, person.version) AS properties, + argMax(created_at, person.version) AS created_at, argMax(_timestamp, person.version) AS _timestamp FROM person diff --git a/posthog/clickhouse/migrations/0093_persons_batch_export_add_created_at.py b/posthog/clickhouse/migrations/0093_persons_batch_export_add_created_at.py new file mode 100644 index 0000000000000..448b67422e042 --- /dev/null +++ b/posthog/clickhouse/migrations/0093_persons_batch_export_add_created_at.py @@ -0,0 +1,13 @@ +from posthog.batch_exports.sql import ( + CREATE_PERSONS_BATCH_EXPORT_VIEW, + CREATE_PERSONS_BATCH_EXPORT_VIEW_BACKFILL, +) +from posthog.clickhouse.client.migration_tools import run_sql_with_exceptions + +operations = map( + run_sql_with_exceptions, + [ + CREATE_PERSONS_BATCH_EXPORT_VIEW, + CREATE_PERSONS_BATCH_EXPORT_VIEW_BACKFILL, + ], +) diff --git a/posthog/temporal/batch_exports/batch_exports.py b/posthog/temporal/batch_exports/batch_exports.py index f8b4d34adc7ca..1b8faf19e482e 100644 --- a/posthog/temporal/batch_exports/batch_exports.py +++ b/posthog/temporal/batch_exports/batch_exports.py @@ -68,6 +68,32 @@ optimize_aggregation_in_order=1 """ +# This is an updated version of the view that we will use going forward +# We will migrate each batch export destination over one at a time to migitate +# risk, and once this is done we can clean this up. +SELECT_FROM_PERSONS_VIEW_NEW = """ +SELECT + persons.team_id AS team_id, + persons.distinct_id AS distinct_id, + persons.person_id AS person_id, + persons.properties AS properties, + persons.person_distinct_id_version AS person_distinct_id_version, + persons.person_version AS person_version, + persons.created_at AS created_at, + persons._inserted_at AS _inserted_at +FROM + persons_batch_export( + team_id={team_id}, + interval_start={interval_start}, + interval_end={interval_end} + ) AS persons +FORMAT ArrowStream +SETTINGS + max_bytes_before_external_group_by=50000000000, + max_bytes_before_external_sort=50000000000, + optimize_aggregation_in_order=1 +""" + SELECT_FROM_PERSONS_VIEW_BACKFILL = """ SELECT persons.team_id AS team_id, @@ -89,6 +115,31 @@ optimize_aggregation_in_order=1 """ +# This is an updated version of the view that we will use going forward +# We will migrate each batch export destination over one at a time to migitate +# risk, and once this is done we can clean this up. +SELECT_FROM_PERSONS_VIEW_BACKFILL_NEW = """ +SELECT + persons.team_id AS team_id, + persons.distinct_id AS distinct_id, + persons.person_id AS person_id, + persons.properties AS properties, + persons.person_distinct_id_version AS person_distinct_id_version, + persons.person_version AS person_version, + persons.created_at AS created_at, + persons._inserted_at AS _inserted_at +FROM + persons_batch_export_backfill( + team_id={team_id}, + interval_end={interval_end} + ) AS persons +FORMAT ArrowStream +SETTINGS + max_bytes_before_external_group_by=50000000000, + max_bytes_before_external_sort=50000000000, + optimize_aggregation_in_order=1 +""" + SELECT_FROM_EVENTS_VIEW = Template( """ SELECT @@ -195,6 +246,8 @@ async def iter_model_records( interval_start: str | None, interval_end: str, destination_default_fields: list[BatchExportField] | None = None, + # TODO - remove this once all batch exports are using the latest schema + use_latest_schema: bool = False, **parameters, ) -> AsyncRecordsGenerator: if not is_backfill and interval_start is None: @@ -215,6 +268,7 @@ async def iter_model_records( extra_query_parameters=model.schema["values"] if model.schema is not None else None, interval_start=interval_start, interval_end=interval_end, + use_latest_schema=use_latest_schema, **parameters, ): yield record @@ -241,13 +295,21 @@ async def iter_records_from_model_view( interval_start: str | None, interval_end: str, fields: list[BatchExportField], + # TODO - remove this once all batch exports are using the latest schema + use_latest_schema: bool = False, **parameters, ) -> AsyncRecordsGenerator: if model_name == "persons": if is_backfill and interval_start is None: - view = SELECT_FROM_PERSONS_VIEW_BACKFILL + if use_latest_schema: + view = SELECT_FROM_PERSONS_VIEW_BACKFILL_NEW + else: + view = SELECT_FROM_PERSONS_VIEW_BACKFILL else: - view = SELECT_FROM_PERSONS_VIEW + if use_latest_schema: + view = SELECT_FROM_PERSONS_VIEW_NEW + else: + view = SELECT_FROM_PERSONS_VIEW elif str(team_id) not in settings.ASYNC_ARROW_STREAMING_TEAM_IDS: # TODO: Let this model be exported by `astream_query_as_arrow`. # Just to reduce risk, I don't want to change the function that runs 100% of the exports @@ -396,6 +458,8 @@ def start_produce_batch_export_record_batches( done_ranges: list[tuple[dt.datetime, dt.datetime]], fields: list[BatchExportField] | None = None, destination_default_fields: list[BatchExportField] | None = None, + # TODO - remove this once all batch exports are using the latest schema + use_latest_schema: bool = False, **parameters, ): """Start producing batch export record batches from a model query. @@ -418,9 +482,15 @@ def start_produce_batch_export_record_batches( if model_name == "persons": if is_backfill and full_range[0] is None: - view = SELECT_FROM_PERSONS_VIEW_BACKFILL + if use_latest_schema: + view = SELECT_FROM_PERSONS_VIEW_BACKFILL_NEW + else: + view = SELECT_FROM_PERSONS_VIEW_BACKFILL else: - view = SELECT_FROM_PERSONS_VIEW + if use_latest_schema: + view = SELECT_FROM_PERSONS_VIEW_NEW + else: + view = SELECT_FROM_PERSONS_VIEW else: if parameters.get("exclude_events", None): @@ -762,13 +832,10 @@ class StartBatchExportRunInputs: @activity.defn async def start_batch_export_run(inputs: StartBatchExportRunInputs) -> BatchExportRunId: - """Activity that creates an BatchExportRun and returns the count of records to export. + """Activity that creates an BatchExportRun and returns the run id. Intended to be used in all export workflows, usually at the start, to create a model instance to represent them in our database. - - Upon seeing a count of 0 records to export, batch export workflows should finish early - (i.e. without running the insert activity), as there will be nothing to export. """ logger = await bind_temporal_worker_logger(team_id=inputs.team_id) await logger.ainfo( diff --git a/posthog/temporal/batch_exports/bigquery_batch_export.py b/posthog/temporal/batch_exports/bigquery_batch_export.py index e4eea3625a7fc..e99ba77f3c1bf 100644 --- a/posthog/temporal/batch_exports/bigquery_batch_export.py +++ b/posthog/temporal/batch_exports/bigquery_batch_export.py @@ -34,6 +34,11 @@ start_batch_export_run, start_produce_batch_export_record_batches, ) +from posthog.temporal.batch_exports.heartbeat import ( + BatchExportRangeHeartbeatDetails, + DateRange, + should_resume_from_activity_heartbeat, +) from posthog.temporal.batch_exports.metrics import ( get_bytes_exported_metric, get_rows_exported_metric, @@ -49,12 +54,6 @@ cast_record_batch_json_columns, set_status_to_running_task, ) -from posthog.temporal.batch_exports.heartbeat import ( - BatchExportRangeHeartbeatDetails, - DateRange, - should_resume_from_activity_heartbeat, -) - from posthog.temporal.common.clickhouse import get_client from posthog.temporal.common.heartbeat import Heartbeater from posthog.temporal.common.logger import configure_temporal_worker_logger @@ -174,17 +173,26 @@ async def adelete_table( project_id: str, dataset_id: str, table_id: str, - table_schema: list[bigquery.SchemaField], not_found_ok: bool = True, ) -> None: """Delete a table in BigQuery.""" fully_qualified_name = f"{project_id}.{dataset_id}.{table_id}" - table = bigquery.Table(fully_qualified_name, schema=table_schema) + table = bigquery.Table(fully_qualified_name) await asyncio.to_thread(self.delete_table, table, not_found_ok=not_found_ok) return None + async def aget_table( + self, + project_id: str, + dataset_id: str, + table_id: str, + ) -> bigquery.Table: + """Get a table in BigQuery.""" + fully_qualified_name = f"{project_id}.{dataset_id}.{table_id}" + return await asyncio.to_thread(self.get_table, fully_qualified_name) + @contextlib.asynccontextmanager async def managed_table( self, @@ -197,25 +205,23 @@ async def managed_table( delete: bool = True, create: bool = True, ) -> collections.abc.AsyncGenerator[bigquery.Table, None]: - """Manage a table in BigQuery by ensure it exists while in context.""" + """Manage a table in BigQuery by ensuring it exists while in context.""" if create is True: table = await self.acreate_table(project_id, dataset_id, table_id, table_schema, exists_ok) else: - fully_qualified_name = f"{project_id}.{dataset_id}.{table_id}" - table = bigquery.Table(fully_qualified_name, schema=table_schema) + table = await self.aget_table(project_id, dataset_id, table_id) try: yield table finally: if delete is True: - await self.adelete_table(project_id, dataset_id, table_id, table_schema, not_found_ok) + await self.adelete_table(project_id, dataset_id, table_id, not_found_ok) async def amerge_person_tables( self, final_table: bigquery.Table, stage_table: bigquery.Table, merge_key: collections.abc.Iterable[bigquery.SchemaField], - update_fields: collections.abc.Iterable[bigquery.SchemaField] | None = None, person_version_key: str = "person_version", person_distinct_id_version_key: str = "person_distinct_id_version", ): @@ -233,12 +239,7 @@ async def amerge_person_tables( values = "" field_names = "" - if not update_fields: - update_clause_fields = final_table.schema - else: - update_clause_fields = update_fields - - for n, field in enumerate(update_clause_fields): + for n, field in enumerate(final_table.schema): if n > 0: update_clause += ", " values += ", " @@ -409,6 +410,7 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records include_events=inputs.include_events, fields=fields, destination_default_fields=bigquery_default_fields(), + use_latest_schema=True, extra_query_parameters=extra_query_parameters, ) @@ -475,17 +477,17 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records with bigquery_client(inputs) as bq_client: async with ( bq_client.managed_table( - inputs.project_id, - inputs.dataset_id, - inputs.table_id, - schema, + project_id=inputs.project_id, + dataset_id=inputs.dataset_id, + table_id=inputs.table_id, + table_schema=schema, delete=False, ) as bigquery_table, bq_client.managed_table( - inputs.project_id, - inputs.dataset_id, - stage_table_name, - schema, + project_id=inputs.project_id, + dataset_id=inputs.dataset_id, + table_id=stage_table_name, + table_schema=schema, create=requires_merge, delete=requires_merge, ) as bigquery_stage_table, @@ -556,7 +558,6 @@ async def flush_to_bigquery( final_table=bigquery_table, stage_table=bigquery_stage_table, merge_key=merge_key, - update_fields=schema, ) return records_total diff --git a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py index bc76b062f4d64..b4ec82c7eacfc 100644 --- a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py @@ -18,8 +18,12 @@ from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker +from posthog.batch_exports.service import ( + BatchExportModel, + BatchExportSchema, + BigQueryBatchExportInputs, +) from posthog.constants import BATCH_EXPORTS_TASK_QUEUE -from posthog.batch_exports.service import BatchExportModel, BatchExportSchema, BigQueryBatchExportInputs from posthog.temporal.batch_exports.batch_exports import ( finish_batch_export_run, iter_model_records, @@ -54,6 +58,17 @@ TEST_TIME = dt.datetime.now(dt.UTC) +EXPECTED_PERSONS_BATCH_EXPORT_FIELDS = [ + "team_id", + "distinct_id", + "person_id", + "properties", + "person_version", + "person_distinct_id_version", + "created_at", + "_inserted_at", +] + @pytest.fixture def activity_environment(activity_environment): @@ -76,6 +91,7 @@ async def assert_clickhouse_records_in_bigquery( sort_key: str = "event", is_backfill: bool = False, expect_duplicates: bool = False, + expected_fields: list[str] | None = None, ) -> None: """Assert ClickHouse records are written to a given BigQuery table. @@ -92,6 +108,7 @@ async def assert_clickhouse_records_in_bigquery( batch_export_schema: Custom schema used in the batch export. use_json_type: Whether to use JSON type for known fields. expect_duplicates: Whether duplicates are expected (e.g. when testing retrying logic). + expected_fields: The expected fields to be exported. """ if use_json_type is True: json_columns = ["properties", "set", "set_once", "person_properties"] @@ -121,25 +138,20 @@ async def assert_clickhouse_records_in_bigquery( inserted_records.append(inserted_record) - schema_column_names = [field["alias"] for field in bigquery_default_fields()] - if batch_export_model is not None: - if isinstance(batch_export_model, BatchExportModel): - batch_export_schema = batch_export_model.schema - else: - batch_export_schema = batch_export_model - - if batch_export_schema is not None: - schema_column_names = [field["alias"] for field in batch_export_schema["fields"]] - elif isinstance(batch_export_model, BatchExportModel) and batch_export_model.name == "persons": - schema_column_names = [ - "team_id", - "distinct_id", - "person_id", - "properties", - "person_version", - "person_distinct_id_version", - "_inserted_at", - ] + if expected_fields is not None: + schema_column_names = expected_fields + else: + schema_column_names = [field["alias"] for field in bigquery_default_fields()] + if batch_export_model is not None: + if isinstance(batch_export_model, BatchExportModel): + batch_export_schema = batch_export_model.schema + else: + batch_export_schema = batch_export_model + + if batch_export_schema is not None: + schema_column_names = [field["alias"] for field in batch_export_schema["fields"]] + elif isinstance(batch_export_model, BatchExportModel) and batch_export_model.name == "persons": + schema_column_names = EXPECTED_PERSONS_BATCH_EXPORT_FIELDS expected_records = [] for data_interval_start, data_interval_end in date_ranges: @@ -153,6 +165,7 @@ async def assert_clickhouse_records_in_bigquery( include_events=include_events, destination_default_fields=bigquery_default_fields(), is_backfill=is_backfill, + use_latest_schema=True, ): for record in record_batch.select(schema_column_names).to_pylist(): expected_record = {} @@ -205,6 +218,15 @@ def is_record_seen(record) -> bool: assert all(ts >= min_ingested_timestamp for ts in inserted_bq_ingested_timestamp) +def drop_column_from_bigquery_table( + bigquery_client: bigquery.Client, dataset_id: str, table_id: str, column_name: str +) -> None: + """Drop a column from a BigQuery table.""" + + query_job = bigquery_client.query(f"ALTER TABLE {dataset_id}.{table_id} DROP COLUMN {column_name}") + _ = query_job.result() + + @pytest.fixture def bigquery_config() -> dict[str, str]: """Return a BigQuery configuration dictionary to use in tests.""" @@ -450,6 +472,109 @@ async def test_insert_into_bigquery_activity_merges_data_in_follow_up_runs( ) +async def test_insert_into_bigquery_activity_handles_person_schema_changes( + clickhouse_client, + activity_environment, + bigquery_client, + bigquery_config, + bigquery_dataset, + generate_test_data, + data_interval_start, + data_interval_end, + ateam, +): + """Test that the `insert_into_bigquery_activity` handles changes to the + person schema. + + If we update the schema of the persons model we export, we should still be + able to export the data without breaking existing exports. For example, any + new fields should not be added to the destination (in future we may want to + allow this but for now we don't). + + To replicate this situation we first export the data with the original + schema, then delete a column in the destination and then rerun the export. + """ + model = BatchExportModel(name="persons", schema=None) + + insert_inputs = BigQueryInsertInputs( + team_id=ateam.pk, + table_id=f"test_insert_activity_migration_table_{ateam.pk}", + dataset_id=bigquery_dataset.dataset_id, + data_interval_start=data_interval_start.isoformat(), + data_interval_end=data_interval_end.isoformat(), + batch_export_model=model, + **bigquery_config, + ) + + with freeze_time(TEST_TIME) as frozen_time: + await activity_environment.run(insert_into_bigquery_activity, insert_inputs) + + ingested_timestamp = frozen_time().replace(tzinfo=dt.UTC) + + await assert_clickhouse_records_in_bigquery( + bigquery_client=bigquery_client, + clickhouse_client=clickhouse_client, + table_id=f"test_insert_activity_migration_table_{ateam.pk}", + dataset_id=bigquery_dataset.dataset_id, + team_id=ateam.pk, + date_ranges=[(data_interval_start, data_interval_end)], + batch_export_model=model, + min_ingested_timestamp=ingested_timestamp, + sort_key="person_id", + ) + + # drop the created_at column from the BigQuery table + drop_column_from_bigquery_table( + bigquery_client=bigquery_client, + dataset_id=bigquery_dataset.dataset_id, + table_id=f"test_insert_activity_migration_table_{ateam.pk}", + column_name="created_at", + ) + + _, persons_to_export_created = generate_test_data + + for old_person in persons_to_export_created[: len(persons_to_export_created) // 2]: + new_person_id = uuid.uuid4() + new_person, _ = await generate_test_persons_in_clickhouse( + client=clickhouse_client, + team_id=ateam.pk, + start_time=data_interval_start, + end_time=data_interval_end, + person_id=new_person_id, + count=1, + properties={"utm_medium": "referral", "$initial_os": "Linux", "new_property": "Something"}, + ) + + await generate_test_person_distinct_id2_in_clickhouse( + clickhouse_client, + ateam.pk, + person_id=uuid.UUID(new_person[0]["id"]), + distinct_id=old_person["distinct_id"], + version=old_person["version"] + 1, + timestamp=old_person["_timestamp"], + ) + + with freeze_time(TEST_TIME) as frozen_time: + await activity_environment.run(insert_into_bigquery_activity, insert_inputs) + + ingested_timestamp = frozen_time().replace(tzinfo=dt.UTC) + + # this time we don't expected there to be a created_at column + expected_fields = [field for field in EXPECTED_PERSONS_BATCH_EXPORT_FIELDS if field != "created_at"] + await assert_clickhouse_records_in_bigquery( + bigquery_client=bigquery_client, + clickhouse_client=clickhouse_client, + table_id=f"test_insert_activity_migration_table_{ateam.pk}", + dataset_id=bigquery_dataset.dataset_id, + team_id=ateam.pk, + date_ranges=[(data_interval_start, data_interval_end)], + batch_export_model=model, + min_ingested_timestamp=ingested_timestamp, + sort_key="person_id", + expected_fields=expected_fields, + ) + + @pytest.mark.parametrize("interval", ["hour"], indirect=True) @pytest.mark.parametrize( "done_relative_ranges,expected_relative_ranges",