Skip to content

Commit

Permalink
feat: Use a background task to set batch export status to running
Browse files Browse the repository at this point in the history
This allows us to run the task concurrently instead of timing out on
it. It also has the additional benefit of removing a lot of nesting levels!
  • Loading branch information
tomasfarias committed Jun 20, 2024
1 parent 5d44b12 commit 9b4c15f
Show file tree
Hide file tree
Showing 6 changed files with 489 additions and 478 deletions.
212 changes: 105 additions & 107 deletions posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from posthog.temporal.batch_exports.temporary_file import (
BatchExportTemporaryFile,
)
from posthog.temporal.batch_exports.utils import peek_first_and_rewind, try_set_batch_export_run_to_running
from posthog.temporal.batch_exports.utils import peek_first_and_rewind, set_status_to_running_task
from posthog.temporal.common.clickhouse import get_client
from posthog.temporal.common.heartbeat import Heartbeater
from posthog.temporal.common.logger import bind_temporal_worker_logger
Expand Down Expand Up @@ -214,9 +214,11 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records
inputs.table_id,
)

async with Heartbeater() as heartbeater:
await try_set_batch_export_run_to_running(run_id=inputs.run_id, logger=logger)

async with (
Heartbeater() as heartbeater,
get_client(team_id=inputs.team_id) as client,
set_status_to_running_task(run_id=inputs.run_id, logger=logger),
):
should_resume, details = await should_resume_from_activity_heartbeat(activity, BigQueryHeartbeatDetails, logger)

if should_resume is True and details is not None:
Expand All @@ -226,122 +228,118 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records
data_interval_start = inputs.data_interval_start
last_inserted_at = None

async with get_client(team_id=inputs.team_id) as client:
if not await client.is_alive():
raise ConnectionError("Cannot establish connection to ClickHouse")
if not await client.is_alive():
raise ConnectionError("Cannot establish connection to ClickHouse")

if inputs.batch_export_schema is None:
fields = bigquery_default_fields()
query_parameters = None

else:
fields = inputs.batch_export_schema["fields"]
query_parameters = inputs.batch_export_schema["values"]

records_iterator = iter_records(
client=client,
team_id=inputs.team_id,
interval_start=data_interval_start,
interval_end=inputs.data_interval_end,
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
fields=fields,
extra_query_parameters=query_parameters,
is_backfill=inputs.is_backfill,
)

first_record_batch, records_iterator = peek_first_and_rewind(records_iterator)
if first_record_batch is None:
return 0

bigquery_table = None
inserted_at = None

with bigquery_client(inputs) as bq_client, BatchExportTemporaryFile() as jsonl_file:
rows_exported = get_rows_exported_metric()
bytes_exported = get_bytes_exported_metric()

async def flush_to_bigquery(bigquery_table, table_schema):
logger.debug(
"Loading %s records of size %s bytes",
jsonl_file.records_since_last_reset,
jsonl_file.bytes_since_last_reset,
)
await load_jsonl_file_to_bigquery_table(jsonl_file, bigquery_table, table_schema, bq_client)

rows_exported.add(jsonl_file.records_since_last_reset)
bytes_exported.add(jsonl_file.bytes_since_last_reset)

if inputs.use_json_type is True:
json_type = "JSON"
json_columns = ["properties", "set", "set_once", "person_properties"]
else:
json_type = "STRING"
json_columns = []

if inputs.batch_export_schema is None:
fields = bigquery_default_fields()
query_parameters = None
schema = [
bigquery.SchemaField("uuid", "STRING"),
bigquery.SchemaField("event", "STRING"),
bigquery.SchemaField("properties", json_type),
bigquery.SchemaField("elements", "STRING"),
bigquery.SchemaField("set", json_type),
bigquery.SchemaField("set_once", json_type),
bigquery.SchemaField("distinct_id", "STRING"),
bigquery.SchemaField("team_id", "INT64"),
bigquery.SchemaField("ip", "STRING"),
bigquery.SchemaField("site_url", "STRING"),
bigquery.SchemaField("timestamp", "TIMESTAMP"),
bigquery.SchemaField("bq_ingested_timestamp", "TIMESTAMP"),
]

else:
fields = inputs.batch_export_schema["fields"]
query_parameters = inputs.batch_export_schema["values"]

