diff --git a/posthog/api/test/batch_exports/test_delete.py b/posthog/api/test/batch_exports/test_delete.py index 20375cecbb768..cc07ed4675151 100644 --- a/posthog/api/test/batch_exports/test_delete.py +++ b/posthog/api/test/batch_exports/test_delete.py @@ -241,3 +241,48 @@ def test_deletes_are_partitioned_by_team_id(client: HttpClient): # Make sure we can still get the export with the right user response = get_batch_export(client, team.pk, batch_export_id) assert response.status_code == status.HTTP_200_OK + + +@pytest.mark.django_db(transaction=True) +def test_delete_batch_export_even_without_underlying_schedule(client: HttpClient): + """Test deleting a BatchExport completes even if underlying Schedule was already deleted.""" + temporal = sync_connect() + + destination_data = { + "type": "S3", + "config": { + "bucket_name": "my-production-s3-bucket", + "region": "us-east-1", + "prefix": "posthog-events/", + "aws_access_key_id": "abc123", + "aws_secret_access_key": "secret", + }, + } + batch_export_data = { + "name": "my-production-s3-bucket-destination", + "destination": destination_data, + "interval": "hour", + } + + organization = create_organization("Test Org") + team = create_team(organization) + user = create_user("test@user.com", "Test User", organization) + client.force_login(user) + + with start_test_worker(temporal): + batch_export = create_batch_export_ok(client, team.pk, batch_export_data) + batch_export_id = batch_export["id"] + + handle = temporal.get_schedule_handle(batch_export_id) + async_to_sync(handle.delete)() + + with pytest.raises(RPCError): + describe_schedule(temporal, batch_export_id) + + delete_batch_export_ok(client, team.pk, batch_export_id) + + response = get_batch_export(client, team.pk, batch_export_id) + assert response.status_code == status.HTTP_404_NOT_FOUND + + with pytest.raises(RPCError): + describe_schedule(temporal, batch_export_id) diff --git a/posthog/batch_exports/http.py b/posthog/batch_exports/http.py index 8d6005ec663f8..cef17ab628f32 100644 --- a/posthog/batch_exports/http.py +++ b/posthog/batch_exports/http.py @@ -2,6 +2,7 @@ from typing import Any import posthoganalytics +import structlog from django.db import transaction from django.utils.timezone import now from rest_framework import mixins, request, response, serializers, viewsets @@ -27,6 +28,7 @@ BatchExportIdError, BatchExportServiceError, BatchExportServiceRPCError, + BatchExportServiceScheduleNotFound, backfill_export, cancel_running_batch_export_backfill, delete_schedule, @@ -49,6 +51,8 @@ from posthog.temporal.client import sync_connect from posthog.utils import relative_date_parse +logger = structlog.get_logger(__name__) + def validate_date_input(date_input: Any) -> dt.datetime: """Parse any datetime input as a proper dt.datetime. @@ -320,10 +324,22 @@ def unpause(self, request: request.Request, *args, **kwargs) -> response.Respons return response.Response({"paused": False}) def perform_destroy(self, instance: BatchExport): - """Perform a BatchExport destroy by clearing Temporal and Django state.""" - instance.deleted = True + """Perform a BatchExport destroy by clearing Temporal and Django state. + + If the underlying Temporal Schedule doesn't exist, we ignore the error and proceed with the delete anyways. + The Schedule could have been manually deleted causing Django and Temporal to go out of sync. For whatever reason, + since we are deleting, we assume that we can recover from this state by finishing the delete operation by calling + instance.save(). + """ temporal = sync_connect() - delete_schedule(temporal, str(instance.pk)) + + instance.deleted = True + + try: + delete_schedule(temporal, str(instance.pk)) + except BatchExportServiceScheduleNotFound as e: + logger.warning("The Schedule %s could not be deleted as it was not found", e.schedule_id) + instance.save() for backfill in BatchExportBackfill.objects.filter(batch_export=instance): diff --git a/posthog/batch_exports/service.py b/posthog/batch_exports/service.py index fc74d6f51f253..38cecda263aaa 100644 --- a/posthog/batch_exports/service.py +++ b/posthog/batch_exports/service.py @@ -3,6 +3,7 @@ from dataclasses import asdict, dataclass, fields from uuid import UUID +import temporalio from asgiref.sync import async_to_sync from temporalio.client import ( Client, @@ -163,6 +164,14 @@ class BatchExportServiceRPCError(BatchExportServiceError): """Exception raised when the underlying Temporal RPC fails.""" +class BatchExportServiceScheduleNotFound(BatchExportServiceRPCError): + """Exception raised when the underlying Temporal RPC fails because a schedule was not found.""" + + def __init__(self, schedule_id: str): + self.schedule_id = schedule_id + super().__init__(f"The Temporal Schedule {schedule_id} was not found (maybe it was deleted?)") + + def pause_batch_export(temporal: Client, batch_export_id: str, note: str | None = None) -> None: """Pause this BatchExport. @@ -250,7 +259,14 @@ async def unpause_schedule(temporal: Client, schedule_id: str, note: str | None async def delete_schedule(temporal: Client, schedule_id: str) -> None: """Delete a Temporal Schedule.""" handle = temporal.get_schedule_handle(schedule_id) - await handle.delete() + + try: + await handle.delete() + except temporalio.service.RPCError as e: + if e.status == temporalio.service.RPCStatusCode.NOT_FOUND: + raise BatchExportServiceScheduleNotFound(schedule_id) + else: + raise BatchExportServiceRPCError() from e @async_to_sync