From 1057c4e6edbcc7d470bcdf2518bf331d0f15d0d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 19 Sep 2023 16:23:42 +0200 Subject: [PATCH 1/7] feat: Record S3 BatchExport errors --- .../test_s3_batch_export_workflow.py | 150 ++++++++++++++++++ posthog/temporal/workflows/s3_batch_export.py | 17 +- 2 files changed, 164 insertions(+), 3 deletions(-) diff --git a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py index 08f0d285a944c..03b1025794ca5 100644 --- a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py @@ -1,3 +1,4 @@ +import asyncio import datetime as dt import functools import gzip @@ -15,6 +16,8 @@ from django.conf import settings from django.test import Client as HttpClient from django.test import override_settings +from temporalio import activity +from temporalio.client import WorkflowFailureError from temporalio.common import RetryPolicy from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker @@ -1072,6 +1075,153 @@ async def test_s3_export_workflow_with_minio_bucket_produces_no_duplicates( assert_events_in_s3(s3_client, bucket_name, prefix, events, compression) +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_s3_export_workflow_handles_insert_activity_errors(): + """Test that S3 Export Workflow can gracefully handle errors when inserting S3 data.""" + prefix = f"posthog-events-{str(uuid4())}" + destination_data = { + "type": "S3", + "config": { + "bucket_name": "test-bucket", + "region": "us-east-1", + "prefix": prefix, + "aws_access_key_id": "object_storage_root_user", + "aws_secret_access_key": "object_storage_root_password", + "compression": "gzip", + }, + } + + batch_export_data = { + "name": "my-production-s3-bucket-destination", + "destination": destination_data, + "interval": "hour", + } + + organization = await acreate_organization("test") + team = await acreate_team(organization=organization) + batch_export = await acreate_batch_export( + team_id=team.pk, + name=batch_export_data["name"], + destination_data=batch_export_data["destination"], + interval=batch_export_data["interval"], + ) + + workflow_id = str(uuid4()) + inputs = S3BatchExportInputs( + team_id=team.pk, + batch_export_id=str(batch_export.id), + data_interval_end="2023-04-25 14:30:00.000000", + **batch_export.destination.config, + ) + + @activity.defn(name="insert_into_s3_activity") + async def insert_into_s3_activity_mocked(_: S3InsertInputs) -> str: + raise ValueError("A useful error message") + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[S3BatchExportWorkflow], + activities=[create_export_run, insert_into_s3_activity_mocked, update_export_run_status], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + with pytest.raises(WorkflowFailureError): + await activity_environment.client.execute_workflow( + S3BatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + + runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Failed" + assert run.latest_error == "ValueError: A useful error message" + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_s3_export_workflow_handles_cancellation(): + """Test that S3 Export Workflow end-to-end by using a local MinIO bucket instead of S3. + + In this particular instance of the test, we assert no duplicates are exported to S3. + """ + prefix = f"posthog-events-{str(uuid4())}" + destination_data = { + "type": "S3", + "config": { + "bucket_name": "test-bucket", + "region": "us-east-1", + "prefix": prefix, + "aws_access_key_id": "object_storage_root_user", + "aws_secret_access_key": "object_storage_root_password", + "compression": "gzip", + }, + } + + batch_export_data = { + "name": "my-production-s3-bucket-destination", + "destination": destination_data, + "interval": "hour", + } + + organization = await acreate_organization("test") + team = await acreate_team(organization=organization) + batch_export = await acreate_batch_export( + team_id=team.pk, + name=batch_export_data["name"], + destination_data=batch_export_data["destination"], + interval=batch_export_data["interval"], + ) + + workflow_id = str(uuid4()) + inputs = S3BatchExportInputs( + team_id=team.pk, + batch_export_id=str(batch_export.id), + data_interval_end="2023-04-25 14:30:00.000000", + **batch_export.destination.config, + ) + + @activity.defn(name="insert_into_s3_activity") + async def never_finish_activity(_: S3InsertInputs) -> str: + while True: + activity.heartbeat() + await asyncio.sleep(1) + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[S3BatchExportWorkflow], + activities=[create_export_run, never_finish_activity, update_export_run_status], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + handle = await activity_environment.client.start_workflow( + S3BatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + await asyncio.sleep(5) + await handle.cancel() + + with pytest.raises(WorkflowFailureError): + await handle.result() + + runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Cancelled" + assert run.latest_error == "Cancelled" + + # We don't care about these for the next test, just need something to be defined. base_inputs = { "bucket_name": "test", diff --git a/posthog/temporal/workflows/s3_batch_export.py b/posthog/temporal/workflows/s3_batch_export.py index 13bbf183e5d06..a4987b6024338 100644 --- a/posthog/temporal/workflows/s3_batch_export.py +++ b/posthog/temporal/workflows/s3_batch_export.py @@ -7,7 +7,7 @@ import boto3 from django.conf import settings -from temporalio import activity, workflow +from temporalio import activity, exceptions, workflow from temporalio.common import RetryPolicy from posthog.batch_exports.service import S3BatchExportInputs @@ -481,10 +481,21 @@ async def run(self, inputs: S3BatchExportInputs): ), ) + except exceptions.ActivityError as e: + if isinstance(e.cause, exceptions.CancelledError): + workflow.logger.exception("S3 BatchExport was cancelled.") + update_inputs.status = "Cancelled" + else: + workflow.logger.exception("S3 BatchExport failed.", exc_info=e) + update_inputs.status = "Failed" + + update_inputs.latest_error = str(e.cause) + raise + except Exception as e: - workflow.logger.exception("S3 BatchExport failed.", exc_info=e) + workflow.logger.exception("S3 BatchExport failed with an unexpected exception.", exc_info=e) update_inputs.status = "Failed" - update_inputs.latest_error = str(e) + update_inputs.latest_error = "An unexpected error has ocurred" raise finally: From 07c5cf2a38e3320dfb8d5d6a49d326e7d68aaf6f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 19 Sep 2023 20:12:44 +0200 Subject: [PATCH 2/7] feat: Display batch export run latest error --- frontend/src/scenes/batch_exports/BatchExportScene.tsx | 8 ++++++++ frontend/src/scenes/batch_exports/batchExportLogic.ts | 4 ++++ frontend/src/types.ts | 3 +++ 3 files changed, 15 insertions(+) diff --git a/frontend/src/scenes/batch_exports/BatchExportScene.tsx b/frontend/src/scenes/batch_exports/BatchExportScene.tsx index 609d1103bec3c..e65e1f1bfe849 100644 --- a/frontend/src/scenes/batch_exports/BatchExportScene.tsx +++ b/frontend/src/scenes/batch_exports/BatchExportScene.tsx @@ -262,6 +262,13 @@ export function BatchExportScene(): JSX.Element { tooltip: 'Date and time when this BatchExport run started', render: (_, run) => , }, + + { + title: 'Error message', + tooltip: 'Error message if this run failed', + key: 'message', + dataIndex: 'latest_error', + }, ]} /> ) @@ -312,6 +319,7 @@ export function BatchExportScene(): JSX.Element { return }, }, + { // title: 'Actions', key: 'actions', diff --git a/frontend/src/scenes/batch_exports/batchExportLogic.ts b/frontend/src/scenes/batch_exports/batchExportLogic.ts index d8a07ffaacb2e..5e06601acf8dc 100644 --- a/frontend/src/scenes/batch_exports/batchExportLogic.ts +++ b/frontend/src/scenes/batch_exports/batchExportLogic.ts @@ -201,12 +201,16 @@ export const batchExportLogic = kea([ data_interval_end: run.data_interval_end, runs: [], last_run_at: run.created_at, + latest_error: run.latest_error, + status: run.status, } } groupedRuns[key].runs.push(run) groupedRuns[key].runs.sort((a, b) => b.created_at.diff(a.created_at)) groupedRuns[key].last_run_at = groupedRuns[key].runs[0].created_at + groupedRuns[key].latest_error = groupedRuns[key].runs[0].latest_error + groupedRuns[key].status = groupedRuns[key].runs[0].status }) return Object.values(groupedRuns).sort((a, b) => b.data_interval_end.diff(a.data_interval_end)) diff --git a/frontend/src/types.ts b/frontend/src/types.ts index 32e0e2862ca3f..b8f2c220f56e0 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -3171,6 +3171,7 @@ export type BatchExportRun = { data_interval_start: Dayjs data_interval_end: Dayjs last_updated_at?: Dayjs + latest_error: string | null } export type GroupedBatchExportRuns = { @@ -3178,6 +3179,8 @@ export type GroupedBatchExportRuns = { data_interval_start: Dayjs data_interval_end: Dayjs runs: BatchExportRun[] + latest_error: string | null + status: string } export type SDK = { From 438cc1c0c6222028bffdff20152b6ed4a69bc903 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 19 Sep 2023 20:59:43 +0200 Subject: [PATCH 3/7] refactor: Abstract some S3 batch export test setup --- .../temporal/tests/batch_exports/fixtures.py | 16 +++- .../test_s3_batch_export_workflow.py | 78 +++++++++---------- 2 files changed, 54 insertions(+), 40 deletions(-) diff --git a/posthog/temporal/tests/batch_exports/fixtures.py b/posthog/temporal/tests/batch_exports/fixtures.py index 65de3fd4910c3..d54db02304cc5 100644 --- a/posthog/temporal/tests/batch_exports/fixtures.py +++ b/posthog/temporal/tests/batch_exports/fixtures.py @@ -1,7 +1,13 @@ from uuid import UUID + from asgiref.sync import sync_to_async +from temporalio.client import Client -from posthog.batch_exports.models import BatchExport, BatchExportDestination, BatchExportRun +from posthog.batch_exports.models import ( + BatchExport, + BatchExportDestination, + BatchExportRun, +) from posthog.batch_exports.service import sync_batch_export @@ -32,3 +38,11 @@ def fetch_batch_export_runs(batch_export_id: UUID, limit: int = 100) -> list[Bat async def afetch_batch_export_runs(batch_export_id: UUID, limit: int = 100) -> list[BatchExportRun]: """Fetch the BatchExportRuns for a given BatchExport.""" return await sync_to_async(fetch_batch_export_runs)(batch_export_id, limit) # type: ignore + + +async def adelete_batch_export(batch_export: BatchExport, temporal: Client) -> None: + """Delete a BatchExport and its underlying Schedule.""" + handle = temporal.get_schedule_handle(str(batch_export.id)) + await handle.delete() + + await sync_to_async(batch_export.delete)() # type: ignore diff --git a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py index 03b1025794ca5..2511580358e72 100644 --- a/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_s3_batch_export_workflow.py @@ -13,6 +13,8 @@ import botocore.exceptions import brotli import pytest +import pytest_asyncio +from asgiref.sync import sync_to_async from django.conf import settings from django.test import Client as HttpClient from django.test import override_settings @@ -24,6 +26,7 @@ from posthog.api.test.test_organization import acreate_organization from posthog.api.test.test_team import acreate_team +from posthog.temporal.client import connect from posthog.temporal.tests.batch_exports.base import ( EventValues, amaterialize, @@ -32,6 +35,7 @@ ) from posthog.temporal.tests.batch_exports.fixtures import ( acreate_batch_export, + adelete_batch_export, afetch_batch_export_runs, ) from posthog.temporal.workflows.base import create_export_run, update_export_run_status @@ -1075,10 +1079,22 @@ async def test_s3_export_workflow_with_minio_bucket_produces_no_duplicates( assert_events_in_s3(s3_client, bucket_name, prefix, events, compression) -@pytest.mark.django_db -@pytest.mark.asyncio -async def test_s3_export_workflow_handles_insert_activity_errors(): - """Test that S3 Export Workflow can gracefully handle errors when inserting S3 data.""" +@pytest_asyncio.fixture +async def organization(): + organization = await acreate_organization("test") + yield organization + await sync_to_async(organization.delete)() # type: ignore + + +@pytest_asyncio.fixture +async def team(organization): + team = await acreate_team(organization=organization) + yield team + await sync_to_async(team.delete)() # type: ignore + + +@pytest_asyncio.fixture +async def batch_export(team): prefix = f"posthog-events-{str(uuid4())}" destination_data = { "type": "S3", @@ -1098,8 +1114,6 @@ async def test_s3_export_workflow_handles_insert_activity_errors(): "interval": "hour", } - organization = await acreate_organization("test") - team = await acreate_team(organization=organization) batch_export = await acreate_batch_export( team_id=team.pk, name=batch_export_data["name"], @@ -1107,6 +1121,23 @@ async def test_s3_export_workflow_handles_insert_activity_errors(): interval=batch_export_data["interval"], ) + yield batch_export + + client = await connect( + settings.TEMPORAL_HOST, + settings.TEMPORAL_PORT, + settings.TEMPORAL_NAMESPACE, + settings.TEMPORAL_CLIENT_ROOT_CA, + settings.TEMPORAL_CLIENT_CERT, + settings.TEMPORAL_CLIENT_KEY, + ) + await adelete_batch_export(batch_export, client) + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_s3_export_workflow_handles_insert_activity_errors(team, batch_export): + """Test that S3 Export Workflow can gracefully handle errors when inserting S3 data.""" workflow_id = str(uuid4()) inputs = S3BatchExportInputs( team_id=team.pk, @@ -1146,39 +1177,8 @@ async def insert_into_s3_activity_mocked(_: S3InsertInputs) -> str: @pytest.mark.django_db @pytest.mark.asyncio -async def test_s3_export_workflow_handles_cancellation(): - """Test that S3 Export Workflow end-to-end by using a local MinIO bucket instead of S3. - - In this particular instance of the test, we assert no duplicates are exported to S3. - """ - prefix = f"posthog-events-{str(uuid4())}" - destination_data = { - "type": "S3", - "config": { - "bucket_name": "test-bucket", - "region": "us-east-1", - "prefix": prefix, - "aws_access_key_id": "object_storage_root_user", - "aws_secret_access_key": "object_storage_root_password", - "compression": "gzip", - }, - } - - batch_export_data = { - "name": "my-production-s3-bucket-destination", - "destination": destination_data, - "interval": "hour", - } - - organization = await acreate_organization("test") - team = await acreate_team(organization=organization) - batch_export = await acreate_batch_export( - team_id=team.pk, - name=batch_export_data["name"], - destination_data=batch_export_data["destination"], - interval=batch_export_data["interval"], - ) - +async def test_s3_export_workflow_handles_cancellation(team, batch_export): + """Test that S3 Export Workflow can gracefully handle cancellations when inserting S3 data.""" workflow_id = str(uuid4()) inputs = S3BatchExportInputs( team_id=team.pk, From 8bfd7dbcc3774d6841c9722ebeb7deda5e1455cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 19 Sep 2023 21:00:06 +0200 Subject: [PATCH 4/7] feat: Handle BigQuery BatchExport errors --- .../test_bigquery_batch_export_workflow.py | 153 +++++++++++++++++- .../workflows/bigquery_batch_export.py | 19 ++- 2 files changed, 165 insertions(+), 7 deletions(-) diff --git a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py index b0d45d55f4b45..ad6e511577f85 100644 --- a/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_bigquery_batch_export_workflow.py @@ -1,19 +1,25 @@ +import asyncio import datetime as dt import json +import os from random import randint from uuid import uuid4 -import os import pytest +import pytest_asyncio +from asgiref.sync import sync_to_async from django.conf import settings from freezegun.api import freeze_time from google.cloud import bigquery +from temporalio import activity +from temporalio.client import WorkflowFailureError from temporalio.common import RetryPolicy from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker from posthog.api.test.test_organization import acreate_organization from posthog.api.test.test_team import acreate_team +from posthog.temporal.client import connect from posthog.temporal.tests.batch_exports.base import ( EventValues, amaterialize, @@ -21,16 +27,17 @@ ) from posthog.temporal.tests.batch_exports.fixtures import ( acreate_batch_export, + adelete_batch_export, afetch_batch_export_runs, ) from posthog.temporal.workflows.base import create_export_run, update_export_run_status -from posthog.temporal.workflows.clickhouse import ClickHouseClient from posthog.temporal.workflows.bigquery_batch_export import ( BigQueryBatchExportInputs, BigQueryBatchExportWorkflow, BigQueryInsertInputs, insert_into_bigquery_activity, ) +from posthog.temporal.workflows.clickhouse import ClickHouseClient TEST_TIME = dt.datetime.utcnow() @@ -406,3 +413,145 @@ async def test_bigquery_export_workflow( events=events, bq_ingested_timestamp=ingested_timestamp, ) + + +@pytest_asyncio.fixture +async def organization(): + organization = await acreate_organization("test") + yield organization + await sync_to_async(organization.delete)() # type: ignore + + +@pytest_asyncio.fixture +async def team(organization): + team = await acreate_team(organization=organization) + yield team + await sync_to_async(team.delete)() # type: ignore + + +@pytest_asyncio.fixture +async def batch_export(team): + destination_data = { + "type": "BigQuery", + "config": { + "table_id": f"test_workflow_table_{team.pk}", + "project_id": "project_id", + "private_key": "private_key", + "private_key_id": "private_key_id", + "token_uri": "token_uri", + "client_email": "client_email", + "dataset_id": "BatchExports", + }, + } + batch_export_data = { + "name": "my-production-bigquery-export", + "destination": destination_data, + "interval": "hour", + } + + batch_export = await acreate_batch_export( + team_id=team.pk, + name=batch_export_data["name"], + destination_data=batch_export_data["destination"], + interval=batch_export_data["interval"], + ) + + yield batch_export + + client = await connect( + settings.TEMPORAL_HOST, + settings.TEMPORAL_PORT, + settings.TEMPORAL_NAMESPACE, + settings.TEMPORAL_CLIENT_ROOT_CA, + settings.TEMPORAL_CLIENT_CERT, + settings.TEMPORAL_CLIENT_KEY, + ) + await adelete_batch_export(batch_export, client) + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_bigquery_export_workflow_handles_insert_activity_errors(team, batch_export): + """Test that BigQuery Export Workflow can gracefully handle errors when inserting BigQuery data.""" + workflow_id = str(uuid4()) + inputs = BigQueryBatchExportInputs( + team_id=team.pk, + batch_export_id=str(batch_export.id), + data_interval_end="2023-04-25 14:30:00.000000", + **batch_export.destination.config, + ) + + @activity.defn(name="insert_into_bigquery_activity") + async def insert_into_bigquery_activity_mocked(_: BigQueryInsertInputs) -> str: + raise ValueError("A useful error message") + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[BigQueryBatchExportWorkflow], + activities=[create_export_run, insert_into_bigquery_activity_mocked, update_export_run_status], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + with pytest.raises(WorkflowFailureError): + await activity_environment.client.execute_workflow( + BigQueryBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + + runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Failed" + assert run.latest_error == "ValueError: A useful error message" + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_bigquery_export_workflow_handles_cancellation(team, batch_export): + """Test that BigQuery Export Workflow can gracefully handle cancellations when inserting BigQuery data.""" + workflow_id = str(uuid4()) + inputs = BigQueryBatchExportInputs( + team_id=team.pk, + batch_export_id=str(batch_export.id), + data_interval_end="2023-04-25 14:30:00.000000", + **batch_export.destination.config, + ) + + @activity.defn(name="insert_into_s3_activity") + async def never_finish_activity(_: BigQueryInsertInputs) -> str: + while True: + activity.heartbeat() + await asyncio.sleep(1) + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[BigQueryBatchExportWorkflow], + activities=[create_export_run, never_finish_activity, update_export_run_status], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + handle = await activity_environment.client.start_workflow( + BigQueryBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + await asyncio.sleep(5) + await handle.cancel() + + with pytest.raises(WorkflowFailureError): + await handle.result() + + runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Cancelled" + assert run.latest_error == "Cancelled" diff --git a/posthog/temporal/workflows/bigquery_batch_export.py b/posthog/temporal/workflows/bigquery_batch_export.py index f1f247a0672fc..4be09632ff12f 100644 --- a/posthog/temporal/workflows/bigquery_batch_export.py +++ b/posthog/temporal/workflows/bigquery_batch_export.py @@ -6,7 +6,7 @@ from django.conf import settings from google.cloud import bigquery from google.oauth2 import service_account -from temporalio import activity, workflow +from temporalio import activity, exceptions, workflow from temporalio.common import RetryPolicy from posthog.batch_exports.service import BigQueryBatchExportInputs @@ -253,12 +253,21 @@ async def run(self, inputs: BigQueryBatchExportInputs): ), ) + except exceptions.ActivityError as e: + if isinstance(e.cause, exceptions.CancelledError): + workflow.logger.exception("BigQuery BatchExport was cancelled.") + update_inputs.status = "Cancelled" + else: + workflow.logger.exception("BigQuery BatchExport failed.", exc_info=e) + update_inputs.status = "Failed" + + update_inputs.latest_error = str(e.cause) + raise + except Exception as e: - workflow.logger.exception("Bigquery BatchExport failed.", exc_info=e) + workflow.logger.exception("BigQuery BatchExport failed with an unexpected exception.", exc_info=e) update_inputs.status = "Failed" - # Note: This shallows the exception type, but the message should be enough. - # If not, swap to repr(e) - update_inputs.latest_error = str(e) + update_inputs.latest_error = "An unexpected error has ocurred" raise finally: From b9e95675baa10e361a3255c46220f10274e02470 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 19 Sep 2023 21:09:53 +0200 Subject: [PATCH 5/7] feat: Handle Postgres export errors --- .../test_postgres_batch_export_workflow.py | 139 ++++++++++++++++++ .../workflows/postgres_batch_export.py | 19 ++- 2 files changed, 153 insertions(+), 5 deletions(-) diff --git a/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py index 499acbd29502d..831a7e9308ba1 100644 --- a/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_postgres_batch_export_workflow.py @@ -1,3 +1,4 @@ +import asyncio import datetime as dt import json from random import randint @@ -5,15 +6,20 @@ import psycopg2 import pytest +import pytest_asyncio +from asgiref.sync import sync_to_async from django.conf import settings from django.test import override_settings from psycopg2 import sql +from temporalio import activity +from temporalio.client import WorkflowFailureError from temporalio.common import RetryPolicy from temporalio.testing import WorkflowEnvironment from temporalio.worker import UnsandboxedWorkflowRunner, Worker from posthog.api.test.test_organization import acreate_organization from posthog.api.test.test_team import acreate_team +from posthog.temporal.client import connect from posthog.temporal.tests.batch_exports.base import ( EventValues, amaterialize, @@ -21,6 +27,7 @@ ) from posthog.temporal.tests.batch_exports.fixtures import ( acreate_batch_export, + adelete_batch_export, afetch_batch_export_runs, ) from posthog.temporal.workflows.base import create_export_run, update_export_run_status @@ -439,3 +446,135 @@ async def test_postgres_export_workflow( assert run.status == "Completed" assert_events_in_postgres(postgres_connection, postgres_config["schema"], table_name, events) + + +@pytest_asyncio.fixture +async def organization(): + organization = await acreate_organization("test") + yield organization + await sync_to_async(organization.delete)() # type: ignore + + +@pytest_asyncio.fixture +async def team(organization): + team = await acreate_team(organization=organization) + yield team + await sync_to_async(team.delete)() # type: ignore + + +@pytest_asyncio.fixture +async def batch_export(team, postgres_config): + table_name = "test_workflow_table" + destination_data = {"type": "Postgres", "config": {**postgres_config, "table_name": table_name}} + batch_export_data = { + "name": "my-production-postgres-export", + "destination": destination_data, + "interval": "hour", + } + + batch_export = await acreate_batch_export( + team_id=team.pk, + name=batch_export_data["name"], + destination_data=batch_export_data["destination"], + interval=batch_export_data["interval"], + ) + + yield batch_export + + client = await connect( + settings.TEMPORAL_HOST, + settings.TEMPORAL_PORT, + settings.TEMPORAL_NAMESPACE, + settings.TEMPORAL_CLIENT_ROOT_CA, + settings.TEMPORAL_CLIENT_CERT, + settings.TEMPORAL_CLIENT_KEY, + ) + await adelete_batch_export(batch_export, client) + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_postgres_export_workflow_handles_insert_activity_errors(team, batch_export): + """Test that Postgres Export Workflow can gracefully handle errors when inserting Postgres data.""" + workflow_id = str(uuid4()) + inputs = PostgresBatchExportInputs( + team_id=team.pk, + batch_export_id=str(batch_export.id), + data_interval_end="2023-04-25 14:30:00.000000", + **batch_export.destination.config, + ) + + @activity.defn(name="insert_into_postgres_activity") + async def insert_into_postgres_activity_mocked(_: PostgresInsertInputs) -> str: + raise ValueError("A useful error message") + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[PostgresBatchExportWorkflow], + activities=[create_export_run, insert_into_postgres_activity_mocked, update_export_run_status], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + with pytest.raises(WorkflowFailureError): + await activity_environment.client.execute_workflow( + PostgresBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + + runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Failed" + assert run.latest_error == "ValueError: A useful error message" + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_postgres_export_workflow_handles_cancellation(team, batch_export): + """Test that Postgres Export Workflow can gracefully handle cancellations when inserting Postgres data.""" + workflow_id = str(uuid4()) + inputs = PostgresBatchExportInputs( + team_id=team.pk, + batch_export_id=str(batch_export.id), + data_interval_end="2023-04-25 14:30:00.000000", + **batch_export.destination.config, + ) + + @activity.defn(name="insert_into_postgres_activity") + async def never_finish_activity(_: PostgresInsertInputs) -> str: + while True: + activity.heartbeat() + await asyncio.sleep(1) + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[PostgresBatchExportWorkflow], + activities=[create_export_run, never_finish_activity, update_export_run_status], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + handle = await activity_environment.client.start_workflow( + PostgresBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + await asyncio.sleep(5) + await handle.cancel() + + with pytest.raises(WorkflowFailureError): + await handle.result() + + runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Cancelled" + assert run.latest_error == "Cancelled" diff --git a/posthog/temporal/workflows/postgres_batch_export.py b/posthog/temporal/workflows/postgres_batch_export.py index b81c7496b3adb..b077ac892d698 100644 --- a/posthog/temporal/workflows/postgres_batch_export.py +++ b/posthog/temporal/workflows/postgres_batch_export.py @@ -6,7 +6,7 @@ import psycopg2 from django.conf import settings from psycopg2 import sql -from temporalio import activity, workflow +from temporalio import activity, exceptions, workflow from temporalio.common import RetryPolicy from posthog.batch_exports.service import PostgresBatchExportInputs @@ -254,12 +254,21 @@ async def run(self, inputs: PostgresBatchExportInputs): ), ) + except exceptions.ActivityError as e: + if isinstance(e.cause, exceptions.CancelledError): + workflow.logger.exception("Postgres BatchExport was cancelled.") + update_inputs.status = "Cancelled" + else: + workflow.logger.exception("Postgres BatchExport failed.", exc_info=e) + update_inputs.status = "Failed" + + update_inputs.latest_error = str(e.cause) + raise + except Exception as e: - workflow.logger.exception("Postgres BatchExport failed.", exc_info=e) + workflow.logger.exception("Postgres BatchExport failed with an unexpected exception.", exc_info=e) update_inputs.status = "Failed" - # Note: This shallows the exception type, but the message should be enough. - # If not, swap to repr(e) - update_inputs.latest_error = str(e) + update_inputs.latest_error = "An unexpected error has ocurred" raise finally: From d1f709f67182093380f8c761d1f3282634e7f5fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 19 Sep 2023 21:19:36 +0200 Subject: [PATCH 6/7] feat: Handle Snowflake export errors --- .../test_snowflake_batch_export_workflow.py | 149 ++++++++++++++++++ .../workflows/snowflake_batch_export.py | 19 ++- 2 files changed, 163 insertions(+), 5 deletions(-) diff --git a/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py b/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py index 979929d1ce205..af82608baa8a9 100644 --- a/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py +++ b/posthog/temporal/tests/batch_exports/test_snowflake_batch_export_workflow.py @@ -1,3 +1,4 @@ +import asyncio import datetime as dt import gzip import json @@ -6,10 +7,13 @@ from uuid import uuid4 import pytest +import pytest_asyncio import responses +from asgiref.sync import sync_to_async from django.conf import settings from django.test import override_settings from requests.models import PreparedRequest +from temporalio import activity from temporalio.client import WorkflowFailureError from temporalio.common import RetryPolicy from temporalio.exceptions import ActivityError, ApplicationError @@ -18,6 +22,7 @@ from posthog.api.test.test_organization import acreate_organization from posthog.api.test.test_team import acreate_team +from posthog.temporal.client import connect from posthog.temporal.tests.batch_exports.base import ( EventValues, insert_events, @@ -25,6 +30,7 @@ ) from posthog.temporal.tests.batch_exports.fixtures import ( acreate_batch_export, + adelete_batch_export, afetch_batch_export_runs, ) from posthog.temporal.workflows.base import create_export_run, update_export_run_status @@ -32,6 +38,7 @@ from posthog.temporal.workflows.snowflake_batch_export import ( SnowflakeBatchExportInputs, SnowflakeBatchExportWorkflow, + SnowflakeInsertInputs, insert_into_snowflake_activity, ) @@ -645,3 +652,145 @@ async def test_snowflake_export_workflow_raises_error_on_copy_fail(): assert isinstance(err.__cause__, ActivityError) assert isinstance(err.__cause__.__cause__, ApplicationError) assert err.__cause__.__cause__.type == "SnowflakeFileNotLoadedError" + + +@pytest_asyncio.fixture +async def organization(): + organization = await acreate_organization("test") + yield organization + await sync_to_async(organization.delete)() # type: ignore + + +@pytest_asyncio.fixture +async def team(organization): + team = await acreate_team(organization=organization) + yield team + await sync_to_async(team.delete)() # type: ignore + + +@pytest_asyncio.fixture +async def batch_export(team): + destination_data = { + "type": "Snowflake", + "config": { + "user": "hazzadous", + "password": "password", + "account": "account", + "database": "PostHog", + "schema": "test", + "warehouse": "COMPUTE_WH", + "table_name": "events", + }, + } + batch_export_data = { + "name": "my-production-snowflake-export", + "destination": destination_data, + "interval": "hour", + } + + batch_export = await acreate_batch_export( + team_id=team.pk, + name=batch_export_data["name"], + destination_data=batch_export_data["destination"], + interval=batch_export_data["interval"], + ) + + yield batch_export + + client = await connect( + settings.TEMPORAL_HOST, + settings.TEMPORAL_PORT, + settings.TEMPORAL_NAMESPACE, + settings.TEMPORAL_CLIENT_ROOT_CA, + settings.TEMPORAL_CLIENT_CERT, + settings.TEMPORAL_CLIENT_KEY, + ) + await adelete_batch_export(batch_export, client) + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_snowflake_export_workflow_handles_insert_activity_errors(team, batch_export): + """Test that Snowflake Export Workflow can gracefully handle errors when inserting Snowflake data.""" + workflow_id = str(uuid4()) + inputs = SnowflakeBatchExportInputs( + team_id=team.pk, + batch_export_id=str(batch_export.id), + data_interval_end="2023-04-25 14:30:00.000000", + **batch_export.destination.config, + ) + + @activity.defn(name="insert_into_snowflake_activity") + async def insert_into_snowflake_activity_mocked(_: SnowflakeInsertInputs) -> str: + raise ValueError("A useful error message") + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[SnowflakeBatchExportWorkflow], + activities=[create_export_run, insert_into_snowflake_activity_mocked, update_export_run_status], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + with pytest.raises(WorkflowFailureError): + await activity_environment.client.execute_workflow( + SnowflakeBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + + runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Failed" + assert run.latest_error == "ValueError: A useful error message" + + +@pytest.mark.django_db +@pytest.mark.asyncio +async def test_snowflake_export_workflow_handles_cancellation(team, batch_export): + """Test that Snowflake Export Workflow can gracefully handle cancellations when inserting Snowflake data.""" + workflow_id = str(uuid4()) + inputs = SnowflakeBatchExportInputs( + team_id=team.pk, + batch_export_id=str(batch_export.id), + data_interval_end="2023-04-25 14:30:00.000000", + **batch_export.destination.config, + ) + + @activity.defn(name="insert_into_snowflake_activity") + async def never_finish_activity(_: SnowflakeInsertInputs) -> str: + while True: + activity.heartbeat() + await asyncio.sleep(1) + + async with await WorkflowEnvironment.start_time_skipping() as activity_environment: + async with Worker( + activity_environment.client, + task_queue=settings.TEMPORAL_TASK_QUEUE, + workflows=[SnowflakeBatchExportWorkflow], + activities=[create_export_run, never_finish_activity, update_export_run_status], + workflow_runner=UnsandboxedWorkflowRunner(), + ): + handle = await activity_environment.client.start_workflow( + SnowflakeBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + retry_policy=RetryPolicy(maximum_attempts=1), + ) + await asyncio.sleep(5) + await handle.cancel() + + with pytest.raises(WorkflowFailureError): + await handle.result() + + runs = await afetch_batch_export_runs(batch_export_id=batch_export.id) + assert len(runs) == 1 + + run = runs[0] + assert run.status == "Cancelled" + assert run.latest_error == "Cancelled" diff --git a/posthog/temporal/workflows/snowflake_batch_export.py b/posthog/temporal/workflows/snowflake_batch_export.py index 558e4bda7df75..a38f15d7aab73 100644 --- a/posthog/temporal/workflows/snowflake_batch_export.py +++ b/posthog/temporal/workflows/snowflake_batch_export.py @@ -6,7 +6,7 @@ import snowflake.connector from django.conf import settings from snowflake.connector.cursor import SnowflakeCursor -from temporalio import activity, workflow +from temporalio import activity, exceptions, workflow from temporalio.common import RetryPolicy from posthog.batch_exports.service import SnowflakeBatchExportInputs @@ -342,12 +342,21 @@ async def run(self, inputs: SnowflakeBatchExportInputs): ), ) + except exceptions.ActivityError as e: + if isinstance(e.cause, exceptions.CancelledError): + workflow.logger.exception("Snowflake BatchExport was cancelled.") + update_inputs.status = "Cancelled" + else: + workflow.logger.exception("Snowflake BatchExport failed.", exc_info=e) + update_inputs.status = "Failed" + + update_inputs.latest_error = str(e.cause) + raise + except Exception as e: - workflow.logger.exception("Snowflake BatchExport failed.", exc_info=e) + workflow.logger.exception("Snowflake BatchExport failed with an unexpected exception.", exc_info=e) update_inputs.status = "Failed" - # Note: This shallows the exception type, but the message should be enough. - # If not, swap to repr(e) - update_inputs.latest_error = str(e) + update_inputs.latest_error = "An unexpected error has ocurred" raise finally: From bd0561d48fc7dffc6aab9221c265136a869b2f74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Wed, 20 Sep 2023 15:57:33 +0200 Subject: [PATCH 7/7] fix: Revert frontend changes to show single error --- frontend/src/scenes/batch_exports/BatchExportScene.tsx | 8 -------- frontend/src/scenes/batch_exports/batchExportLogic.ts | 4 ---- frontend/src/types.ts | 3 --- 3 files changed, 15 deletions(-) diff --git a/frontend/src/scenes/batch_exports/BatchExportScene.tsx b/frontend/src/scenes/batch_exports/BatchExportScene.tsx index e65e1f1bfe849..609d1103bec3c 100644 --- a/frontend/src/scenes/batch_exports/BatchExportScene.tsx +++ b/frontend/src/scenes/batch_exports/BatchExportScene.tsx @@ -262,13 +262,6 @@ export function BatchExportScene(): JSX.Element { tooltip: 'Date and time when this BatchExport run started', render: (_, run) => , }, - - { - title: 'Error message', - tooltip: 'Error message if this run failed', - key: 'message', - dataIndex: 'latest_error', - }, ]} /> ) @@ -319,7 +312,6 @@ export function BatchExportScene(): JSX.Element { return }, }, - { // title: 'Actions', key: 'actions', diff --git a/frontend/src/scenes/batch_exports/batchExportLogic.ts b/frontend/src/scenes/batch_exports/batchExportLogic.ts index 5e06601acf8dc..d8a07ffaacb2e 100644 --- a/frontend/src/scenes/batch_exports/batchExportLogic.ts +++ b/frontend/src/scenes/batch_exports/batchExportLogic.ts @@ -201,16 +201,12 @@ export const batchExportLogic = kea([ data_interval_end: run.data_interval_end, runs: [], last_run_at: run.created_at, - latest_error: run.latest_error, - status: run.status, } } groupedRuns[key].runs.push(run) groupedRuns[key].runs.sort((a, b) => b.created_at.diff(a.created_at)) groupedRuns[key].last_run_at = groupedRuns[key].runs[0].created_at - groupedRuns[key].latest_error = groupedRuns[key].runs[0].latest_error - groupedRuns[key].status = groupedRuns[key].runs[0].status }) return Object.values(groupedRuns).sort((a, b) => b.data_interval_end.diff(a.data_interval_end)) diff --git a/frontend/src/types.ts b/frontend/src/types.ts index b8f2c220f56e0..32e0e2862ca3f 100644 --- a/frontend/src/types.ts +++ b/frontend/src/types.ts @@ -3171,7 +3171,6 @@ export type BatchExportRun = { data_interval_start: Dayjs data_interval_end: Dayjs last_updated_at?: Dayjs - latest_error: string | null } export type GroupedBatchExportRuns = { @@ -3179,8 +3178,6 @@ export type GroupedBatchExportRuns = { data_interval_start: Dayjs data_interval_end: Dayjs runs: BatchExportRun[] - latest_error: string | null - status: string } export type SDK = {