Skip to content

Commit

Permalink
fix: Check for query permissions to decide what to do
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Dec 10, 2024
1 parent b085d67 commit c0dc947
Showing 1 changed file with 61 additions and 43 deletions.
104 changes: 61 additions & 43 deletions posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,23 @@ async def amerge_tables(
final_table, stage_table, merge_key=merge_key, stage_fields_cast_to_json=stage_fields_cast_to_json
)

async def acheck_for_query_permissions_on_table(
self,
table: bigquery.Table,
):
"""Attempt to SELECT from table to check for query permissions."""
job_config = bigquery.QueryJobConfig()
query = f"""
SELECT 1 FROM `{table.full_table_id.replace(":", ".", 1)}`
"""

try:
query_job = self.query(query, job_config=job_config)
await asyncio.to_thread(query_job.result)
except Forbidden:
return False
return True

async def ainsert_into_from_stage_table(
self,
into_table: bigquery.Table,
Expand Down Expand Up @@ -629,53 +646,54 @@ async def insert_into_bigquery_activity(inputs: BigQueryInsertInputs) -> Records
stage_table_name = f"stage_{inputs.table_id}_{data_interval_end_str}"

with bigquery_client(inputs) as bq_client:
async with (
bq_client.managed_table(
project_id=inputs.project_id,
dataset_id=inputs.dataset_id,
table_id=inputs.table_id,
table_schema=schema,
delete=False,
) as bigquery_table,
bq_client.managed_table(
async with bq_client.managed_table(
project_id=inputs.project_id,
dataset_id=inputs.dataset_id,
table_id=inputs.table_id,
table_schema=schema,
delete=False,
) as bigquery_table:
has_query_permissions = await bq_client.acheck_for_query_permissions_on_table(bigquery_table)

async with bq_client.managed_table(
project_id=inputs.project_id,
dataset_id=inputs.dataset_id,
table_id=stage_table_name,
table_schema=stage_schema,
create=True,
delete=True,
) as bigquery_stage_table,
):
records_completed = await run_consumer_loop(
queue=queue,
consumer_cls=BigQueryConsumer,
producer_task=producer_task,
heartbeater=heartbeater,
heartbeat_details=details,
data_interval_end=data_interval_end,
data_interval_start=data_interval_start,
schema=record_batch_schema,
writer_format=WriterFormat.PARQUET,
max_bytes=settings.BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES,
json_columns=(),
bigquery_client=bq_client,
bigquery_table=bigquery_stage_table,
table_schema=stage_schema,
writer_file_kwargs={"compression": "zstd"},
multiple_files=True,
)

merge_key = (
bigquery.SchemaField("team_id", "INT64"),
bigquery.SchemaField("distinct_id", "STRING"),
)
await bq_client.amerge_tables(
final_table=bigquery_table,
stage_table=bigquery_stage_table,
mutable=True if model_name == "persons" else False,
merge_key=merge_key,
stage_fields_cast_to_json=json_columns,
)
create=True and has_query_permissions,
delete=True and has_query_permissions,
) as bigquery_stage_table:
records_completed = await run_consumer_loop(
queue=queue,
consumer_cls=BigQueryConsumer,
producer_task=producer_task,
heartbeater=heartbeater,
heartbeat_details=details,
data_interval_end=data_interval_end,
data_interval_start=data_interval_start,
schema=record_batch_schema,
writer_format=WriterFormat.PARQUET if has_query_permissions else WriterFormat.JSONL,
max_bytes=settings.BATCH_EXPORT_BIGQUERY_UPLOAD_CHUNK_SIZE_BYTES,
json_columns=() if has_query_permissions else json_columns,
bigquery_client=bq_client,
bigquery_table=bigquery_stage_table if has_query_permissions else bigquery_table,
table_schema=stage_schema if has_query_permissions else schema,
writer_file_kwargs={"compression": "zstd"} if has_query_permissions else {},
multiple_files=True and has_query_permissions,
)

if has_query_permissions:
merge_key = (
bigquery.SchemaField("team_id", "INT64"),
bigquery.SchemaField("distinct_id", "STRING"),
)
await bq_client.amerge_tables(
final_table=bigquery_table,
stage_table=bigquery_stage_table,
mutable=True if model_name == "persons" else False,
merge_key=merge_key,
stage_fields_cast_to_json=json_columns,
)

return records_completed

Expand Down

0 comments on commit c0dc947

Please sign in to comment.