From 2e68dad33bc93d345e1e99588fb0cdc9180dd024 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Fri, 5 Jul 2024 04:46:21 -0400 Subject: [PATCH] WIP: cancellation --- temporalio/worker/_workflow_instance.py | 2 -- tests/worker/test_workflow.py | 47 +++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 004faa70..11834778 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -422,8 +422,6 @@ def activate( continue i += 1 - if seen_completion: - self._warn_if_unfinished_handlers() return self._current_completion def _apply( diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 246a83e3..d57e6b72 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -5632,6 +5632,53 @@ async def finish(self): self.workflow_may_exit = True +@workflow.defn +class UpdateCancellationWorkflow(CoroutinesUseLockWorkflow): + def __init__(self) -> None: + self.non_terminating_operation_has_started = False + + @workflow.run + async def run(self) -> str: + await workflow.wait_condition(lambda: False) + return "unreachable" + + @workflow.update + async def wait_until_non_terminating_operation_has_started(self) -> None: + await workflow.wait_condition( + lambda: self.non_terminating_operation_has_started + ) + + @workflow.update + async def non_terminating_operation(self) -> str: + self.non_terminating_operation_has_started = True + await workflow.wait_condition(lambda: False) + return "unreachable" + + +async def test_update_cancellation(client: Client): + async with new_worker(client, UpdateCancellationWorkflow) as worker: + wf_handle = await client.start_workflow( + UpdateCancellationWorkflow.run, + id=str(uuid.uuid4()), + task_queue=worker.task_queue, + ) + # Asynchronously run an update that will never complete + non_terminating_update_task = asyncio.create_task( + wf_handle.execute_update( + UpdateCancellationWorkflow.non_terminating_operation + ) + ) + print("wait until handler started...") + # Wait until we know the update handler has started executing + await wf_handle.execute_update( + UpdateCancellationWorkflow.wait_until_non_terminating_operation_has_started + ) + print("cancel the workflow") + await wf_handle.cancel() + print("await non_terminating_update_task...") + await non_terminating_update_task + + async def _do_workflow_coroutines_lock_or_semaphore_test( client: Client, params: UseLockOrSemaphoreWorkflowParameters,