Skip to content

Commit

Permalink
Merge branch 'main' into gh-arm-runner
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz authored Jul 18, 2024
2 parents 2137826 + 913b4b6 commit 5ce7eea
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 59 deletions.
7 changes: 7 additions & 0 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -966,6 +966,7 @@ def get_schedule_handle(self, id: str) -> ScheduleHandle:

async def list_schedules(
self,
query: Optional[str] = None,
*,
page_size: int = 1000,
next_page_token: Optional[bytes] = None,
Expand All @@ -982,6 +983,9 @@ async def list_schedules(
Args:
page_size: Maximum number of results for each page.
query: A Temporal visibility list filter. See Temporal documentation
concerning visibility list filters including behavior when left
unset.
next_page_token: A previously obtained next page token if doing
pagination. Usually not needed as the iterator automatically
starts from the beginning.
Expand All @@ -998,6 +1002,7 @@ async def list_schedules(
next_page_token=next_page_token,
rpc_metadata=rpc_metadata,
rpc_timeout=rpc_timeout,
query=query,
)
)

Expand Down Expand Up @@ -4195,6 +4200,7 @@ async def fetch_next_page(self, *, page_size: Optional[int] = None) -> None:
namespace=self._client.namespace,
maximum_page_size=page_size or self._input.page_size,
next_page_token=self._next_page_token or b"",
query=self._input.query or "",
),
retry=True,
metadata=self._input.rpc_metadata,
Expand Down Expand Up @@ -4704,6 +4710,7 @@ class ListSchedulesInput:
next_page_token: Optional[bytes]
rpc_metadata: Mapping[str, str]
rpc_timeout: Optional[timedelta]
query: Optional[str] = None


@dataclass
Expand Down
25 changes: 23 additions & 2 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,10 +909,20 @@ async def update_desc_get_action_count() -> int:
)

# Create 4 more schedules of the same type and confirm they are in list
# eventually
# eventually. Two of them we will create with search attributes.
keyword_attr_key = SearchAttributeKey.for_keyword("python-test-schedule-keyword")
await ensure_search_attributes_present(client, keyword_attr_key)
expected_ids = [handle.id]
for i in range(4):
new_handle = await client.create_schedule(f"{handle.id}-{i + 1}", desc.schedule)
new_handle = await client.create_schedule(
f"{handle.id}-{i + 1}",
desc.schedule,
search_attributes=TypedSearchAttributes(
[SearchAttributePair(keyword_attr_key, "some-schedule-attr")]
)
if i >= 2
else None,
)
expected_ids.append(new_handle.id)

async def list_ids() -> List[str]:
Expand All @@ -925,6 +935,17 @@ async def list_ids() -> List[str]:

await assert_eq_eventually(expected_ids, list_ids)

# Now do a list w/ query for certain search attributes and confirm
list_descs = [
d
async for d in await client.list_schedules(
"`python-test-schedule-keyword` = 'some-schedule-attr'"
)
]
assert len(list_descs) == 2
assert list_descs[0].id in [f"{handle.id}-3", f"{handle.id}-4"]
assert list_descs[1].id in [f"{handle.id}-3", f"{handle.id}-4"]

