Skip to content

Commit

Permalink
fix: Use JSONL for bigquery always (#23321)
Browse files Browse the repository at this point in the history
* fix: Use JSONL for bigquery always

* Update query snapshots

* fix: Do not require delete permissions

* Update query snapshots

* Update query snapshots

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
tomasfarias and github-actions[bot] authored Jun 28, 2024
1 parent d3c247c commit 8fc2b15
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 16 deletions.
8 changes: 4 additions & 4 deletions posthog/api/test/__snapshots__/test_properties_timeline.ambr
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@
ORDER BY timestamp ASC ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING) AS end_event_number
FROM
(SELECT timestamp, person_properties AS properties,
array(replaceRegexpAll(JSONExtractRaw(person_properties, 'foo'), '^"|"$', ''), replaceRegexpAll(JSONExtractRaw(person_properties, 'bar'), '^"|"$', '')) AS relevant_property_values,
array(replaceRegexpAll(JSONExtractRaw(person_properties, 'bar'), '^"|"$', ''), replaceRegexpAll(JSONExtractRaw(person_properties, 'foo'), '^"|"$', '')) AS relevant_property_values,
lagInFrame(relevant_property_values) OVER (
ORDER BY timestamp ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS previous_relevant_property_values,
row_number() OVER (
Expand Down Expand Up @@ -482,7 +482,7 @@
ORDER BY timestamp ASC ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING) AS end_event_number
FROM
(SELECT timestamp, person_properties AS properties,
array("mat_pp_foo", "mat_pp_bar") AS relevant_property_values,
array("mat_pp_bar", "mat_pp_foo") AS relevant_property_values,
lagInFrame(relevant_property_values) OVER (
ORDER BY timestamp ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS previous_relevant_property_values,
row_number() OVER (
Expand Down Expand Up @@ -522,7 +522,7 @@
ORDER BY timestamp ASC ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING) AS end_event_number
FROM
(SELECT timestamp, person_properties AS properties,
array(replaceRegexpAll(JSONExtractRaw(person_properties, 'foo'), '^"|"$', ''), replaceRegexpAll(JSONExtractRaw(person_properties, 'bar'), '^"|"$', '')) AS relevant_property_values,
array(replaceRegexpAll(JSONExtractRaw(person_properties, 'bar'), '^"|"$', ''), replaceRegexpAll(JSONExtractRaw(person_properties, 'foo'), '^"|"$', '')) AS relevant_property_values,
lagInFrame(relevant_property_values) OVER (
ORDER BY timestamp ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS previous_relevant_property_values,
row_number() OVER (
Expand Down Expand Up @@ -558,7 +558,7 @@
ORDER BY timestamp ASC ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING) AS end_event_number
FROM
(SELECT timestamp, person_properties AS properties,
array("mat_pp_foo", "mat_pp_bar") AS relevant_property_values,
array("mat_pp_bar", "mat_pp_foo") AS relevant_property_values,
lagInFrame(relevant_property_values) OVER (
ORDER BY timestamp ASC ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS previous_relevant_property_values,
row_number() OVER (
Expand Down
27 changes: 15 additions & 12 deletions posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,14 @@ async def managed_table(
exists_ok: bool = True,
not_found_ok: bool = True,
delete: bool = True,
create: bool = True,
) -> collections.abc.AsyncGenerator[bigquery.Table, None]:
"""Manage a table in BigQuery by ensure it exists while in context."""
table = await self.acreate_table(project_id, dataset_id, table_id, table_schema, exists_ok)
if create is True:
table = await self.acreate_table(project_id, dataset_id, table_id, table_schema, exists_ok)
else:
fully_qualified_name = f"{project_id}.{dataset_id}.{table_id}"
table = bigquery.Table(fully_qualified_name, schema=table_schema)

try:
yield table
Expand Down Expand Up @@ -398,21 +403,24 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records
requires_merge = (
isinstance(inputs.batch_export_model, BatchExportModel) and inputs.batch_export_model.name == "persons"
)
stage_table_name = f"stage_{inputs.table_id}" if requires_merge else inputs.table_id

with bigquery_client(inputs) as bq_client:
async with (
bq_client.managed_table(
inputs.project_id,
inputs.dataset_id,
f"{inputs.table_id}",
inputs.table_id,
schema,
delete=False,
) as bigquery_table,
bq_client.managed_table(
inputs.project_id,
inputs.dataset_id,
f"stage_{inputs.table_id}",
stage_table_name,
schema,
create=requires_merge,
delete=requires_merge,
) as bigquery_stage_table,
):

Expand All @@ -426,10 +434,7 @@ async def flush_to_bigquery(
)
table = bigquery_stage_table if requires_merge else bigquery_table

if inputs.use_json_type is True:
await bq_client.load_jsonl_file(local_results_file, table, schema)
else:
await bq_client.load_parquet_file(local_results_file, table, schema)
await bq_client.load_jsonl_file(local_results_file, table, schema)

rows_exported.add(records_since_last_flush)
bytes_exported.add(bytes_since_last_flush)
Expand All @@ -447,13 +452,11 @@ async def flush_to_bigquery(
for field in first_record_batch.select([field.name for field in schema]).schema
]
)

writer = get_batch_export_writer(
inputs,
flush_callable=flush_to_bigquery,
writer = JSONLBatchExportWriter(
max_bytes=settings.BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES,
schema=record_schema,
flush_callable=flush_to_bigquery,
)

async with writer.open_temporary_file():
async for record_batch in records_iterator:
record_batch = cast_record_batch_json_columns(record_batch, json_columns=json_columns)
Expand Down

0 comments on commit 8fc2b15

Please sign in to comment.