Skip to content

Commit

Permalink
Honor all non-completion commands (#569)
Browse files Browse the repository at this point in the history
* Honor commands generated after the first completion command
  • Loading branch information
dandavison authored Aug 5, 2024
1 parent a839196 commit 50914c4
Show file tree
Hide file tree
Showing 6 changed files with 453 additions and 22 deletions.
18 changes: 16 additions & 2 deletions temporalio/bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 9 additions & 19 deletions temporalio/worker/_workflow_instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,27 +403,17 @@ def activate(
f"Failed converting activation exception: {inner_err}"
)

# If there are successful commands, we must remove all
# non-query-responses after terminal workflow commands. We must do this
# in place to avoid the copy-on-write that occurs when you reassign.
seen_completion = False
i = 0
while i < len(self._current_completion.successful.commands):
command = self._current_completion.successful.commands[i]
if not seen_completion:
seen_completion = (
command.HasField("complete_workflow_execution")
or command.HasField("continue_as_new_workflow_execution")
or command.HasField("fail_workflow_execution")
or command.HasField("cancel_workflow_execution")
)
elif not command.HasField("respond_to_query"):
del self._current_completion.successful.commands[i]
continue
i += 1
def is_completion(command):
return (
command.HasField("complete_workflow_execution")
or command.HasField("continue_as_new_workflow_execution")
or command.HasField("fail_workflow_execution")
or command.HasField("cancel_workflow_execution")
)

if seen_completion:
if any(map(is_completion, self._current_completion.successful.commands)):
self._warn_if_unfinished_handlers()

return self._current_completion

def _apply(
Expand Down
75 changes: 75 additions & 0 deletions tests/worker/test_replayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,3 +310,78 @@ def new_say_hello_worker(client: Client) -> Worker:
workflows=[SayHelloWorkflow],
activities=[say_hello],
)


@workflow.defn
class UpdateCompletionAfterWorkflowReturn:
def __init__(self) -> None:
self.workflow_returned = False

@workflow.run
async def run(self) -> str:
self.workflow_returned = True
return "workflow-result"

@workflow.update
async def my_update(self) -> str:
await workflow.wait_condition(lambda: self.workflow_returned)
return "update-result"


async def test_replayer_command_reordering_backward_compatibility() -> None:
"""
The UpdateCompletionAfterWorkflowReturn workflow above features an update handler that returns
after the main workflow coroutine has exited. It will (if an update is sent in the first WFT)
generate a raw command sequence (before sending to core) of
[UpdateAccepted, CompleteWorkflowExecution, UpdateCompleted].
Prior to https://github.com/temporalio/sdk-python/pull/569, Python truncated this command
sequence to
[UpdateAccepted, CompleteWorkflowExecution].
With #569, Python performs no truncation, and Core changes it to
[UpdateAccepted, UpdateCompleted, CompleteWorkflowExecution].
This test takes a history generated using pre-#569 SDK code, and replays it. This succeeds.
The history is
1 WorkflowExecutionStarted
2 WorkflowTaskScheduled
3 WorkflowTaskStarted
4 WorkflowTaskCompleted
5 WorkflowExecutionUpdateAccepted
6 WorkflowExecutionCompleted
Note that the history lacks a WorkflowExecutionUpdateCompleted event.
If Core's logic (which involves a flag) incorrectly allowed this history to be replayed
using Core's post-#569 implementation, then a non-determinism error would result. Specifically,
Core would, at some point during replay, do the following:
Receive [UpdateAccepted, CompleteWorkflowExecution, UpdateCompleted] from lang,
change that to [UpdateAccepted, UpdateCompleted, CompleteWorkflowExecution]
and create an UpdateMachine instance (the WorkflowTaskMachine instance already exists).
Then continue to consume history events.
Event 5 WorkflowExecutionUpdateAccepted would apply to the UpdateMachine associated with
the UpdateAccepted command, but event 6 WorkflowExecutionCompleted would not, since
core is expecting an event that can be applied to the UpdateMachine corresponding to
UpdateCompleted. If we modify core to incorrectly apply its new logic then we do see that:
[TMPRL1100] Nondeterminism error: Update machine does not handle this event: HistoryEvent(id: 6, WorkflowExecutionCompleted)
The test passes because core in fact (because the history lacks the flag) uses its old logic
and changes the command sequence from [UpdateAccepted, CompleteWorkflowExecution, UpdateCompleted]
to [UpdateAccepted, CompleteWorkflowExecution], and events 5 and 6 can be applied to the
corresponding state machines.
"""
with Path(__file__).with_name(
"test_replayer_command_reordering_backward_compatibility.json"
).open() as f:
history = f.read()
await Replayer(workflows=[UpdateCompletionAfterWorkflowReturn]).replay_workflow(
WorkflowHistory.from_json("fake", history)
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
{
"events": [
{
"eventId": "1",
"eventTime": "2024-08-02T23:35:00.061520Z",
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_STARTED",
"taskId": "1049558",
"workflowExecutionStartedEventAttributes": {
"workflowType": {
"name": "UpdateCompletionAfterWorkflowReturn"
},
"taskQueue": {
"name": "tq",
"kind": "TASK_QUEUE_KIND_NORMAL"
},
"workflowTaskTimeout": "10s",
"originalExecutionRunId": "a32ce0cb-b50e-4734-b003-784dda811861",
"identity": "[email protected]",
"firstExecutionRunId": "a32ce0cb-b50e-4734-b003-784dda811861",
"attempt": 1,
"firstWorkflowTaskBackoff": "0s",
"workflowId": "wf-dd1e2267-d1bf-4822-be38-2a97a499331e"
}
},
{
"eventId": "2",
"eventTime": "2024-08-02T23:35:00.070867Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_SCHEDULED",
"taskId": "1049559",
"workflowTaskScheduledEventAttributes": {
"taskQueue": {
"name": "tq",
"kind": "TASK_QUEUE_KIND_NORMAL"
},
"startToCloseTimeout": "10s",
"attempt": 1
}
},
{
"eventId": "3",
"eventTime": "2024-08-02T23:35:00.155562Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_STARTED",
"taskId": "1049564",
"workflowTaskStartedEventAttributes": {
"scheduledEventId": "2",
"identity": "[email protected]",
"requestId": "b03f25fb-b2ab-4b93-b2ad-0f6899f6e2e2",
"historySizeBytes": "260"
}
},
{
"eventId": "4",
"eventTime": "2024-08-02T23:35:00.224744Z",
"eventType": "EVENT_TYPE_WORKFLOW_TASK_COMPLETED",
"taskId": "1049568",
"workflowTaskCompletedEventAttributes": {
"scheduledEventId": "2",
"startedEventId": "3",
"identity": "[email protected]",
"workerVersion": {
"buildId": "17647b02191ec9e4e58b623a9c71f20a"
},
"sdkMetadata": {
"coreUsedFlags": [
1,
2
]
},
"meteringMetadata": {}
}
},
{
"eventId": "5",
"eventTime": "2024-08-02T23:35:00.242507Z",
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED",
"taskId": "1049569",
"workflowExecutionUpdateAcceptedEventAttributes": {
"protocolInstanceId": "my-update",
"acceptedRequestMessageId": "my-update/request",
"acceptedRequestSequencingEventId": "2",
"acceptedRequest": {
"meta": {
"updateId": "my-update",
"identity": "[email protected]"
},
"input": {
"name": "my_update"
}
}
}
},
{
"eventId": "6",
"eventTime": "2024-08-02T23:35:00.258465Z",
"eventType": "EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED",
"taskId": "1049570",
"workflowExecutionCompletedEventAttributes": {
"result": {
"payloads": [
{
"metadata": {
"encoding": "anNvbi9wbGFpbg==",
"encodingDecoded": "json/plain"
},
"data": "workflow-result"
}
]
},
"workflowTaskCompletedEventId": "4"
}
}
]
}
Loading

0 comments on commit 50914c4

Please sign in to comment.