# Delete all of the schedules
for id in await list_ids():
await client.get_schedule_handle(id).delete()
Expand Down
118 changes: 61 additions & 57 deletions tests/worker/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -1113,37 +1113,37 @@ async def cancel_child(self) -> None:
@pytest.mark.parametrize("use_execute", [True, False])
async def test_workflow_cancel_child_started(client: Client, use_execute: bool):
async with new_worker(client, CancelChildWorkflow, LongSleepWorkflow) as worker:
with pytest.raises(WorkflowFailureError) as err:
# Start workflow
handle = await client.start_workflow(
CancelChildWorkflow.run,
use_execute,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
# Start workflow
handle = await client.start_workflow(
CancelChildWorkflow.run,
use_execute,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)

# Wait until child started
async def child_started() -> bool:
try:
return await handle.query(
CancelChildWorkflow.ready
) and await client.get_workflow_handle_for(
LongSleepWorkflow.run, # type: ignore[arg-type]
workflow_id=f"{handle.id}_child",
).query(LongSleepWorkflow.started)
except RPCError as err:
# Ignore not-found or failed precondition because child may
# not have started yet
if (
err.status == RPCStatusCode.NOT_FOUND
or err.status == RPCStatusCode.FAILED_PRECONDITION
):
return False
raise
# Wait until child started
async def child_started() -> bool:
try:
return await handle.query(
CancelChildWorkflow.ready
) and await client.get_workflow_handle_for(
LongSleepWorkflow.run, # type: ignore[arg-type]
workflow_id=f"{handle.id}_child",
).query(LongSleepWorkflow.started)
except RPCError as err:
# Ignore not-found or failed precondition because child may
# not have started yet
if (
err.status == RPCStatusCode.NOT_FOUND
or err.status == RPCStatusCode.FAILED_PRECONDITION
):
return False
raise

await assert_eq_eventually(True, child_started)
# Send cancel signal and wait on the handle
await handle.signal(CancelChildWorkflow.cancel_child)
await assert_eq_eventually(True, child_started)
# Send cancel signal and wait on the handle
await handle.signal(CancelChildWorkflow.cancel_child)
with pytest.raises(WorkflowFailureError) as err:
await handle.result()
assert isinstance(err.value.cause, ChildWorkflowError)
assert isinstance(err.value.cause.cause, CancelledError)
Expand Down Expand Up @@ -2374,17 +2374,17 @@ async def test_workflow_already_started(client: Client, env: WorkflowEnvironment
async with new_worker(client, LongSleepWorkflow) as worker:
id = f"workflow-{uuid.uuid4()}"
# Try to start it twice
await client.start_workflow(
LongSleepWorkflow.run,
id=id,
task_queue=worker.task_queue,
)
with pytest.raises(WorkflowAlreadyStartedError):
await client.start_workflow(
LongSleepWorkflow.run,
id=id,
task_queue=worker.task_queue,
)
await client.start_workflow(
LongSleepWorkflow.run,
id=id,
task_queue=worker.task_queue,
)


@workflow.defn
Expand Down Expand Up @@ -3263,15 +3263,15 @@ async def test_workflow_custom_failure_converter(client: Client):
client = Client(**config)

# Run workflow and confirm error
with pytest.raises(WorkflowFailureError) as err:
async with new_worker(
client, CustomErrorWorkflow, activities=[custom_error_activity]
) as worker:
handle = await client.start_workflow(
CustomErrorWorkflow.run,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
async with new_worker(
client, CustomErrorWorkflow, activities=[custom_error_activity]
) as worker:
handle = await client.start_workflow(
CustomErrorWorkflow.run,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
with pytest.raises(WorkflowFailureError) as err:
await handle.result()

# Check error is as expected
Expand Down Expand Up @@ -4606,13 +4606,13 @@ async def test_workflow_timeout_support(client: Client, approach: str):
client, TimeoutSupportWorkflow, activities=[wait_cancel]
) as worker:
# Run and confirm activity gets cancelled
handle = await client.start_workflow(
TimeoutSupportWorkflow.run,
approach,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
with pytest.raises(WorkflowFailureError) as err:
handle = await client.start_workflow(
TimeoutSupportWorkflow.run,
approach,
id=f"workflow-{uuid.uuid4()}",
task_queue=worker.task_queue,
)
await handle.result()
assert isinstance(err.value.cause, ActivityError)
assert isinstance(err.value.cause.cause, CancelledError)
Expand Down Expand Up @@ -4946,8 +4946,8 @@ async def run(self, param: str) -> None:


async def test_workflow_fail_on_bad_input(client: Client):
with pytest.raises(WorkflowFailureError) as err:
async with new_worker(client, FailOnBadInputWorkflow) as worker:
async with new_worker(client, FailOnBadInputWorkflow) as worker:
with pytest.raises(WorkflowFailureError) as err:
await client.execute_workflow(
"FailOnBadInputWorkflow",
123,
Expand Down Expand Up @@ -5484,15 +5484,19 @@ async def _run_workflow_and_get_warning(self) -> bool:
assert update_task
with pytest.raises(RPCError) as update_err:
await update_task
assert (
update_err.value.status == RPCStatusCode.NOT_FOUND
and "workflow execution already completed"
in str(update_err.value).lower()
)
assert update_err.value.status == RPCStatusCode.NOT_FOUND and (
str(update_err.value).lower()
== "workflow execution already completed"
)

with pytest.raises(WorkflowFailureError) as err:
await handle.result()
assert "workflow execution failed" in str(err.value).lower()
assert isinstance(
err.value.cause,
{"cancellation": CancelledError, "failure": ApplicationError}[
self.workflow_termination_type
],
)

unfinished_handler_warning_emitted = any(
issubclass(w.category, self._unfinished_handler_warning_cls)
Expand Down

0 comments on commit 5ce7eea

Please sign in to comment.