Skip to content

Commit

Permalink
feat: Record S3 BatchExport errors (#17535)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias authored Sep 22, 2023
1 parent 0427aee commit 8bf729f
Show file tree
Hide file tree
Showing 9 changed files with 660 additions and 21 deletions.
16 changes: 15 additions & 1 deletion posthog/temporal/tests/batch_exports/fixtures.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from uuid import UUID

from asgiref.sync import sync_to_async
from temporalio.client import Client

from posthog.batch_exports.models import BatchExport, BatchExportDestination, BatchExportRun
from posthog.batch_exports.models import (
BatchExport,
BatchExportDestination,
BatchExportRun,
)
from posthog.batch_exports.service import sync_batch_export


Expand Down Expand Up @@ -32,3 +38,11 @@ def fetch_batch_export_runs(batch_export_id: UUID, limit: int = 100) -> list[Bat
async def afetch_batch_export_runs(batch_export_id: UUID, limit: int = 100) -> list[BatchExportRun]:
"""Fetch the BatchExportRuns for a given BatchExport."""
return await sync_to_async(fetch_batch_export_runs)(batch_export_id, limit) # type: ignore


async def adelete_batch_export(batch_export: BatchExport, temporal: Client) -> None:
"""Delete a BatchExport and its underlying Schedule."""
handle = temporal.get_schedule_handle(str(batch_export.id))
await handle.delete()

await sync_to_async(batch_export.delete)() # type: ignore
Original file line number Diff line number Diff line change
@@ -1,36 +1,43 @@
import asyncio
import datetime as dt
import json
import os
from random import randint
from uuid import uuid4
import os

import pytest
import pytest_asyncio
from asgiref.sync import sync_to_async
from django.conf import settings
from freezegun.api import freeze_time
from google.cloud import bigquery
from temporalio import activity
from temporalio.client import WorkflowFailureError
from temporalio.common import RetryPolicy
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import UnsandboxedWorkflowRunner, Worker

from posthog.api.test.test_organization import acreate_organization
from posthog.api.test.test_team import acreate_team
from posthog.temporal.client import connect
from posthog.temporal.tests.batch_exports.base import (
EventValues,
amaterialize,
insert_events,
)
from posthog.temporal.tests.batch_exports.fixtures import (
acreate_batch_export,
adelete_batch_export,
afetch_batch_export_runs,
)
from posthog.temporal.workflows.base import create_export_run, update_export_run_status
from posthog.temporal.workflows.clickhouse import ClickHouseClient
from posthog.temporal.workflows.bigquery_batch_export import (
BigQueryBatchExportInputs,
BigQueryBatchExportWorkflow,
BigQueryInsertInputs,
insert_into_bigquery_activity,
)
from posthog.temporal.workflows.clickhouse import ClickHouseClient

TEST_TIME = dt.datetime.utcnow()

Expand Down Expand Up @@ -406,3 +413,145 @@ async def test_bigquery_export_workflow(
events=events,
bq_ingested_timestamp=ingested_timestamp,
)


@pytest_asyncio.fixture
async def organization():
organization = await acreate_organization("test")
yield organization
await sync_to_async(organization.delete)() # type: ignore


@pytest_asyncio.fixture
async def team(organization):
team = await acreate_team(organization=organization)
yield team
await sync_to_async(team.delete)() # type: ignore


@pytest_asyncio.fixture
async def batch_export(team):
destination_data = {
"type": "BigQuery",
"config": {
"table_id": f"test_workflow_table_{team.pk}",
"project_id": "project_id",
"private_key": "private_key",
"private_key_id": "private_key_id",
"token_uri": "token_uri",
"client_email": "client_email",
"dataset_id": "BatchExports",
},
}
batch_export_data = {
"name": "my-production-bigquery-export",
"destination": destination_data,
"interval": "hour",
}

batch_export = await acreate_batch_export(
team_id=team.pk,
name=batch_export_data["name"],
destination_data=batch_export_data["destination"],
interval=batch_export_data["interval"],
)

yield batch_export

client = await connect(
settings.TEMPORAL_HOST,
settings.TEMPORAL_PORT,
settings.TEMPORAL_NAMESPACE,
settings.TEMPORAL_CLIENT_ROOT_CA,
settings.TEMPORAL_CLIENT_CERT,
settings.TEMPORAL_CLIENT_KEY,
)
await adelete_batch_export(batch_export, client)


@pytest.mark.django_db
@pytest.mark.asyncio
async def test_bigquery_export_workflow_handles_insert_activity_errors(team, batch_export):
"""Test that BigQuery Export Workflow can gracefully handle errors when inserting BigQuery data."""
workflow_id = str(uuid4())
inputs = BigQueryBatchExportInputs(
team_id=team.pk,
batch_export_id=str(batch_export.id),
data_interval_end="2023-04-25 14:30:00.000000",
**batch_export.destination.config,
)

@activity.defn(name="insert_into_bigquery_activity")
async def insert_into_bigquery_activity_mocked(_: BigQueryInsertInputs) -> str:
raise ValueError("A useful error message")

async with await WorkflowEnvironment.start_time_skipping() as activity_environment:
async with Worker(
activity_environment.client,
task_queue=settings.TEMPORAL_TASK_QUEUE,
workflows=[BigQueryBatchExportWorkflow],
activities=[create_export_run, insert_into_bigquery_activity_mocked, update_export_run_status],
workflow_runner=UnsandboxedWorkflowRunner(),
):
with pytest.raises(WorkflowFailureError):
await activity_environment.client.execute_workflow(
BigQueryBatchExportWorkflow.run,
inputs,
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=1),
)

runs = await afetch_batch_export_runs(batch_export_id=batch_export.id)
assert len(runs) == 1

run = runs[0]
assert run.status == "Failed"
assert run.latest_error == "ValueError: A useful error message"


@pytest.mark.django_db
@pytest.mark.asyncio
async def test_bigquery_export_workflow_handles_cancellation(team, batch_export):
"""Test that BigQuery Export Workflow can gracefully handle cancellations when inserting BigQuery data."""
workflow_id = str(uuid4())
inputs = BigQueryBatchExportInputs(
team_id=team.pk,
batch_export_id=str(batch_export.id),
data_interval_end="2023-04-25 14:30:00.000000",
**batch_export.destination.config,
)

@activity.defn(name="insert_into_s3_activity")
async def never_finish_activity(_: BigQueryInsertInputs) -> str:
while True:
activity.heartbeat()
await asyncio.sleep(1)

async with await WorkflowEnvironment.start_time_skipping() as activity_environment:
async with Worker(
activity_environment.client,
task_queue=settings.TEMPORAL_TASK_QUEUE,
workflows=[BigQueryBatchExportWorkflow],
activities=[create_export_run, never_finish_activity, update_export_run_status],
workflow_runner=UnsandboxedWorkflowRunner(),
):
handle = await activity_environment.client.start_workflow(
BigQueryBatchExportWorkflow.run,
inputs,
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=1),
)
await asyncio.sleep(5)
await handle.cancel()

