From 8bf729febe2879990e94a60eb9acf6bbb397246f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Fri, 22 Sep 2023 15:35:36 +0200 Subject: [PATCH] feat: Record S3 BatchExport errors (#17535) --- .../temporal/tests/batch_exports/fixtures.py | 16 +- .../test_bigquery_batch_export_workflow.py | 153 +++++++++++++++++- .../test_postgres_batch_export_workflow.py | 139 ++++++++++++++++ .../test_s3_batch_export_workflow.py | 150 +++++++++++++++++ .../test_snowflake_batch_export_workflow.py | 149 +++++++++++++++++ .../workflows/bigquery_batch_export.py | 19 ++- .../workflows/postgres_batch_export.py | 19 ++- posthog/temporal/workflows/s3_batch_export.py | 17 +- .../workflows/snowflake_batch_export.py | 19 ++- 9 files changed, 660 insertions(+), 21 deletions(-) diff --git a/posthog/temporal/tests/batch_exports/fixtures.py b/posthog/temporal/tests/batch_exports/fixtures.py index 65de3fd4910c3..d54db02304cc5 100644 --- a/posthog/temporal/tests/batch_exports/fixtures.py +++ b/posthog/temporal/tests/batch_exports/fixtures.py @@ -1,7 +1,13 @@ from uuid import UUID + from asgiref.sync import sync_to_async +from temporalio.client import Client -from posthog.batch_exports.models import BatchExport, BatchExportDestination, BatchExportRun +from posthog.batch_exports.models import ( + BatchExport, + BatchExportDestination, + BatchExportRun, +) from posthog.batch_exports.service import sync_batch_export @@ -32,3 +38,11 @@ def fetch_batch_export_runs(batch_export_id: UUID, limit: int = 100) -> list[Bat async def afetch_batch_export_runs(batch_export_id: UUID, limit: int = 100) -> list[BatchExportRun]: """Fetch the BatchExportRuns for a given BatchExport.""" return await sync_to_async(fetch_batch_export_runs)(batch_export_id, limit) # type: ignore + + +async def adelete_batch_export(batch_export: BatchExport, temporal: Client) -> None: + """Delete a BatchExport and its underlying Schedule.""" + handle = temporal.get_schedule_handle(str(batch_export.id)) + await handle.delete() + + await sync_to_async(batch_export.delete)() # type: ignore diff --git a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py index b0d45d55f4b45..ad6e511577f85 100644 --- a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py @@ -1,19 +1,25 @@ +import asyncio import datetime as dt import json +import os from random import randint from uuid import uuid4 -import os import pytest +import pytest_asyncio +from asgiref.sync import sync_to_async from django.conf import settings from freezegun.api import freeze_time from google.cloud import bigquery +from temporalio import activity +from temporalio.client import WorkflowFailureError from temporalio.common import RetryPolicy from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker from posthog.api.test.test_organization import acreate_organization from posthog.api.test.test_team import acreate_team +from posthog.temporal.client import connect from posthog.temporal.tests.batch_exports.base import ( EventValues, amaterialize, @@ -21,16 +27,17 @@ ) from posthog.temporal.tests.batch_exports.fixtures import ( acreate_batch_export, + adelete_batch_export, afetch_batch_export_runs, ) from posthog.temporal.workflows.base import create_export_run, update_export_run_status -from posthog.temporal.workflows.clickhouse import ClickHouseClient from posthog.temporal.workflows.bigquery_batch_export import ( BigQueryBatchExportInputs, BigQueryBatchExportWorkflow, BigQueryInsertInputs, insert_into_bigquery_activity, ) +from posthog.temporal.workflows.clickhouse import ClickHouseClient TEST_TIME = dt.datetime.utcnow() @@ -406,3 +413,145 @@ async def test_bigquery_export_workflow( events=events, bq_ingested_timestamp=ingested_timestamp, ) + + +@pytest_asyncio.fixture +async def organization(): + organization = await acreate_organization("test") + yield organization + await sync_to_async(organization.delete)() # type: ignore + + +@pytest_asyncio.fixture +async def team(organization): + team = await acreate_team(organization=organization) + yield team + await sync_to_async(team.delete)() # type: ignore + + +@pytest_asyncio.fixture +async def batch_export(team): + destination_data = { + "type": "BigQuery", + "config": { + "table_id": f"test_workflow_table_{team.pk}", + "project_id": "project_id", + "private_key": "private_key", + "private_key_id": "private_key_id", + "token_uri": "token_uri", + "client_email": "client_email", + "dataset_id": "BatchExports", + }, + } + batch_export_data = { + "name": "my-production-bigquery-export", + "destination": destination_data, + "interval": "hour", + } + + batch_export = await acreate_batch_export( + team_id=team.pk, + name=batch_export_data["name"], + destination_data=batch_export_data["destination"], + interval=batch_export_data["interval"], + ) + + yield batch_export + + client = await connect( + settings.TEMPORAL_HOST, + settings.TEMPORAL_PORT, + settings.TEMPORAL_NAMESPACE, + settings.TEMPORAL_CLIENT_ROOT_CA, + settings.TEMPORAL_CLIENT_CERT, + settings.TEMPORAL_CLIENT_KEY, + ) + await adelete_batch_export(batch_export, client) + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_bigquery_export_workflow_handles_insert_activity_errors(team, batch_export): + """Test that BigQuery Export Workflow can gracefully handle errors when inserting BigQuery data.""" + workflow_id = str(uuid4()) + inputs = BigQueryBatchExportInputs( + team_id=team.pk, + batch_export_id=str(batch_export.id), + data_interval_end="2023-04-25 14:30:00.000000", + **batch_export.destination.config, + ) + + @activity.defn(name="insert_into_bigquery_activity") + async def insert_into_bigquery_activity_mocked(_: BigQueryInsertInputs) -> str: + raise ValueError("A useful error message") + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[BigQueryBatchExportWorkflow], + activities=[create_export_run, insert_into_bigquery_activity_mocked, update_export_run_status], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + with pytest.raises(WorkflowFailureError): + await activity_environment.client.execute_workflow( + BigQueryBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + + runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Failed" + assert run.latest_error == "ValueError: A useful error message" + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_bigquery_export_workflow_handles_cancellation(team, batch_export): + """Test that BigQuery Export Workflow can gracefully handle cancellations when inserting BigQuery data.""" + workflow_id = str(uuid4()) + inputs = BigQueryBatchExportInputs( + team_id=team.pk, + batch_export_id=str(batch_export.id), + data_interval_end="2023-04-25 14:30:00.000000", + **batch_export.destination.config, + ) + + @activity.defn(name="insert_into_s3_activity") + async def never_finish_activity(_: BigQueryInsertInputs) -> str: + while True: + activity.heartbeat() + await asyncio.sleep(1) + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[BigQueryBatchExportWorkflow], + activities=[create_export_run, never_finish_activity, update_export_run_status], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + handle = await activity_environment.client.start_workflow( + BigQueryBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + await asyncio.sleep(5) + await handle.cancel() + + with pytest.raises(WorkflowFailureError): + await handle.result() + + runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Cancelled" + assert run.latest_error == "Cancelled" diff --git a/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py index 499acbd29502d..831a7e9308ba1 100644 --- a/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py @@ -1,3 +1,4 @@ +import asyncio import datetime as dt import json from random import randint @@ -5,15 +6,20 @@ import psycopg2 import pytest +import pytest_asyncio +from asgiref.sync import sync_to_async from django.conf import settings from django.test import override_settings from psycopg2 import sql +from temporalio import activity +from temporalio.client import WorkflowFailureError from temporalio.common import RetryPolicy from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker from posthog.api.test.test_organization import acreate_organization from posthog.api.test.test_team import acreate_team +from posthog.temporal.client import connect from posthog.temporal.tests.batch_exports.base import ( EventValues, amaterialize, @@ -21,6 +27,7 @@ ) from posthog.temporal.tests.batch_exports.fixtures import ( acreate_batch_export, + adelete_batch_export, afetch_batch_export_runs, ) from posthog.temporal.workflows.base import create_export_run, update_export_run_status @@ -439,3 +446,135 @@ async def test_postgres_export_workflow( assert run.status == "Completed" assert_events_in_postgres(postgres_connection, postgres_config["schema"], table_name, events) + + +@pytest_asyncio.fixture +async def organization(): + organization = await acreate_organization("test") + yield organization + await sync_to_async(organization.delete)() # type: ignore + + +@pytest_asyncio.fixture +async def team(organization): + team = await acreate_team(organization=organization) + yield team + await sync_to_async(team.delete)() # type: ignore + + +@pytest_asyncio.fixture +async def batch_export(team, postgres_config): + table_name = "test_workflow_table" + destination_data = {"type": "Postgres", "config": {**postgres_config, "table_name": table_name}} + batch_export_data = { + "name": "my-production-postgres-export", + "destination": destination_data, + "interval": "hour", + } + + batch_export = await acreate_batch_export( + team_id=team.pk, + name=batch_export_data["name"], + destination_data=batch_export_data["destination"], + interval=batch_export_data["interval"], + ) + + yield batch_export + + client = await connect( + settings.TEMPORAL_HOST, + settings.TEMPORAL_PORT, + settings.TEMPORAL_NAMESPACE, + settings.TEMPORAL_CLIENT_ROOT_CA, + settings.TEMPORAL_CLIENT_CERT, + settings.TEMPORAL_CLIENT_KEY, + ) + await adelete_batch_export(batch_export, client) + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_postgres_export_workflow_handles_insert_activity_errors(team, batch_export): + """Test that Postgres Export Workflow can gracefully handle errors when inserting Postgres data.""" + workflow_id = str(uuid4()) + inputs = PostgresBatchExportInputs( + team_id=team.pk, + batch_export_id=str(batch_export.id), + data_interval_end="2023-04-25 14:30:00.000000", + **batch_export.destination.config, + ) + + @activity.defn(name="insert_into_postgres_activity") + async def insert_into_postgres_activity_mocked(_: PostgresInsertInputs) -> str: + raise ValueError("A useful error message") + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[PostgresBatchExportWorkflow], + activities=[create_export_run, insert_into_postgres_activity_mocked, update_export_run_status], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + with pytest.raises(WorkflowFailureError): + await activity_environment.client.execute_workflow( + PostgresBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + + runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Failed" + assert run.latest_error == "ValueError: A useful error message" + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_postgres_export_workflow_handles_cancellation(team, batch_export): + """Test that Postgres Export Workflow can gracefully handle cancellations when inserting Postgres data.""" + workflow_id = str(uuid4()) + inputs = PostgresBatchExportInputs( + team_id=team.pk, + batch_export_id=str(batch_export.id), + data_interval_end="2023-04-25 14:30:00.000000", + **batch_export.destination.config, + ) + + @activity.defn(name="insert_into_postgres_activity") + async def never_finish_activity(_: PostgresInsertInputs) -> str: + while True: + activity.heartbeat() + await asyncio.sleep(1) + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[PostgresBatchExportWorkflow], + activities=[create_export_run, never_finish_activity, update_export_run_status], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + handle = await activity_environment.client.start_workflow( + PostgresBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + await asyncio.sleep(5) + await handle.cancel() + + with pytest.raises(WorkflowFailureError): + await handle.result() + + runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Cancelled" + assert run.latest_error == "Cancelled" diff --git a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py index 08f0d285a944c..2511580358e72 100644 --- a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py @@ -1,3 +1,4 @@ +import asyncio import datetime as dt import functools import gzip @@ -12,15 +13,20 @@ import botocore.exceptions import brotli import pytest +import pytest_asyncio +from asgiref.sync import sync_to_async from django.conf import settings from django.test import Client as HttpClient from django.test import override_settings +from temporalio import activity +from temporalio.client import WorkflowFailureError from temporalio.common import RetryPolicy from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker from posthog.api.test.test_organization import acreate_organization from posthog.api.test.test_team import acreate_team +from posthog.temporal.client import connect from posthog.temporal.tests.batch_exports.base import ( EventValues, amaterialize, @@ -29,6 +35,7 @@ ) from posthog.temporal.tests.batch_exports.fixtures import ( acreate_batch_export, + adelete_batch_export, afetch_batch_export_runs, ) from posthog.temporal.workflows.base import create_export_run, update_export_run_status @@ -1072,6 +1079,149 @@ async def test_s3_export_workflow_with_minio_bucket_produces_no_duplicates( assert_events_in_s3(s3_client, bucket_name, prefix, events, compression) +@pytest_asyncio.fixture +async def organization(): + organization = await acreate_organization("test") + yield organization + await sync_to_async(organization.delete)() # type: ignore + + +@pytest_asyncio.fixture +async def team(organization): + team = await acreate_team(organization=organization) + yield team + await sync_to_async(team.delete)() # type: ignore + + +@pytest_asyncio.fixture +async def batch_export(team): + prefix = f"posthog-events-{str(uuid4())}" + destination_data = { + "type": "S3", + "config": { + "bucket_name": "test-bucket", + "region": "us-east-1", + "prefix": prefix, + "aws_access_key_id": "object_storage_root_user", + "aws_secret_access_key": "object_storage_root_password", + "compression": "gzip", + }, + } + + batch_export_data = { + "name": "my-production-s3-bucket-destination", + "destination": destination_data, + "interval": "hour", + } + + batch_export = await acreate_batch_export( + team_id=team.pk, + name=batch_export_data["name"], + destination_data=batch_export_data["destination"], + interval=batch_export_data["interval"], + ) + + yield batch_export + + client = await connect( + settings.TEMPORAL_HOST, + settings.TEMPORAL_PORT, + settings.TEMPORAL_NAMESPACE, + settings.TEMPORAL_CLIENT_ROOT_CA, + settings.TEMPORAL_CLIENT_CERT, + settings.TEMPORAL_CLIENT_KEY, + ) + await adelete_batch_export(batch_export, client) + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_s3_export_workflow_handles_insert_activity_errors(team, batch_export): + """Test that S3 Export Workflow can gracefully handle errors when inserting S3 data.""" + workflow_id = str(uuid4()) + inputs = S3BatchExportInputs( + team_id=team.pk, + batch_export_id=str(batch_export.id), + data_interval_end="2023-04-25 14:30:00.000000", + **batch_export.destination.config, + ) + + @activity.defn(name="insert_into_s3_activity") + async def insert_into_s3_activity_mocked(_: S3InsertInputs) -> str: + raise ValueError("A useful error message") + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[S3BatchExportWorkflow], + activities=[create_export_run, insert_into_s3_activity_mocked, update_export_run_status], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + with pytest.raises(WorkflowFailureError): + await activity_environment.client.execute_workflow( + S3BatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + + runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Failed" + assert run.latest_error == "ValueError: A useful error message" + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_s3_export_workflow_handles_cancellation(team, batch_export): + """Test that S3 Export Workflow can gracefully handle cancellations when inserting S3 data.""" + workflow_id = str(uuid4()) + inputs = S3BatchExportInputs( + team_id=team.pk, + batch_export_id=str(batch_export.id), + data_interval_end="2023-04-25 14:30:00.000000", + **batch_export.destination.config, + ) + + @activity.defn(name="insert_into_s3_activity") + async def never_finish_activity(_: S3InsertInputs) -> str: + while True: + activity.heartbeat() + await asyncio.sleep(1) + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[S3BatchExportWorkflow], + activities=[create_export_run, never_finish_activity, update_export_run_status], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + handle = await activity_environment.client.start_workflow( + S3BatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + await asyncio.sleep(5) + await handle.cancel() + + with pytest.raises(WorkflowFailureError): + await handle.result() + + runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Cancelled" + assert run.latest_error == "Cancelled" + + # We don't care about these for the next test, just need something to be defined. base_inputs = { "bucket_name": "test", diff --git a/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py index 979929d1ce205..af82608baa8a9 100644 --- a/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py @@ -1,3 +1,4 @@ +import asyncio import datetime as dt import gzip import json @@ -6,10 +7,13 @@ from uuid import uuid4 import pytest +import pytest_asyncio import responses +from asgiref.sync import sync_to_async from django.conf import settings from django.test import override_settings from requests.models import PreparedRequest +from temporalio import activity from temporalio.client import WorkflowFailureError from temporalio.common import RetryPolicy from temporalio.exceptions import ActivityError, ApplicationError @@ -18,6 +22,7 @@ from posthog.api.test.test_organization import acreate_organization from posthog.api.test.test_team import acreate_team +from posthog.temporal.client import connect from posthog.temporal.tests.batch_exports.base import ( EventValues, insert_events, @@ -25,6 +30,7 @@ ) from posthog.temporal.tests.batch_exports.fixtures import ( acreate_batch_export, + adelete_batch_export, afetch_batch_export_runs, ) from posthog.temporal.workflows.base import create_export_run, update_export_run_status @@ -32,6 +38,7 @@ from posthog.temporal.workflows.snowflake_batch_export import ( SnowflakeBatchExportInputs, SnowflakeBatchExportWorkflow, + SnowflakeInsertInputs, insert_into_snowflake_activity, ) @@ -645,3 +652,145 @@ async def test_snowflake_export_workflow_raises_error_on_copy_fail(): assert isinstance(err.__cause__, ActivityError) assert isinstance(err.__cause__.__cause__, ApplicationError) assert err.__cause__.__cause__.type == "SnowflakeFileNotLoadedError" + + +@pytest_asyncio.fixture +async def organization(): + organization = await acreate_organization("test") + yield organization + await sync_to_async(organization.delete)() # type: ignore + + +@pytest_asyncio.fixture +async def team(organization): + team = await acreate_team(organization=organization) + yield team + await sync_to_async(team.delete)() # type: ignore + + +@pytest_asyncio.fixture +async def batch_export(team): + destination_data = { + "type": "Snowflake", + "config": { + "user": "hazzadous", + "password": "password", + "account": "account", + "database": "PostHog", + "schema": "test", + "warehouse": "COMPUTE_WH", + "table_name": "events", + }, + } + batch_export_data = { + "name": "my-production-snowflake-export", + "destination": destination_data, + "interval": "hour", + } + + batch_export = await acreate_batch_export( + team_id=team.pk, + name=batch_export_data["name"], + destination_data=batch_export_data["destination"], + interval=batch_export_data["interval"], + ) + + yield batch_export + + client = await connect( + settings.TEMPORAL_HOST, + settings.TEMPORAL_PORT, + settings.TEMPORAL_NAMESPACE, + settings.TEMPORAL_CLIENT_ROOT_CA, + settings.TEMPORAL_CLIENT_CERT, + settings.TEMPORAL_CLIENT_KEY, + ) + await adelete_batch_export(batch_export, client) + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_snowflake_export_workflow_handles_insert_activity_errors(team, batch_export): + """Test that Snowflake Export Workflow can gracefully handle errors when inserting Snowflake data.""" + workflow_id = str(uuid4()) + inputs = SnowflakeBatchExportInputs( + team_id=team.pk, + batch_export_id=str(batch_export.id), + data_interval_end="2023-04-25 14:30:00.000000", + **batch_export.destination.config, + ) + + @activity.defn(name="insert_into_snowflake_activity") + async def insert_into_snowflake_activity_mocked(_: SnowflakeInsertInputs) -> str: + raise ValueError("A useful error message") + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[SnowflakeBatchExportWorkflow], + activities=[create_export_run, insert_into_snowflake_activity_mocked, update_export_run_status], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + with pytest.raises(WorkflowFailureError): + await activity_environment.client.execute_workflow( + SnowflakeBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + + runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Failed" + assert run.latest_error == "ValueError: A useful error message" + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_snowflake_export_workflow_handles_cancellation(team, batch_export): + """Test that Snowflake Export Workflow can gracefully handle cancellations when inserting Snowflake data.""" + workflow_id = str(uuid4()) + inputs = SnowflakeBatchExportInputs( + team_id=team.pk, + batch_export_id=str(batch_export.id), + data_interval_end="2023-04-25 14:30:00.000000", + **batch_export.destination.config, + ) + + @activity.defn(name="insert_into_snowflake_activity") + async def never_finish_activity(_: SnowflakeInsertInputs) -> str: + while True: + activity.heartbeat() + await asyncio.sleep(1) + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[SnowflakeBatchExportWorkflow], + activities=[create_export_run, never_finish_activity, update_export_run_status], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + handle = await activity_environment.client.start_workflow( + SnowflakeBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + await asyncio.sleep(5) + await handle.cancel() + + with pytest.raises(WorkflowFailureError): + await handle.result() + + runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Cancelled" + assert run.latest_error == "Cancelled" diff --git a/posthog/temporal/workflows/bigquery_batch_export.py b/posthog/temporal/workflows/bigquery_batch_export.py index f1f247a0672fc..4be09632ff12f 100644 --- a/posthog/temporal/workflows/bigquery_batch_export.py +++ b/posthog/temporal/workflows/bigquery_batch_export.py @@ -6,7 +6,7 @@ from django.conf import settings from google.cloud import bigquery from google.oauth2 import service_account -from temporalio import activity, workflow +from temporalio import activity, exceptions, workflow from temporalio.common import RetryPolicy from posthog.batch_exports.service import BigQueryBatchExportInputs @@ -253,12 +253,21 @@ async def run(self, inputs: BigQueryBatchExportInputs): ), ) + except exceptions.ActivityError as e: + if isinstance(e.cause, exceptions.CancelledError): + workflow.logger.exception("BigQuery BatchExport was cancelled.") + update_inputs.status = "Cancelled" + else: + workflow.logger.exception("BigQuery BatchExport failed.", exc_info=e) + update_inputs.status = "Failed" + + update_inputs.latest_error = str(e.cause) + raise + except Exception as e: - workflow.logger.exception("Bigquery BatchExport failed.", exc_info=e) + workflow.logger.exception("BigQuery BatchExport failed with an unexpected exception.", exc_info=e) update_inputs.status = "Failed" - # Note: This shallows the exception type, but the message should be enough. - # If not, swap to repr(e) - update_inputs.latest_error = str(e) + update_inputs.latest_error = "An unexpected error has ocurred" raise finally: diff --git a/posthog/temporal/workflows/postgres_batch_export.py b/posthog/temporal/workflows/postgres_batch_export.py index b81c7496b3adb..b077ac892d698 100644 --- a/posthog/temporal/workflows/postgres_batch_export.py +++ b/posthog/temporal/workflows/postgres_batch_export.py @@ -6,7 +6,7 @@ import psycopg2 from django.conf import settings from psycopg2 import sql -from temporalio import activity, workflow +from temporalio import activity, exceptions, workflow from temporalio.common import RetryPolicy from posthog.batch_exports.service import PostgresBatchExportInputs @@ -254,12 +254,21 @@ async def run(self, inputs: PostgresBatchExportInputs): ), ) + except exceptions.ActivityError as e: + if isinstance(e.cause, exceptions.CancelledError): + workflow.logger.exception("Postgres BatchExport was cancelled.") + update_inputs.status = "Cancelled" + else: + workflow.logger.exception("Postgres BatchExport failed.", exc_info=e) + update_inputs.status = "Failed" + + update_inputs.latest_error = str(e.cause) + raise + except Exception as e: - workflow.logger.exception("Postgres BatchExport failed.", exc_info=e) + workflow.logger.exception("Postgres BatchExport failed with an unexpected exception.", exc_info=e) update_inputs.status = "Failed" - # Note: This shallows the exception type, but the message should be enough. - # If not, swap to repr(e) - update_inputs.latest_error = str(e) + update_inputs.latest_error = "An unexpected error has ocurred" raise finally: diff --git a/posthog/temporal/workflows/s3_batch_export.py b/posthog/temporal/workflows/s3_batch_export.py index 13bbf183e5d06..a4987b6024338 100644 --- a/posthog/temporal/workflows/s3_batch_export.py +++ b/posthog/temporal/workflows/s3_batch_export.py @@ -7,7 +7,7 @@ import boto3 from django.conf import settings -from temporalio import activity, workflow +from temporalio import activity, exceptions, workflow from temporalio.common import RetryPolicy from posthog.batch_exports.service import S3BatchExportInputs @@ -481,10 +481,21 @@ async def run(self, inputs: S3BatchExportInputs): ), ) + except exceptions.ActivityError as e: + if isinstance(e.cause, exceptions.CancelledError): + workflow.logger.exception("S3 BatchExport was cancelled.") + update_inputs.status = "Cancelled" + else: + workflow.logger.exception("S3 BatchExport failed.", exc_info=e) + update_inputs.status = "Failed" + + update_inputs.latest_error = str(e.cause) + raise + except Exception as e: - workflow.logger.exception("S3 BatchExport failed.", exc_info=e) + workflow.logger.exception("S3 BatchExport failed with an unexpected exception.", exc_info=e) update_inputs.status = "Failed" - update_inputs.latest_error = str(e) + update_inputs.latest_error = "An unexpected error has ocurred" raise finally: diff --git a/posthog/temporal/workflows/snowflake_batch_export.py b/posthog/temporal/workflows/snowflake_batch_export.py index 558e4bda7df75..a38f15d7aab73 100644 --- a/posthog/temporal/workflows/snowflake_batch_export.py +++ b/posthog/temporal/workflows/snowflake_batch_export.py @@ -6,7 +6,7 @@ import snowflake.connector from django.conf import settings from snowflake.connector.cursor import SnowflakeCursor -from temporalio import activity, workflow +from temporalio import activity, exceptions, workflow from temporalio.common import RetryPolicy from posthog.batch_exports.service import SnowflakeBatchExportInputs @@ -342,12 +342,21 @@ async def run(self, inputs: SnowflakeBatchExportInputs): ), ) + except exceptions.ActivityError as e: + if isinstance(e.cause, exceptions.CancelledError): + workflow.logger.exception("Snowflake BatchExport was cancelled.") + update_inputs.status = "Cancelled" + else: + workflow.logger.exception("Snowflake BatchExport failed.", exc_info=e) + update_inputs.status = "Failed" + + update_inputs.latest_error = str(e.cause) + raise + except Exception as e: - workflow.logger.exception("Snowflake BatchExport failed.", exc_info=e) + workflow.logger.exception("Snowflake BatchExport failed with an unexpected exception.", exc_info=e) update_inputs.status = "Failed" - # Note: This shallows the exception type, but the message should be enough. - # If not, swap to repr(e) - update_inputs.latest_error = str(e) + update_inputs.latest_error = "An unexpected error has ocurred" raise finally: