diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 55153c3b..15b8d6da 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -21,7 +21,14 @@ repos: language: python pass_filenames: false - repo: https://github.com/asottile/pyupgrade - rev: v3.15.2 + rev: v3.17.0 hooks: - id: pyupgrade args: [--py39-plus] + - repo: https://github.com/charliermarsh/ruff-pre-commit + rev: v0.6.9 + hooks: + - id: ruff + args: + - --fix + exclude: ^(tests/.*|examples/.*|docs/.*)$ \ No newline at end of file diff --git a/examples/worker.py b/examples/worker.py index feb744a6..4873ead5 100644 --- a/examples/worker.py +++ b/examples/worker.py @@ -1,5 +1,4 @@ import asyncio -from typing import Dict from pyzeebe import ( Job, @@ -52,14 +51,14 @@ async def example_logging_task_decorator(job: Job) -> Job: # Create a task like this: @worker.task(task_type="test") -def example_task() -> Dict: - return {"output": f"Hello world, test!"} +def example_task() -> dict: + return {"output": "Hello world, test!"} # Or like this: @worker.task(task_type="test2") -async def second_example_task() -> Dict: - return {"output": f"Hello world, test2!"} +async def second_example_task() -> dict: + return {"output": "Hello world, test2!"} # Create a task that will return a single value (not a dict) like this: @@ -98,7 +97,7 @@ async def exception_task(): before=[example_logging_task_decorator], after=[example_logging_task_decorator], ) -async def decorator_task() -> Dict: +async def decorator_task() -> dict: return {"output": "Hello world, test!"} diff --git a/pyproject.toml b/pyproject.toml index 0c619b6c..d317a4da 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -69,6 +69,25 @@ profile = "black" [tool.pytest.ini_options] asyncio_mode = "auto" +[tool.ruff] +target-version = "py39" + +[tool.ruff.lint] +select = [ + "E", # pycodestyle errors + "W", # pycodestyle warnings + "F", # pyflakes + "C", # flake8-comprehensions + "B", # flake8-bugbear + "TID", # flake8-tidy-imports + "T20", # flake8-print + "ASYNC", # flake8-async + "FA", # flake8-future-annotations +] +ignore = [ + "E501", # line too long, handled by black +] + [build-system] requires = ["poetry-core>=1.0.0"] build-backend = "poetry.core.masonry.api" diff --git a/pyzeebe/__init__.py b/pyzeebe/__init__.py index 48368309..dfbea6b9 100644 --- a/pyzeebe/__init__.py +++ b/pyzeebe/__init__.py @@ -1,5 +1,10 @@ from pyzeebe import errors -from pyzeebe.channel import * +from pyzeebe.channel import ( + create_camunda_cloud_channel, + create_insecure_channel, + create_oauth2_client_credentials_channel, + create_secure_channel, +) from pyzeebe.client.client import ZeebeClient from pyzeebe.client.sync_client import SyncZeebeClient from pyzeebe.credentials.base import CredentialsABC @@ -18,6 +23,7 @@ "create_camunda_cloud_channel", "create_insecure_channel", "create_secure_channel", + "create_oauth2_client_credentials_channel", "ZeebeClient", "SyncZeebeClient", "Job", diff --git a/pyzeebe/channel/__init__.py b/pyzeebe/channel/__init__.py index 3df12b88..e84b704e 100644 --- a/pyzeebe/channel/__init__.py +++ b/pyzeebe/channel/__init__.py @@ -4,3 +4,10 @@ create_oauth2_client_credentials_channel, ) from pyzeebe.channel.secure_channel import create_secure_channel + +__all__ = ( + "create_insecure_channel", + "create_camunda_cloud_channel", + "create_oauth2_client_credentials_channel", + "create_secure_channel", +) diff --git a/pyzeebe/channel/camunda_cloud_channel.py b/pyzeebe/channel/camunda_cloud_channel.py index 666751d5..5c81344c 100644 --- a/pyzeebe/channel/camunda_cloud_channel.py +++ b/pyzeebe/channel/camunda_cloud_channel.py @@ -1,4 +1,4 @@ -from typing import Optional +from __future__ import annotations import grpc from oauthlib import oauth2 @@ -24,7 +24,7 @@ def create_camunda_cloud_channel( client_secret: str, cluster_id: str, region: str = "bru-2", - channel_options: Optional[ChannelArgumentType] = None, + channel_options: ChannelArgumentType | None = None, ) -> grpc.aio.Channel: """ Create channel connected to a Camunda Cloud cluster diff --git a/pyzeebe/channel/channel_options.py b/pyzeebe/channel/channel_options.py index e1803ffd..70bcfd57 100644 --- a/pyzeebe/channel/channel_options.py +++ b/pyzeebe/channel/channel_options.py @@ -8,14 +8,14 @@ https://docs.camunda.io/docs/product-manuals/zeebe/deployment-guide/operations/setting-up-a-cluster/#keep-alive-intervals """ -from typing import Optional +from __future__ import annotations from pyzeebe.types import ChannelArgumentType GRPC_CHANNEL_OPTIONS_DEFAULT: ChannelArgumentType = (("grpc.keepalive_time_ms", 45_000),) -def get_channel_options(options: Optional[ChannelArgumentType] = None) -> ChannelArgumentType: +def get_channel_options(options: ChannelArgumentType | None = None) -> ChannelArgumentType: """ Get default channel options for creating the gRPC channel. diff --git a/pyzeebe/channel/insecure_channel.py b/pyzeebe/channel/insecure_channel.py index 72b54831..f71c7def 100644 --- a/pyzeebe/channel/insecure_channel.py +++ b/pyzeebe/channel/insecure_channel.py @@ -1,4 +1,4 @@ -from typing import Optional +from __future__ import annotations import grpc @@ -8,7 +8,7 @@ def create_insecure_channel( - grpc_address: Optional[str] = None, channel_options: Optional[ChannelArgumentType] = None + grpc_address: str | None = None, channel_options: ChannelArgumentType | None = None ) -> grpc.aio.Channel: """ Create an insecure channel diff --git a/pyzeebe/channel/oauth_channel.py b/pyzeebe/channel/oauth_channel.py index c5ca2755..73fd42eb 100644 --- a/pyzeebe/channel/oauth_channel.py +++ b/pyzeebe/channel/oauth_channel.py @@ -1,5 +1,6 @@ +from __future__ import annotations + from functools import partial -from typing import Optional import grpc @@ -13,12 +14,12 @@ def create_oauth2_client_credentials_channel( client_id: str, client_secret: str, authorization_server: str, - scope: Optional[str] = None, - audience: Optional[str] = None, - channel_credentials: grpc.ChannelCredentials = grpc.ssl_channel_credentials(), - channel_options: Optional[ChannelArgumentType] = None, + scope: str | None = None, + audience: str | None = None, + channel_credentials: grpc.ChannelCredentials | None = None, + channel_options: ChannelArgumentType | None = None, leeway: int = 60, - expire_in: Optional[int] = None, + expire_in: int | None = None, ) -> grpc.aio.Channel: """Create a gRPC channel for connecting to Camunda 8 (Self-Managed) with OAuth2ClientCredentials. @@ -65,7 +66,7 @@ def create_oauth2_client_credentials_channel( call_credentials: grpc.CallCredentials = grpc.metadata_call_credentials(oauth2_client_credentials) composite_credentials: grpc.ChannelCredentials = grpc.composite_channel_credentials( - channel_credentials, call_credentials + channel_credentials or grpc.ssl_channel_credentials(), call_credentials ) channel: grpc.aio.Channel = grpc.aio.secure_channel( @@ -83,10 +84,10 @@ def create_camunda_cloud_channel( scope: str = "Zeebe", authorization_server: str = "https://login.cloud.camunda.io/oauth/token", audience: str = "zeebe.camunda.io", - channel_credentials: grpc.ChannelCredentials = grpc.ssl_channel_credentials(), - channel_options: Optional[ChannelArgumentType] = None, + channel_credentials: grpc.ChannelCredentials | None = None, + channel_options: ChannelArgumentType | None = None, leeway: int = 60, - expire_in: Optional[int] = None, + expire_in: int | None = None, ) -> grpc.aio.Channel: """Create a gRPC channel for connecting to Camunda 8 Cloud (SaaS). @@ -139,9 +140,8 @@ def create_camunda_cloud_channel( oauth2_client_credentials._func_retrieve_token = func call_credentials: grpc.CallCredentials = grpc.metadata_call_credentials(oauth2_client_credentials) - # channel_credentials: grpc.ChannelCredentials = channel_credentials or grpc.ssl_channel_credentials() composite_credentials: grpc.ChannelCredentials = grpc.composite_channel_credentials( - channel_credentials, call_credentials + channel_credentials or grpc.ssl_channel_credentials(), call_credentials ) channel: grpc.aio.Channel = grpc.aio.secure_channel( diff --git a/pyzeebe/channel/secure_channel.py b/pyzeebe/channel/secure_channel.py index 4a4a2124..bddfad9b 100644 --- a/pyzeebe/channel/secure_channel.py +++ b/pyzeebe/channel/secure_channel.py @@ -1,4 +1,4 @@ -from typing import Optional +from __future__ import annotations import grpc @@ -8,9 +8,9 @@ def create_secure_channel( - grpc_address: Optional[str] = None, - channel_options: Optional[ChannelArgumentType] = None, - channel_credentials: Optional[grpc.ChannelCredentials] = None, + grpc_address: str | None = None, + channel_options: ChannelArgumentType | None = None, + channel_credentials: grpc.ChannelCredentials | None = None, ) -> grpc.aio.Channel: """ Create a secure channel diff --git a/pyzeebe/channel/utils.py b/pyzeebe/channel/utils.py index fcea921d..7b81909d 100644 --- a/pyzeebe/channel/utils.py +++ b/pyzeebe/channel/utils.py @@ -1,11 +1,12 @@ +from __future__ import annotations + import os -from typing import Optional DEFAULT_ZEEBE_ADDRESS = "localhost:26500" def create_address( - grpc_address: Optional[str] = None, + grpc_address: str | None = None, ) -> str: if grpc_address: return grpc_address diff --git a/pyzeebe/client/client.py b/pyzeebe/client/client.py index f40e22d1..f51de6f8 100644 --- a/pyzeebe/client/client.py +++ b/pyzeebe/client/client.py @@ -1,4 +1,6 @@ -from typing import Iterable, Optional +from __future__ import annotations + +from collections.abc import Iterable import grpc @@ -28,9 +30,9 @@ def __init__(self, grpc_channel: grpc.aio.Channel, max_connection_retries: int = async def run_process( self, bpmn_process_id: str, - variables: Optional[Variables] = None, + variables: Variables | None = None, version: int = -1, - tenant_id: Optional[str] = None, + tenant_id: str | None = None, ) -> CreateProcessInstanceResponse: """ Run process @@ -61,11 +63,11 @@ async def run_process( async def run_process_with_result( self, bpmn_process_id: str, - variables: Optional[Variables] = None, + variables: Variables | None = None, version: int = -1, timeout: int = 0, - variables_to_fetch: Optional[Iterable[str]] = None, - tenant_id: Optional[str] = None, + variables_to_fetch: Iterable[str] | None = None, + tenant_id: str | None = None, ) -> CreateProcessInstanceWithResultResponse: """ Run process and wait for the result. @@ -121,9 +123,7 @@ async def cancel_process_instance(self, process_instance_key: int) -> CancelProc """ return await self.zeebe_adapter.cancel_process_instance(process_instance_key=process_instance_key) - async def deploy_resource( - self, *resource_file_path: str, tenant_id: Optional[str] = None - ) -> DeployResourceResponse: + async def deploy_resource(self, *resource_file_path: str, tenant_id: str | None = None) -> DeployResourceResponse: """ Deploy one or more processes @@ -150,10 +150,10 @@ async def publish_message( self, name: str, correlation_key: str, - variables: Optional[Variables] = None, + variables: Variables | None = None, time_to_live_in_milliseconds: int = 60000, - message_id: Optional[str] = None, - tenant_id: Optional[str] = None, + message_id: str | None = None, + tenant_id: str | None = None, ) -> PublishMessageResponse: """ Publish a message diff --git a/pyzeebe/client/sync_client.py b/pyzeebe/client/sync_client.py index 35fcd414..f23c2b47 100644 --- a/pyzeebe/client/sync_client.py +++ b/pyzeebe/client/sync_client.py @@ -1,5 +1,6 @@ +from __future__ import annotations + import asyncio -from typing import List, Optional import grpc @@ -22,20 +23,20 @@ def __init__(self, grpc_channel: grpc.aio.Channel, max_connection_retries: int = def run_process( self, bpmn_process_id: str, - variables: Optional[Variables] = None, + variables: Variables | None = None, version: int = -1, - tenant_id: Optional[str] = None, + tenant_id: str | None = None, ) -> CreateProcessInstanceResponse: return self.loop.run_until_complete(self.client.run_process(bpmn_process_id, variables, version, tenant_id)) def run_process_with_result( self, bpmn_process_id: str, - variables: Optional[Variables] = None, + variables: Variables | None = None, version: int = -1, timeout: int = 0, - variables_to_fetch: Optional[List[str]] = None, - tenant_id: Optional[str] = None, + variables_to_fetch: list[str] | None = None, + tenant_id: str | None = None, ) -> CreateProcessInstanceWithResultResponse: return self.loop.run_until_complete( self.client.run_process_with_result( @@ -46,17 +47,17 @@ def run_process_with_result( def cancel_process_instance(self, process_instance_key: int) -> CancelProcessInstanceResponse: return self.loop.run_until_complete(self.client.cancel_process_instance(process_instance_key)) - def deploy_resource(self, *resource_file_path: str, tenant_id: Optional[str] = None) -> DeployResourceResponse: + def deploy_resource(self, *resource_file_path: str, tenant_id: str | None = None) -> DeployResourceResponse: return self.loop.run_until_complete(self.client.deploy_resource(*resource_file_path, tenant_id=tenant_id)) def publish_message( self, name: str, correlation_key: str, - variables: Optional[Variables] = None, + variables: Variables | None = None, time_to_live_in_milliseconds: int = 60000, - message_id: Optional[str] = None, - tenant_id: Optional[str] = None, + message_id: str | None = None, + tenant_id: str | None = None, ) -> PublishMessageResponse: return self.loop.run_until_complete( self.client.publish_message( diff --git a/pyzeebe/credentials/__init__.py b/pyzeebe/credentials/__init__.py index 0b11026c..02c74df4 100644 --- a/pyzeebe/credentials/__init__.py +++ b/pyzeebe/credentials/__init__.py @@ -2,3 +2,11 @@ from .camunda_identity import CamundaIdentityCredentials from .oauth import Oauth2ClientCredentialsMetadataPlugin, OAuth2MetadataPlugin from .plugins import AuthMetadataPlugin + +__all__ = ( + "CredentialsABC", + "CamundaIdentityCredentials", + "Oauth2ClientCredentialsMetadataPlugin", + "OAuth2MetadataPlugin", + "AuthMetadataPlugin", +) diff --git a/pyzeebe/credentials/camunda_identity.py b/pyzeebe/credentials/camunda_identity.py index 44fc1e9c..d657d4d9 100644 --- a/pyzeebe/credentials/camunda_identity.py +++ b/pyzeebe/credentials/camunda_identity.py @@ -1,6 +1,8 @@ +from __future__ import annotations + import datetime import threading -from typing import Any, Dict, Optional +from typing import Any import requests @@ -27,7 +29,7 @@ def __init__( client_id: str, client_secret: str, audience: str = "zeebe-api", - refresh_threshold_seconds: int = 20 + refresh_threshold_seconds: int = 20, ) -> None: self.oauth_url = oauth_url self.client_id = client_id @@ -37,8 +39,8 @@ def __init__( self._lock = threading.Lock() self._refresh_threshold = datetime.timedelta(seconds=refresh_threshold_seconds) - self._token: Optional[Dict[str, Any]] = None - self._expires_in: Optional[datetime.datetime] = None + self._token: dict[str, Any] | None = None + self._expires_in: datetime.datetime | None = None def _expired(self) -> bool: return ( @@ -83,4 +85,4 @@ def get_auth_metadata(self, context: CallContext) -> AuthMetadata: with self._lock: if self._expired() is True: self._refresh() - return (("authorization", "Bearer {}".format(self._token)),) + return (("authorization", f"Bearer {self._token}"),) diff --git a/pyzeebe/credentials/oauth.py b/pyzeebe/credentials/oauth.py index 2065dedb..72cd2385 100644 --- a/pyzeebe/credentials/oauth.py +++ b/pyzeebe/credentials/oauth.py @@ -1,9 +1,11 @@ +from __future__ import annotations + import json import logging import time import timeit from functools import partial -from typing import Any, Dict, Optional +from typing import Any import grpc import requests @@ -15,10 +17,10 @@ def _sign_request( callback: grpc.AuthMetadataPluginCallback, - token: Optional[str], - error: Optional[Exception], + token: str | None, + error: Exception | None, ) -> None: - metadata = (("authorization", "Bearer {}".format(token)),) + metadata = (("authorization", f"Bearer {token}"),) callback(metadata, error) @@ -35,9 +37,9 @@ class OAuth2MetadataPlugin(grpc.AuthMetadataPlugin): # type: ignore[misc] def __init__( self, oauth2session: OAuth2Session, - func_retrieve_token: partial[Dict[str, Any]], + func_retrieve_token: partial[dict[str, Any]], leeway: int = 60, - expire_in: Optional[int] = None, + expire_in: int | None = None, ) -> None: """AuthMetadataPlugin for OAuth2 Authentication. @@ -52,10 +54,10 @@ def __init__( Should only be used if the token does not contain an "expires_in" attribute. """ self._oauth: OAuth2Session = oauth2session - self._func_retrieve_token: partial[Dict[str, Any]] = func_retrieve_token + self._func_retrieve_token: partial[dict[str, Any]] = func_retrieve_token self._leeway: int = leeway - self._expires_in: Optional[int] = expire_in + self._expires_in: int | None = expire_in if self._expires_in is not None: # NOTE: "expires_in" is only RECOMMENDED # https://datatracker.ietf.org/doc/html/rfc6749#section-5.1 @@ -143,10 +145,10 @@ def __init__( client_id: str, client_secret: str, authorization_server: str, - scope: Optional[str] = None, - audience: Optional[str] = None, + scope: str | None = None, + audience: str | None = None, leeway: int = 60, - expire_in: Optional[int] = None, + expire_in: int | None = None, ): """AuthMetadataPlugin for OAuth2 Client Credentials Authentication based on Oauth2MetadataPlugin. @@ -168,10 +170,10 @@ def __init__( self.client_id: str = client_id self.client_secret: str = client_secret self.authorization_server: str = authorization_server - self.scope: Optional[str] = scope - self.audience: Optional[str] = audience + self.scope: str | None = scope + self.audience: str | None = audience self.leeway: int = leeway - self.expire_in: Optional[int] = expire_in + self.expire_in: int | None = expire_in client = oauth2.BackendApplicationClient(client_id=self.client_id, scope=self.scope) oauth2session = OAuth2Session(client=client) diff --git a/pyzeebe/credentials/typing.py b/pyzeebe/credentials/typing.py index 9e4dd652..128e64e6 100644 --- a/pyzeebe/credentials/typing.py +++ b/pyzeebe/credentials/typing.py @@ -1,6 +1,6 @@ -from typing import Protocol, Tuple, Union +from typing import Protocol, Union -AuthMetadata = Tuple[Tuple[str, Union[str, bytes]], ...] +AuthMetadata = tuple[tuple[str, Union[str, bytes]], ...] class CallContext(Protocol): diff --git a/pyzeebe/errors/__init__.py b/pyzeebe/errors/__init__.py index f56a4dfa..d4aa3fdf 100644 --- a/pyzeebe/errors/__init__.py +++ b/pyzeebe/errors/__init__.py @@ -1,6 +1,57 @@ -from .credentials_errors import * -from .job_errors import * -from .message_errors import * -from .process_errors import * -from .pyzeebe_errors import * -from .zeebe_errors import * +from .credentials_errors import ( + InvalidCamundaCloudCredentialsError, + InvalidOAuthCredentialsError, +) +from .job_errors import ( + ActivateJobsRequestInvalidError, + JobAlreadyDeactivatedError, + JobNotFoundError, +) +from .message_errors import MessageAlreadyExistsError +from .process_errors import ( + InvalidJSONError, + ProcessDefinitionHasNoStartEventError, + ProcessDefinitionNotFoundError, + ProcessInstanceNotFoundError, + ProcessInvalidError, + ProcessTimeoutError, +) +from .pyzeebe_errors import ( + BusinessError, + DuplicateTaskTypeError, + NoVariableNameGivenError, + PyZeebeError, + TaskNotFoundError, +) +from .zeebe_errors import ( + UnknownGrpcStatusCodeError, + ZeebeBackPressureError, + ZeebeDeadlineExceeded, + ZeebeGatewayUnavailableError, + ZeebeInternalError, +) + +__all__ = ( + "InvalidCamundaCloudCredentialsError", + "InvalidOAuthCredentialsError", + "ActivateJobsRequestInvalidError", + "JobAlreadyDeactivatedError", + "JobNotFoundError", + "MessageAlreadyExistsError", + "InvalidJSONError", + "ProcessDefinitionHasNoStartEventError", + "ProcessDefinitionNotFoundError", + "ProcessInstanceNotFoundError", + "ProcessInvalidError", + "ProcessTimeoutError", + "BusinessError", + "DuplicateTaskTypeError", + "NoVariableNameGivenError", + "PyZeebeError", + "TaskNotFoundError", + "UnknownGrpcStatusCodeError", + "ZeebeBackPressureError", + "ZeebeDeadlineExceeded", + "ZeebeGatewayUnavailableError", + "ZeebeInternalError", +) diff --git a/pyzeebe/errors/pyzeebe_errors.py b/pyzeebe/errors/pyzeebe_errors.py index d3e5bc00..f7bba62b 100644 --- a/pyzeebe/errors/pyzeebe_errors.py +++ b/pyzeebe/errors/pyzeebe_errors.py @@ -1,4 +1,4 @@ -from typing import Optional +from __future__ import annotations class PyZeebeError(Exception): @@ -21,17 +21,13 @@ def __init__(self, task_type: str): self.task_type = task_type -class MaxConsecutiveTaskThreadError(PyZeebeError): - pass - - class BusinessError(PyZeebeError): """ Exception that can be raised with a user defined code, to be caught later by an error event in the Zeebe process """ - def __init__(self, error_code: str, msg: Optional[str] = None) -> None: + def __init__(self, error_code: str, msg: str | None = None) -> None: if msg is None: msg = f"Business error with code {error_code}" super().__init__(msg) diff --git a/pyzeebe/function_tools/__init__.py b/pyzeebe/function_tools/__init__.py index fd9d708a..a6d385d8 100644 --- a/pyzeebe/function_tools/__init__.py +++ b/pyzeebe/function_tools/__init__.py @@ -1,6 +1,7 @@ from __future__ import annotations -from typing import Any, Awaitable, Callable, Dict, Optional, TypeVar, Union +from collections.abc import Awaitable +from typing import Any, Callable, Optional, TypeVar, Union from typing_extensions import ParamSpec @@ -11,4 +12,4 @@ AsyncFunction = Callable[Parameters, Awaitable[ReturnType]] Function = Union[SyncFunction[Parameters, ReturnType], AsyncFunction[Parameters, ReturnType]] -DictFunction = Callable[Parameters, Awaitable[Optional[Dict[str, Any]]]] +DictFunction = Callable[Parameters, Awaitable[Optional[dict[str, Any]]]] diff --git a/pyzeebe/function_tools/async_tools.py b/pyzeebe/function_tools/async_tools.py index 80338503..574e1cf1 100644 --- a/pyzeebe/function_tools/async_tools.py +++ b/pyzeebe/function_tools/async_tools.py @@ -2,7 +2,8 @@ import asyncio import functools -from typing import Any, Iterable, List, TypeVar +from collections.abc import Iterable +from typing import Any, TypeVar from typing_extensions import ParamSpec, TypeIs @@ -12,7 +13,7 @@ R = TypeVar("R") -def asyncify_all_functions(functions: Iterable[Function[..., Any]]) -> List[AsyncFunction[..., Any]]: +def asyncify_all_functions(functions: Iterable[Function[..., Any]]) -> list[AsyncFunction[..., Any]]: async_functions: list[AsyncFunction[..., Any]] = [] for function in functions: if not is_async_function(function): diff --git a/pyzeebe/function_tools/dict_tools.py b/pyzeebe/function_tools/dict_tools.py index 4dd9f3b2..b978dfa5 100644 --- a/pyzeebe/function_tools/dict_tools.py +++ b/pyzeebe/function_tools/dict_tools.py @@ -1,5 +1,7 @@ +from __future__ import annotations + import functools -from typing import Any, Dict, TypeVar +from typing import Any, TypeVar from typing_extensions import ParamSpec @@ -11,7 +13,7 @@ def convert_to_dict_function(single_value_function: AsyncFunction[P, R], variable_name: str) -> DictFunction[P]: @functools.wraps(single_value_function) - async def inner_fn(*args: P.args, **kwargs: P.kwargs) -> Dict[str, Any]: + async def inner_fn(*args: P.args, **kwargs: P.kwargs) -> dict[str, Any]: return {variable_name: await single_value_function(*args, **kwargs)} return inner_fn diff --git a/pyzeebe/function_tools/parameter_tools.py b/pyzeebe/function_tools/parameter_tools.py index fb2fabf5..fc80598f 100644 --- a/pyzeebe/function_tools/parameter_tools.py +++ b/pyzeebe/function_tools/parameter_tools.py @@ -1,13 +1,13 @@ from __future__ import annotations import inspect -from typing import Any, List, Optional +from typing import Any from pyzeebe.function_tools import Function from pyzeebe.job.job import Job -def get_parameters_from_function(task_function: Function[..., Any]) -> Optional[List[str]]: +def get_parameters_from_function(task_function: Function[..., Any]) -> list[str] | None: function_signature = inspect.signature(task_function) for _, parameter in function_signature.parameters.items(): if parameter.kind in (inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD): @@ -22,7 +22,7 @@ def get_parameters_from_function(task_function: Function[..., Any]) -> Optional[ return [param.name for param in function_signature.parameters.values() if param.annotation != Job] -def get_job_parameter_name(function: Function[..., Any]) -> Optional[str]: +def get_job_parameter_name(function: Function[..., Any]) -> str | None: function_signature = inspect.signature(function) params = list(function_signature.parameters.values()) for param in params: diff --git a/pyzeebe/grpc_internals/types.py b/pyzeebe/grpc_internals/types.py index 25f386f0..08897b2e 100644 --- a/pyzeebe/grpc_internals/types.py +++ b/pyzeebe/grpc_internals/types.py @@ -1,5 +1,6 @@ +from __future__ import annotations + from dataclasses import dataclass -from typing import List, Optional, Union from pyzeebe.types import Variables @@ -17,7 +18,7 @@ class CreateProcessInstanceResponse: #: needs a process instance key (e.g. CancelProcessInstanceRequest) process_instance_key: int #: the tenant ID of the created process instance - tenant_id: Optional[str] + tenant_id: str | None @dataclass(frozen=True) @@ -35,7 +36,7 @@ class CreateProcessInstanceWithResultResponse: #: consisting of all visible variables to the root scope variables: Variables #: the tenant ID of the process definition - tenant_id: Optional[str] + tenant_id: str | None @dataclass(frozen=True) @@ -58,7 +59,7 @@ class ProcessMetadata: #: parsed resource_name: str #: the tenant ID of the deployed process - tenant_id: Optional[str] + tenant_id: str | None @dataclass(frozen=True) class DecisionMetadata: @@ -79,7 +80,7 @@ class DecisionMetadata: #: part of decision_requirements_key: int #: the tenant ID of the deployed decision - tenant_id: Optional[str] + tenant_id: str | None @dataclass(frozen=True) class DecisionRequirementsMetadata: @@ -97,7 +98,7 @@ class DecisionRequirementsMetadata: #: requirements was parsed resource_name: str #: the tenant ID of the deployed decision requirements - tenant_id: Optional[str] + tenant_id: str | None @dataclass(frozen=True) class FormMetadata: @@ -111,14 +112,14 @@ class FormMetadata: #: the resource name resource_name: str #: the tenant ID of the deployed form - tenant_id: Optional[str] + tenant_id: str | None #: the unique key identifying the deployment key: int #: a list of deployed resources, e.g. processes - deployments: List[Union[ProcessMetadata, DecisionMetadata, DecisionRequirementsMetadata, FormMetadata]] + deployments: list[ProcessMetadata | DecisionMetadata | DecisionRequirementsMetadata | FormMetadata] #: the tenant ID of the deployed resources - tenant_id: Optional[str] + tenant_id: str | None @dataclass(frozen=True) @@ -126,7 +127,7 @@ class PublishMessageResponse: #: the unique ID of the message that was published key: int #: the tenant ID of the message - tenant_id: Optional[str] + tenant_id: str | None @dataclass(frozen=True) diff --git a/pyzeebe/grpc_internals/zeebe_job_adapter.py b/pyzeebe/grpc_internals/zeebe_job_adapter.py index 3c4bd8b2..4b3cd0b8 100644 --- a/pyzeebe/grpc_internals/zeebe_job_adapter.py +++ b/pyzeebe/grpc_internals/zeebe_job_adapter.py @@ -1,7 +1,9 @@ +from __future__ import annotations + import json import logging import types -from typing import AsyncGenerator, Iterable, Optional +from collections.abc import AsyncGenerator, Iterable import grpc from zeebe_grpc.gateway_pb2 import ( @@ -38,8 +40,8 @@ async def activate_jobs( max_jobs_to_activate: int, variables_to_fetch: Iterable[str], request_timeout: int, - tenant_ids: Optional[Iterable[str]] = None, - ) -> AsyncGenerator[Job, None]: + tenant_ids: Iterable[str] | None = None, + ) -> AsyncGenerator[Job]: try: grpc_request_timeout = request_timeout / 1000 * 2 if request_timeout > 0 else DEFAULT_GRPC_REQUEST_TIMEOUT async for response in self._gateway_stub.ActivateJobs( diff --git a/pyzeebe/grpc_internals/zeebe_message_adapter.py b/pyzeebe/grpc_internals/zeebe_message_adapter.py index 6cd5018d..ba4389a9 100644 --- a/pyzeebe/grpc_internals/zeebe_message_adapter.py +++ b/pyzeebe/grpc_internals/zeebe_message_adapter.py @@ -1,5 +1,6 @@ +from __future__ import annotations + import json -from typing import Optional import grpc from zeebe_grpc.gateway_pb2 import PublishMessageRequest @@ -19,8 +20,8 @@ async def publish_message( correlation_key: str, time_to_live_in_milliseconds: int, variables: Variables, - message_id: Optional[str] = None, - tenant_id: Optional[str] = None, + message_id: str | None = None, + tenant_id: str | None = None, ) -> PublishMessageResponse: try: response = await self._gateway_stub.PublishMessage( diff --git a/pyzeebe/grpc_internals/zeebe_process_adapter.py b/pyzeebe/grpc_internals/zeebe_process_adapter.py index 664252e0..09a20d35 100644 --- a/pyzeebe/grpc_internals/zeebe_process_adapter.py +++ b/pyzeebe/grpc_internals/zeebe_process_adapter.py @@ -1,6 +1,9 @@ +from __future__ import annotations + import json import os -from typing import Callable, Dict, Iterable, List, NoReturn, Optional, Union +from collections.abc import Iterable +from typing import Callable, NoReturn import anyio import grpc @@ -42,7 +45,7 @@ async def create_process_instance( bpmn_process_id: str, version: int, variables: Variables, - tenant_id: Optional[str] = None, + tenant_id: str | None = None, ) -> CreateProcessInstanceResponse: try: response = await self._gateway_stub.CreateProcessInstance( @@ -69,9 +72,9 @@ async def create_process_instance_with_result( bpmn_process_id: str, version: int, variables: Variables, - timeout: int, + timeout: int, # noqa: ASYNC109 variables_to_fetch: Iterable[str], - tenant_id: Optional[str] = None, + tenant_id: str | None = None, ) -> CreateProcessInstanceWithResultResponse: try: response = await self._gateway_stub.CreateProcessInstanceWithResult( @@ -125,9 +128,7 @@ async def cancel_process_instance(self, process_instance_key: int) -> CancelProc return CancelProcessInstanceResponse() - async def deploy_resource( - self, *resource_file_path: str, tenant_id: Optional[str] = None - ) -> DeployResourceResponse: + async def deploy_resource(self, *resource_file_path: str, tenant_id: str | None = None) -> DeployResourceResponse: try: response = await self._gateway_stub.DeployResource( DeployResourceRequest( @@ -140,13 +141,13 @@ async def deploy_resource( raise ProcessInvalidError() from grpc_error await self._handle_grpc_error(grpc_error) - deployments: List[ - Union[ - DeployResourceResponse.ProcessMetadata, - DeployResourceResponse.DecisionMetadata, - DeployResourceResponse.DecisionRequirementsMetadata, - DeployResourceResponse.FormMetadata, - ] + deployments: list[ + ( + DeployResourceResponse.ProcessMetadata + | DeployResourceResponse.DecisionMetadata + | DeployResourceResponse.DecisionRequirementsMetadata + | DeployResourceResponse.FormMetadata + ) ] = [] for deployment in response.deployments: metadata_field = deployment.WhichOneof("Metadata") @@ -205,16 +206,16 @@ def _create_form_from_raw_form(response: FormMetadata) -> DeployResourceResponse ) -_METADATA_PARSERS: Dict[ +_METADATA_PARSERS: dict[ str, Callable[ - [Union[ProcessMetadata, DecisionMetadata, DecisionRequirementsMetadata, FormMetadata]], - Union[ - DeployResourceResponse.ProcessMetadata, - DeployResourceResponse.DecisionMetadata, - DeployResourceResponse.DecisionRequirementsMetadata, - DeployResourceResponse.FormMetadata, - ], + [ProcessMetadata | DecisionMetadata | DecisionRequirementsMetadata | FormMetadata], + ( + DeployResourceResponse.ProcessMetadata + | DeployResourceResponse.DecisionMetadata + | DeployResourceResponse.DecisionRequirementsMetadata + | DeployResourceResponse.FormMetadata + ), ], ] = { "process": ZeebeProcessAdapter._create_process_from_raw_process, diff --git a/pyzeebe/job/job.py b/pyzeebe/job/job.py index 1eafa388..f41f1aff 100644 --- a/pyzeebe/job/job.py +++ b/pyzeebe/job/job.py @@ -1,5 +1,7 @@ +from __future__ import annotations + from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Optional +from typing import TYPE_CHECKING, Any from pyzeebe.job.job_status import JobStatus from pyzeebe.types import Headers, Variables @@ -23,7 +25,7 @@ class Job: retries: int deadline: int variables: Variables - tenant_id: Optional[str] = None + tenant_id: str | None = None status: JobStatus = JobStatus.Running task_result = None @@ -40,7 +42,7 @@ def __eq__(self, other: object) -> bool: class JobController: - def __init__(self, job: Job, zeebe_adapter: "ZeebeAdapter") -> None: + def __init__(self, job: Job, zeebe_adapter: ZeebeAdapter) -> None: self._job = job self._zeebe_adapter = zeebe_adapter @@ -50,7 +52,7 @@ async def set_running_after_decorators_status(self) -> None: """ self._job._set_status(JobStatus.RunningAfterDecorators) - async def set_success_status(self, variables: Optional[Variables] = None) -> None: + async def set_success_status(self, variables: Variables | None = None) -> None: """ Success status means that the job has been completed as intended. @@ -67,7 +69,7 @@ async def set_failure_status( self, message: str, retry_back_off_ms: int = 0, - variables: Optional[Variables] = None, + variables: Variables | None = None, ) -> None: """ Failure status means a technical error has occurred. If retried the job may succeed. @@ -98,7 +100,7 @@ async def set_error_status( self, message: str, error_code: str = "", - variables: Optional[Variables] = None, + variables: Variables | None = None, ) -> None: """ Error status means that the job could not be completed because of a business error and won't ever be able to be completed. diff --git a/pyzeebe/task/exception_handler.py b/pyzeebe/task/exception_handler.py index 1faaead9..4a685eb0 100644 --- a/pyzeebe/task/exception_handler.py +++ b/pyzeebe/task/exception_handler.py @@ -1,5 +1,6 @@ import logging -from typing import Awaitable, Callable +from collections.abc import Awaitable +from typing import Callable from pyzeebe.errors.pyzeebe_errors import BusinessError from pyzeebe.job.job import Job, JobController diff --git a/pyzeebe/task/task_builder.py b/pyzeebe/task/task_builder.py index 420ec9de..a4a84cf4 100644 --- a/pyzeebe/task/task_builder.py +++ b/pyzeebe/task/task_builder.py @@ -3,8 +3,8 @@ import functools import inspect import logging -from inspect import Parameter -from typing import Any, Callable, Dict, Sequence, Tuple, TypeVar +from collections.abc import Sequence +from typing import Any, TypeVar from typing_extensions import ParamSpec @@ -65,10 +65,10 @@ def prepare_task_function(task_function: Function[P, R], task_config: TaskConfig async def run_original_task_function( task_function: DictFunction[...], task_config: TaskConfig, job: Job, job_controller: JobController -) -> Tuple[Variables, bool]: +) -> tuple[Variables, bool]: try: if task_config.variables_to_fetch is None: - variables: Dict[str, Any] = {} + variables: dict[str, Any] = {} elif task_wants_all_variables(task_config): if only_job_is_required_in_task_function(task_function): variables = {} diff --git a/pyzeebe/task/task_config.py b/pyzeebe/task/task_config.py index 41e3a40e..d2ebc189 100644 --- a/pyzeebe/task/task_config.py +++ b/pyzeebe/task/task_config.py @@ -1,4 +1,6 @@ -from typing import Iterable, List, Optional +from __future__ import annotations + +from collections.abc import Iterable from pyzeebe.errors import NoVariableNameGivenError from pyzeebe.function_tools import async_tools @@ -7,21 +9,21 @@ class TaskConfig: - before: List[AsyncTaskDecorator] - after: List[AsyncTaskDecorator] + before: list[AsyncTaskDecorator] + after: list[AsyncTaskDecorator] def __init__( self, type: str, - exception_handler: Optional[ExceptionHandler], + exception_handler: ExceptionHandler | None, timeout_ms: int, max_jobs_to_activate: int, max_running_jobs: int, - variables_to_fetch: Optional[Iterable[str]], + variables_to_fetch: Iterable[str] | None, single_value: bool, variable_name: str, - before: List[TaskDecorator], - after: List[TaskDecorator], + before: list[TaskDecorator], + after: list[TaskDecorator], ) -> None: if single_value and not variable_name: raise NoVariableNameGivenError(type) @@ -36,7 +38,7 @@ def __init__( self.variable_name = variable_name self.before = async_tools.asyncify_all_functions(before) self.after = async_tools.asyncify_all_functions(after) - self.job_parameter_name: Optional[str] = None + self.job_parameter_name: str | None = None def __repr__(self) -> str: return ( diff --git a/pyzeebe/task/types.py b/pyzeebe/task/types.py index 67c690e8..ea4c910d 100644 --- a/pyzeebe/task/types.py +++ b/pyzeebe/task/types.py @@ -1,4 +1,5 @@ -from typing import Awaitable, Callable, Union +from collections.abc import Awaitable +from typing import Callable, Union from pyzeebe import Job from pyzeebe.job.job import JobController diff --git a/pyzeebe/types.py b/pyzeebe/types.py index 3e973478..6191be46 100644 --- a/pyzeebe/types.py +++ b/pyzeebe/types.py @@ -1,8 +1,9 @@ -from typing import Any, Mapping, Sequence, Tuple +from collections.abc import Mapping, Sequence +from typing import Any from typing_extensions import TypeAlias Headers: TypeAlias = Mapping[str, Any] Variables: TypeAlias = Mapping[str, Any] -ChannelArgumentType: TypeAlias = Sequence[Tuple[str, Any]] +ChannelArgumentType: TypeAlias = Sequence[tuple[str, Any]] diff --git a/pyzeebe/worker/job_poller.py b/pyzeebe/worker/job_poller.py index 15417459..176e42fc 100644 --- a/pyzeebe/worker/job_poller.py +++ b/pyzeebe/worker/job_poller.py @@ -1,6 +1,7 @@ +from __future__ import annotations + import asyncio import logging -from typing import List, Optional from pyzeebe.errors import ( ActivateJobsRequestInvalidError, @@ -22,12 +23,12 @@ def __init__( self, zeebe_adapter: ZeebeJobAdapter, task: Task, - queue: "asyncio.Queue[Job]", + queue: asyncio.Queue[Job], worker_name: str, request_timeout: int, task_state: TaskState, poll_retry_delay: int, - tenant_ids: Optional[List[str]], + tenant_ids: list[str] | None, ) -> None: self.zeebe_adapter = zeebe_adapter self.task = task diff --git a/pyzeebe/worker/task_router.py b/pyzeebe/worker/task_router.py index 445d605a..df19f46e 100644 --- a/pyzeebe/worker/task_router.py +++ b/pyzeebe/worker/task_router.py @@ -1,16 +1,8 @@ +from __future__ import annotations + import logging -from typing import ( - Any, - Callable, - Dict, - Iterable, - List, - Literal, - Optional, - Tuple, - TypeVar, - overload, -) +from collections.abc import Iterable +from typing import Any, Callable, Literal, Optional, TypeVar, overload from typing_extensions import ParamSpec @@ -24,7 +16,7 @@ P = ParamSpec("P") R = TypeVar("R") -RD = TypeVar("RD", bound=Optional[Dict[str, Any]]) +RD = TypeVar("RD", bound=Optional[dict[str, Any]]) logger = logging.getLogger(__name__) @@ -32,9 +24,9 @@ class ZeebeTaskRouter: def __init__( self, - before: Optional[List[TaskDecorator]] = None, - after: Optional[List[TaskDecorator]] = None, - exception_handler: Optional[ExceptionHandler] = None, + before: list[TaskDecorator] | None = None, + after: list[TaskDecorator] | None = None, + exception_handler: ExceptionHandler | None = None, ): """ Args: @@ -43,21 +35,21 @@ def __init__( exception_handler (ExceptionHandler): Handler that will be called when a job fails. """ self._exception_handler = exception_handler - self._before: List[TaskDecorator] = before or [] - self._after: List[TaskDecorator] = after or [] - self.tasks: List[Task] = [] + self._before: list[TaskDecorator] = before or [] + self._after: list[TaskDecorator] = after or [] + self.tasks: list[Task] = [] @overload def task( self, task_type: str, - exception_handler: Optional[ExceptionHandler] = None, - variables_to_fetch: Optional[Iterable[str]] = None, + exception_handler: ExceptionHandler | None = None, + variables_to_fetch: Iterable[str] | None = None, timeout_ms: int = 10000, max_jobs_to_activate: int = 32, max_running_jobs: int = 32, - before: Optional[List[TaskDecorator]] = None, - after: Optional[List[TaskDecorator]] = None, + before: list[TaskDecorator] | None = None, + after: list[TaskDecorator] | None = None, *, single_value: Literal[False] = False, ) -> Callable[[Function[P, RD]], Function[P, RD]]: ... @@ -66,13 +58,13 @@ def task( def task( self, task_type: str, - exception_handler: Optional[ExceptionHandler] = None, - variables_to_fetch: Optional[Iterable[str]] = None, + exception_handler: ExceptionHandler | None = None, + variables_to_fetch: Iterable[str] | None = None, timeout_ms: int = 10000, max_jobs_to_activate: int = 32, max_running_jobs: int = 32, - before: Optional[List[TaskDecorator]] = None, - after: Optional[List[TaskDecorator]] = None, + before: list[TaskDecorator] | None = None, + after: list[TaskDecorator] | None = None, *, single_value: Literal[True], variable_name: str, @@ -81,15 +73,15 @@ def task( def task( self, task_type: str, - exception_handler: Optional[ExceptionHandler] = None, - variables_to_fetch: Optional[Iterable[str]] = None, + exception_handler: ExceptionHandler | None = None, + variables_to_fetch: Iterable[str] | None = None, timeout_ms: int = 10000, max_jobs_to_activate: int = 32, max_running_jobs: int = 32, - before: Optional[List[TaskDecorator]] = None, - after: Optional[List[TaskDecorator]] = None, + before: list[TaskDecorator] | None = None, + after: list[TaskDecorator] | None = None, single_value: bool = False, - variable_name: Optional[str] = None, + variable_name: str | None = None, ) -> Callable[[Function[P, R]], Function[P, R]]: """ Decorator to create a task @@ -225,7 +217,7 @@ def get_task(self, task_type: str) -> Task: def _get_task_index(self, task_type: str) -> int: return self._get_task_and_index(task_type)[1] - def _get_task_and_index(self, task_type: str) -> Tuple[Task, int]: + def _get_task_and_index(self, task_type: str) -> tuple[Task, int]: for index, task in enumerate(self.tasks): if task.type == task_type: return task, index diff --git a/pyzeebe/worker/task_state.py b/pyzeebe/worker/task_state.py index 7b0d2a1a..7eb546b8 100644 --- a/pyzeebe/worker/task_state.py +++ b/pyzeebe/worker/task_state.py @@ -1,5 +1,4 @@ import logging -from typing import List from pyzeebe import Job @@ -8,7 +7,7 @@ class TaskState: def __init__(self) -> None: - self._active_jobs: List[int] = [] + self._active_jobs: list[int] = [] def remove(self, job: Job) -> None: try: diff --git a/pyzeebe/worker/worker.py b/pyzeebe/worker/worker.py index a717eae4..f58e513e 100644 --- a/pyzeebe/worker/worker.py +++ b/pyzeebe/worker/worker.py @@ -1,7 +1,8 @@ +from __future__ import annotations + import asyncio import logging import socket -from typing import List, Optional import anyio import grpc @@ -25,15 +26,14 @@ class ZeebeWorker(ZeebeTaskRouter): def __init__( self, grpc_channel: grpc.aio.Channel, - name: Optional[str] = None, + name: str | None = None, request_timeout: int = 0, - before: Optional[List[TaskDecorator]] = None, - after: Optional[List[TaskDecorator]] = None, + before: list[TaskDecorator] | None = None, + after: list[TaskDecorator] | None = None, max_connection_retries: int = 10, - watcher_max_errors_factor: int = 3, poll_retry_delay: int = 5, - tenant_ids: Optional[List[str]] = None, - exception_handler: Optional[ExceptionHandler] = None, + tenant_ids: list[str] | None = None, + exception_handler: ExceptionHandler | None = None, ): """ Args: @@ -44,7 +44,6 @@ def __init__( after (List[TaskDecorator]): Decorators to be performed after each task exception_handler (ExceptionHandler): Handler that will be called when a job fails. max_connection_retries (int): Amount of connection retries before worker gives up on connecting to zeebe. To setup with infinite retries use -1 - watcher_max_errors_factor (int): Number of consecutive errors for a task watcher will accept before raising MaxConsecutiveTaskThreadError poll_retry_delay (int): The number of seconds to wait before attempting to poll again when reaching max amount of running jobs tenant_ids (List[str]): A list of tenant IDs for which to activate jobs. New in Zeebe 8.3. """ @@ -52,19 +51,17 @@ def __init__( self.zeebe_adapter = ZeebeAdapter(grpc_channel, max_connection_retries) self.name = name or socket.gethostname() self.request_timeout = request_timeout - self.watcher_max_errors_factor = watcher_max_errors_factor - self._watcher_thread = None self.poll_retry_delay = poll_retry_delay self.tenant_ids = tenant_ids - self._job_pollers: List[JobPoller] = [] - self._job_executors: List[JobExecutor] = [] + self._job_pollers: list[JobPoller] = [] + self._job_executors: list[JobExecutor] = [] self._stop_event = anyio.Event() def _init_tasks(self) -> None: self._job_executors, self._job_pollers = [], [] for task in self.tasks: - jobs_queue: "asyncio.Queue[Job]" = asyncio.Queue() + jobs_queue: asyncio.Queue[Job] = asyncio.Queue() task_state = TaskState() poller = JobPoller( diff --git a/tests/integration/cancel_process_test.py b/tests/integration/cancel_process_test.py index fbb7b645..b969222e 100644 --- a/tests/integration/cancel_process_test.py +++ b/tests/integration/cancel_process_test.py @@ -1,9 +1,7 @@ -from typing import Dict - from pyzeebe import ZeebeClient -async def test_cancel_process(zeebe_client: ZeebeClient, process_name: str, process_variables: Dict): +async def test_cancel_process(zeebe_client: ZeebeClient, process_name: str, process_variables: dict): response = await zeebe_client.run_process(process_name, process_variables) await zeebe_client.cancel_process_instance(response.process_instance_key) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index a90d04f8..c64c02a4 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -1,6 +1,5 @@ import asyncio import os -from typing import Dict from uuid import uuid4 import grpc @@ -39,7 +38,7 @@ async def exception_handler(exc: Exception, job: Job, job_controller: JobControl await job_controller.set_error_status(job, f"Failed to run task {job.type}. Reason: {exc}") @zeebe_worker.task("test", exception_handler) - async def task_handler(should_throw: bool, input: str, job: Job) -> Dict: + async def task_handler(should_throw: bool, input: str, job: Job) -> dict: process_stats.add_process_run(ProcessRun(job.process_instance_key, job.variables)) if should_throw: raise Exception("Error thrown") @@ -69,7 +68,7 @@ def process_name() -> str: @pytest.fixture -def process_variables() -> Dict: +def process_variables() -> dict: return {"input": str(uuid4()), "should_throw": False} diff --git a/tests/integration/publish_message_test.py b/tests/integration/publish_message_test.py index c24945e7..29583a31 100644 --- a/tests/integration/publish_message_test.py +++ b/tests/integration/publish_message_test.py @@ -1,11 +1,9 @@ -from typing import Dict - from pyzeebe import ZeebeClient from tests.integration.utils import ProcessStats from tests.integration.utils.wait_for_process import wait_for_process_with_variables -async def test_publish_message(zeebe_client: ZeebeClient, process_stats: ProcessStats, process_variables: Dict): +async def test_publish_message(zeebe_client: ZeebeClient, process_stats: ProcessStats, process_variables: dict): initial_amount_of_processes = process_stats.get_process_runs() await zeebe_client.publish_message("start_test_process", correlation_key="", variables=process_variables) diff --git a/tests/integration/run_process_test.py b/tests/integration/run_process_test.py index 85cb9863..3c25e3f7 100644 --- a/tests/integration/run_process_test.py +++ b/tests/integration/run_process_test.py @@ -1,4 +1,3 @@ -from typing import Dict from uuid import uuid4 import pytest @@ -11,7 +10,7 @@ async def test_run_process( - zeebe_client: ZeebeClient, process_name: str, process_variables: Dict, process_stats: ProcessStats + zeebe_client: ZeebeClient, process_name: str, process_variables: dict, process_stats: ProcessStats ): initial_amount_of_processes = process_stats.get_process_runs() @@ -26,7 +25,7 @@ async def test_non_existent_process(zeebe_client: ZeebeClient): await zeebe_client.run_process(str(uuid4())) -async def test_run_process_with_result(zeebe_client: ZeebeClient, process_name: str, process_variables: Dict): +async def test_run_process_with_result(zeebe_client: ZeebeClient, process_name: str, process_variables: dict): response = await zeebe_client.run_process_with_result( process_name, process_variables, timeout=PROCESS_TIMEOUT_IN_MS ) diff --git a/tests/integration/utils/__init__.py b/tests/integration/utils/__init__.py index 5f052e68..0157d797 100644 --- a/tests/integration/utils/__init__.py +++ b/tests/integration/utils/__init__.py @@ -1,3 +1,10 @@ from .process_run import ProcessRun from .process_stats import ProcessStats from .wait_for_process import wait_for_process, wait_for_process_with_variables + +__all__ = ( + "ProcessRun", + "ProcessStats", + "wait_for_process", + "wait_for_process_with_variables", +) diff --git a/tests/integration/utils/process_run.py b/tests/integration/utils/process_run.py index 06324b22..de906b55 100644 --- a/tests/integration/utils/process_run.py +++ b/tests/integration/utils/process_run.py @@ -1,7 +1,4 @@ -from typing import Dict - - class ProcessRun: - def __init__(self, instance_key: int, variables: Dict): + def __init__(self, instance_key: int, variables: dict): self.instance_key = instance_key self.variables = variables diff --git a/tests/integration/utils/process_stats.py b/tests/integration/utils/process_stats.py index 331679be..bbe5358d 100644 --- a/tests/integration/utils/process_stats.py +++ b/tests/integration/utils/process_stats.py @@ -1,12 +1,10 @@ -from typing import Dict, List - from .process_run import ProcessRun class ProcessStats: def __init__(self, process_name: str): self.process_name = process_name - self.runs: List[ProcessRun] = [] + self.runs: list[ProcessRun] = [] def add_process_run(self, process: ProcessRun): self.runs.append(process) @@ -14,7 +12,7 @@ def add_process_run(self, process: ProcessRun): def has_process_been_run(self, process_instance_key: int) -> bool: return any(run.instance_key == process_instance_key for run in self.runs if run.instance_key) - def has_process_with_variables_been_run(self, variables: Dict) -> bool: + def has_process_with_variables_been_run(self, variables: dict) -> bool: return any(run.variables == variables for run in self.runs) def get_process_runs(self) -> int: diff --git a/tests/integration/utils/wait_for_process.py b/tests/integration/utils/wait_for_process.py index 5fc3200f..00f7601c 100644 --- a/tests/integration/utils/wait_for_process.py +++ b/tests/integration/utils/wait_for_process.py @@ -1,5 +1,4 @@ from asyncio import sleep -from typing import Dict from .process_stats import ProcessStats @@ -9,6 +8,6 @@ async def wait_for_process(process_instance_key: int, process_stats: ProcessStat await sleep(interval) -async def wait_for_process_with_variables(process_stats: ProcessStats, variables: Dict, interval: float = 0.2): +async def wait_for_process_with_variables(process_stats: ProcessStats, variables: dict, interval: float = 0.2): while not process_stats.has_process_with_variables_been_run(variables): await sleep(interval) diff --git a/tests/unit/credentials/camunda_identity_credentials_test.py b/tests/unit/credentials/camunda_identity_credentials_test.py index 2960e4d5..313831c3 100644 --- a/tests/unit/credentials/camunda_identity_credentials_test.py +++ b/tests/unit/credentials/camunda_identity_credentials_test.py @@ -113,6 +113,6 @@ def test_gets_access_token_refresh_threshold( oauth_url=url, client_id=client_id, client_secret=client_secret, refresh_threshold_seconds=10 ) - assert credentials.get_auth_metadata(Mock()) == (("authorization", f"Bearer test1"),) - assert credentials.get_auth_metadata(Mock()) == (("authorization", f"Bearer test2"),) + assert credentials.get_auth_metadata(Mock()) == (("authorization", "Bearer test1"),) + assert credentials.get_auth_metadata(Mock()) == (("authorization", "Bearer test2"),) assert mocked_responses.assert_call_count(url, 2) diff --git a/tests/unit/credentials/oauth_test.py b/tests/unit/credentials/oauth_test.py index a8ffa93f..3c381c8e 100644 --- a/tests/unit/credentials/oauth_test.py +++ b/tests/unit/credentials/oauth_test.py @@ -6,9 +6,7 @@ from unittest.mock import MagicMock, PropertyMock from uuid import uuid4 -import grpc import pytest -from grpc.aio._typing import ChannelArgumentType from oauthlib import oauth2 from oauthlib.oauth2 import OAuth2Error from requests import Response diff --git a/tests/unit/function_tools/parameter_tools_test.py b/tests/unit/function_tools/parameter_tools_test.py index 16fae394..4edb1d96 100644 --- a/tests/unit/function_tools/parameter_tools_test.py +++ b/tests/unit/function_tools/parameter_tools_test.py @@ -1,4 +1,6 @@ -from typing import Callable, List, Optional +from __future__ import annotations + +from typing import Callable import pytest @@ -30,7 +32,7 @@ class TestGetFunctionParameters: (dummy_functions.lambda_positional_and_keyword_params, ["x", "y"]), ], ) - def test_get_params(self, fn: Callable, expected: Optional[List[str]]): + def test_get_params(self, fn: Callable, expected: list[str] | None): assert parameter_tools.get_parameters_from_function(fn) == expected diff --git a/tests/unit/grpc_internals/zeebe_job_adapter_test.py b/tests/unit/grpc_internals/zeebe_job_adapter_test.py index 4d3510d5..f49feac2 100644 --- a/tests/unit/grpc_internals/zeebe_job_adapter_test.py +++ b/tests/unit/grpc_internals/zeebe_job_adapter_test.py @@ -1,5 +1,4 @@ from random import randint -from typing import Dict from uuid import uuid4 import pytest @@ -29,7 +28,7 @@ def random_message() -> str: return str(uuid4()) -def random_variables() -> Dict: +def random_variables() -> dict: return {str(uuid4()): str(uuid4())} diff --git a/tests/unit/task/task_builder_test.py b/tests/unit/task/task_builder_test.py index bc1477bc..4819361b 100644 --- a/tests/unit/task/task_builder_test.py +++ b/tests/unit/task/task_builder_test.py @@ -1,11 +1,9 @@ import copy from typing import Callable -from unittest.mock import MagicMock import pytest from pyzeebe import Job, JobController, TaskDecorator -from pyzeebe.function_tools.parameter_tools import get_parameters_from_function from pyzeebe.job.job_status import JobStatus from pyzeebe.task import task_builder from pyzeebe.task.task import Task diff --git a/tests/unit/utils/function_tools.py b/tests/unit/utils/function_tools.py index 9c9b424e..cadcbe61 100644 --- a/tests/unit/utils/function_tools.py +++ b/tests/unit/utils/function_tools.py @@ -1,7 +1,7 @@ -from typing import Callable, List +from typing import Callable from pyzeebe.function_tools import async_tools -def functions_are_all_async(functions: List[Callable]) -> bool: - return all([async_tools.is_async_function(function) for function in functions]) +def functions_are_all_async(functions: list[Callable]) -> bool: + return all(async_tools.is_async_function(function) for function in functions) diff --git a/tests/unit/utils/gateway_mock.py b/tests/unit/utils/gateway_mock.py index 116a3a92..8f217917 100644 --- a/tests/unit/utils/gateway_mock.py +++ b/tests/unit/utils/gateway_mock.py @@ -1,11 +1,24 @@ import json from random import randint -from typing import Dict, List from unittest.mock import patch from uuid import uuid4 import grpc -from zeebe_grpc.gateway_pb2 import * +from zeebe_grpc.gateway_pb2 import ( + ActivatedJob, + ActivateJobsResponse, + CancelProcessInstanceResponse, + CompleteJobResponse, + CreateProcessInstanceResponse, + CreateProcessInstanceWithResultResponse, + DecisionMetadata, + Deployment, + DeployResourceResponse, + FailJobResponse, + FormMetadata, + ProcessMetadata, + PublishMessageResponse, +) from zeebe_grpc.gateway_pb2_grpc import GatewayServicer from pyzeebe.job.job import Job @@ -25,7 +38,7 @@ class GatewayMock(GatewayServicer): def __init__(self): self.deployed_processes = {} self.active_processes = {} - self.active_jobs: Dict[int, Job] = {} + self.active_jobs: dict[int, Job] = {} self.messages = {} def ActivateJobs(self, request, context): @@ -195,7 +208,7 @@ def PublishMessage(self, request, context): self.messages[request.messageId] = request.correlationKey return PublishMessageResponse() - def mock_deploy_process(self, bpmn_process_id: str, version: int, tasks: List[Task]): + def mock_deploy_process(self, bpmn_process_id: str, version: int, tasks: list[Task]): self.deployed_processes[bpmn_process_id] = { "bpmn_process_id": bpmn_process_id, "version": version, diff --git a/tests/unit/utils/random_utils.py b/tests/unit/utils/random_utils.py index 9297c004..c6b91d4d 100644 --- a/tests/unit/utils/random_utils.py +++ b/tests/unit/utils/random_utils.py @@ -1,5 +1,6 @@ +from __future__ import annotations + from random import randint -from typing import Optional from uuid import uuid4 from pyzeebe.job.job import Job @@ -14,7 +15,7 @@ def random_job( task: Task = task_builder.build_task( lambda x: {"x": x}, TaskConfig("test", lambda: None, 10000, 32, 32, [], False, "", [], []) ), - variables: Optional[dict] = None, + variables: dict | None = None, ) -> Job: return Job( type=task.type, diff --git a/tests/unit/worker/worker_test.py b/tests/unit/worker/worker_test.py index 461a8668..ef77d820 100644 --- a/tests/unit/worker/worker_test.py +++ b/tests/unit/worker/worker_test.py @@ -1,5 +1,5 @@ -import asyncio -from typing import List +from __future__ import annotations + from unittest.mock import AsyncMock, Mock from uuid import uuid4 @@ -11,7 +11,6 @@ from pyzeebe.errors import DuplicateTaskTypeError from pyzeebe.job.job import Job, JobController from pyzeebe.task.task import Task -from pyzeebe.worker.job_executor import JobExecutor from pyzeebe.worker.job_poller import JobPoller from pyzeebe.worker.worker import ZeebeWorker @@ -87,7 +86,7 @@ def test_include_router_adds_task(self, zeebe_worker: ZeebeWorker, router: Zeebe assert zeebe_worker.get_task(task_type) is not None - def test_include_multiple_routers(self, zeebe_worker: ZeebeWorker, routers: List[ZeebeTaskRouter]): + def test_include_multiple_routers(self, zeebe_worker: ZeebeWorker, routers: list[ZeebeTaskRouter]): for router in routers: self.include_router_with_task(zeebe_worker, router) @@ -270,7 +269,7 @@ async def test_poller_failed(self, zeebe_worker: ZeebeWorker): poller_mock = AsyncMock(spec_set=JobPoller, poll=AsyncMock(side_effect=[Exception("test_exception")])) zeebe_worker._job_pollers = [poller_mock] - with pytest.raises(Exception, match=r"unhandled errors in a TaskGroup \(1 sub-exception\)") as err: + with pytest.raises(Exception, match=r"unhandled errors in a TaskGroup \(1 sub-exception\)"): await zeebe_worker.work() poller_mock.poll.assert_awaited_once() @@ -290,7 +289,7 @@ async def poll2(): poller2_mock = AsyncMock(spec_set=JobPoller, poll=AsyncMock(wraps=poll2)) zeebe_worker._job_pollers = [poller_mock, poller2_mock] - with pytest.raises(Exception, match=r"unhandled errors in a TaskGroup \(1 sub-exception\)") as err: + with pytest.raises(Exception, match=r"unhandled errors in a TaskGroup \(1 sub-exception\)"): await zeebe_worker.work() poller_mock.poll.assert_awaited_once()