Skip to content

Commit

Permalink
Workflow start delay (#406)
Browse files Browse the repository at this point in the history
Fixes #404
  • Loading branch information
cretz authored Oct 24, 2023
1 parent 4242dfb commit 97814c2
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 0 deletions.
18 changes: 18 additions & 0 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ async def start_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
Expand All @@ -299,6 +300,7 @@ async def start_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
Expand All @@ -325,6 +327,7 @@ async def start_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
Expand All @@ -351,6 +354,7 @@ async def start_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
Expand All @@ -375,6 +379,7 @@ async def start_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
Expand All @@ -400,6 +405,9 @@ async def start_workflow(
cron_schedule: See https://docs.temporal.io/docs/content/what-is-a-temporal-cron-job/
memo: Memo for the workflow.
search_attributes: Search attributes for the workflow.
start_delay: Amount of time to wait before starting the workflow.
This does not work with ``cron_schedule``. This is currently
experimental.
start_signal: If present, this signal is sent as signal-with-start
instead of traditional workflow start.
start_signal_args: Arguments for start_signal if start_signal
Expand Down Expand Up @@ -444,6 +452,7 @@ async def start_workflow(
cron_schedule=cron_schedule,
memo=memo,
search_attributes=search_attributes,
start_delay=start_delay,
headers={},
start_signal=start_signal,
start_signal_args=start_signal_args,
Expand All @@ -469,6 +478,7 @@ async def execute_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
Expand All @@ -493,6 +503,7 @@ async def execute_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
Expand All @@ -519,6 +530,7 @@ async def execute_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
Expand All @@ -545,6 +557,7 @@ async def execute_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
Expand All @@ -569,6 +582,7 @@ async def execute_workflow(
cron_schedule: str = "",
memo: Optional[Mapping[str, Any]] = None,
search_attributes: Optional[temporalio.common.SearchAttributes] = None,
start_delay: Optional[timedelta] = None,
start_signal: Optional[str] = None,
start_signal_args: Sequence[Any] = [],
rpc_metadata: Mapping[str, str] = {},
Expand Down Expand Up @@ -597,6 +611,7 @@ async def execute_workflow(
cron_schedule=cron_schedule,
memo=memo,
search_attributes=search_attributes,
start_delay=start_delay,
start_signal=start_signal,
start_signal_args=start_signal_args,
rpc_metadata=rpc_metadata,
Expand Down Expand Up @@ -3753,6 +3768,7 @@ class StartWorkflowInput:
cron_schedule: str
memo: Optional[Mapping[str, Any]]
search_attributes: Optional[temporalio.common.SearchAttributes]
start_delay: Optional[timedelta]
headers: Mapping[str, temporalio.api.common.v1.Payload]
start_signal: Optional[str]
start_signal_args: Sequence[Any]
Expand Down Expand Up @@ -4233,6 +4249,8 @@ async def start_workflow(
temporalio.converter.encode_search_attributes(
input.search_attributes, req.search_attributes
)
if input.start_delay is not None:
req.workflow_start_delay.FromTimedelta(input.start_delay)
if input.headers is not None:
temporalio.common._apply_headers(input.headers, req.header.fields)

Expand Down
47 changes: 47 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,53 @@ async def test_start_with_signal(client: Client, worker: ExternalWorker):
assert "some signal arg" == await handle.result()


async def test_start_delay(
client: Client, worker: ExternalWorker, env: WorkflowEnvironment
):
if env.supports_time_skipping:
pytest.skip("Java test server does not support start delay")
start_delay = timedelta(hours=1, minutes=20, seconds=30)
handle = await client.start_workflow(
"kitchen_sink",
KSWorkflowParams(
actions=[KSAction(result=KSResultAction(value="some result"))]
),
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
start_delay=start_delay,
)
# Check that first event has start delay
first_event = [e async for e in handle.fetch_history_events()][0]
assert (
start_delay
== first_event.workflow_execution_started_event_attributes.first_workflow_task_backoff.ToTimedelta()
)


async def test_signal_with_start_delay(
client: Client, worker: ExternalWorker, env: WorkflowEnvironment
):
if env.supports_time_skipping:
pytest.skip("Java test server does not support start delay")
start_delay = timedelta(hours=1, minutes=20, seconds=30)
handle = await client.start_workflow(
"kitchen_sink",
KSWorkflowParams(
actions=[KSAction(result=KSResultAction(value="some result"))]
),
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
start_delay=start_delay,
start_signal="some-signal",
)
# Check that first event has start delay
first_event = [e async for e in handle.fetch_history_events()][0]
assert (
start_delay
== first_event.workflow_execution_started_event_attributes.first_workflow_task_backoff.ToTimedelta()
)


async def test_result_follow_continue_as_new(
client: Client, worker: ExternalWorker, env: WorkflowEnvironment
):
Expand Down

0 comments on commit 97814c2

Please sign in to comment.