From 5c4c1d5634e8d0b5f2f769c5fb0548c9a6620462 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 5 Jul 2024 07:28:19 -0400 Subject: [PATCH] Add failing test --- tests/worker/test_activity.py | 38 +++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/tests/worker/test_activity.py b/tests/worker/test_activity.py index c981a98c..463b0c29 100644 --- a/tests/worker/test_activity.py +++ b/tests/worker/test_activity.py @@ -41,6 +41,7 @@ Worker, WorkerConfig, ) +from tests.helpers import new_worker from tests.helpers.worker import ( ExternalWorker, KSAction, @@ -1351,3 +1352,40 @@ def assert_activity_application_error( ret = assert_activity_error(err) assert isinstance(ret, ApplicationError) return ret + + +@activity.defn +async def activity_with_retry_delay(retry_delay_seconds: float): + if activity.info().attempt == 1: + raise ApplicationError( + "Deliberately failing with next_retry_delay set", + next_retry_delay=timedelta(seconds=retry_delay_seconds), + ) + + +@workflow.defn +class ActivitiesWithRetryDelayWorkflow: + @workflow.run + async def run(self, retry_delay_seconds: float) -> float: + t0 = workflow.time() + await workflow.execute_activity( + activity_with_retry_delay, + retry_delay_seconds, + schedule_to_close_timeout=timedelta(seconds=retry_delay_seconds * 2), + ) + t1 = workflow.time() + return t1 - t0 + + +async def test_activity_retry_delay(client: Client): + retry_delay = timedelta(seconds=2) + async with new_worker( + client, ActivitiesWithRetryDelayWorkflow, activities=[activity_with_retry_delay] + ) as worker: + workflow_duration = await client.execute_workflow( + ActivitiesWithRetryDelayWorkflow.run, + retry_delay.total_seconds(), + id=str(uuid.uuid4()), + task_queue=worker.task_queue, + ) + assert workflow_duration > retry_delay.total_seconds()