Skip to content

Commit

Permalink
fix: Ignore schedule not found when deleting batch export (#18779)
Browse files Browse the repository at this point in the history
* fix: Ignore schedule not found when deleting batch export

* test: Add a test case
  • Loading branch information
tomasfarias authored Nov 22, 2023
1 parent 5adc82f commit a8df13e
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 4 deletions.
45 changes: 45 additions & 0 deletions posthog/api/test/batch_exports/test_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,3 +241,48 @@ def test_deletes_are_partitioned_by_team_id(client: HttpClient):
# Make sure we can still get the export with the right user
response = get_batch_export(client, team.pk, batch_export_id)
assert response.status_code == status.HTTP_200_OK


@pytest.mark.django_db(transaction=True)
def test_delete_batch_export_even_without_underlying_schedule(client: HttpClient):
"""Test deleting a BatchExport completes even if underlying Schedule was already deleted."""
temporal = sync_connect()

destination_data = {
"type": "S3",
"config": {
"bucket_name": "my-production-s3-bucket",
"region": "us-east-1",
"prefix": "posthog-events/",
"aws_access_key_id": "abc123",
"aws_secret_access_key": "secret",
},
}
batch_export_data = {
"name": "my-production-s3-bucket-destination",
"destination": destination_data,
"interval": "hour",
}

organization = create_organization("Test Org")
team = create_team(organization)
user = create_user("[email protected]", "Test User", organization)
client.force_login(user)

with start_test_worker(temporal):
batch_export = create_batch_export_ok(client, team.pk, batch_export_data)
batch_export_id = batch_export["id"]

handle = temporal.get_schedule_handle(batch_export_id)
async_to_sync(handle.delete)()

with pytest.raises(RPCError):
describe_schedule(temporal, batch_export_id)

delete_batch_export_ok(client, team.pk, batch_export_id)

response = get_batch_export(client, team.pk, batch_export_id)
assert response.status_code == status.HTTP_404_NOT_FOUND

with pytest.raises(RPCError):
describe_schedule(temporal, batch_export_id)
22 changes: 19 additions & 3 deletions posthog/batch_exports/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Any

import posthoganalytics
import structlog
from django.db import transaction
from django.utils.timezone import now
from rest_framework import mixins, request, response, serializers, viewsets
Expand All @@ -27,6 +28,7 @@
BatchExportIdError,
BatchExportServiceError,
BatchExportServiceRPCError,
BatchExportServiceScheduleNotFound,
backfill_export,
cancel_running_batch_export_backfill,
delete_schedule,
Expand All @@ -49,6 +51,8 @@
from posthog.temporal.client import sync_connect
from posthog.utils import relative_date_parse

logger = structlog.get_logger(__name__)


def validate_date_input(date_input: Any) -> dt.datetime:
"""Parse any datetime input as a proper dt.datetime.
Expand Down Expand Up @@ -320,10 +324,22 @@ def unpause(self, request: request.Request, *args, **kwargs) -> response.Respons
return response.Response({"paused": False})

def perform_destroy(self, instance: BatchExport):
"""Perform a BatchExport destroy by clearing Temporal and Django state."""
instance.deleted = True
"""Perform a BatchExport destroy by clearing Temporal and Django state.
If the underlying Temporal Schedule doesn't exist, we ignore the error and proceed with the delete anyways.
The Schedule could have been manually deleted causing Django and Temporal to go out of sync. For whatever reason,
since we are deleting, we assume that we can recover from this state by finishing the delete operation by calling
instance.save().
"""
temporal = sync_connect()
delete_schedule(temporal, str(instance.pk))

instance.deleted = True

try:
delete_schedule(temporal, str(instance.pk))
except BatchExportServiceScheduleNotFound as e:
logger.warning("The Schedule %s could not be deleted as it was not found", e.schedule_id)

instance.save()

for backfill in BatchExportBackfill.objects.filter(batch_export=instance):
Expand Down
18 changes: 17 additions & 1 deletion posthog/batch_exports/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from dataclasses import asdict, dataclass, fields
from uuid import UUID

import temporalio
from asgiref.sync import async_to_sync
from temporalio.client import (
Client,
Expand Down Expand Up @@ -163,6 +164,14 @@ class BatchExportServiceRPCError(BatchExportServiceError):
"""Exception raised when the underlying Temporal RPC fails."""


class BatchExportServiceScheduleNotFound(BatchExportServiceRPCError):
"""Exception raised when the underlying Temporal RPC fails because a schedule was not found."""

def __init__(self, schedule_id: str):
self.schedule_id = schedule_id
super().__init__(f"The Temporal Schedule {schedule_id} was not found (maybe it was deleted?)")


def pause_batch_export(temporal: Client, batch_export_id: str, note: str | None = None) -> None:
"""Pause this BatchExport.
Expand Down Expand Up @@ -250,7 +259,14 @@ async def unpause_schedule(temporal: Client, schedule_id: str, note: str | None
async def delete_schedule(temporal: Client, schedule_id: str) -> None:
"""Delete a Temporal Schedule."""
handle = temporal.get_schedule_handle(schedule_id)
await handle.delete()

try:
await handle.delete()
except temporalio.service.RPCError as e:
if e.status == temporalio.service.RPCStatusCode.NOT_FOUND:
raise BatchExportServiceScheduleNotFound(schedule_id)
else:
raise BatchExportServiceRPCError() from e


@async_to_sync
Expand Down

0 comments on commit a8df13e

Please sign in to comment.