From a9d32a4b1b34619fececb292a2a1ee592151e247 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Wed, 4 Sep 2024 12:20:27 +0200 Subject: [PATCH] fix(batch-exports): Use existing schema in merge query (#24753) --- .../temporal/batch_exports/bigquery_batch_export.py | 13 ++++++++++++- .../test_bigquery_batch_export_workflow.py | 8 +++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/posthog/temporal/batch_exports/bigquery_batch_export.py b/posthog/temporal/batch_exports/bigquery_batch_export.py index df8a1dfb5c387..3e80dd51bdf3d 100644 --- a/posthog/temporal/batch_exports/bigquery_batch_export.py +++ b/posthog/temporal/batch_exports/bigquery_batch_export.py @@ -205,6 +205,7 @@ async def amerge_person_tables( final_table: bigquery.Table, stage_table: bigquery.Table, merge_key: collections.abc.Iterable[bigquery.SchemaField], + update_fields: collections.abc.Iterable[bigquery.SchemaField] | None = None, person_version_key: str = "person_version", person_distinct_id_version_key: str = "person_distinct_id_version", ): @@ -221,7 +222,13 @@ async def amerge_person_tables( update_clause = "" values = "" field_names = "" - for n, field in enumerate(final_table.schema): + + if not update_fields: + update_clause_fields = final_table.schema + else: + update_clause_fields = update_fields + + for n, field in enumerate(update_clause_fields): if n > 0: update_clause += ", " values += ", " @@ -231,6 +238,9 @@ async def amerge_person_tables( field_names += f"`{field.name}`" values += f"stage.`{field.name}`" + if not update_clause: + raise ValueError("Empty update clause") + merge_query = f""" MERGE `{final_table.full_table_id.replace(":", ".", 1)}` final USING `{stage_table.full_table_id.replace(":", ".", 1)}` stage @@ -481,6 +491,7 @@ async def flush_to_bigquery( final_table=bigquery_table, stage_table=bigquery_stage_table, merge_key=merge_key, + update_fields=schema, ) return writer.records_total diff --git a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py index bfaffcc1b4cd2..0f184b79356a1 100644 --- a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py @@ -5,6 +5,7 @@ import os import typing import uuid +import warnings import pyarrow as pa import pytest @@ -213,7 +214,12 @@ def bigquery_dataset(bigquery_config, bigquery_client) -> typing.Generator[bigqu yield dataset - # bigquery_client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True) + try: + bigquery_client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True) + except Exception as exc: + warnings.warn( + f"Failed to clean up dataset: {dataset_id} due to '{exc.__class__.__name__}': {str(exc)}", stacklevel=1 + ) @pytest.fixture