Skip to content

Commit

Permalink
chore: Also set InvalidFullyQualifiedIdError as non-retryable
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Mar 22, 2024
1 parent 925e739 commit 103856f
Showing 1 changed file with 21 additions and 2 deletions.
23 changes: 21 additions & 2 deletions posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ async def load_jsonl_file_to_bigquery_table(jsonl_file, table, table_schema, big
await asyncio.to_thread(load_job.result)


class InvalidFullyQualifiedIdError(Exception):
"""Exception raised on an invalid fully qualified table id."""

def __init__(self, fully_qualified_table_id: str):
msg = (
"The project id, dataset id, and table id provided did not generate a valid "
f"(e.g. 'project.dataset.table') fully qualified ID, but instead: '{fully_qualified_table_id}'"
)
super().__init__(msg)


async def create_table_in_bigquery(
project_id: str,
dataset_id: str,
Expand All @@ -61,7 +72,11 @@ async def create_table_in_bigquery(
) -> bigquery.Table:
"""Create a table in BigQuery."""
fully_qualified_name = f"{project_id}.{dataset_id}.{table_id}"
table = bigquery.Table(fully_qualified_name, schema=table_schema)

try:
table = bigquery.Table(fully_qualified_name, schema=table_schema)
except ValueError:
raise InvalidFullyQualifiedIdError(fully_qualified_name)

if "timestamp" in [field.name for field in table_schema]:
# TODO: Maybe choosing which column to use as parititoning should be a configuration parameter.
Expand Down Expand Up @@ -394,7 +409,7 @@ async def run(self, inputs: BigQueryBatchExportInputs):
initial_interval=dt.timedelta(seconds=10),
maximum_interval=dt.timedelta(seconds=60),
maximum_attempts=0,
non_retryable_error_types=["NotNullViolation", "IntegrityError", "BadRequest"],
non_retryable_error_types=["NotNullViolation", "IntegrityError"],
),
)

Expand Down Expand Up @@ -429,6 +444,10 @@ async def run(self, inputs: BigQueryBatchExportInputs):
"RefreshError",
# Usually means the dataset or project doesn't exist.
"NotFound",
# Raised when BigQuery detects a schema change.
"BadRequest",
# Raised when BigQuery rejects the table ID.
"InvalidFullyQualifiedIdError",
],
update_inputs=update_inputs,
)

0 comments on commit 103856f

Please sign in to comment.