with pytest.raises(WorkflowFailureError):
await handle.result()

runs = await afetch_batch_export_runs(batch_export_id=batch_export.id)
assert len(runs) == 1

run = runs[0]
assert run.status == "Cancelled"
assert run.latest_error == "Cancelled"
Original file line number Diff line number Diff line change
@@ -1,26 +1,33 @@
import asyncio
import datetime as dt
import json
from random import randint
from uuid import uuid4

import psycopg2
import pytest
import pytest_asyncio
from asgiref.sync import sync_to_async
from django.conf import settings
from django.test import override_settings
from psycopg2 import sql
from temporalio import activity
from temporalio.client import WorkflowFailureError
from temporalio.common import RetryPolicy
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import UnsandboxedWorkflowRunner, Worker

from posthog.api.test.test_organization import acreate_organization
from posthog.api.test.test_team import acreate_team
from posthog.temporal.client import connect
from posthog.temporal.tests.batch_exports.base import (
EventValues,
amaterialize,
insert_events,
)
from posthog.temporal.tests.batch_exports.fixtures import (
acreate_batch_export,
adelete_batch_export,
afetch_batch_export_runs,
)
from posthog.temporal.workflows.base import create_export_run, update_export_run_status
Expand Down Expand Up @@ -439,3 +446,135 @@ async def test_postgres_export_workflow(
assert run.status == "Completed"

assert_events_in_postgres(postgres_connection, postgres_config["schema"], table_name, events)


@pytest_asyncio.fixture
async def organization():
organization = await acreate_organization("test")
yield organization
await sync_to_async(organization.delete)() # type: ignore


@pytest_asyncio.fixture
async def team(organization):
team = await acreate_team(organization=organization)
yield team
await sync_to_async(team.delete)() # type: ignore


@pytest_asyncio.fixture
async def batch_export(team, postgres_config):
table_name = "test_workflow_table"
destination_data = {"type": "Postgres", "config": {**postgres_config, "table_name": table_name}}
batch_export_data = {
"name": "my-production-postgres-export",
"destination": destination_data,
"interval": "hour",
}

batch_export = await acreate_batch_export(
team_id=team.pk,
name=batch_export_data["name"],
destination_data=batch_export_data["destination"],
interval=batch_export_data["interval"],
)

yield batch_export

client = await connect(
settings.TEMPORAL_HOST,
settings.TEMPORAL_PORT,
settings.TEMPORAL_NAMESPACE,
settings.TEMPORAL_CLIENT_ROOT_CA,
settings.TEMPORAL_CLIENT_CERT,
settings.TEMPORAL_CLIENT_KEY,
)
await adelete_batch_export(batch_export, client)


@pytest.mark.django_db
@pytest.mark.asyncio
async def test_postgres_export_workflow_handles_insert_activity_errors(team, batch_export):
"""Test that Postgres Export Workflow can gracefully handle errors when inserting Postgres data."""
workflow_id = str(uuid4())
inputs = PostgresBatchExportInputs(
team_id=team.pk,
batch_export_id=str(batch_export.id),
data_interval_end="2023-04-25 14:30:00.000000",
**batch_export.destination.config,
)

@activity.defn(name="insert_into_postgres_activity")
async def insert_into_postgres_activity_mocked(_: PostgresInsertInputs) -> str:
raise ValueError("A useful error message")

async with await WorkflowEnvironment.start_time_skipping() as activity_environment:
async with Worker(
activity_environment.client,
task_queue=settings.TEMPORAL_TASK_QUEUE,
workflows=[PostgresBatchExportWorkflow],
activities=[create_export_run, insert_into_postgres_activity_mocked, update_export_run_status],
workflow_runner=UnsandboxedWorkflowRunner(),
):
with pytest.raises(WorkflowFailureError):
await activity_environment.client.execute_workflow(
PostgresBatchExportWorkflow.run,
inputs,
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=1),
)

