diff --git a/temporalio/client.py b/temporalio/client.py index 008cdbcc..aceb3301 100644 --- a/temporalio/client.py +++ b/temporalio/client.py @@ -275,6 +275,7 @@ async def start_workflow( cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None, + start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], rpc_metadata: Mapping[str, str] = {}, @@ -299,6 +300,7 @@ async def start_workflow( cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None, + start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], rpc_metadata: Mapping[str, str] = {}, @@ -325,6 +327,7 @@ async def start_workflow( cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None, + start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], rpc_metadata: Mapping[str, str] = {}, @@ -351,6 +354,7 @@ async def start_workflow( cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None, + start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], rpc_metadata: Mapping[str, str] = {}, @@ -375,6 +379,7 @@ async def start_workflow( cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None, + start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], rpc_metadata: Mapping[str, str] = {}, @@ -400,6 +405,9 @@ async def start_workflow( cron_schedule: See https://docs.temporal.io/docs/content/what-is-a-temporal-cron-job/ memo: Memo for the workflow. search_attributes: Search attributes for the workflow. + start_delay: Amount of time to wait before starting the workflow. + This does not work with ``cron_schedule``. This is currently + experimental. start_signal: If present, this signal is sent as signal-with-start instead of traditional workflow start. start_signal_args: Arguments for start_signal if start_signal @@ -444,6 +452,7 @@ async def start_workflow( cron_schedule=cron_schedule, memo=memo, search_attributes=search_attributes, + start_delay=start_delay, headers={}, start_signal=start_signal, start_signal_args=start_signal_args, @@ -469,6 +478,7 @@ async def execute_workflow( cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None, + start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], rpc_metadata: Mapping[str, str] = {}, @@ -493,6 +503,7 @@ async def execute_workflow( cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None, + start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], rpc_metadata: Mapping[str, str] = {}, @@ -519,6 +530,7 @@ async def execute_workflow( cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None, + start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], rpc_metadata: Mapping[str, str] = {}, @@ -545,6 +557,7 @@ async def execute_workflow( cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None, + start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], rpc_metadata: Mapping[str, str] = {}, @@ -569,6 +582,7 @@ async def execute_workflow( cron_schedule: str = "", memo: Optional[Mapping[str, Any]] = None, search_attributes: Optional[temporalio.common.SearchAttributes] = None, + start_delay: Optional[timedelta] = None, start_signal: Optional[str] = None, start_signal_args: Sequence[Any] = [], rpc_metadata: Mapping[str, str] = {}, @@ -597,6 +611,7 @@ async def execute_workflow( cron_schedule=cron_schedule, memo=memo, search_attributes=search_attributes, + start_delay=start_delay, start_signal=start_signal, start_signal_args=start_signal_args, rpc_metadata=rpc_metadata, @@ -3753,6 +3768,7 @@ class StartWorkflowInput: cron_schedule: str memo: Optional[Mapping[str, Any]] search_attributes: Optional[temporalio.common.SearchAttributes] + start_delay: Optional[timedelta] headers: Mapping[str, temporalio.api.common.v1.Payload] start_signal: Optional[str] start_signal_args: Sequence[Any] @@ -4233,6 +4249,8 @@ async def start_workflow( temporalio.converter.encode_search_attributes( input.search_attributes, req.search_attributes ) + if input.start_delay is not None: + req.workflow_start_delay.FromTimedelta(input.start_delay) if input.headers is not None: temporalio.common._apply_headers(input.headers, req.header.fields) diff --git a/tests/test_client.py b/tests/test_client.py index 7ce9d307..5760f7a6 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -140,6 +140,53 @@ async def test_start_with_signal(client: Client, worker: ExternalWorker): assert "some signal arg" == await handle.result() +async def test_start_delay( + client: Client, worker: ExternalWorker, env: WorkflowEnvironment +): + if env.supports_time_skipping: + pytest.skip("Java test server does not support start delay") + start_delay = timedelta(hours=1, minutes=20, seconds=30) + handle = await client.start_workflow( + "kitchen_sink", + KSWorkflowParams( + actions=[KSAction(result=KSResultAction(value="some result"))] + ), + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + start_delay=start_delay, + ) + # Check that first event has start delay + first_event = [e async for e in handle.fetch_history_events()][0] + assert ( + start_delay + == first_event.workflow_execution_started_event_attributes.first_workflow_task_backoff.ToTimedelta() + ) + + +async def test_signal_with_start_delay( + client: Client, worker: ExternalWorker, env: WorkflowEnvironment +): + if env.supports_time_skipping: + pytest.skip("Java test server does not support start delay") + start_delay = timedelta(hours=1, minutes=20, seconds=30) + handle = await client.start_workflow( + "kitchen_sink", + KSWorkflowParams( + actions=[KSAction(result=KSResultAction(value="some result"))] + ), + id=f"workflow-{uuid.uuid4()}", + task_queue=worker.task_queue, + start_delay=start_delay, + start_signal="some-signal", + ) + # Check that first event has start delay + first_event = [e async for e in handle.fetch_history_events()][0] + assert ( + start_delay + == first_event.workflow_execution_started_event_attributes.first_workflow_task_backoff.ToTimedelta() + ) + + async def test_result_follow_continue_as_new( client: Client, worker: ExternalWorker, env: WorkflowEnvironment ):