Skip to content

Commit

Permalink
Describe task queue
Browse files Browse the repository at this point in the history
  • Loading branch information
cretz committed Sep 30, 2024
1 parent a1b5d65 commit 5e57616
Show file tree
Hide file tree
Showing 2 changed files with 260 additions and 0 deletions.
204 changes: 204 additions & 0 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1108,6 +1108,45 @@ async def get_worker_task_reachability(
)
)

async def describe_task_queue(
self,
task_queue: str,
task_queue_types: Sequence[TaskQueueType] = [],
report_pollers: bool = False,
report_stats: bool = False,
rpc_metadata: Mapping[str, str] = {},
rpc_timeout: Optional[timedelta] = None,
) -> TaskQueueDescription:
"""
Describe task queue.
.. note::
This is only for unversioned workers. Worker versioning is not yet
supported for describing task queue.
Args:
task_queue: Name of the task queue. Sticky queues are not supported.
task_queue_types: Task queue types to report info about. If not
present or empty, all types are considered.
report_pollers: Include list of pollers for requested task queue types.
report_stats: Include task queue stats for requested task queue types.
"""

if not report_pollers and not report_stats:
raise ValueError(
"At least one of report_pollers or report_stats must be True"
)
return await self._impl.describe_task_queue(
DescribeTaskQueueInput(
task_queue,
task_queue_types,
report_pollers,
report_stats,
rpc_metadata,
rpc_timeout,
)
)


class ClientConfig(TypedDict, total=False):
"""TypedDict of config originally passed to :py:meth:`Client`."""
Expand Down Expand Up @@ -4814,6 +4853,18 @@ class GetWorkerTaskReachabilityInput:
rpc_timeout: Optional[timedelta]


@dataclass
class DescribeTaskQueueInput:
"""Input for :py:meth:`OutboundInterceptor.describe_task_queue`."""

task_queue: str
task_queue_types: Sequence[TaskQueueType]
report_pollers: bool
report_stats: bool
rpc_metadata: Mapping[str, str]
rpc_timeout: Optional[timedelta]


