diff --git a/frontend/src/scenes/batch_exports/components.tsx b/frontend/src/scenes/batch_exports/components.tsx index 7191de9f40685..2e0143765b0ef 100644 --- a/frontend/src/scenes/batch_exports/components.tsx +++ b/frontend/src/scenes/batch_exports/components.tsx @@ -39,6 +39,7 @@ export const colorForStatus = ( case 'TimedOut': return 'warning' case 'Failed': + case 'FailedRetryable': return 'danger' default: return 'default' diff --git a/frontend/src/types.ts b/frontend/src/types.ts index 134a9097e82aa..4b3709121fb00 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -3640,7 +3640,16 @@ export type BatchExportConfiguration = { export type BatchExportRun = { id: string - status: 'Cancelled' | 'Completed' | 'ContinuedAsNew' | 'Failed' | 'Terminated' | 'TimedOut' | 'Running' | 'Starting' + status: + | 'Cancelled' + | 'Completed' + | 'ContinuedAsNew' + | 'Failed' + | 'FailedRetryable' + | 'Terminated' + | 'TimedOut' + | 'Running' + | 'Starting' created_at: Dayjs data_interval_start: Dayjs data_interval_end: Dayjs diff --git a/latest_migrations.manifest b/latest_migrations.manifest index 12689bf871206..08970c7aa2899 100644 --- a/latest_migrations.manifest +++ b/latest_migrations.manifest @@ -5,7 +5,7 @@ contenttypes: 0002_remove_content_type_name ee: 0015_add_verified_properties otp_static: 0002_throttling otp_totp: 0002_auto_20190420_0723 -posthog: 0390_personalapikey_scopes +posthog: 0391_alter_batchexportbackfill_status_and_more sessions: 0001_initial social_django: 0010_uid_db_index two_factor: 0007_auto_20201201_1019 diff --git a/posthog/batch_exports/models.py b/posthog/batch_exports/models.py index 8cb95e44827ac..9af64f1c2733d 100644 --- a/posthog/batch_exports/models.py +++ b/posthog/batch_exports/models.py @@ -74,8 +74,9 @@ class Status(models.TextChoices): CANCELLED = "Cancelled" COMPLETED = "Completed" - CONTINUEDASNEW = "ContinuedAsNew" + CONTINUED_AS_NEW = "ContinuedAsNew" FAILED = "Failed" + FAILED_RETRYABLE = "FailedRetryable" TERMINATED = "Terminated" TIMEDOUT = "TimedOut" RUNNING = "Running" @@ -269,8 +270,9 @@ class Status(models.TextChoices): CANCELLED = "Cancelled" COMPLETED = "Completed" - CONTINUEDASNEW = "ContinuedAsNew" + CONTINUED_AS_NEW = "ContinuedAsNew" FAILED = "Failed" + FAILED_RETRYABLE = "FailedRetryable" TERMINATED = "Terminated" TIMEDOUT = "TimedOut" RUNNING = "Running" diff --git a/posthog/migrations/0391_alter_batchexportbackfill_status_and_more.py b/posthog/migrations/0391_alter_batchexportbackfill_status_and_more.py new file mode 100644 index 0000000000000..b3133f90d41c3 --- /dev/null +++ b/posthog/migrations/0391_alter_batchexportbackfill_status_and_more.py @@ -0,0 +1,50 @@ +# Generated by Django 4.1.13 on 2024-02-20 19:21 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("posthog", "0390_personalapikey_scopes"), + ] + + operations = [ + migrations.AlterField( + model_name="batchexportbackfill", + name="status", + field=models.CharField( + choices=[ + ("Cancelled", "Cancelled"), + ("Completed", "Completed"), + ("ContinuedAsNew", "Continued As New"), + ("Failed", "Failed"), + ("FailedRetryable", "Failed Retryable"), + ("Terminated", "Terminated"), + ("TimedOut", "Timedout"), + ("Running", "Running"), + ("Starting", "Starting"), + ], + help_text="The status of this backfill.", + max_length=64, + ), + ), + migrations.AlterField( + model_name="batchexportrun", + name="status", + field=models.CharField( + choices=[ + ("Cancelled", "Cancelled"), + ("Completed", "Completed"), + ("ContinuedAsNew", "Continued As New"), + ("Failed", "Failed"), + ("FailedRetryable", "Failed Retryable"), + ("Terminated", "Terminated"), + ("TimedOut", "Timedout"), + ("Running", "Running"), + ("Starting", "Starting"), + ], + help_text="The status of this run.", + max_length=64, + ), + ), + ] diff --git a/posthog/temporal/batch_exports/backfill_batch_export.py b/posthog/temporal/batch_exports/backfill_batch_export.py index 838427780af5d..d6cbf2b553338 100644 --- a/posthog/temporal/batch_exports/backfill_batch_export.py +++ b/posthog/temporal/batch_exports/backfill_batch_export.py @@ -13,6 +13,7 @@ import temporalio.workflow from django.conf import settings +from posthog.batch_exports.models import BatchExportBackfill from posthog.batch_exports.service import BackfillBatchExportInputs from posthog.temporal.batch_exports.base import PostHogWorkflow from posthog.temporal.batch_exports.batch_exports import ( @@ -328,7 +329,7 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None: batch_export_id=inputs.batch_export_id, start_at=inputs.start_at, end_at=inputs.end_at, - status="Running", + status=BatchExportBackfill.Status.RUNNING, ) backfill_id = await temporalio.workflow.execute_activity( @@ -342,7 +343,9 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None: non_retryable_error_types=["NotNullViolation", "IntegrityError"], ), ) - update_inputs = UpdateBatchExportBackfillStatusInputs(id=backfill_id, status="Completed") + update_inputs = UpdateBatchExportBackfillStatusInputs( + id=backfill_id, status=BatchExportBackfill.Status.COMPLETED + ) frequency_seconds = await temporalio.workflow.execute_activity( get_schedule_frequency, @@ -380,14 +383,14 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None: except temporalio.exceptions.ActivityError as e: if isinstance(e.cause, temporalio.exceptions.CancelledError): - update_inputs.status = "Cancelled" + update_inputs.status = BatchExportBackfill.Status.CANCELLED else: - update_inputs.status = "Failed" + update_inputs.status = BatchExportBackfill.Status.FAILED raise except Exception: - update_inputs.status = "Failed" + update_inputs.status = BatchExportBackfill.Status.FAILED raise finally: diff --git a/posthog/temporal/batch_exports/batch_exports.py b/posthog/temporal/batch_exports/batch_exports.py index c5b70f6af8d24..65f4522285972 100644 --- a/posthog/temporal/batch_exports/batch_exports.py +++ b/posthog/temporal/batch_exports/batch_exports.py @@ -16,6 +16,7 @@ from temporalio import activity, exceptions, workflow from temporalio.common import RetryPolicy +from posthog.batch_exports.models import BatchExportBackfill, BatchExportRun from posthog.batch_exports.service import ( BatchExportField, create_batch_export_backfill, @@ -496,7 +497,7 @@ class CreateBatchExportRunInputs: batch_export_id: str data_interval_start: str data_interval_end: str - status: str = "Starting" + status: str = BatchExportRun.Status.STARTING @activity.defn @@ -546,10 +547,10 @@ async def update_export_run_status(inputs: UpdateBatchExportRunStatusInputs) -> latest_error=inputs.latest_error, ) - if batch_export_run.status == "Failed": + if batch_export_run.status == BatchExportRun.Status.FAILED: logger.error("BatchExport failed with error: %s", batch_export_run.latest_error) - elif batch_export_run.status == "Cancelled": + elif batch_export_run.status == BatchExportRun.Status.CANCELLED: logger.warning("BatchExport was cancelled.") else: @@ -612,10 +613,10 @@ async def update_batch_export_backfill_model_status(inputs: UpdateBatchExportBac ) logger = await bind_temporal_worker_logger(team_id=backfill.team_id) - if backfill.status == "Failed": + if backfill.status == BatchExportBackfill.Status.FAILED: logger.error("Historical export failed") - elif backfill.status == "Cancelled": + elif backfill.status == BatchExportBackfill.Status.CANCELLED: logger.warning("Historical export was cancelled.") else: @@ -661,6 +662,7 @@ async def execute_batch_export_insert_activity( maximum_attempts=maximum_attempts, non_retryable_error_types=non_retryable_error_types, ) + try: await workflow.execute_activity( activity, @@ -672,15 +674,17 @@ async def execute_batch_export_insert_activity( except exceptions.ActivityError as e: if isinstance(e.cause, exceptions.CancelledError): - update_inputs.status = "Cancelled" + update_inputs.status = BatchExportRun.Status.CANCELLED + elif isinstance(e.cause, exceptions.ApplicationError) and e.cause.type not in non_retryable_error_types: + update_inputs.status = BatchExportRun.Status.FAILED_RETRYABLE else: - update_inputs.status = "Failed" + update_inputs.status = BatchExportRun.Status.FAILED update_inputs.latest_error = str(e.cause) raise except Exception: - update_inputs.status = "Failed" + update_inputs.status = BatchExportRun.Status.FAILED update_inputs.latest_error = "An unexpected error has ocurred" raise diff --git a/posthog/temporal/batch_exports/bigquery_batch_export.py b/posthog/temporal/batch_exports/bigquery_batch_export.py index 3dcbbda66bed5..202b08639844a 100644 --- a/posthog/temporal/batch_exports/bigquery_batch_export.py +++ b/posthog/temporal/batch_exports/bigquery_batch_export.py @@ -11,6 +11,7 @@ from temporalio import activity, workflow from temporalio.common import RetryPolicy +from posthog.batch_exports.models import BatchExportRun from posthog.batch_exports.service import BatchExportField, BatchExportSchema, BigQueryBatchExportInputs from posthog.temporal.batch_exports.base import PostHogWorkflow from posthog.temporal.batch_exports.batch_exports import ( @@ -393,7 +394,9 @@ async def run(self, inputs: BigQueryBatchExportInputs): ), ) - update_inputs = UpdateBatchExportRunStatusInputs(id=run_id, status="Completed", team_id=inputs.team_id) + update_inputs = UpdateBatchExportRunStatusInputs( + id=run_id, status=BatchExportRun.Status.COMPLETED, team_id=inputs.team_id + ) insert_inputs = BigQueryInsertInputs( team_id=inputs.team_id, diff --git a/posthog/temporal/batch_exports/http_batch_export.py b/posthog/temporal/batch_exports/http_batch_export.py index a9d6dd5d5f87f..f1fb72eb1c8e8 100644 --- a/posthog/temporal/batch_exports/http_batch_export.py +++ b/posthog/temporal/batch_exports/http_batch_export.py @@ -9,6 +9,7 @@ from temporalio import activity, workflow from temporalio.common import RetryPolicy +from posthog.models import BatchExportRun from posthog.batch_exports.service import BatchExportField, BatchExportSchema, HttpBatchExportInputs from posthog.temporal.batch_exports.base import PostHogWorkflow from posthog.temporal.batch_exports.batch_exports import ( @@ -338,15 +339,13 @@ async def run(self, inputs: HttpBatchExportInputs): initial_interval=dt.timedelta(seconds=10), maximum_interval=dt.timedelta(seconds=60), maximum_attempts=0, - non_retryable_error_types=[ - "NonRetryableResponseError", - ], + non_retryable_error_types=["NotNullViolation", "IntegrityError"], ), ) update_inputs = UpdateBatchExportRunStatusInputs( id=run_id, - status="Completed", + status=BatchExportRun.Status.COMPLETED, team_id=inputs.team_id, ) diff --git a/posthog/temporal/batch_exports/postgres_batch_export.py b/posthog/temporal/batch_exports/postgres_batch_export.py index 806bc9e531a05..94ff5fc4d5af4 100644 --- a/posthog/temporal/batch_exports/postgres_batch_export.py +++ b/posthog/temporal/batch_exports/postgres_batch_export.py @@ -13,6 +13,7 @@ from temporalio import activity, workflow from temporalio.common import RetryPolicy +from posthog.batch_exports.models import BatchExportRun from posthog.batch_exports.service import BatchExportField, BatchExportSchema, PostgresBatchExportInputs from posthog.temporal.batch_exports.base import PostHogWorkflow from posthog.temporal.batch_exports.batch_exports import ( @@ -400,7 +401,7 @@ async def run(self, inputs: PostgresBatchExportInputs): update_inputs = UpdateBatchExportRunStatusInputs( id=run_id, - status="Completed", + status=BatchExportRun.Status.COMPLETED, team_id=inputs.team_id, ) @@ -427,11 +428,11 @@ async def run(self, inputs: PostgresBatchExportInputs): non_retryable_error_types=[ # Raised on errors that are related to database operation. # For example: unexpected disconnect, database or other object not found. - "OperationalError" + "OperationalError", # The schema name provided is invalid (usually because it doesn't exist). - "InvalidSchemaName" + "InvalidSchemaName", # Missing permissions to, e.g., insert into table. - "InsufficientPrivilege" + "InsufficientPrivilege", ], update_inputs=update_inputs, # Disable heartbeat timeout until we add heartbeat support. diff --git a/posthog/temporal/batch_exports/redshift_batch_export.py b/posthog/temporal/batch_exports/redshift_batch_export.py index 7f5e44fffd08a..1b6af0fa82092 100644 --- a/posthog/temporal/batch_exports/redshift_batch_export.py +++ b/posthog/temporal/batch_exports/redshift_batch_export.py @@ -12,6 +12,7 @@ from temporalio import activity, workflow from temporalio.common import RetryPolicy +from posthog.batch_exports.models import BatchExportRun from posthog.batch_exports.service import BatchExportField, RedshiftBatchExportInputs from posthog.temporal.batch_exports.base import PostHogWorkflow from posthog.temporal.batch_exports.batch_exports import ( @@ -431,7 +432,7 @@ async def run(self, inputs: RedshiftBatchExportInputs): update_inputs = UpdateBatchExportRunStatusInputs( id=run_id, - status="Completed", + status=BatchExportRun.Status.COMPLETED, team_id=inputs.team_id, ) diff --git a/posthog/temporal/batch_exports/s3_batch_export.py b/posthog/temporal/batch_exports/s3_batch_export.py index 6a6f4a8404995..095ebfd15cd92 100644 --- a/posthog/temporal/batch_exports/s3_batch_export.py +++ b/posthog/temporal/batch_exports/s3_batch_export.py @@ -12,6 +12,7 @@ from temporalio import activity, workflow from temporalio.common import RetryPolicy +from posthog.batch_exports.models import BatchExportRun from posthog.batch_exports.service import BatchExportField, BatchExportSchema, S3BatchExportInputs from posthog.temporal.batch_exports.base import PostHogWorkflow from posthog.temporal.batch_exports.batch_exports import ( @@ -543,7 +544,7 @@ async def run(self, inputs: S3BatchExportInputs): update_inputs = UpdateBatchExportRunStatusInputs( id=run_id, - status="Completed", + status=BatchExportRun.Status.COMPLETED, team_id=inputs.team_id, ) diff --git a/posthog/temporal/batch_exports/snowflake_batch_export.py b/posthog/temporal/batch_exports/snowflake_batch_export.py index 425f71767eb76..417fa157084d6 100644 --- a/posthog/temporal/batch_exports/snowflake_batch_export.py +++ b/posthog/temporal/batch_exports/snowflake_batch_export.py @@ -14,6 +14,7 @@ from temporalio import activity, workflow from temporalio.common import RetryPolicy +from posthog.batch_exports.models import BatchExportRun from posthog.batch_exports.service import BatchExportField, BatchExportSchema, SnowflakeBatchExportInputs from posthog.temporal.batch_exports.base import PostHogWorkflow from posthog.temporal.batch_exports.batch_exports import ( @@ -594,7 +595,7 @@ async def run(self, inputs: SnowflakeBatchExportInputs): update_inputs = UpdateBatchExportRunStatusInputs( id=run_id, - status="Completed", + status=BatchExportRun.Status.COMPLETED, team_id=inputs.team_id, ) diff --git a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py index e8b05e9a2bd71..8ad4dae13eaaf 100644 --- a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py @@ -513,10 +513,61 @@ async def insert_into_bigquery_activity_mocked(_: BigQueryInsertInputs) -> str: assert len(runs) == 1 run = runs[0] - assert run.status == "Failed" + assert run.status == "FailedRetryable" assert run.latest_error == "ValueError: A useful error message" +async def test_bigquery_export_workflow_handles_insert_activity_non_retryable_errors( + ateam, bigquery_batch_export, interval +): + """Test that BigQuery Export Workflow can gracefully handle non-retryable errors when inserting BigQuery data.""" + data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") + + workflow_id = str(uuid4()) + inputs = BigQueryBatchExportInputs( + team_id=ateam.pk, + batch_export_id=str(bigquery_batch_export.id), + data_interval_end=data_interval_end.isoformat(), + interval=interval, + **bigquery_batch_export.destination.config, + ) + + @activity.defn(name="insert_into_bigquery_activity") + async def insert_into_bigquery_activity_mocked(_: BigQueryInsertInputs) -> str: + class RefreshError(Exception): + pass + + raise RefreshError("A useful error message") + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[BigQueryBatchExportWorkflow], + activities=[ + create_export_run, + insert_into_bigquery_activity_mocked, + update_export_run_status, + ], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + with pytest.raises(WorkflowFailureError): + await activity_environment.client.execute_workflow( + BigQueryBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + + runs = await afetch_batch_export_runs(batch_export_id=bigquery_batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Failed" + assert run.latest_error == "RefreshError: A useful error message" + + async def test_bigquery_export_workflow_handles_cancellation(ateam, bigquery_batch_export, interval): """Test that BigQuery Export Workflow can gracefully handle cancellations when inserting BigQuery data.""" data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") diff --git a/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py index 95547342cd77d..90d80325b02a6 100644 --- a/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_http_batch_export_workflow.py @@ -418,12 +418,61 @@ async def insert_into_http_activity_mocked(_: HttpInsertInputs) -> str: retry_policy=RetryPolicy(maximum_attempts=1), ) - runs = await afetch_batch_export_runs(batch_export_id=http_batch_export.id) - assert len(runs) == 1 + runs = await afetch_batch_export_runs(batch_export_id=http_batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "FailedRetryable" + assert run.latest_error == "ValueError: A useful error message" + + +async def test_http_export_workflow_handles_insert_activity_non_retryable_errors(ateam, http_batch_export, interval): + """Test that HTTP Export Workflow can gracefully handle non-retryable errors when POSTing to HTTP Endpoint.""" + data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") - run = runs[0] - assert run.status == "Failed" - assert run.latest_error == "ValueError: A useful error message" + workflow_id = str(uuid4()) + inputs = HttpBatchExportInputs( + team_id=ateam.pk, + batch_export_id=str(http_batch_export.id), + data_interval_end=data_interval_end.isoformat(), + interval=interval, + **http_batch_export.destination.config, + ) + + @activity.defn(name="insert_into_http_activity") + async def insert_into_http_activity_mocked(_: HttpInsertInputs) -> str: + class NonRetryableResponseError(Exception): + pass + + raise NonRetryableResponseError("A useful error message") + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[HttpBatchExportWorkflow], + activities=[ + create_export_run, + insert_into_http_activity_mocked, + update_export_run_status, + ], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + with pytest.raises(WorkflowFailureError): + await activity_environment.client.execute_workflow( + HttpBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + + runs = await afetch_batch_export_runs(batch_export_id=http_batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Failed" + assert run.latest_error == "NonRetryableResponseError: A useful error message" async def test_http_export_workflow_handles_cancellation(ateam, http_batch_export, interval): diff --git a/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py index f63751d593c33..28f52389b9f97 100644 --- a/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py @@ -437,12 +437,63 @@ async def insert_into_postgres_activity_mocked(_: PostgresInsertInputs) -> str: retry_policy=RetryPolicy(maximum_attempts=1), ) - runs = await afetch_batch_export_runs(batch_export_id=postgres_batch_export.id) - assert len(runs) == 1 + runs = await afetch_batch_export_runs(batch_export_id=postgres_batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "FailedRetryable" + assert run.latest_error == "ValueError: A useful error message" + + +async def test_postgres_export_workflow_handles_insert_activity_non_retryable_errors( + ateam, postgres_batch_export, interval +): + """Test that Postgres Export Workflow can gracefully handle non-retryable errors when inserting Postgres data.""" + data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") + + workflow_id = str(uuid4()) + inputs = PostgresBatchExportInputs( + team_id=ateam.pk, + batch_export_id=str(postgres_batch_export.id), + data_interval_end=data_interval_end.isoformat(), + interval=interval, + **postgres_batch_export.destination.config, + ) + + @activity.defn(name="insert_into_postgres_activity") + async def insert_into_postgres_activity_mocked(_: PostgresInsertInputs) -> str: + class InsufficientPrivilege(Exception): + pass + + raise InsufficientPrivilege("A useful error message") + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[PostgresBatchExportWorkflow], + activities=[ + create_export_run, + insert_into_postgres_activity_mocked, + update_export_run_status, + ], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + with pytest.raises(WorkflowFailureError): + await activity_environment.client.execute_workflow( + PostgresBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) - run = runs[0] - assert run.status == "Failed" - assert run.latest_error == "ValueError: A useful error message" + runs = await afetch_batch_export_runs(batch_export_id=postgres_batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Failed" + assert run.latest_error == "InsufficientPrivilege: A useful error message" async def test_postgres_export_workflow_handles_cancellation(ateam, postgres_batch_export, interval): diff --git a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py index e8392b5e4fb5b..92215352bf1fe 100644 --- a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py @@ -962,12 +962,64 @@ async def insert_into_s3_activity_mocked(_: S3InsertInputs) -> str: retry_policy=RetryPolicy(maximum_attempts=1), ) - runs = await afetch_batch_export_runs(batch_export_id=s3_batch_export.id) - assert len(runs) == 1 + runs = await afetch_batch_export_runs(batch_export_id=s3_batch_export.id) + assert len(runs) == 1 - run = runs[0] - assert run.status == "Failed" - assert run.latest_error == "ValueError: A useful error message" + run = runs[0] + assert run.status == "FailedRetryable" + assert run.latest_error == "ValueError: A useful error message" + + +async def test_s3_export_workflow_handles_insert_activity_non_retryable_errors(ateam, s3_batch_export, interval): + """Test S3BatchExport Workflow can handle non-retryable errors from executing the insert into S3 activity. + + Currently, this only means we do the right updates to the BatchExportRun model. + """ + data_interval_end = dt.datetime.fromisoformat("2023-04-25T14:30:00.000000+00:00") + + workflow_id = str(uuid4()) + inputs = S3BatchExportInputs( + team_id=ateam.pk, + batch_export_id=str(s3_batch_export.id), + data_interval_end=data_interval_end.isoformat(), + interval=interval, + **s3_batch_export.destination.config, + ) + + @activity.defn(name="insert_into_s3_activity") + async def insert_into_s3_activity_mocked(_: S3InsertInputs) -> str: + class ParamValidationError(Exception): + pass + + raise ParamValidationError("A useful error message") + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[S3BatchExportWorkflow], + activities=[ + create_export_run, + insert_into_s3_activity_mocked, + update_export_run_status, + ], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + with pytest.raises(WorkflowFailureError): + await activity_environment.client.execute_workflow( + S3BatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + + runs = await afetch_batch_export_runs(batch_export_id=s3_batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Failed" + assert run.latest_error == "ParamValidationError: A useful error message" @pytest.mark.asyncio diff --git a/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py index 3ddec5bbd969e..8a0d6b32bb3a9 100644 --- a/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py @@ -692,10 +692,56 @@ async def insert_into_snowflake_activity_mocked(_: SnowflakeInsertInputs) -> str assert len(runs) == 1 run = runs[0] - assert run.status == "Failed" + assert run.status == "FailedRetryable" assert run.latest_error == "ValueError: A useful error message" +async def test_snowflake_export_workflow_handles_insert_activity_non_retryable_errors(ateam, snowflake_batch_export): + """Test that Snowflake Export Workflow can gracefully handle non-retryable errors when inserting Snowflake data.""" + workflow_id = str(uuid4()) + inputs = SnowflakeBatchExportInputs( + team_id=ateam.pk, + batch_export_id=str(snowflake_batch_export.id), + data_interval_end="2023-04-25 14:30:00.000000", + **snowflake_batch_export.destination.config, + ) + + @activity.defn(name="insert_into_snowflake_activity") + async def insert_into_snowflake_activity_mocked(_: SnowflakeInsertInputs) -> str: + class ForbiddenError(Exception): + pass + + raise ForbiddenError("A useful error message") + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[SnowflakeBatchExportWorkflow], + activities=[ + create_export_run, + insert_into_snowflake_activity_mocked, + update_export_run_status, + ], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + with pytest.raises(WorkflowFailureError): + await activity_environment.client.execute_workflow( + SnowflakeBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + + runs = await afetch_batch_export_runs(batch_export_id=snowflake_batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Failed" + assert run.latest_error == "ForbiddenError: A useful error message" + + async def test_snowflake_export_workflow_handles_cancellation_mocked(ateam, snowflake_batch_export): """Test that Snowflake Export Workflow can gracefully handle cancellations when inserting Snowflake data.