Skip to content

Commit

Permalink
refactor: Wrap blocking IO in asyncio.to_thread
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Nov 20, 2023
1 parent 3ad6132 commit 1cac927
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions posthog/temporal/workflows/bigquery_batch_export.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import contextlib
import asyncio
import contextlib
import dataclasses
import datetime as dt
import json
Expand Down Expand Up @@ -31,18 +31,18 @@
from posthog.temporal.workflows.metrics import get_bytes_exported_metric, get_rows_exported_metric


def load_jsonl_file_to_bigquery_table(jsonl_file, table, table_schema, bigquery_client):
async def load_jsonl_file_to_bigquery_table(jsonl_file, table, table_schema, bigquery_client):
"""Execute a COPY FROM query with given connection to copy contents of jsonl_file."""
job_config = bigquery.LoadJobConfig(
source_format="NEWLINE_DELIMITED_JSON",
schema=table_schema,
)

load_job = bigquery_client.load_table_from_file(jsonl_file, table, job_config=job_config, rewind=True)
load_job.result()
await asyncio.to_thread(load_job.result)


def create_table_in_bigquery(
async def create_table_in_bigquery(
project_id: str,
dataset_id: str,
table_id: str,
Expand All @@ -54,7 +54,7 @@ def create_table_in_bigquery(
fully_qualified_name = f"{project_id}.{dataset_id}.{table_id}"
table = bigquery.Table(fully_qualified_name, schema=table_schema)
table.time_partitioning = bigquery.TimePartitioning(type_=bigquery.TimePartitioningType.DAY, field="timestamp")
table = bigquery_client.create_table(table, exists_ok=exists_ok)
table = await asyncio.to_thread(bigquery_client.create_table, table, exists_ok=exists_ok)

return table

Expand Down Expand Up @@ -191,7 +191,7 @@ async def worker_shutdown_handler():
asyncio.create_task(worker_shutdown_handler())

with bigquery_client(inputs) as bq_client:
bigquery_table = create_table_in_bigquery(
bigquery_table = await create_table_in_bigquery(
inputs.project_id,
inputs.dataset_id,
inputs.table_id,
Expand All @@ -203,13 +203,13 @@ async def worker_shutdown_handler():
rows_exported = get_rows_exported_metric()
bytes_exported = get_bytes_exported_metric()

def flush_to_bigquery():
async def flush_to_bigquery():
logger.debug(
"Loading %s records of size %s bytes",
jsonl_file.records_since_last_reset,
jsonl_file.bytes_since_last_reset,
)
load_jsonl_file_to_bigquery_table(jsonl_file, bigquery_table, table_schema, bq_client)
await load_jsonl_file_to_bigquery_table(jsonl_file, bigquery_table, table_schema, bq_client)

rows_exported.add(jsonl_file.records_since_last_reset)
bytes_exported.add(jsonl_file.bytes_since_last_reset)
Expand Down Expand Up @@ -238,6 +238,8 @@ def flush_to_bigquery():
last_inserted_at = result["inserted_at"]
activity.heartbeat(last_inserted_at)

jsonl_file.reset()


@workflow.defn(name="bigquery-export")
class BigQueryBatchExportWorkflow(PostHogWorkflow):
Expand Down

0 comments on commit 1cac927

Please sign in to comment.