From 8fc2b1572462c60642f9944e3e45ef475eaeb97c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Fri, 28 Jun 2024 14:52:34 +0200 Subject: [PATCH] fix: Use JSONL for bigquery always (#23321) * fix: Use JSONL for bigquery always * Update query snapshots * fix: Do not require delete permissions * Update query snapshots * Update query snapshots --------- Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com> --- .../test_properties_timeline.ambr | 8 +++--- .../batch_exports/bigquery_batch_export.py | 27 ++++++++++--------- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/posthog/api/test/__snapshots__/test_properties_timeline.ambr b/posthog/api/test/__snapshots__/test_properties_timeline.ambr index 670ac0cd349e4..ac8cf04120ea6 100644 --- a/posthog/api/test/__snapshots__/test_properties_timeline.ambr +++ b/posthog/api/test/__snapshots__/test_properties_timeline.ambr @@ -446,7 +446,7 @@ ORDER BY timestamp ASC ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING) AS end_event_number FROM (SELECT timestamp, person_properties AS properties, - array(replaceRegexpAll(JSONExtractRaw(person_properties, 'foo'), '^"|"$', ''), replaceRegexpAll(JSONExtractRaw(person_properties, 'bar'), '^"|"$', '')) AS relevant_property_values, + array(replaceRegexpAll(JSONExtractRaw(person_properties, 'bar'), '^"|"$', ''), replaceRegexpAll(JSONExtractRaw(person_properties, 'foo'), '^"|"$', '')) AS relevant_property_values, lagInFrame(relevant_property_values) OVER ( ORDER BY timestamp ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS previous_relevant_property_values, row_number() OVER ( @@ -482,7 +482,7 @@ ORDER BY timestamp ASC ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING) AS end_event_number FROM (SELECT timestamp, person_properties AS properties, - array("mat_pp_foo", "mat_pp_bar") AS relevant_property_values, + array("mat_pp_bar", "mat_pp_foo") AS relevant_property_values, lagInFrame(relevant_property_values) OVER ( ORDER BY timestamp ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS previous_relevant_property_values, row_number() OVER ( @@ -522,7 +522,7 @@ ORDER BY timestamp ASC ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING) AS end_event_number FROM (SELECT timestamp, person_properties AS properties, - array(replaceRegexpAll(JSONExtractRaw(person_properties, 'foo'), '^"|"$', ''), replaceRegexpAll(JSONExtractRaw(person_properties, 'bar'), '^"|"$', '')) AS relevant_property_values, + array(replaceRegexpAll(JSONExtractRaw(person_properties, 'bar'), '^"|"$', ''), replaceRegexpAll(JSONExtractRaw(person_properties, 'foo'), '^"|"$', '')) AS relevant_property_values, lagInFrame(relevant_property_values) OVER ( ORDER BY timestamp ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS previous_relevant_property_values, row_number() OVER ( @@ -558,7 +558,7 @@ ORDER BY timestamp ASC ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING) AS end_event_number FROM (SELECT timestamp, person_properties AS properties, - array("mat_pp_foo", "mat_pp_bar") AS relevant_property_values, + array("mat_pp_bar", "mat_pp_foo") AS relevant_property_values, lagInFrame(relevant_property_values) OVER ( ORDER BY timestamp ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS previous_relevant_property_values, row_number() OVER ( diff --git a/posthog/temporal/batch_exports/bigquery_batch_export.py b/posthog/temporal/batch_exports/bigquery_batch_export.py index da4f8101cbf8a..7d439e4ccca6f 100644 --- a/posthog/temporal/batch_exports/bigquery_batch_export.py +++ b/posthog/temporal/batch_exports/bigquery_batch_export.py @@ -184,9 +184,14 @@ async def managed_table( exists_ok: bool = True, not_found_ok: bool = True, delete: bool = True, + create: bool = True, ) -> collections.abc.AsyncGenerator[bigquery.Table, None]: """Manage a table in BigQuery by ensure it exists while in context.""" - table = await self.acreate_table(project_id, dataset_id, table_id, table_schema, exists_ok) + if create is True: + table = await self.acreate_table(project_id, dataset_id, table_id, table_schema, exists_ok) + else: + fully_qualified_name = f"{project_id}.{dataset_id}.{table_id}" + table = bigquery.Table(fully_qualified_name, schema=table_schema) try: yield table @@ -398,21 +403,24 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records requires_merge = ( isinstance(inputs.batch_export_model, BatchExportModel) and inputs.batch_export_model.name == "persons" ) + stage_table_name = f"stage_{inputs.table_id}" if requires_merge else inputs.table_id with bigquery_client(inputs) as bq_client: async with ( bq_client.managed_table( inputs.project_id, inputs.dataset_id, - f"{inputs.table_id}", + inputs.table_id, schema, delete=False, ) as bigquery_table, bq_client.managed_table( inputs.project_id, inputs.dataset_id, - f"stage_{inputs.table_id}", + stage_table_name, schema, + create=requires_merge, + delete=requires_merge, ) as bigquery_stage_table, ): @@ -426,10 +434,7 @@ async def flush_to_bigquery( ) table = bigquery_stage_table if requires_merge else bigquery_table - if inputs.use_json_type is True: - await bq_client.load_jsonl_file(local_results_file, table, schema) - else: - await bq_client.load_parquet_file(local_results_file, table, schema) + await bq_client.load_jsonl_file(local_results_file, table, schema) rows_exported.add(records_since_last_flush) bytes_exported.add(bytes_since_last_flush) @@ -447,13 +452,11 @@ async def flush_to_bigquery( for field in first_record_batch.select([field.name for field in schema]).schema ] ) - - writer = get_batch_export_writer( - inputs, - flush_callable=flush_to_bigquery, + writer = JSONLBatchExportWriter( max_bytes=settings.BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES, - schema=record_schema, + flush_callable=flush_to_bigquery, ) + async with writer.open_temporary_file(): async for record_batch in records_iterator: record_batch = cast_record_batch_json_columns(record_batch, json_columns=json_columns)