-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Misc Python update, signal, and child workflow features (#361)
- Loading branch information
1 parent
a5935fa
commit 8946823
Showing
8 changed files
with
161 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
from datetime import timedelta | ||
|
||
from temporalio import workflow | ||
from temporalio.client import WorkflowHandle | ||
|
||
from harness.python.feature import Runner, register_feature | ||
|
||
|
||
@workflow.defn | ||
class Workflow: | ||
def __init__(self) -> None: | ||
self._state = "" | ||
|
||
@workflow.run | ||
async def run(self) -> str: | ||
await workflow.wait_condition(lambda: self._state != "") | ||
return self._state | ||
|
||
@workflow.signal | ||
async def my_signal(self, arg: str): | ||
self._state = arg | ||
|
||
|
||
async def start(runner: Runner) -> WorkflowHandle: | ||
handle: WorkflowHandle = await runner.client.start_workflow( | ||
Workflow, | ||
id="workflow-id", | ||
task_queue=runner.task_queue, | ||
execution_timeout=timedelta(minutes=1), | ||
) | ||
await handle.signal(Workflow.my_signal, "arg") | ||
return handle | ||
|
||
|
||
register_feature( | ||
workflows=[Workflow], | ||
expect_run_result="arg", | ||
start=start, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
# Signal a child workflow | ||
|
||
A signal can be sent from within a workflow to a child workflow. | ||
|
||
# Detailed spec | ||
|
||
- Start a child workflow that does not terminate until a signal is sent. | ||
- Use its handle to send a signal. | ||
- Confirm that the signal had its intended effect within the child workflow. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
from __future__ import annotations | ||
|
||
from typing import Optional | ||
|
||
from temporalio import workflow | ||
from temporalio.client import WorkflowHandle | ||
|
||
from harness.python.feature import Runner, register_feature | ||
|
||
|
||
@workflow.defn | ||
class Workflow: | ||
@workflow.run | ||
async def run(self) -> str: | ||
child_wf = await workflow.start_child_workflow( | ||
ChildWorkflow.run, "child-wf-arg" | ||
) | ||
await child_wf.signal(ChildWorkflow.my_signal, "signal-arg") | ||
return await child_wf | ||
|
||
|
||
@workflow.defn | ||
class ChildWorkflow: | ||
def __init__(self) -> None: | ||
self._state = "" | ||
|
||
@workflow.run | ||
async def run(self, input: str) -> str: | ||
await workflow.wait_condition(lambda: self._state != "") | ||
return f"{input} {self._state}" | ||
|
||
@workflow.signal | ||
async def my_signal(self, arg: str): | ||
self._state = arg | ||
|
||
|
||
async def start(runner: Runner) -> WorkflowHandle: | ||
return await runner.start_parameterless_workflow(Workflow) | ||
|
||
|
||
register_feature( | ||
workflows=[Workflow, ChildWorkflow], | ||
expect_run_result=f"child-wf-arg signal-arg", | ||
start=start, | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,10 @@ | ||
# Update a workflow | ||
|
||
A Workflow Update can be performed. | ||
A Workflow Update can be validated and executed. | ||
|
||
# Detailed spec | ||
|
||
An Update can be defined, the handler can be set, and issuing the update results in both the expected mutation and the expected return value. | ||
- An Update can be defined. | ||
- The handler and validator can be set. | ||
- Issuing the update results in the expected validation. | ||
- If and only if the validation passes, then the expected mutation occurs and the expected value is returned. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
# Update a workflow via a handle | ||
|
||
A Workflow Update handle can be obtained and used | ||
|
||
# Detailed spec | ||
|
||
- Obtain an Update handle. | ||
- Confirm that the Workflow has passed validation (A handle is only returned once it has passed validation) | ||
- Use the handle to get a result. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
from datetime import timedelta | ||
|
||
from temporalio import workflow | ||
from temporalio.client import WorkflowHandle, WorkflowUpdateFailedError | ||
|
||
from harness.python.feature import Runner, register_feature | ||
|
||
|
||
@workflow.defn | ||
class Workflow: | ||
""" | ||
A workflow with a signal and signal validator. | ||
If accepted, the signal makes a change to workflow state. | ||
The workflow does not terminate until such a change occurs. | ||
""" | ||
|
||
def __init__(self) -> None: | ||
self._state = "" | ||
|
||
@workflow.run | ||
async def run(self) -> str: | ||
await workflow.wait_condition(lambda: self._state != "") | ||
return self._state | ||
|
||
@workflow.update | ||
async def my_update(self, arg: str) -> str: | ||
self._state = arg | ||
return "update-result" | ||
|
||
@my_update.validator | ||
def my_validate(self, arg: str): | ||
if arg == "bad-update-arg": | ||
raise ValueError("Invalid Update argument") | ||
|
||
|
||
async def checker(_: Runner, handle: WorkflowHandle): | ||
bad_update_handle = await handle.start_update(Workflow.my_update, "bad-update-arg") | ||
try: | ||
await bad_update_handle.result() | ||
except WorkflowUpdateFailedError: | ||
pass | ||
else: | ||
assert False, "Expected Update to be rejected due to validation failure" | ||
|
||
update_handle = await handle.start_update(Workflow.my_update, "update-arg") | ||
update_result = await update_handle.result() | ||
assert update_result == "update-result" | ||
result = await handle.result() | ||
assert result == "update-arg" | ||
|
||
|
||
register_feature(workflows=[Workflow], check_result=checker) |