From 80ad7b24c3482be5739f2dc154b7d64ac7588c78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Mon, 13 Nov 2023 17:10:38 +0100 Subject: [PATCH 1/4] fix: Fail backfill on schedule deletion --- .../test_backfill_batch_export.py | 99 +++++++++++++++++++ posthog/temporal/tests/utils/models.py | 7 +- .../workflows/backfill_batch_export.py | 37 ++++++- 3 files changed, 141 insertions(+), 2 deletions(-) diff --git a/posthog/temporal/tests/batch_exports/test_backfill_batch_export.py b/posthog/temporal/tests/batch_exports/test_backfill_batch_export.py index 105f331995d2f..37790bcd52711 100644 --- a/posthog/temporal/tests/batch_exports/test_backfill_batch_export.py +++ b/posthog/temporal/tests/batch_exports/test_backfill_batch_export.py @@ -6,6 +6,7 @@ import temporalio import temporalio.client import temporalio.common +import temporalio.exceptions import temporalio.testing import temporalio.worker from django.conf import settings @@ -22,6 +23,7 @@ backfill_range, backfill_schedule, get_schedule_frequency, + wait_for_schedule_backfill_in_range, ) pytestmark = [pytest.mark.asyncio] @@ -207,3 +209,100 @@ async def test_backfill_batch_export_workflow(temporal_worker, temporal_schedule backfill = backfills.pop() assert backfill.status == "Completed" + + +@pytest.mark.django_db(transaction=True) +async def test_backfill_batch_export_workflow_fails_when_schedule_deleted( + temporal_worker, temporal_schedule, temporal_client, team +): + """Test BackfillBatchExportWorkflow fails when its underlying Temporal Schedule is deleted.""" + start_at = dt.datetime(2023, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc) + end_at = dt.datetime(2023, 1, 1, 0, 10, 0, tzinfo=dt.timezone.utc) + + desc = await temporal_schedule.describe() + + workflow_id = str(uuid.uuid4()) + inputs = BackfillBatchExportInputs( + team_id=team.pk, + batch_export_id=desc.id, + start_at=start_at.isoformat(), + end_at=end_at.isoformat(), + buffer_limit=1, + wait_delay=2.0, + ) + + handle = await temporal_client.start_workflow( + BackfillBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + execution_timeout=dt.timedelta(seconds=20), + retry_policy=temporalio.common.RetryPolicy(maximum_attempts=1), + ) + await temporal_schedule.delete() + + with pytest.raises(temporalio.client.WorkflowFailureError) as exc_info: + await handle.result() + + err = exc_info.value + assert isinstance(err.__cause__, temporalio.exceptions.ActivityError) + assert isinstance(err.__cause__.__cause__, temporalio.exceptions.ApplicationError) + assert err.__cause__.__cause__.type == "TemporalScheduleNotFoundError" + + +@pytest.mark.django_db(transaction=True) +async def test_backfill_batch_export_workflow_fails_when_schedule_deleted_after_running( + temporal_worker, temporal_schedule, temporal_client, team +): + """Test BackfillBatchExportWorkflow fails when its underlying Temporal Schedule is deleted. + + In this test, in contrats to the previous one, we wait until we have started running some + backfill runs before cancelling. + """ + start_at = dt.datetime(2023, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc) + end_at = dt.datetime(2023, 1, 1, 0, 10, 0, tzinfo=dt.timezone.utc) + now = dt.datetime.utcnow() + + desc = await temporal_schedule.describe() + + workflow_id = str(uuid.uuid4()) + inputs = BackfillBatchExportInputs( + team_id=team.pk, + batch_export_id=desc.id, + start_at=start_at.isoformat(), + end_at=end_at.isoformat(), + buffer_limit=1, + wait_delay=2.0, + ) + + handle = await temporal_client.start_workflow( + BackfillBatchExportWorkflow.run, + inputs, + id=workflow_id, + task_queue=settings.TEMPORAL_TASK_QUEUE, + execution_timeout=dt.timedelta(seconds=20), + retry_policy=temporalio.common.RetryPolicy(maximum_attempts=1), + ) + await wait_for_schedule_backfill_in_range( + client=temporal_client, + schedule_id=desc.id, + start_at=start_at, + end_at=dt.datetime(2023, 1, 1, 0, 1, 0, tzinfo=dt.timezone.utc), + now=now, + wait_delay=1.0, + ) + + desc = await temporal_schedule.describe() + result = desc.info.num_actions + + assert result >= 1 + + await temporal_schedule.delete() + + with pytest.raises(temporalio.client.WorkflowFailureError) as exc_info: + await handle.result() + + err = exc_info.value + assert isinstance(err.__cause__, temporalio.exceptions.ActivityError) + assert isinstance(err.__cause__.__cause__, temporalio.exceptions.ApplicationError) + assert err.__cause__.__cause__.type == "TemporalScheduleNotFoundError" diff --git a/posthog/temporal/tests/utils/models.py b/posthog/temporal/tests/utils/models.py index 01286c7c373f3..1ceb4f2cb9bb0 100644 --- a/posthog/temporal/tests/utils/models.py +++ b/posthog/temporal/tests/utils/models.py @@ -30,7 +30,12 @@ async def acreate_batch_export(team_id: int, interval: str, name: str, destinati async def adelete_batch_export(batch_export: BatchExport, temporal_client: temporalio.client.Client) -> None: """Async delete a BatchExport and its underlying Schedule.""" handle = temporal_client.get_schedule_handle(str(batch_export.id)) - await handle.delete() + + try: + await handle.delete() + except temporalio.service.RPCError: + # This means the schedule was already deleted, so we can continue + pass await sync_to_async(batch_export.delete)() # type: ignore diff --git a/posthog/temporal/workflows/backfill_batch_export.py b/posthog/temporal/workflows/backfill_batch_export.py index 53201e3bf9bc0..fc1790a21c27a 100644 --- a/posthog/temporal/workflows/backfill_batch_export.py +++ b/posthog/temporal/workflows/backfill_batch_export.py @@ -25,6 +25,13 @@ from posthog.temporal.workflows.logger import bind_batch_exports_logger +class TemporalScheduleNotFoundError(Exception): + """Exception raised when a Temporal Schedule is not found.""" + + def __init__(self, schedule_id: str): + super().__init__(f"The Temporal Schedule {schedule_id} was not found (maybe it was deleted?)") + + class HeartbeatDetails(typing.NamedTuple): """Details sent over in a Temporal Activity heartbeat.""" @@ -66,6 +73,9 @@ async def get_schedule_frequency(schedule_id: str) -> float: """Return a Temporal Schedule's frequency. This assumes that the Schedule has one interval set. + + Raises: + TemporalScheduleNotFoundError: If the Temporal Schedule whose frequency we are trying to get doesn't exist. """ client = await connect( settings.TEMPORAL_HOST, @@ -77,7 +87,11 @@ async def get_schedule_frequency(schedule_id: str) -> float: ) handle = client.get_schedule_handle(schedule_id) - desc = await handle.describe() + + try: + desc = await handle.describe() + except temporalio.service.RPCError: + raise TemporalScheduleNotFoundError(schedule_id) interval = desc.schedule.spec.intervals[0] return interval.every.total_seconds() @@ -208,7 +222,13 @@ async def wait_for_schedule_backfill_in_range( execution start time, assuming that backfill runs will have started recently after 'now' whereas regularly scheduled runs happened sometime in the past, before 'now'. This should hold true for historical backfills, but the heuristic fails for "future backfills", which should not be allowed. + + Raises: + TemporalScheduleNotFoundError: If we detect the Temporal Schedule we are waiting on doesn't exist. """ + if await check_temporal_schedule_exists(client, schedule_id) is False: + raise TemporalScheduleNotFoundError(schedule_id) + query = ( f'TemporalScheduledById="{schedule_id}" ' f'AND TemporalScheduledStartTime >= "{start_at.isoformat()}" ' @@ -234,6 +254,9 @@ async def wait_for_schedule_backfill_in_range( if check_workflow_executions_not_running(workflows) is False: continue + if await check_temporal_schedule_exists(client, schedule_id) is False: + raise TemporalScheduleNotFoundError(schedule_id) + done = True @@ -245,6 +268,17 @@ def check_workflow_executions_not_running(workflow_executions: list[temporalio.c ) +async def check_temporal_schedule_exists(client: temporalio.client.Client, schedule_id: str) -> bool: + """Check if Temporal Schedule exists by trying to describe it.""" + handle = client.get_schedule_handle(schedule_id) + + try: + await handle.describe() + except temporalio.service.RPCError: + return False + return True + + def backfill_range( start_at: dt.datetime, end_at: dt.datetime, step: dt.timedelta ) -> typing.Generator[tuple[dt.datetime, dt.datetime], None, None]: @@ -337,6 +371,7 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None: retry_policy=temporalio.common.RetryPolicy( initial_interval=dt.timedelta(seconds=10), maximum_interval=dt.timedelta(seconds=60), + non_retryable_error_types=["TemporalScheduleDeletedError"], ), # Temporal requires that we set a timeout. # Allocate 5 minutes per expected number of runs to backfill as a timeout. From 796e3360638e30ef735f633ada22621ab9ba4c73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Mon, 13 Nov 2023 18:13:06 +0100 Subject: [PATCH 2/4] fix: Check first that a schedule exists --- posthog/temporal/workflows/backfill_batch_export.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/posthog/temporal/workflows/backfill_batch_export.py b/posthog/temporal/workflows/backfill_batch_export.py index fc1790a21c27a..06757d6448a97 100644 --- a/posthog/temporal/workflows/backfill_batch_export.py +++ b/posthog/temporal/workflows/backfill_batch_export.py @@ -245,6 +245,9 @@ async def wait_for_schedule_backfill_in_range( while not done: await asyncio.sleep(wait_delay) + if await check_temporal_schedule_exists(client, schedule_id) is False: + raise TemporalScheduleNotFoundError(schedule_id) + workflows = [workflow async for workflow in client.list_workflows(query=query)] if not workflows: @@ -254,9 +257,6 @@ async def wait_for_schedule_backfill_in_range( if check_workflow_executions_not_running(workflows) is False: continue - if await check_temporal_schedule_exists(client, schedule_id) is False: - raise TemporalScheduleNotFoundError(schedule_id) - done = True From 0c35d7b1286d65a9376ad979fa9508b39eace191 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Mon, 13 Nov 2023 18:31:26 +0100 Subject: [PATCH 3/4] fix: Do not retry on TemporalScheduleNotFoundError --- posthog/temporal/workflows/backfill_batch_export.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/posthog/temporal/workflows/backfill_batch_export.py b/posthog/temporal/workflows/backfill_batch_export.py index 06757d6448a97..effe79b7c063a 100644 --- a/posthog/temporal/workflows/backfill_batch_export.py +++ b/posthog/temporal/workflows/backfill_batch_export.py @@ -371,7 +371,7 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None: retry_policy=temporalio.common.RetryPolicy( initial_interval=dt.timedelta(seconds=10), maximum_interval=dt.timedelta(seconds=60), - non_retryable_error_types=["TemporalScheduleDeletedError"], + non_retryable_error_types=["TemporalScheduleNotFoundError"], ), # Temporal requires that we set a timeout. # Allocate 5 minutes per expected number of runs to backfill as a timeout. From c968b9d86b3bd6fdfeeacf32753ad70e143e316b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 14 Nov 2023 16:08:37 +0100 Subject: [PATCH 4/4] fix: Assume schedule deleted only with NOT_FOUND status --- .../temporal/workflows/backfill_batch_export.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/posthog/temporal/workflows/backfill_batch_export.py b/posthog/temporal/workflows/backfill_batch_export.py index effe79b7c063a..8542320cb7ffc 100644 --- a/posthog/temporal/workflows/backfill_batch_export.py +++ b/posthog/temporal/workflows/backfill_batch_export.py @@ -90,8 +90,11 @@ async def get_schedule_frequency(schedule_id: str) -> float: try: desc = await handle.describe() - except temporalio.service.RPCError: - raise TemporalScheduleNotFoundError(schedule_id) + except temporalio.service.RPCError as e: + if e.status == temporalio.service.RPCStatusCode.NOT_FOUND: + raise TemporalScheduleNotFoundError(schedule_id) + else: + raise interval = desc.schedule.spec.intervals[0] return interval.every.total_seconds() @@ -274,8 +277,11 @@ async def check_temporal_schedule_exists(client: temporalio.client.Client, sched try: await handle.describe() - except temporalio.service.RPCError: - return False + except temporalio.service.RPCError as e: + if e.status == temporalio.service.RPCStatusCode.NOT_FOUND: + return False + else: + raise return True