runs = await afetch_batch_export_runs(batch_export_id=batch_export.id)
assert len(runs) == 1

run = runs[0]
assert run.status == "Failed"
assert run.latest_error == "ValueError: A useful error message"


@pytest.mark.django_db
@pytest.mark.asyncio
async def test_postgres_export_workflow_handles_cancellation(team, batch_export):
"""Test that Postgres Export Workflow can gracefully handle cancellations when inserting Postgres data."""
workflow_id = str(uuid4())
inputs = PostgresBatchExportInputs(
team_id=team.pk,
batch_export_id=str(batch_export.id),
data_interval_end="2023-04-25 14:30:00.000000",
**batch_export.destination.config,
)

@activity.defn(name="insert_into_postgres_activity")
async def never_finish_activity(_: PostgresInsertInputs) -> str:
while True:
activity.heartbeat()
await asyncio.sleep(1)

async with await WorkflowEnvironment.start_time_skipping() as activity_environment:
async with Worker(
activity_environment.client,
task_queue=settings.TEMPORAL_TASK_QUEUE,
workflows=[PostgresBatchExportWorkflow],
activities=[create_export_run, never_finish_activity, update_export_run_status],
workflow_runner=UnsandboxedWorkflowRunner(),
):
handle = await activity_environment.client.start_workflow(
PostgresBatchExportWorkflow.run,
inputs,
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
retry_policy=RetryPolicy(maximum_attempts=1),
)
await asyncio.sleep(5)
await handle.cancel()

with pytest.raises(WorkflowFailureError):
await handle.result()

runs = await afetch_batch_export_runs(batch_export_id=batch_export.id)
assert len(runs) == 1

run = runs[0]
assert run.status == "Cancelled"
assert run.latest_error == "Cancelled"
Loading

0 comments on commit 8bf729f

Please sign in to comment.