From 50914c40dc08ac84ddf328b0f97d7df71bf239cc Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Mon, 5 Aug 2024 14:19:37 -0400 Subject: [PATCH] Honor all non-completion commands (#569) * Honor commands generated after the first completion command --- temporalio/bridge/Cargo.lock | 18 +- temporalio/bridge/sdk-core | 2 +- temporalio/worker/_workflow_instance.py | 28 +- tests/worker/test_replayer.py | 75 ++++++ ...and_reordering_backward_compatibility.json | 113 +++++++++ tests/worker/test_workflow.py | 239 ++++++++++++++++++ 6 files changed, 453 insertions(+), 22 deletions(-) create mode 100644 tests/worker/test_replayer_command_reordering_backward_compatibility.json diff --git a/temporalio/bridge/Cargo.lock b/temporalio/bridge/Cargo.lock index b96f4958..5dd15c44 100644 --- a/temporalio/bridge/Cargo.lock +++ b/temporalio/bridge/Cargo.lock @@ -463,6 +463,20 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "dashmap" +version = "6.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "deflate64" version = "0.1.8" @@ -826,7 +840,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b" dependencies = [ "cfg-if", - "dashmap", + "dashmap 5.5.3", "futures", "futures-timer", "no-std-compat", @@ -2597,7 +2611,7 @@ dependencies = [ "crossbeam-channel", "crossbeam-queue", "crossbeam-utils", - "dashmap", + "dashmap 6.0.1", "derive_builder", "derive_more", "enum-iterator", diff --git a/temporalio/bridge/sdk-core b/temporalio/bridge/sdk-core index f14a3b81..8ae8054d 160000 --- a/temporalio/bridge/sdk-core +++ b/temporalio/bridge/sdk-core @@ -1 +1 @@ -Subproject commit f14a3b81b63bf78d0c7c2d9cfe40cd469c069df6 +Subproject commit 8ae8054d3e813d082c4cd9ceaf9b02a3961016f9 diff --git a/temporalio/worker/_workflow_instance.py b/temporalio/worker/_workflow_instance.py index 378cfc01..d04e0f54 100644 --- a/temporalio/worker/_workflow_instance.py +++ b/temporalio/worker/_workflow_instance.py @@ -403,27 +403,17 @@ def activate( f"Failed converting activation exception: {inner_err}" ) - # If there are successful commands, we must remove all - # non-query-responses after terminal workflow commands. We must do this - # in place to avoid the copy-on-write that occurs when you reassign. - seen_completion = False - i = 0 - while i < len(self._current_completion.successful.commands): - command = self._current_completion.successful.commands[i] - if not seen_completion: - seen_completion = ( - command.HasField("complete_workflow_execution") - or command.HasField("continue_as_new_workflow_execution") - or command.HasField("fail_workflow_execution") - or command.HasField("cancel_workflow_execution") - ) - elif not command.HasField("respond_to_query"): - del self._current_completion.successful.commands[i] - continue - i += 1 + def is_completion(command): + return ( + command.HasField("complete_workflow_execution") + or command.HasField("continue_as_new_workflow_execution") + or command.HasField("fail_workflow_execution") + or command.HasField("cancel_workflow_execution") + ) - if seen_completion: + if any(map(is_completion, self._current_completion.successful.commands)): self._warn_if_unfinished_handlers() + return self._current_completion def _apply( diff --git a/tests/worker/test_replayer.py b/tests/worker/test_replayer.py index 08ae2cfe..9f6e6ac5 100644 --- a/tests/worker/test_replayer.py +++ b/tests/worker/test_replayer.py @@ -310,3 +310,78 @@ def new_say_hello_worker(client: Client) -> Worker: workflows=[SayHelloWorkflow], activities=[say_hello], ) + + +@workflow.defn +class UpdateCompletionAfterWorkflowReturn: + def __init__(self) -> None: + self.workflow_returned = False + + @workflow.run + async def run(self) -> str: + self.workflow_returned = True + return "workflow-result" + + @workflow.update + async def my_update(self) -> str: + await workflow.wait_condition(lambda: self.workflow_returned) + return "update-result" + + +async def test_replayer_command_reordering_backward_compatibility() -> None: + """ + The UpdateCompletionAfterWorkflowReturn workflow above features an update handler that returns + after the main workflow coroutine has exited. It will (if an update is sent in the first WFT) + generate a raw command sequence (before sending to core) of + + [UpdateAccepted, CompleteWorkflowExecution, UpdateCompleted]. + + Prior to https://github.com/temporalio/sdk-python/pull/569, Python truncated this command + sequence to + + [UpdateAccepted, CompleteWorkflowExecution]. + + With #569, Python performs no truncation, and Core changes it to + + [UpdateAccepted, UpdateCompleted, CompleteWorkflowExecution]. + + This test takes a history generated using pre-#569 SDK code, and replays it. This succeeds. + The history is + + 1 WorkflowExecutionStarted + 2 WorkflowTaskScheduled + 3 WorkflowTaskStarted + 4 WorkflowTaskCompleted + 5 WorkflowExecutionUpdateAccepted + 6 WorkflowExecutionCompleted + + Note that the history lacks a WorkflowExecutionUpdateCompleted event. + + If Core's logic (which involves a flag) incorrectly allowed this history to be replayed + using Core's post-#569 implementation, then a non-determinism error would result. Specifically, + Core would, at some point during replay, do the following: + + Receive [UpdateAccepted, CompleteWorkflowExecution, UpdateCompleted] from lang, + change that to [UpdateAccepted, UpdateCompleted, CompleteWorkflowExecution] + and create an UpdateMachine instance (the WorkflowTaskMachine instance already exists). + Then continue to consume history events. + + Event 5 WorkflowExecutionUpdateAccepted would apply to the UpdateMachine associated with + the UpdateAccepted command, but event 6 WorkflowExecutionCompleted would not, since + core is expecting an event that can be applied to the UpdateMachine corresponding to + UpdateCompleted. If we modify core to incorrectly apply its new logic then we do see that: + + [TMPRL1100] Nondeterminism error: Update machine does not handle this event: HistoryEvent(id: 6, WorkflowExecutionCompleted) + + The test passes because core in fact (because the history lacks the flag) uses its old logic + and changes the command sequence from [UpdateAccepted, CompleteWorkflowExecution, UpdateCompleted] + to [UpdateAccepted, CompleteWorkflowExecution], and events 5 and 6 can be applied to the + corresponding state machines. + """ + with Path(__file__).with_name( + "test_replayer_command_reordering_backward_compatibility.json" + ).open() as f: + history = f.read() + await Replayer(workflows=[UpdateCompletionAfterWorkflowReturn]).replay_workflow( + WorkflowHistory.from_json("fake", history) + ) diff --git a/tests/worker/test_replayer_command_reordering_backward_compatibility.json b/tests/worker/test_replayer_command_reordering_backward_compatibility.json new file mode 100644 index 00000000..454e9b33 --- /dev/null +++ b/tests/worker/test_replayer_command_reordering_backward_compatibility.json @@ -0,0 +1,113 @@ +{ + "events": [ + { + "eventId": "1", + "eventTime": "2024-08-02T23:35:00.061520Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED", + "taskId": "1049558", + "workflowExecutionStartedEventAttributes": { + "workflowType": { + "name": "UpdateCompletionAfterWorkflowReturn" + }, + "taskQueue": { + "name": "tq", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "workflowTaskTimeout": "10s", + "originalExecutionRunId": "a32ce0cb-b50e-4734-b003-784dda811861", + "identity": "7638@dan-2.local", + "firstExecutionRunId": "a32ce0cb-b50e-4734-b003-784dda811861", + "attempt": 1, + "firstWorkflowTaskBackoff": "0s", + "workflowId": "wf-dd1e2267-d1bf-4822-be38-2a97a499331e" + } + }, + { + "eventId": "2", + "eventTime": "2024-08-02T23:35:00.070867Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED", + "taskId": "1049559", + "workflowTaskScheduledEventAttributes": { + "taskQueue": { + "name": "tq", + "kind": "TASK_QUEUE_KIND_NORMAL" + }, + "startToCloseTimeout": "10s", + "attempt": 1 + } + }, + { + "eventId": "3", + "eventTime": "2024-08-02T23:35:00.155562Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED", + "taskId": "1049564", + "workflowTaskStartedEventAttributes": { + "scheduledEventId": "2", + "identity": "7638@dan-2.local", + "requestId": "b03f25fb-b2ab-4b93-b2ad-0f6899f6e2e2", + "historySizeBytes": "260" + } + }, + { + "eventId": "4", + "eventTime": "2024-08-02T23:35:00.224744Z", + "eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED", + "taskId": "1049568", + "workflowTaskCompletedEventAttributes": { + "scheduledEventId": "2", + "startedEventId": "3", + "identity": "7638@dan-2.local", + "workerVersion": { + "buildId": "17647b02191ec9e4e58b623a9c71f20a" + }, + "sdkMetadata": { + "coreUsedFlags": [ + 1, + 2 + ] + }, + "meteringMetadata": {} + } + }, + { + "eventId": "5", + "eventTime": "2024-08-02T23:35:00.242507Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED", + "taskId": "1049569", + "workflowExecutionUpdateAcceptedEventAttributes": { + "protocolInstanceId": "my-update", + "acceptedRequestMessageId": "my-update/request", + "acceptedRequestSequencingEventId": "2", + "acceptedRequest": { + "meta": { + "updateId": "my-update", + "identity": "7638@dan-2.local" + }, + "input": { + "name": "my_update" + } + } + } + }, + { + "eventId": "6", + "eventTime": "2024-08-02T23:35:00.258465Z", + "eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED", + "taskId": "1049570", + "workflowExecutionCompletedEventAttributes": { + "result": { + "payloads": [ + { + "metadata": { + "encoding": "anNvbi9wbGFpbg==", + "encodingDecoded": "json/plain" + }, + "data": "workflow-result" + } + ] + }, + "workflowTaskCompletedEventId": "4" + } + } + ] +} \ No newline at end of file diff --git a/tests/worker/test_workflow.py b/tests/worker/test_workflow.py index 6ce26553..23399137 100644 --- a/tests/worker/test_workflow.py +++ b/tests/worker/test_workflow.py @@ -19,6 +19,7 @@ from typing import ( Any, Awaitable, + Callable, Dict, List, Mapping, @@ -5576,3 +5577,241 @@ async def test_workflow_id_conflict(client: Client): assert new_handle.run_id != handle.run_id assert (await handle.describe()).status == WorkflowExecutionStatus.TERMINATED assert (await new_handle.describe()).status == WorkflowExecutionStatus.RUNNING + + +@workflow.defn +class TestUpdateCompletionIsHonoredWhenAfterWorkflowReturn1: + def __init__(self) -> None: + self.workflow_returned = False + + @workflow.run + async def run(self) -> str: + self.workflow_returned = True + return "workflow-result" + + @workflow.update + async def my_update(self) -> str: + await workflow.wait_condition(lambda: self.workflow_returned) + return "update-result" + + +async def test_update_completion_is_honored_when_after_workflow_return_1( + client: Client, + env: WorkflowEnvironment, +): + if env.supports_time_skipping: + pytest.skip( + "Java test server: https://github.com/temporalio/sdk-java/issues/1903" + ) + update_id = "my-update" + task_queue = "tq" + wf_handle = await client.start_workflow( + TestUpdateCompletionIsHonoredWhenAfterWorkflowReturn1.run, + id=f"wf-{uuid.uuid4()}", + task_queue=task_queue, + ) + update_result_task = asyncio.create_task( + wf_handle.execute_update( + TestUpdateCompletionIsHonoredWhenAfterWorkflowReturn1.my_update, + id=update_id, + ) + ) + await workflow_update_exists(client, wf_handle.id, update_id) + + async with Worker( + client, + task_queue=task_queue, + workflows=[TestUpdateCompletionIsHonoredWhenAfterWorkflowReturn1], + ): + assert await wf_handle.result() == "workflow-result" + assert await update_result_task == "update-result" + + +@workflow.defn +class TestUpdateCompletionIsHonoredWhenAfterWorkflowReturnWorkflow2: + def __init__(self): + self.received_update = False + self.update_result: asyncio.Future[str] = asyncio.Future() + + @workflow.run + async def run(self) -> str: + await workflow.wait_condition(lambda: self.received_update) + self.update_result.set_result("update-result") + # Prior to https://github.com/temporalio/features/issues/481, the client + # waiting on the update got a "Workflow execution already completed" + # error instead of the update result, because the main workflow + # coroutine completion command is emitted before the update completion + # command, and we were truncating commands at the first completion + # command. + return "workflow-result" + + @workflow.update + async def my_update(self) -> str: + self.received_update = True + return await self.update_result + + +async def test_update_completion_is_honored_when_after_workflow_return_2( + client: Client, + env: WorkflowEnvironment, +): + if env.supports_time_skipping: + pytest.skip( + "Java test server: https://github.com/temporalio/sdk-java/issues/1903" + ) + async with Worker( + client, + task_queue="tq", + workflows=[TestUpdateCompletionIsHonoredWhenAfterWorkflowReturnWorkflow2], + ) as worker: + handle = await client.start_workflow( + TestUpdateCompletionIsHonoredWhenAfterWorkflowReturnWorkflow2.run, + id=f"wf-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + update_result = await handle.execute_update( + TestUpdateCompletionIsHonoredWhenAfterWorkflowReturnWorkflow2.my_update + ) + assert update_result == "update-result" + assert await handle.result() == "workflow-result" + + +@workflow.defn +class FirstCompletionCommandIsHonoredWorkflow: + def __init__(self, main_workflow_returns_before_signal_completions=False) -> None: + self.seen_first_signal = False + self.seen_second_signal = False + self.main_workflow_returns_before_signal_completions = ( + main_workflow_returns_before_signal_completions + ) + self.ping_pong_val = 1 + self.ping_pong_counter = 0 + self.ping_pong_max_count = 4 + + @workflow.run + async def run(self) -> str: + await workflow.wait_condition( + lambda: self.seen_first_signal and self.seen_second_signal + ) + return "workflow-result" + + @workflow.signal + async def this_signal_executes_first(self): + self.seen_first_signal = True + if self.main_workflow_returns_before_signal_completions: + await self.ping_pong(lambda: self.ping_pong_val > 0) + raise ApplicationError( + "Client should see this error unless doing ping-pong " + "(in which case main coroutine returns first)" + ) + + @workflow.signal + async def this_signal_executes_second(self): + await workflow.wait_condition(lambda: self.seen_first_signal) + self.seen_second_signal = True + if self.main_workflow_returns_before_signal_completions: + await self.ping_pong(lambda: self.ping_pong_val < 0) + raise ApplicationError("Client should never see this error!") + + async def ping_pong(self, cond: Callable[[], bool]): + while self.ping_pong_counter < self.ping_pong_max_count: + await workflow.wait_condition(cond) + self.ping_pong_val = -self.ping_pong_val + self.ping_pong_counter += 1 + + +@workflow.defn +class FirstCompletionCommandIsHonoredPingPongWorkflow( + FirstCompletionCommandIsHonoredWorkflow +): + def __init__(self) -> None: + super().__init__(main_workflow_returns_before_signal_completions=True) + + @workflow.run + async def run(self) -> str: + return await super().run() + + +async def test_first_of_two_signal_completion_commands_is_honored(client: Client): + await _do_first_completion_command_is_honored_test( + client, main_workflow_returns_before_signal_completions=False + ) + + +async def test_workflow_return_is_honored_when_it_precedes_signal_completion_command( + client: Client, +): + await _do_first_completion_command_is_honored_test( + client, main_workflow_returns_before_signal_completions=True + ) + + +async def _do_first_completion_command_is_honored_test( + client: Client, main_workflow_returns_before_signal_completions: bool +): + workflow_cls: Union[ + Type[FirstCompletionCommandIsHonoredPingPongWorkflow], + Type[FirstCompletionCommandIsHonoredWorkflow], + ] = ( + FirstCompletionCommandIsHonoredPingPongWorkflow + if main_workflow_returns_before_signal_completions + else FirstCompletionCommandIsHonoredWorkflow + ) + async with Worker( + client, + task_queue="tq", + workflows=[workflow_cls], + ) as worker: + handle = await client.start_workflow( + workflow_cls.run, + id=f"wf-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + await handle.signal(workflow_cls.this_signal_executes_second) + await handle.signal(workflow_cls.this_signal_executes_first) + try: + result = await handle.result() + except WorkflowFailureError as err: + if main_workflow_returns_before_signal_completions: + assert ( + False + ), "Expected no error due to main workflow coroutine returning first" + else: + assert str(err.cause).startswith("Client should see this error") + else: + assert ( + main_workflow_returns_before_signal_completions + and result == "workflow-result" + ) + + +@workflow.defn +class TimerStartedAfterWorkflowCompletionWorkflow: + def __init__(self) -> None: + self.received_signal = False + self.main_workflow_coroutine_finished = False + + @workflow.run + async def run(self) -> str: + await workflow.wait_condition(lambda: self.received_signal) + self.main_workflow_coroutine_finished = True + return "workflow-result" + + @workflow.signal(unfinished_policy=workflow.HandlerUnfinishedPolicy.ABANDON) + async def my_signal(self): + self.received_signal = True + await workflow.wait_condition(lambda: self.main_workflow_coroutine_finished) + await asyncio.sleep(7777777) + + +async def test_timer_started_after_workflow_completion(client: Client): + async with new_worker( + client, TimerStartedAfterWorkflowCompletionWorkflow + ) as worker: + handle = await client.start_workflow( + TimerStartedAfterWorkflowCompletionWorkflow.run, + id=f"wf-{uuid.uuid4()}", + task_queue=worker.task_queue, + ) + await handle.signal(TimerStartedAfterWorkflowCompletionWorkflow.my_signal) + assert await handle.result() == "workflow-result"