@dataclass
class Interceptor:
"""Interceptor for clients.
Expand Down Expand Up @@ -4983,6 +5034,13 @@ async def get_worker_task_reachability(
"""Called for every :py:meth:`Client.get_worker_task_reachability` call."""
return await self.next.get_worker_task_reachability(input)

### Other

async def describe_task_queue(
self, input: DescribeTaskQueueInput
) -> TaskQueueDescription:
return await self.next.describe_task_queue(input)


class _ClientImpl(OutboundInterceptor):
def __init__(self, client: Client) -> None:
Expand Down Expand Up @@ -5726,6 +5784,27 @@ async def get_worker_task_reachability(
)
return WorkerTaskReachability._from_proto(resp)

### Other calls

async def describe_task_queue(
self, input: DescribeTaskQueueInput
) -> TaskQueueDescription:
req = temporalio.api.workflowservice.v1.DescribeTaskQueueRequest(
namespace=self._client.namespace,
task_queue=temporalio.api.taskqueue.v1.TaskQueue(name=input.task_queue),
api_mode=temporalio.api.enums.v1.DescribeTaskQueueMode.DESCRIBE_TASK_QUEUE_MODE_ENHANCED,
task_queue_types=[
temporalio.api.enums.v1.TaskQueueType.ValueType(t)
for t in input.task_queue_types
],
report_pollers=input.report_pollers,
report_stats=input.report_stats,
)
resp = await self._client.workflow_service.describe_task_queue(
req, retry=True, metadata=input.rpc_metadata, timeout=input.rpc_timeout
)
return TaskQueueDescription._from_proto(resp)


def _history_from_json(
history: Union[str, Dict[str, Any]],
Expand Down Expand Up @@ -6114,6 +6193,131 @@ def _to_proto(self) -> temporalio.api.enums.v1.TaskReachability.ValueType:
)


class TaskQueueType(IntEnum):
WORKFLOW = int(temporalio.api.enums.v1.TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW)
ACTIVITY = int(temporalio.api.enums.v1.TaskQueueType.TASK_QUEUE_TYPE_ACTIVITY)
NEXUS = int(temporalio.api.enums.v1.TaskQueueType.TASK_QUEUE_TYPE_NEXUS)


@dataclass
class TaskQueueDescription:
types: Mapping[TaskQueueType, TaskQueueTypeInfo]
"""
Task queue type information, keyed by task queue type.
.. note::
This is only for unversioned workers. Worker versioning is not yet
supported for task queue description.
"""

@staticmethod
def _from_proto(
resp: temporalio.api.workflowservice.v1.DescribeTaskQueueResponse,
) -> TaskQueueDescription:
return TaskQueueDescription(
types={
TaskQueueType(type): TaskQueueTypeInfo._from_proto(info)
for type, info in resp.versions_info[""].types_info.items()
}
)


@dataclass
class TaskQueueTypeInfo:
pollers: Sequence[TaskQueuePollerInfo]
"""
Information about recent pollers, or empty if not requested or none
recently.
"""

stats: Optional[TaskQueueStats]
"""
Task queue stats, or none if not requested.
"""

@staticmethod
def _from_proto(
info: temporalio.api.taskqueue.v1.TaskQueueTypeInfo,
) -> TaskQueueTypeInfo:
return TaskQueueTypeInfo(
pollers=[
TaskQueuePollerInfo._from_proto(poller_info)
for poller_info in info.pollers
],
stats=TaskQueueStats._from_proto(info.stats)
if info.HasField("stats")
else None,
)


@dataclass
class TaskQueuePollerInfo:
last_access_time: Optional[datetime]
# Time of the last poll if any.

identity: str
# Identity of the worker/client who is polling this task queue.

rate_per_second: Optional[float]
# Polling rate.

@staticmethod
def _from_proto(
info: temporalio.api.taskqueue.v1.PollerInfo,
) -> TaskQueuePollerInfo:
return TaskQueuePollerInfo(
last_access_time=info.last_access_time.ToDatetime().replace(
tzinfo=timezone.utc
)
if info.HasField("last_access_time")
else None,
identity=info.identity,
rate_per_second=info.rate_per_second if info.rate_per_second != 0 else None,
)


@dataclass
class TaskQueueStats:
approximate_backlog_count: int
"""
The approximate number of tasks backlogged in this task queue. May count
expired tasks but eventually converges to the right value.
"""

approximate_backlog_age: timedelta
"""
Approximate age of the oldest task in the backlog based on the create
timestamp of the task at the head of the queue.
"""

backlog_increase_rate: float
""":py:attr:`tasks_add_rate` - :py:attr:`tasks_dispatch_rate`"""

tasks_add_rate: float
"""
Approximate tasks per second added to the task queue based on activity
within a fixed window. This includes both backlogged and sync-matched tasks.
"""

tasks_dispatch_rate: float
"""
Approximate tasks per second dispatched to workers based on activity within
a fixed window. This includes both backlogged and sync-matched tasks.
"""

@staticmethod
def _from_proto(
stats: temporalio.api.taskqueue.v1.TaskQueueStats,
) -> TaskQueueStats:
return TaskQueueStats(
approximate_backlog_count=stats.approximate_backlog_count,
approximate_backlog_age=stats.approximate_backlog_age.ToTimedelta(),
backlog_increase_rate=stats.tasks_add_rate - stats.tasks_dispatch_rate,
tasks_add_rate=stats.tasks_add_rate,
tasks_dispatch_rate=stats.tasks_dispatch_rate,
)


class CloudOperationsClient:
"""Client for accessing Temporal Cloud Operations API.
Expand Down
56 changes: 56 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
SignalWorkflowInput,
StartWorkflowInput,
StartWorkflowUpdateInput,
TaskQueueType,
TaskReachabilityType,
TerminateWorkflowInput,
WorkflowContinuedAsNewError,
Expand Down Expand Up @@ -1323,3 +1324,58 @@ async def test_cloud_client_simple():
GetNamespaceRequest(namespace=os.environ["TEMPORAL_CLIENT_CLOUD_NAMESPACE"])
)
assert os.environ["TEMPORAL_CLIENT_CLOUD_NAMESPACE"] == result.namespace.namespace


@workflow.defn
class TaskQueueDescribeWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return f"Hello, {name}!"


async def test_describe_task_queue(client: Client):
task_queue = f"tq-{uuid.uuid4()}"
# Simple describe when nothing present
desc = await client.describe_task_queue(
task_queue, report_pollers=True, report_stats=True
)
# Confirm activity and workflow have no pollers
assert not desc.types[TaskQueueType.ACTIVITY].pollers
assert not desc.types[TaskQueueType.WORKFLOW].pollers

# Confirm no add rate
stats = desc.types[TaskQueueType.ACTIVITY].stats
assert stats and stats.tasks_add_rate == 0.0
stats = desc.types[TaskQueueType.WORKFLOW].stats
assert stats and stats.tasks_add_rate == 0.0

# Run some workflows
async with new_worker(
client, TaskQueueDescribeWorkflow, task_queue=task_queue
) as worker:
for i in range(10):
await client.execute_workflow(
TaskQueueDescribeWorkflow.run,
f"user{i}",
id=f"tq-{uuid.uuid4()}",
task_queue=task_queue,
)

# Describe again (while poller still running)
desc = await client.describe_task_queue(
task_queue, report_pollers=True, report_stats=True
)

# Confirm activity still has no pollers, but workflow has this one
assert not desc.types[TaskQueueType.ACTIVITY].pollers
assert len(desc.types[TaskQueueType.WORKFLOW].pollers) == 1
assert (
desc.types[TaskQueueType.WORKFLOW].pollers[0].identity
== client.service_client.config.identity
)

# Confirm activity still has no stats, but workflow does
stats = desc.types[TaskQueueType.ACTIVITY].stats
assert stats and stats.tasks_add_rate == 0.0
stats = desc.types[TaskQueueType.WORKFLOW].stats
assert stats and stats.tasks_add_rate != 0.0

0 comments on commit 5e57616

Please sign in to comment.