Skip to content

Commit

Permalink
fix: Fail backfill on schedule deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Nov 13, 2023
1 parent 413ea11 commit 5e8a7d2
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import temporalio
import temporalio.client
import temporalio.common
import temporalio.exceptions
import temporalio.testing
import temporalio.worker
from django.conf import settings
Expand All @@ -22,6 +23,7 @@
backfill_range,
backfill_schedule,
get_schedule_frequency,
wait_for_schedule_backfill_in_range,
)

pytestmark = [pytest.mark.asyncio]
Expand Down Expand Up @@ -169,7 +171,7 @@ async def test_backfill_schedule_activity(activity_environment, temporal_worker,

@pytest.mark.django_db(transaction=True)
async def test_backfill_batch_export_workflow(temporal_worker, temporal_schedule, temporal_client, team):
"""Test BackfillBatchExportWorkflow executes all backfill runs and updates model."""
"""Test BackfillBatchExportWorkflow fails when its underlying Temporal Schedule is deleted."""
start_at = dt.datetime(2023, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc)
end_at = dt.datetime(2023, 1, 1, 0, 10, 0, tzinfo=dt.timezone.utc)

Expand Down Expand Up @@ -207,3 +209,61 @@ async def test_backfill_batch_export_workflow(temporal_worker, temporal_schedule

backfill = backfills.pop()
assert backfill.status == "Completed"


@pytest.mark.django_db(transaction=True)
async def test_backfill_batch_export_workflow_fails_when_schedule_deleted_after_running(
temporal_worker, temporal_schedule, temporal_client, team
):
"""Test BackfillBatchExportWorkflow fails when its underlying Temporal Schedule is deleted.
In this test, in contrats to the previous one, we wait until we have started running some
backfill runs before cancelling.
"""
start_at = dt.datetime(2023, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc)
end_at = dt.datetime(2023, 1, 1, 0, 10, 0, tzinfo=dt.timezone.utc)
now = dt.datetime.utcnow()

desc = await temporal_schedule.describe()

workflow_id = str(uuid.uuid4())
inputs = BackfillBatchExportInputs(
team_id=team.pk,
batch_export_id=desc.id,
start_at=start_at.isoformat(),
end_at=end_at.isoformat(),
buffer_limit=1,
wait_delay=2.0,
)

handle = await temporal_client.start_workflow(
BackfillBatchExportWorkflow.run,
inputs,
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
execution_timeout=dt.timedelta(seconds=20),
retry_policy=temporalio.common.RetryPolicy(maximum_attempts=1),
)
await wait_for_schedule_backfill_in_range(
client=temporal_client,
schedule_id=desc.id,
start_at=start_at,
end_at=dt.datetime(2023, 1, 1, 0, 1, 0, tzinfo=dt.timezone.utc),
now=now,
wait_delay=1.0,
)

desc = await temporal_schedule.describe()
result = desc.info.num_actions

assert result >= 1

await temporal_schedule.delete()

with pytest.raises(temporalio.client.WorkflowFailureError) as exc_info:
await handle.result()

err = exc_info.value
assert isinstance(err.__cause__, temporalio.exceptions.ActivityError)
assert isinstance(err.__cause__.__cause__, temporalio.exceptions.ApplicationError)
assert err.__cause__.__cause__.type == "TemporalScheduleDeletedError"
7 changes: 6 additions & 1 deletion posthog/temporal/tests/utils/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ async def acreate_batch_export(team_id: int, interval: str, name: str, destinati
async def adelete_batch_export(batch_export: BatchExport, temporal_client: temporalio.client.Client) -> None:
"""Async delete a BatchExport and its underlying Schedule."""
handle = temporal_client.get_schedule_handle(str(batch_export.id))
await handle.delete()

try:
await handle.delete()
except temporalio.service.RPCError:
# This means the schedule was already deleted, so we can continue
pass

await sync_to_async(batch_export.delete)() # type: ignore

Expand Down
34 changes: 33 additions & 1 deletion posthog/temporal/workflows/backfill_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@
from posthog.temporal.workflows.logger import bind_batch_exports_logger


class TemporalScheduleDeletedError(Exception):
"""Exception raised when a Temporal Schedule was deleted during backfill."""

def __init__(self, schedule_id: str):
super().__init__(f"The Temporal Schedule {schedule_id} was deleted during backfill")


class HeartbeatDetails(typing.NamedTuple):
"""Details sent over in a Temporal Activity heartbeat."""

Expand Down Expand Up @@ -77,7 +84,11 @@ async def get_schedule_frequency(schedule_id: str) -> float:
)

handle = client.get_schedule_handle(schedule_id)
desc = await handle.describe()

try:
desc = await handle.describe()
except temporalio.service.RPCError:
raise TemporalScheduleDeletedError(schedule_id)

interval = desc.schedule.spec.intervals[0]
return interval.every.total_seconds()
Expand Down Expand Up @@ -208,7 +219,13 @@ async def wait_for_schedule_backfill_in_range(
execution start time, assuming that backfill runs will have started recently after 'now' whereas regularly
scheduled runs happened sometime in the past, before 'now'. This should hold true for historical backfills,
but the heuristic fails for "future backfills", which should not be allowed.
Raises:
TemporalScheduleDeletedError: If we detect the Temporal Schedule we are waiting on doesn't exist anymore.
"""
if await check_temporal_schedule_exists(client, schedule_id) is False:
raise TemporalScheduleDeletedError(schedule_id)

query = (
f'TemporalScheduledById="{schedule_id}" '
f'AND TemporalScheduledStartTime >= "{start_at.isoformat()}" '
Expand All @@ -234,6 +251,9 @@ async def wait_for_schedule_backfill_in_range(
if check_workflow_executions_not_running(workflows) is False:
continue

if await check_temporal_schedule_exists(client, schedule_id) is False:
raise TemporalScheduleDeletedError(schedule_id)

done = True


Expand All @@ -245,6 +265,17 @@ def check_workflow_executions_not_running(workflow_executions: list[temporalio.c
)


async def check_temporal_schedule_exists(client: temporalio.client.Client, schedule_id: str) -> bool:
"""Check if Temporal Schedule exists by trying to describe it."""
handle = client.get_schedule_handle(schedule_id)

try:
await handle.describe()
except temporalio.service.RPCError:
return False
return True


def backfill_range(
start_at: dt.datetime, end_at: dt.datetime, step: dt.timedelta
) -> typing.Generator[tuple[dt.datetime, dt.datetime], None, None]:
Expand Down Expand Up @@ -337,6 +368,7 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None:
retry_policy=temporalio.common.RetryPolicy(
initial_interval=dt.timedelta(seconds=10),
maximum_interval=dt.timedelta(seconds=60),
non_retryable_error_types=["TemporalScheduleDeletedError"],
),
# Temporal requires that we set a timeout.
# Allocate 5 minutes per expected number of runs to backfill as a timeout.
Expand Down

0 comments on commit 5e8a7d2

Please sign in to comment.