Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Misc Python update / signal / child-workflow features #361

Merged
merged 1 commit into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions features/child_workflow/result/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,7 @@

A Workflow can get the result of a Child Workflow.

This feature executes a Workflow that:

- starts a Child Workflow
- gets the result of the Child
- returns the result

# Detailed spec

TODO
- Start a Child Workflow
- Get the result of the Child
3 changes: 0 additions & 3 deletions features/child_workflow/result/feature.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
from __future__ import annotations

from datetime import timedelta
from uuid import uuid4

from temporalio import workflow
from temporalio.client import WorkflowHandle

Expand Down
39 changes: 39 additions & 0 deletions features/signal/basic/feature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from datetime import timedelta

from temporalio import workflow
from temporalio.client import WorkflowHandle

from harness.python.feature import Runner, register_feature


@workflow.defn
class Workflow:
def __init__(self) -> None:
self._state = ""

@workflow.run
async def run(self) -> str:
await workflow.wait_condition(lambda: self._state != "")
return self._state

@workflow.signal
async def my_signal(self, arg: str):
self._state = arg


async def start(runner: Runner) -> WorkflowHandle:
handle: WorkflowHandle = await runner.client.start_workflow(
Workflow,
id="workflow-id",
task_queue=runner.task_queue,
execution_timeout=timedelta(minutes=1),
)
await handle.signal(Workflow.my_signal, "arg")
return handle


register_feature(
workflows=[Workflow],
expect_run_result="arg",
start=start,
)
9 changes: 9 additions & 0 deletions features/signal/child_workflow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Signal a child workflow

A signal can be sent from within a workflow to a child workflow.

# Detailed spec

- Start a child workflow that does not terminate until a signal is sent.
- Use its handle to send a signal.
- Confirm that the signal had its intended effect within the child workflow.
45 changes: 45 additions & 0 deletions features/signal/child_workflow/feature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from __future__ import annotations

from typing import Optional

from temporalio import workflow
from temporalio.client import WorkflowHandle

from harness.python.feature import Runner, register_feature


@workflow.defn
class Workflow:
@workflow.run
async def run(self) -> str:
child_wf = await workflow.start_child_workflow(
ChildWorkflow.run, "child-wf-arg"
)
await child_wf.signal(ChildWorkflow.my_signal, "signal-arg")
return await child_wf


@workflow.defn
class ChildWorkflow:
def __init__(self) -> None:
self._state = ""

@workflow.run
async def run(self, input: str) -> str:
await workflow.wait_condition(lambda: self._state != "")
return f"{input} {self._state}"

@workflow.signal
async def my_signal(self, arg: str):
self._state = arg


async def start(runner: Runner) -> WorkflowHandle:
return await runner.start_parameterless_workflow(Workflow)


register_feature(
workflows=[Workflow, ChildWorkflow],
expect_run_result=f"child-wf-arg signal-arg",
start=start,
)
7 changes: 5 additions & 2 deletions features/update/basic/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# Update a workflow

A Workflow Update can be performed.
A Workflow Update can be validated and executed.

# Detailed spec

An Update can be defined, the handler can be set, and issuing the update results in both the expected mutation and the expected return value.
- An Update can be defined.
- The handler and validator can be set.
- Issuing the update results in the expected validation.
- If and only if the validation passes, then the expected mutation occurs and the expected value is returned.
9 changes: 9 additions & 0 deletions features/update/basic_async/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Update a workflow via a handle

A Workflow Update handle can be obtained and used

# Detailed spec

- Obtain an Update handle.
- Confirm that the Workflow has passed validation (A handle is only returned once it has passed validation)
- Use the handle to get a result.
52 changes: 52 additions & 0 deletions features/update/basic_async/feature.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from datetime import timedelta

from temporalio import workflow
from temporalio.client import WorkflowHandle, WorkflowUpdateFailedError

from harness.python.feature import Runner, register_feature


@workflow.defn
class Workflow:
"""
A workflow with a signal and signal validator.
If accepted, the signal makes a change to workflow state.
The workflow does not terminate until such a change occurs.
"""

def __init__(self) -> None:
self._state = ""

@workflow.run
async def run(self) -> str:
await workflow.wait_condition(lambda: self._state != "")
return self._state

@workflow.update
async def my_update(self, arg: str) -> str:
self._state = arg
return "update-result"

@my_update.validator
def my_validate(self, arg: str):
if arg == "bad-update-arg":
raise ValueError("Invalid Update argument")


async def checker(_: Runner, handle: WorkflowHandle):
bad_update_handle = await handle.start_update(Workflow.my_update, "bad-update-arg")
try:
await bad_update_handle.result()
except WorkflowUpdateFailedError:
pass
else:
assert False, "Expected Update to be rejected due to validation failure"

update_handle = await handle.start_update(Workflow.my_update, "update-arg")
update_result = await update_handle.result()
assert update_result == "update-result"
result = await handle.result()
assert result == "update-arg"


register_feature(workflows=[Workflow], check_result=checker)
Loading