Skip to content

Commit

Permalink
fix(batch-exports): Use existing schema in merge query (#24753)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias authored Sep 4, 2024
1 parent dccb835 commit a9d32a4
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
13 changes: 12 additions & 1 deletion posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ async def amerge_person_tables(
final_table: bigquery.Table,
stage_table: bigquery.Table,
merge_key: collections.abc.Iterable[bigquery.SchemaField],
update_fields: collections.abc.Iterable[bigquery.SchemaField] | None = None,
person_version_key: str = "person_version",
person_distinct_id_version_key: str = "person_distinct_id_version",
):
Expand All @@ -221,7 +222,13 @@ async def amerge_person_tables(
update_clause = ""
values = ""
field_names = ""
for n, field in enumerate(final_table.schema):

if not update_fields:
update_clause_fields = final_table.schema
else:
update_clause_fields = update_fields

for n, field in enumerate(update_clause_fields):
if n > 0:
update_clause += ", "
values += ", "
Expand All @@ -231,6 +238,9 @@ async def amerge_person_tables(
field_names += f"`{field.name}`"
values += f"stage.`{field.name}`"

if not update_clause:
raise ValueError("Empty update clause")

merge_query = f"""
MERGE `{final_table.full_table_id.replace(":", ".", 1)}` final
USING `{stage_table.full_table_id.replace(":", ".", 1)}` stage
Expand Down Expand Up @@ -481,6 +491,7 @@ async def flush_to_bigquery(
final_table=bigquery_table,
stage_table=bigquery_stage_table,
merge_key=merge_key,
update_fields=schema,
)

return writer.records_total
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import os
import typing
import uuid
import warnings

import pyarrow as pa
import pytest
Expand Down Expand Up @@ -213,7 +214,12 @@ def bigquery_dataset(bigquery_config, bigquery_client) -> typing.Generator[bigqu

yield dataset

# bigquery_client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True)
try:
bigquery_client.delete_dataset(dataset_id, delete_contents=True, not_found_ok=True)
except Exception as exc:
warnings.warn(
f"Failed to clean up dataset: {dataset_id} due to '{exc.__class__.__name__}': {str(exc)}", stacklevel=1
)


@pytest.fixture
Expand Down

0 comments on commit a9d32a4

Please sign in to comment.