Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fail backfill on schedule deletion #18575

Merged
merged 4 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 99 additions & 0 deletions posthog/temporal/tests/batch_exports/test_backfill_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import temporalio
import temporalio.client
import temporalio.common
import temporalio.exceptions
import temporalio.testing
import temporalio.worker
from django.conf import settings
Expand All @@ -22,6 +23,7 @@
backfill_range,
backfill_schedule,
get_schedule_frequency,
wait_for_schedule_backfill_in_range,
)

pytestmark = [pytest.mark.asyncio]
Expand Down Expand Up @@ -207,3 +209,100 @@ async def test_backfill_batch_export_workflow(temporal_worker, temporal_schedule

backfill = backfills.pop()
assert backfill.status == "Completed"


@pytest.mark.django_db(transaction=True)
async def test_backfill_batch_export_workflow_fails_when_schedule_deleted(
temporal_worker, temporal_schedule, temporal_client, team
):
"""Test BackfillBatchExportWorkflow fails when its underlying Temporal Schedule is deleted."""
start_at = dt.datetime(2023, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc)
end_at = dt.datetime(2023, 1, 1, 0, 10, 0, tzinfo=dt.timezone.utc)

desc = await temporal_schedule.describe()

workflow_id = str(uuid.uuid4())
inputs = BackfillBatchExportInputs(
team_id=team.pk,
batch_export_id=desc.id,
start_at=start_at.isoformat(),
end_at=end_at.isoformat(),
buffer_limit=1,
wait_delay=2.0,
)

handle = await temporal_client.start_workflow(
BackfillBatchExportWorkflow.run,
inputs,
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
execution_timeout=dt.timedelta(seconds=20),
retry_policy=temporalio.common.RetryPolicy(maximum_attempts=1),
)
await temporal_schedule.delete()

with pytest.raises(temporalio.client.WorkflowFailureError) as exc_info:
await handle.result()

err = exc_info.value
assert isinstance(err.__cause__, temporalio.exceptions.ActivityError)
assert isinstance(err.__cause__.__cause__, temporalio.exceptions.ApplicationError)
assert err.__cause__.__cause__.type == "TemporalScheduleNotFoundError"


@pytest.mark.django_db(transaction=True)
async def test_backfill_batch_export_workflow_fails_when_schedule_deleted_after_running(
temporal_worker, temporal_schedule, temporal_client, team
):
"""Test BackfillBatchExportWorkflow fails when its underlying Temporal Schedule is deleted.

In this test, in contrats to the previous one, we wait until we have started running some
backfill runs before cancelling.
"""
start_at = dt.datetime(2023, 1, 1, 0, 0, 0, tzinfo=dt.timezone.utc)
end_at = dt.datetime(2023, 1, 1, 0, 10, 0, tzinfo=dt.timezone.utc)
now = dt.datetime.utcnow()

desc = await temporal_schedule.describe()

workflow_id = str(uuid.uuid4())
inputs = BackfillBatchExportInputs(
team_id=team.pk,
batch_export_id=desc.id,
start_at=start_at.isoformat(),
end_at=end_at.isoformat(),
buffer_limit=1,
wait_delay=2.0,
)

handle = await temporal_client.start_workflow(
BackfillBatchExportWorkflow.run,
inputs,
id=workflow_id,
task_queue=settings.TEMPORAL_TASK_QUEUE,
execution_timeout=dt.timedelta(seconds=20),
retry_policy=temporalio.common.RetryPolicy(maximum_attempts=1),
)
await wait_for_schedule_backfill_in_range(
client=temporal_client,
schedule_id=desc.id,
start_at=start_at,
end_at=dt.datetime(2023, 1, 1, 0, 1, 0, tzinfo=dt.timezone.utc),
now=now,
wait_delay=1.0,
)

desc = await temporal_schedule.describe()
result = desc.info.num_actions

assert result >= 1

await temporal_schedule.delete()

with pytest.raises(temporalio.client.WorkflowFailureError) as exc_info:
await handle.result()

err = exc_info.value
assert isinstance(err.__cause__, temporalio.exceptions.ActivityError)
assert isinstance(err.__cause__.__cause__, temporalio.exceptions.ApplicationError)
assert err.__cause__.__cause__.type == "TemporalScheduleNotFoundError"
7 changes: 6 additions & 1 deletion posthog/temporal/tests/utils/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ async def acreate_batch_export(team_id: int, interval: str, name: str, destinati
async def adelete_batch_export(batch_export: BatchExport, temporal_client: temporalio.client.Client) -> None:
"""Async delete a BatchExport and its underlying Schedule."""
handle = temporal_client.get_schedule_handle(str(batch_export.id))
await handle.delete()

try:
await handle.delete()
except temporalio.service.RPCError:
# This means the schedule was already deleted, so we can continue
tomasfarias marked this conversation as resolved.
Show resolved Hide resolved
pass

await sync_to_async(batch_export.delete)() # type: ignore

Expand Down
37 changes: 36 additions & 1 deletion posthog/temporal/workflows/backfill_batch_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@
from posthog.temporal.workflows.logger import bind_batch_exports_logger


class TemporalScheduleNotFoundError(Exception):
"""Exception raised when a Temporal Schedule is not found."""

def __init__(self, schedule_id: str):
super().__init__(f"The Temporal Schedule {schedule_id} was not found (maybe it was deleted?)")


class HeartbeatDetails(typing.NamedTuple):
"""Details sent over in a Temporal Activity heartbeat."""

Expand Down Expand Up @@ -66,6 +73,9 @@ async def get_schedule_frequency(schedule_id: str) -> float:
"""Return a Temporal Schedule's frequency.

This assumes that the Schedule has one interval set.

Raises:
TemporalScheduleNotFoundError: If the Temporal Schedule whose frequency we are trying to get doesn't exist.
"""
client = await connect(
settings.TEMPORAL_HOST,
Expand All @@ -77,7 +87,11 @@ async def get_schedule_frequency(schedule_id: str) -> float:
)

handle = client.get_schedule_handle(schedule_id)
desc = await handle.describe()

try:
desc = await handle.describe()
except temporalio.service.RPCError:
tomasfarias marked this conversation as resolved.
Show resolved Hide resolved
raise TemporalScheduleNotFoundError(schedule_id)

interval = desc.schedule.spec.intervals[0]
return interval.every.total_seconds()
Expand Down Expand Up @@ -208,7 +222,13 @@ async def wait_for_schedule_backfill_in_range(
execution start time, assuming that backfill runs will have started recently after 'now' whereas regularly
scheduled runs happened sometime in the past, before 'now'. This should hold true for historical backfills,
but the heuristic fails for "future backfills", which should not be allowed.

Raises:
TemporalScheduleNotFoundError: If we detect the Temporal Schedule we are waiting on doesn't exist.
"""
if await check_temporal_schedule_exists(client, schedule_id) is False:
raise TemporalScheduleNotFoundError(schedule_id)

query = (
f'TemporalScheduledById="{schedule_id}" '
f'AND TemporalScheduledStartTime >= "{start_at.isoformat()}" '
Expand All @@ -225,6 +245,9 @@ async def wait_for_schedule_backfill_in_range(
while not done:
await asyncio.sleep(wait_delay)

if await check_temporal_schedule_exists(client, schedule_id) is False:
raise TemporalScheduleNotFoundError(schedule_id)

workflows = [workflow async for workflow in client.list_workflows(query=query)]

if not workflows:
Expand All @@ -245,6 +268,17 @@ def check_workflow_executions_not_running(workflow_executions: list[temporalio.c
)


async def check_temporal_schedule_exists(client: temporalio.client.Client, schedule_id: str) -> bool:
"""Check if Temporal Schedule exists by trying to describe it."""
handle = client.get_schedule_handle(schedule_id)

try:
await handle.describe()
except temporalio.service.RPCError:
return False
return True


def backfill_range(
start_at: dt.datetime, end_at: dt.datetime, step: dt.timedelta
) -> typing.Generator[tuple[dt.datetime, dt.datetime], None, None]:
Expand Down Expand Up @@ -337,6 +371,7 @@ async def run(self, inputs: BackfillBatchExportInputs) -> None:
retry_policy=temporalio.common.RetryPolicy(
initial_interval=dt.timedelta(seconds=10),
maximum_interval=dt.timedelta(seconds=60),
non_retryable_error_types=["TemporalScheduleNotFoundError"],
),
# Temporal requires that we set a timeout.
# Allocate 5 minutes per expected number of runs to backfill as a timeout.
Expand Down
Loading