records_iterator = iter_records(
client=client,
team_id=inputs.team_id,
interval_start=data_interval_start,
interval_end=inputs.data_interval_end,
exclude_events=inputs.exclude_events,
include_events=inputs.include_events,
fields=fields,
extra_query_parameters=query_parameters,
is_backfill=inputs.is_backfill,
column_names = [column for column in first_record_batch.schema.names if column != "_inserted_at"]
record_schema = first_record_batch.select(column_names).schema
schema = get_bigquery_fields_from_record_schema(record_schema, known_json_columns=json_columns)

bigquery_table = await create_table_in_bigquery(
inputs.project_id,
inputs.dataset_id,
inputs.table_id,
schema,
bq_client,
)

first_record_batch, records_iterator = peek_first_and_rewind(records_iterator)
if first_record_batch is None:
return 0

bigquery_table = None
inserted_at = None

with bigquery_client(inputs) as bq_client:
with BatchExportTemporaryFile() as jsonl_file:
rows_exported = get_rows_exported_metric()
bytes_exported = get_bytes_exported_metric()

async def flush_to_bigquery(bigquery_table, table_schema):
logger.debug(
"Loading %s records of size %s bytes",
jsonl_file.records_since_last_reset,
jsonl_file.bytes_since_last_reset,
)
await load_jsonl_file_to_bigquery_table(jsonl_file, bigquery_table, table_schema, bq_client)

rows_exported.add(jsonl_file.records_since_last_reset)
bytes_exported.add(jsonl_file.bytes_since_last_reset)

if inputs.use_json_type is True:
json_type = "JSON"
json_columns = ["properties", "set", "set_once", "person_properties"]
else:
json_type = "STRING"
json_columns = []

if inputs.batch_export_schema is None:
schema = [
bigquery.SchemaField("uuid", "STRING"),
bigquery.SchemaField("event", "STRING"),
bigquery.SchemaField("properties", json_type),
bigquery.SchemaField("elements", "STRING"),
bigquery.SchemaField("set", json_type),
bigquery.SchemaField("set_once", json_type),
bigquery.SchemaField("distinct_id", "STRING"),
bigquery.SchemaField("team_id", "INT64"),
bigquery.SchemaField("ip", "STRING"),
bigquery.SchemaField("site_url", "STRING"),
bigquery.SchemaField("timestamp", "TIMESTAMP"),
bigquery.SchemaField("bq_ingested_timestamp", "TIMESTAMP"),
]

else:
column_names = [
column for column in first_record_batch.schema.names if column != "_inserted_at"
]
record_schema = first_record_batch.select(column_names).schema
schema = get_bigquery_fields_from_record_schema(record_schema, known_json_columns=json_columns)

bigquery_table = await create_table_in_bigquery(
inputs.project_id,
inputs.dataset_id,
inputs.table_id,
schema,
bq_client,
)

# Columns need to be sorted according to BigQuery schema.
record_columns = [field.name for field in schema] + ["_inserted_at"]

for record_batch in records_iterator:
for record in record_batch.select(record_columns).to_pylist():
inserted_at = record.pop("_inserted_at")

for json_column in json_columns:
if json_column in record and (json_str := record.get(json_column, None)) is not None:
record[json_column] = json.loads(json_str)

# TODO: Parquet is a much more efficient format to send data to BigQuery.
jsonl_file.write_records_to_jsonl([record])

if jsonl_file.tell() > settings.BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES:
await flush_to_bigquery(bigquery_table, schema)

last_inserted_at = inserted_at.isoformat()
heartbeater.details = (str(last_inserted_at),)

jsonl_file.reset()

if jsonl_file.tell() > 0 and inserted_at is not None:
# Columns need to be sorted according to BigQuery schema.
record_columns = [field.name for field in schema] + ["_inserted_at"]

for record_batch in records_iterator:
for record in record_batch.select(record_columns).to_pylist():
inserted_at = record.pop("_inserted_at")

for json_column in json_columns:
if json_column in record and (json_str := record.get(json_column, None)) is not None:
record[json_column] = json.loads(json_str)

# TODO: Parquet is a much more efficient format to send data to BigQuery.
jsonl_file.write_records_to_jsonl([record])

if jsonl_file.tell() > settings.BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES:
await flush_to_bigquery(bigquery_table, schema)

last_inserted_at = inserted_at.isoformat()
heartbeater.details = (str(last_inserted_at),)

jsonl_file.reset()

return jsonl_file.records_total
if jsonl_file.tell() > 0 and inserted_at is not None:
await flush_to_bigquery(bigquery_table, schema)

last_inserted_at = inserted_at.isoformat()
heartbeater.details = (str(last_inserted_at),)

jsonl_file.reset()

return jsonl_file.records_total


@workflow.defn(name="bigquery-export")
Expand Down
Loading

0 comments on commit 9b4c15f

Please sign in to comment.