diff --git a/posthog/temporal/tests/batch_exports/test_backfill_batch_export.py b/posthog/temporal/tests/batch_exports/test_backfill_batch_export.py index 105f331995d2f0..0625ef2586b49c 100644 --- a/posthog/temporal/tests/batch_exports/test_backfill_batch_export.py +++ b/posthog/temporal/tests/batch_exports/test_backfill_batch_export.py @@ -6,6 +6,7 @@ import temporalio import temporalio.client import temporalio.common +import temporalio.exceptions import temporalio.testing import temporalio.worker from django.conf import settings @@ -22,6 +23,7 @@ backfill_range, backfill_schedule, get_schedule_frequency, + wait_for_schedule_backfill_in_range, ) pytestmark = [pytest.mark.asyncio] @@ -169,7 +171,7 @@ async def test_backfill_schedule_activity(activity_environment, temporal_worker, @pytest.mark.django_db(transaction=True) async def test_backfill_batch_export_workflow(temporal_worker, temporal_schedule, temporal_client, team): - """Test BackfillBatchExportWorkflow executes all backfill runs and updates model.""" + """Test BackfillBatchExportWorkflow fails when its underlying Temporal Schedule is deleted.""" start_at = dt.datetime(2023, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc) end_at = dt.datetime(2023, 1, 1, 0, 10, 0, tzinfo=dt.timezone.utc) @@ -207,3 +209,61 @@ async def test_backfill_batch_export_workflow(temporal_worker, temporal_schedule backfill = backfills.pop() assert backfill.status == "Completed" + + +@pytest.mark.django_db(transaction=True) +async def test_backfill_batch_export_workflow_fails_when_schedule_deleted_after_running( + temporal_worker, temporal_schedule, temporal_client, team +): + """Test BackfillBatchExportWorkflow fails when its underlying Temporal Schedule is deleted. + + In this test, in contrats to the previous one, we wait until we have started running some + backfill runs before cancelling. + """ + start_at = dt.datetime(2023, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc) + end_at = dt.datetime(2023, 1, 1, 0, 10, 0, tzinfo=dt.timezone.utc) + now = dt.datetime.utcnow() + + desc = await temporal_schedule.describe() + + workflow_id = str(uuid.uuid4()) + inputs = BackfillBatchExportInputs( + team_id=team.pk, + batch_export_id=desc.id, + start_at=start_at.isoformat(), + end_at=end_at.isoformat(), + buffer_limit=1, + wait_delay=2.0, + ) + + handle = await temporal_client.start_workflow( + BackfillBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + execution_timeout=dt.timedelta(seconds=20), + retry_policy=temporalio.common.RetryPolicy(maximum_attempts=1), + ) + await wait_for_schedule_backfill_in_range( + client=temporal_client, + schedule_id=desc.id, + start_at=start_at, + end_at=dt.datetime(2023, 1, 1, 0, 1, 0, tzinfo=dt.timezone.utc), + now=now, + wait_delay=1.0, + ) + + desc = await temporal_schedule.describe() + result = desc.info.num_actions + + assert result >= 1 + + await temporal_schedule.delete() + + with pytest.raises(temporalio.client.WorkflowFailureError) as exc_info: + await handle.result() + + err = exc_info.value + assert isinstance(err.__cause__, temporalio.exceptions.ActivityError) + assert isinstance(err.__cause__.__cause__, temporalio.exceptions.ApplicationError) + assert err.__cause__.__cause__.type == "TemporalScheduleDeletedError" diff --git a/posthog/temporal/tests/utils/models.py b/posthog/temporal/tests/utils/models.py index 01286c7c373f33..1ceb4f2cb9bb01 100644 --- a/posthog/temporal/tests/utils/models.py +++ b/posthog/temporal/tests/utils/models.py @@ -30,7 +30,12 @@ async def acreate_batch_export(team_id: int, interval: str, name: str, destinati async def adelete_batch_export(batch_export: BatchExport, temporal_client: temporalio.client.Client) -> None: """Async delete a BatchExport and its underlying Schedule.""" handle = temporal_client.get_schedule_handle(str(batch_export.id)) - await handle.delete() + + try: + await handle.delete() + except temporalio.service.RPCError: + # This means the schedule was already deleted, so we can continue + pass await sync_to_async(batch_export.delete)() # type: ignore diff --git a/posthog/temporal/workflows/backfill_batch_export.py b/posthog/temporal/workflows/backfill_batch_export.py index 53201e3bf9bc0b..e94d5e0c074088 100644 --- a/posthog/temporal/workflows/backfill_batch_export.py +++ b/posthog/temporal/workflows/backfill_batch_export.py @@ -25,6 +25,13 @@ from posthog.temporal.workflows.logger import bind_batch_exports_logger +class TemporalScheduleDeletedError(Exception): + """Exception raised when a Temporal Schedule was deleted during backfill.""" + + def __init__(self, schedule_id: str): + super().__init__(f"The Temporal Schedule {schedule_id} was deleted during backfill") + + class HeartbeatDetails(typing.NamedTuple): """Details sent over in a Temporal Activity heartbeat.""" @@ -77,7 +84,11 @@ async def get_schedule_frequency(schedule_id: str) -> float: ) handle = client.get_schedule_handle(schedule_id) - desc = await handle.describe() + + try: + desc = await handle.describe() + except temporalio.service.RPCError: + raise TemporalScheduleDeletedError(schedule_id) interval = desc.schedule.spec.intervals[0] return interval.every.total_seconds() @@ -208,7 +219,13 @@ async def wait_for_schedule_backfill_in_range( execution start time, assuming that backfill runs will have started recently after 'now' whereas regularly scheduled runs happened sometime in the past, before 'now'. This should hold true for historical backfills, but the heuristic fails for "future backfills", which should not be allowed. + + Raises: + TemporalScheduleDeletedError: If we detect the Temporal Schedule we are waiting on doesn't exist anymore. """ + if await check_temporal_schedule_exists(client, schedule_id) is False: + raise TemporalScheduleDeletedError(schedule_id) + query = ( f'TemporalScheduledById="{schedule_id}" ' f'AND TemporalScheduledStartTime >= "{start_at.isoformat()}" ' @@ -234,6 +251,9 @@ async def wait_for_schedule_backfill_in_range( if check_workflow_executions_not_running(workflows) is False: continue + if await check_temporal_schedule_exists(client, schedule_id) is False: + raise TemporalScheduleDeletedError(schedule_id) + done = True @@ -245,6 +265,17 @@ def check_workflow_executions_not_running(workflow_executions: list[temporalio.c ) +async def check_temporal_schedule_exists(client: temporalio.client.Client, schedule_id: str) -> bool: + """Check if Temporal Schedule exists by trying to describe it.""" + handle = client.get_schedule_handle(schedule_id) + + try: + await handle.describe() + except temporalio.service.RPCError: + return False + return True + + def backfill_range( start_at: dt.datetime, end_at: dt.datetime, step: dt.timedelta ) -> typing.Generator[tuple[dt.datetime, dt.datetime], None, None]: @@ -337,6 +368,7 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None: retry_policy=temporalio.common.RetryPolicy( initial_interval=dt.timedelta(seconds=10), maximum_interval=dt.timedelta(seconds=60), + non_retryable_error_types=["TemporalScheduleDeletedError"], ), # Temporal requires that we set a timeout. # Allocate 5 minutes per expected number of runs to backfill as a timeout.