diff --git a/posthog/temporal/workflows/bigquery_batch_export.py b/posthog/temporal/workflows/bigquery_batch_export.py index a680ff9102d58..f45b67b02264e 100644 --- a/posthog/temporal/workflows/bigquery_batch_export.py +++ b/posthog/temporal/workflows/bigquery_batch_export.py @@ -1,5 +1,5 @@ -import contextlib import asyncio +import contextlib import dataclasses import datetime as dt import json @@ -31,7 +31,7 @@ from posthog.temporal.workflows.metrics import get_bytes_exported_metric, get_rows_exported_metric -def load_jsonl_file_to_bigquery_table(jsonl_file, table, table_schema, bigquery_client): +async def load_jsonl_file_to_bigquery_table(jsonl_file, table, table_schema, bigquery_client): """Execute a COPY FROM query with given connection to copy contents of jsonl_file.""" job_config = bigquery.LoadJobConfig( source_format="NEWLINE_DELIMITED_JSON", @@ -39,10 +39,10 @@ def load_jsonl_file_to_bigquery_table(jsonl_file, table, table_schema, bigquery_ ) load_job = bigquery_client.load_table_from_file(jsonl_file, table, job_config=job_config, rewind=True) - load_job.result() + await asyncio.to_thread(load_job.result) -def create_table_in_bigquery( +async def create_table_in_bigquery( project_id: str, dataset_id: str, table_id: str, @@ -54,7 +54,7 @@ def create_table_in_bigquery( fully_qualified_name = f"{project_id}.{dataset_id}.{table_id}" table = bigquery.Table(fully_qualified_name, schema=table_schema) table.time_partitioning = bigquery.TimePartitioning(type_=bigquery.TimePartitioningType.DAY, field="timestamp") - table = bigquery_client.create_table(table, exists_ok=exists_ok) + table = await asyncio.to_thread(bigquery_client.create_table, table, exists_ok=exists_ok) return table @@ -191,7 +191,7 @@ async def worker_shutdown_handler(): asyncio.create_task(worker_shutdown_handler()) with bigquery_client(inputs) as bq_client: - bigquery_table = create_table_in_bigquery( + bigquery_table = await create_table_in_bigquery( inputs.project_id, inputs.dataset_id, inputs.table_id, @@ -203,13 +203,13 @@ async def worker_shutdown_handler(): rows_exported = get_rows_exported_metric() bytes_exported = get_bytes_exported_metric() - def flush_to_bigquery(): + async def flush_to_bigquery(): logger.debug( "Loading %s records of size %s bytes", jsonl_file.records_since_last_reset, jsonl_file.bytes_since_last_reset, ) - load_jsonl_file_to_bigquery_table(jsonl_file, bigquery_table, table_schema, bq_client) + await load_jsonl_file_to_bigquery_table(jsonl_file, bigquery_table, table_schema, bq_client) rows_exported.add(jsonl_file.records_since_last_reset) bytes_exported.add(jsonl_file.bytes_since_last_reset) @@ -238,6 +238,8 @@ def flush_to_bigquery(): last_inserted_at = result["inserted_at"] activity.heartbeat(last_inserted_at) + jsonl_file.reset() + @workflow.defn(name="bigquery-export") class BigQueryBatchExportWorkflow(PostHogWorkflow):