Skip to content

Commit

Permalink
feat(batch-exports): add FailedRetryable status (and metric label) (#…
Browse files Browse the repository at this point in the history
…20467)

* cleanup: use status references rather than string literals

* feat(batch-exports): add FailedRetryable status (and metric label)

* Treat 'FailedRetryable' as 'Failed' in the front-end for right now

* Fix oopsie in HTTP Exports

* Fix oopsie in postgres non_retryable_error_types

* Update query snapshots

---------

Co-authored-by: github-actions <41898282+github-actions[bot]@users.noreply.github.com>
  • Loading branch information
bretthoerner and github-actions[bot] authored Feb 26, 2024
1 parent df93f26 commit 96ac19e
Show file tree
Hide file tree
Showing 18 changed files with 370 additions and 46 deletions.
1 change: 1 addition & 0 deletions frontend/src/scenes/batch_exports/components.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export const colorForStatus = (
case 'TimedOut':
return 'warning'
case 'Failed':
case 'FailedRetryable':
return 'danger'
default:
return 'default'
Expand Down
11 changes: 10 additions & 1 deletion frontend/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3640,7 +3640,16 @@ export type BatchExportConfiguration = {

export type BatchExportRun = {
id: string
status: 'Cancelled' | 'Completed' | 'ContinuedAsNew' | 'Failed' | 'Terminated' | 'TimedOut' | 'Running' | 'Starting'
status:
| 'Cancelled'
| 'Completed'
| 'ContinuedAsNew'
| 'Failed'
| 'FailedRetryable'
| 'Terminated'
| 'TimedOut'
| 'Running'
| 'Starting'
created_at: Dayjs
data_interval_start: Dayjs
data_interval_end: Dayjs
Expand Down
2 changes: 1 addition & 1 deletion latest_migrations.manifest
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name
ee: 0015_add_verified_properties
otp_static: 0002_throttling
otp_totp: 0002_auto_20190420_0723
posthog: 0390_personalapikey_scopes
posthog: 0391_alter_batchexportbackfill_status_and_more
sessions: 0001_initial
social_django: 0010_uid_db_index
two_factor: 0007_auto_20201201_1019
6 changes: 4 additions & 2 deletions posthog/batch_exports/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ class Status(models.TextChoices):

CANCELLED = "Cancelled"
COMPLETED = "Completed"
CONTINUEDASNEW = "ContinuedAsNew"
CONTINUED_AS_NEW = "ContinuedAsNew"
FAILED = "Failed"
FAILED_RETRYABLE = "FailedRetryable"
TERMINATED = "Terminated"
TIMEDOUT = "TimedOut"
RUNNING = "Running"
Expand Down Expand Up @@ -269,8 +270,9 @@ class Status(models.TextChoices):

CANCELLED = "Cancelled"
COMPLETED = "Completed"
CONTINUEDASNEW = "ContinuedAsNew"
CONTINUED_AS_NEW = "ContinuedAsNew"
FAILED = "Failed"
FAILED_RETRYABLE = "FailedRetryable"
TERMINATED = "Terminated"
TIMEDOUT = "TimedOut"
RUNNING = "Running"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Generated by Django 4.1.13 on 2024-02-20 19:21

from django.db import migrations, models


class Migration(migrations.Migration):
dependencies = [
("posthog", "0390_personalapikey_scopes"),
]

operations = [
migrations.AlterField(
model_name="batchexportbackfill",
name="status",
field=models.CharField(
choices=[
("Cancelled", "Cancelled"),
("Completed", "Completed"),
("ContinuedAsNew", "Continued As New"),
("Failed", "Failed"),
("FailedRetryable", "Failed Retryable"),
("Terminated", "Terminated"),
("TimedOut", "Timedout"),
("Running", "Running"),
("Starting", "Starting"),
],
help_text="The status of this backfill.",
max_length=64,
),
),
migrations.AlterField(
model_name="batchexportrun",
name="status",
field=models.CharField(
choices=[
("Cancelled", "Cancelled"),
("Completed", "Completed"),
("ContinuedAsNew", "Continued As New"),
("Failed", "Failed"),
("FailedRetryable", "Failed Retryable"),
("Terminated", "Terminated"),
("TimedOut", "Timedout"),
("Running", "Running"),
("Starting", "Starting"),
],
help_text="The status of this run.",
max_length=64,
),
),
]
13 changes: 8 additions & 5 deletions posthog/temporal/batch_exports/backfill_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import temporalio.workflow
from django.conf import settings

from posthog.batch_exports.models import BatchExportBackfill
from posthog.batch_exports.service import BackfillBatchExportInputs
from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.temporal.batch_exports.batch_exports import (
Expand Down Expand Up @@ -328,7 +329,7 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None:
batch_export_id=inputs.batch_export_id,
start_at=inputs.start_at,
end_at=inputs.end_at,
status="Running",
status=BatchExportBackfill.Status.RUNNING,
)

backfill_id = await temporalio.workflow.execute_activity(
Expand All @@ -342,7 +343,9 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None:
non_retryable_error_types=["NotNullViolation", "IntegrityError"],
),
)
update_inputs = UpdateBatchExportBackfillStatusInputs(id=backfill_id, status="Completed")
update_inputs = UpdateBatchExportBackfillStatusInputs(
id=backfill_id, status=BatchExportBackfill.Status.COMPLETED
)

frequency_seconds = await temporalio.workflow.execute_activity(
get_schedule_frequency,
Expand Down Expand Up @@ -380,14 +383,14 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None:

except temporalio.exceptions.ActivityError as e:
if isinstance(e.cause, temporalio.exceptions.CancelledError):
update_inputs.status = "Cancelled"
update_inputs.status = BatchExportBackfill.Status.CANCELLED
else:
update_inputs.status = "Failed"
update_inputs.status = BatchExportBackfill.Status.FAILED

raise

except Exception:
update_inputs.status = "Failed"
update_inputs.status = BatchExportBackfill.Status.FAILED
raise

finally:
Expand Down
20 changes: 12 additions & 8 deletions posthog/temporal/batch_exports/batch_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from temporalio import activity, exceptions, workflow
from temporalio.common import RetryPolicy

from posthog.batch_exports.models import BatchExportBackfill, BatchExportRun
from posthog.batch_exports.service import (
BatchExportField,
create_batch_export_backfill,
Expand Down Expand Up @@ -496,7 +497,7 @@ class CreateBatchExportRunInputs:
batch_export_id: str
data_interval_start: str
data_interval_end: str
status: str = "Starting"
status: str = BatchExportRun.Status.STARTING


@activity.defn
Expand Down Expand Up @@ -546,10 +547,10 @@ async def update_export_run_status(inputs: UpdateBatchExportRunStatusInputs) ->
latest_error=inputs.latest_error,
)

if batch_export_run.status == "Failed":
if batch_export_run.status == BatchExportRun.Status.FAILED:
logger.error("BatchExport failed with error: %s", batch_export_run.latest_error)

elif batch_export_run.status == "Cancelled":
elif batch_export_run.status == BatchExportRun.Status.CANCELLED:
logger.warning("BatchExport was cancelled.")

else:
Expand Down Expand Up @@ -612,10 +613,10 @@ async def update_batch_export_backfill_model_status(inputs: UpdateBatchExportBac
)
logger = await bind_temporal_worker_logger(team_id=backfill.team_id)

if backfill.status == "Failed":
if backfill.status == BatchExportBackfill.Status.FAILED:
logger.error("Historical export failed")

elif backfill.status == "Cancelled":
elif backfill.status == BatchExportBackfill.Status.CANCELLED:
logger.warning("Historical export was cancelled.")

else:
Expand Down Expand Up @@ -661,6 +662,7 @@ async def execute_batch_export_insert_activity(
maximum_attempts=maximum_attempts,
non_retryable_error_types=non_retryable_error_types,
)

try:
await workflow.execute_activity(
activity,
Expand All @@ -672,15 +674,17 @@ async def execute_batch_export_insert_activity(

except exceptions.ActivityError as e:
if isinstance(e.cause, exceptions.CancelledError):
update_inputs.status = "Cancelled"
update_inputs.status = BatchExportRun.Status.CANCELLED
elif isinstance(e.cause, exceptions.ApplicationError) and e.cause.type not in non_retryable_error_types:
update_inputs.status = BatchExportRun.Status.FAILED_RETRYABLE
else:
update_inputs.status = "Failed"
update_inputs.status = BatchExportRun.Status.FAILED

update_inputs.latest_error = str(e.cause)
raise

except Exception:
update_inputs.status = "Failed"
update_inputs.status = BatchExportRun.Status.FAILED
update_inputs.latest_error = "An unexpected error has ocurred"
raise

Expand Down
5 changes: 4 additions & 1 deletion posthog/temporal/batch_exports/bigquery_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from temporalio import activity, workflow
from temporalio.common import RetryPolicy

from posthog.batch_exports.models import BatchExportRun
from posthog.batch_exports.service import BatchExportField, BatchExportSchema, BigQueryBatchExportInputs
from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.temporal.batch_exports.batch_exports import (
Expand Down Expand Up @@ -393,7 +394,9 @@ async def run(self, inputs: BigQueryBatchExportInputs):
),
)

update_inputs = UpdateBatchExportRunStatusInputs(id=run_id, status="Completed", team_id=inputs.team_id)
update_inputs = UpdateBatchExportRunStatusInputs(
id=run_id, status=BatchExportRun.Status.COMPLETED, team_id=inputs.team_id
)

insert_inputs = BigQueryInsertInputs(
team_id=inputs.team_id,
Expand Down
7 changes: 3 additions & 4 deletions posthog/temporal/batch_exports/http_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from temporalio import activity, workflow
from temporalio.common import RetryPolicy

from posthog.models import BatchExportRun
from posthog.batch_exports.service import BatchExportField, BatchExportSchema, HttpBatchExportInputs
from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.temporal.batch_exports.batch_exports import (
Expand Down Expand Up @@ -338,15 +339,13 @@ async def run(self, inputs: HttpBatchExportInputs):
initial_interval=dt.timedelta(seconds=10),
maximum_interval=dt.timedelta(seconds=60),
maximum_attempts=0,
non_retryable_error_types=[
"NonRetryableResponseError",
],
non_retryable_error_types=["NotNullViolation", "IntegrityError"],
),
)

update_inputs = UpdateBatchExportRunStatusInputs(
id=run_id,
status="Completed",
status=BatchExportRun.Status.COMPLETED,
team_id=inputs.team_id,
)

Expand Down
9 changes: 5 additions & 4 deletions posthog/temporal/batch_exports/postgres_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from temporalio import activity, workflow
from temporalio.common import RetryPolicy

from posthog.batch_exports.models import BatchExportRun
from posthog.batch_exports.service import BatchExportField, BatchExportSchema, PostgresBatchExportInputs
from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.temporal.batch_exports.batch_exports import (
Expand Down Expand Up @@ -400,7 +401,7 @@ async def run(self, inputs: PostgresBatchExportInputs):

update_inputs = UpdateBatchExportRunStatusInputs(
id=run_id,
status="Completed",
status=BatchExportRun.Status.COMPLETED,
team_id=inputs.team_id,
)

Expand All @@ -427,11 +428,11 @@ async def run(self, inputs: PostgresBatchExportInputs):
non_retryable_error_types=[
# Raised on errors that are related to database operation.
# For example: unexpected disconnect, database or other object not found.
"OperationalError"
"OperationalError",
# The schema name provided is invalid (usually because it doesn't exist).
"InvalidSchemaName"
"InvalidSchemaName",
# Missing permissions to, e.g., insert into table.
"InsufficientPrivilege"
"InsufficientPrivilege",
],
update_inputs=update_inputs,
# Disable heartbeat timeout until we add heartbeat support.
Expand Down
3 changes: 2 additions & 1 deletion posthog/temporal/batch_exports/redshift_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from temporalio import activity, workflow
from temporalio.common import RetryPolicy

from posthog.batch_exports.models import BatchExportRun
from posthog.batch_exports.service import BatchExportField, RedshiftBatchExportInputs
from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.temporal.batch_exports.batch_exports import (
Expand Down Expand Up @@ -431,7 +432,7 @@ async def run(self, inputs: RedshiftBatchExportInputs):

update_inputs = UpdateBatchExportRunStatusInputs(
id=run_id,
status="Completed",
status=BatchExportRun.Status.COMPLETED,
team_id=inputs.team_id,
)

Expand Down
3 changes: 2 additions & 1 deletion posthog/temporal/batch_exports/s3_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from temporalio import activity, workflow
from temporalio.common import RetryPolicy

from posthog.batch_exports.models import BatchExportRun
from posthog.batch_exports.service import BatchExportField, BatchExportSchema, S3BatchExportInputs
from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.temporal.batch_exports.batch_exports import (
Expand Down Expand Up @@ -543,7 +544,7 @@ async def run(self, inputs: S3BatchExportInputs):

update_inputs = UpdateBatchExportRunStatusInputs(
id=run_id,
status="Completed",
status=BatchExportRun.Status.COMPLETED,
team_id=inputs.team_id,
)

Expand Down
3 changes: 2 additions & 1 deletion posthog/temporal/batch_exports/snowflake_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from temporalio import activity, workflow
from temporalio.common import RetryPolicy

from posthog.batch_exports.models import BatchExportRun
from posthog.batch_exports.service import BatchExportField, BatchExportSchema, SnowflakeBatchExportInputs
from posthog.temporal.batch_exports.base import PostHogWorkflow
from posthog.temporal.batch_exports.batch_exports import (
Expand Down Expand Up @@ -594,7 +595,7 @@ async def run(self, inputs: SnowflakeBatchExportInputs):

update_inputs = UpdateBatchExportRunStatusInputs(
id=run_id,
status="Completed",
status=BatchExportRun.Status.COMPLETED,
team_id=inputs.team_id,
)

Expand Down
Loading

0 comments on commit 96ac19e

Please sign in to comment.