diff --git a/README.md b/README.md index 5e58cb54..ca826624 100644 --- a/README.md +++ b/README.md @@ -121,16 +121,6 @@ Please make sure to update tests as appropriate. We use [SemVer](semver.org) for versioning. For the versions available, see the tags on this repository. -In order to bump the current version run: - -```shell -$ bump2version -``` - -where part is the part that will be bumped (major/minor/patch/rc). - -This will bump the version in all relevant files as well as create a git commit. - ## License We use the MIT license, see [LICENSE.md](LICENSE.md) for details diff --git a/docs/channels.rst b/docs/channels.rst index b9ed5fac..7a5cbbf4 100644 --- a/docs/channels.rst +++ b/docs/channels.rst @@ -2,184 +2,8 @@ Channels ======== -In order to instantiate a ZeebeWorker or ZeebeClient you will need to provide an instance of a `grpc.aio.Channel`. -This Channel can be configured with the parameters `channel_credentials` and `channel_options`. +.. toctree:: + :name: channels -.. seealso:: - - `Python Channel Options `_ - Documentation of the available Python `grpc.aio.Channel` `options` (channel_arguments). - - -.. note:: - - By default, channel_options is defined so that the grpc.keepalive_time_ms option is always set to 45_000 (45 seconds). - Reference Camunda Docs `keep alive intervals `_. - - You can override the default `channel_options` by passing - e.g. `channel_options = (("grpc.keepalive_time_ms", 60_000),)` - for a keepalive time of 60 seconds. - - -Pyzeebe provides a couple standard ways to achieve this: - - -Insecure --------- - -Create a grpc channel connected to a Zeebe Gateway with tls disabled - - -.. autofunction:: pyzeebe.create_insecure_channel - - -Example: - -.. code-block:: python - - from pyzeebe import create_insecure_channel - - channel = create_insecure_channel(grpc_address="localhost:26500") - - -Secure ------- - -Create a grpc channel with a secure connection to a Zeebe Gateway with tls - -.. autofunction:: pyzeebe.create_secure_channel - -Example: - -.. code-block:: python - - import grpc - from pyzeebe import create_secure_channel - - - grpc.ssl_channel_credentials(root_certificates="", private_key="") - channel = create_secure_channel(grpc_address="host:port", channel_credentials=credentials) - - -Example with oauth2 (like Camunda Identity): - -.. code-block:: python - - import grpc - from pyzeebe import create_secure_channel - from pyzeebe import AuthMetadataPlugin, CamundaIdentityCredentials - - - credentials = CamundaIdentityCredentials(oauth_url=<...>, client_id=<...>, client_secret=<...>, audience=<...>) - call_credentials = grpc.metadata_call_credentials(AuthMetadataPlugin(credentials=credentials)) - ssl_credentials = grpc.ssl_channel_credentials(root_certificates="", private_key="") - channel_credentials = grpc.composite_channel_credentials(ssl_credentials, call_credentials) - channel = create_secure_channel(grpc_address="host:port", channel_credentials=channel_credentials) - - -Oauth2 Client Credentials Channel ---------------------------------- - -.. autofunction:: pyzeebe.channel.oauth_channel.create_oauth2_client_credentials_channel - -.. warning:: - Some arguments are Optional and are highly dependent on your Authentication Server configuration, - `scope` is usually required and is often optional `audience` . - -Example: - -.. code-block:: python - - import grpc - from pyzeebe.channel.oauth_channel import create_oauth2_client_credentials_channel - - channel: grpc.aio.Channel = create_oauth2_client_credentials_channel( - grpc_address=ZEEBE_ADDRESS, - client_id=ZEEBE_CLIENT_ID, - client_secret=ZEEBE_CLIENT_SECRET, - authorization_server=ZEEBE_AUTHORIZATION_SERVER_URL, - scope="profile email", - audience="zeebe-api", # NOTE: Can be omitted in some cases. - ) - -Example with custom `channel_options`: - -.. code-block:: python - - import grpc - from pyzeebe.channel.oauth_channel import create_oauth2_client_credentials_channel - from pyzeebe.types import ChannelArgumentType - - channel_options: ChannelArgumentType = (("grpc.so_reuseport", 0),) - - channel: grpc.aio.Channel = create_oauth2_client_credentials_channel( - grpc_address=ZEEBE_ADDRESS, - client_id=ZEEBE_CLIENT_ID, - client_secret=ZEEBE_CLIENT_SECRET, - authorization_server=ZEEBE_AUTHORIZATION_SERVER_URL, - scope="profile email", - audience="zeebe-api", - channel_options=channel_options, - ) - -Example with custom `channel_credentials`: - -Useful for self-signed certificates with `grpc.ssl_channel_credentials`. - -.. code-block:: python - - import grpc - from pyzeebe.channel.oauth_channel import create_oauth2_client_credentials_channel - from pyzeebe.types import ChannelArgumentType - - channel_credentials: grpc.ChannelCredentials = grpc.ssl_channel_credentials( - certificate_chain=None, private_key=None, root_certificates=None - ) - channel_options: ChannelArgumentType = (("grpc.so_reuseport", 0),) - - channel: grpc.aio.Channel = create_oauth2_client_credentials_channel( - grpc_address=ZEEBE_ADDRESS, - client_id=ZEEBE_CLIENT_ID, - client_secret=ZEEBE_CLIENT_SECRET, - authorization_server=ZEEBE_AUTHORIZATION_SERVER_URL, - scope="profile email", - audience="zeebe-api", - channel_credentials=channel_credentials, - channel_options=channel_options, - ) - -Camunda Cloud (Oauth2 Client Credentials Channel) -------------------------------------------------- - -.. autofunction:: pyzeebe.channel.oauth_channel.create_camunda_cloud_channel - -.. note:: - This is a convenience function for creating a channel with the correct parameters for Camunda Cloud. - It is equivalent to calling `create_oauth2_client_credentials_channel` with the correct parameters. - -Example: - -.. code-block:: python - - from pyzeebe.channel.oauth_channel import create_camunda_cloud_channel - - channel: grpc.aio.Channel = create_camunda_cloud_channel( - client_id=ZEEBE_CLIENT_ID, - client_secret=ZEEBE_CLIENT_SECRET, - cluster_id=CAMUNDA_CLUSTER_ID, - ) - -Camunda Cloud (Deprecated) --------------------------- - -Create a grpc channel connected to a Zeebe Gateway running in camunda cloud - -.. autofunction:: pyzeebe.channel.camunda_cloud_channel.create_camunda_cloud_channel - -Example: - -.. code-block:: python - - from pyzeebe.channel.camunda_cloud_channel import create_camunda_cloud_channel - - - channel = create_camunda_cloud_channel("client_id", "client_secret", "cluster_id") + Quickstart + Reference diff --git a/docs/channels_quickstart.rst b/docs/channels_quickstart.rst new file mode 100644 index 00000000..e725c85c --- /dev/null +++ b/docs/channels_quickstart.rst @@ -0,0 +1,157 @@ +=================== +Channels QuickStart +=================== + +In order to instantiate a ZeebeWorker or ZeebeClient you will need to provide an instance of a `grpc.aio.Channel`. +This Channel can be configured with the parameters `channel_credentials` and `channel_options`. + +.. seealso:: + + `Python Channel Options `_ + Documentation of the available Python `grpc.aio.Channel` `options` (channel_arguments). + + +.. note:: + + By default, channel_options is defined so that the grpc.keepalive_time_ms option is always set to 45_000 (45 seconds). + Reference Camunda Docs `keep alive intervals `_. + + You can override the default `channel_options` by passing + e.g. `channel_options = (("grpc.keepalive_time_ms", 60_000),)` - for a keepalive time of 60 seconds. + + +Pyzeebe provides a couple standard ways to achieve this: + + +Insecure +-------- + +For creating a grpc channel connected to a Zeebe Gateway with tls disabled, your can use the :py:func:`.create_insecure_channel`. + +Example: + +.. code-block:: python + + from pyzeebe import create_insecure_channel + + channel = create_insecure_channel(grpc_address="localhost:26500") + + +Secure +------ + +Create a grpc channel with a secure connection to a Zeebe Gateway with tls used the :py:func:`.create_secure_channel`. + +Example: + +.. code-block:: python + + import grpc + from pyzeebe import create_secure_channel + + + credentials = grpc.ssl_channel_credentials(root_certificates="", private_key="") + channel = create_secure_channel(grpc_address="host:port", channel_credentials=credentials) + + +Oauth2 Client Credentials Channel +--------------------------------- + +Create a grpc channel with a secure connection to a Zeebe Gateway with authorization via O2Auth +(Camunda Self-Hosted with Identity, for example) used the :py:func:`.create_oauth2_client_credentials_channel`. + +.. note:: + Some arguments are Optional and are highly dependent on your Authentication Server configuration, + `scope` is usually required and is often optional `audience` . + +Example: + +.. code-block:: python + + import grpc + from pyzeebe import create_oauth2_client_credentials_channel + + channel = create_oauth2_client_credentials_channel( + grpc_address=ZEEBE_ADDRESS, + client_id=ZEEBE_CLIENT_ID, + client_secret=ZEEBE_CLIENT_SECRET, + authorization_server=ZEEBE_AUTHORIZATION_SERVER_URL, + scope="profile email", + audience="zeebe-api", # NOTE: Can be omitted in some cases. + ) + +Example with custom `channel_options`: + +.. code-block:: python + + import grpc + from pyzeebe import create_oauth2_client_credentials_channel + from pyzeebe.types import ChannelArgumentType + + channel_options: ChannelArgumentType = (("grpc.so_reuseport", 0),) + + channel = create_oauth2_client_credentials_channel( + grpc_address=ZEEBE_ADDRESS, + client_id=ZEEBE_CLIENT_ID, + client_secret=ZEEBE_CLIENT_SECRET, + authorization_server=ZEEBE_AUTHORIZATION_SERVER_URL, + scope="profile email", + audience="zeebe-api", + channel_options=channel_options, + ) + +Example with custom `channel_credentials`: + +Useful for self-signed certificates with :py:func:`grpc.ssl_channel_credentials`. + +.. code-block:: python + + import grpc + from pyzeebe import create_oauth2_client_credentials_channel + from pyzeebe.types import ChannelArgumentType + + channel_credentials = grpc.ssl_channel_credentials( + root_certificates="", private_key="" + ) + channel_options: ChannelArgumentType = (("grpc.so_reuseport", 0),) + + channel = create_oauth2_client_credentials_channel( + grpc_address=ZEEBE_ADDRESS, + client_id=ZEEBE_CLIENT_ID, + client_secret=ZEEBE_CLIENT_SECRET, + authorization_server=ZEEBE_AUTHORIZATION_SERVER_URL, + scope="profile email", + audience="zeebe-api", + channel_credentials=channel_credentials, + channel_options=channel_options, + ) + +This method use the :py:class:`.Oauth2ClientCredentialsMetadataPlugin` under the hood. + +Camunda Cloud (Oauth2 Client Credentials Channel) +------------------------------------------------- + +Create a grpc channel with a secure connection to a Camunda SaaS used the :py:func:`.create_camunda_cloud_channel`. + +.. note:: + This is a convenience function for creating a channel with the correct parameters for Camunda Cloud. + It is equivalent to calling `create_oauth2_client_credentials_channel` with the correct parameters. + +Example: + +.. code-block:: python + + from pyzeebe import create_camunda_cloud_channel + + channel = create_camunda_cloud_channel( + client_id=ZEEBE_CLIENT_ID, + client_secret=ZEEBE_CLIENT_SECRET, + cluster_id=CAMUNDA_CLUSTER_ID, + ) + +This method use the :py:class:`.Oauth2ClientCredentialsMetadataPlugin` under the hood. + +Custom Oauth2 Authorization Flow +--------------------------------- + +If your need another authorization flow, your can create custom plugin used :py:class:`.OAuth2MetadataPlugin`. diff --git a/docs/channels_reference.rst b/docs/channels_reference.rst new file mode 100644 index 00000000..f9d26799 --- /dev/null +++ b/docs/channels_reference.rst @@ -0,0 +1,28 @@ +================== +Channels Reference +================== + +Channels +-------- + +.. autofunction:: pyzeebe.create_insecure_channel + +.. autofunction:: pyzeebe.create_secure_channel + +.. autofunction:: pyzeebe.create_oauth2_client_credentials_channel + +.. autofunction:: pyzeebe.create_camunda_cloud_channel + + +Credentials +----------- + +.. autoclass:: pyzeebe.credentials.OAuth2MetadataPlugin + :members: + :special-members: + :private-members: + +.. autoclass:: pyzeebe.credentials.Oauth2ClientCredentialsMetadataPlugin + :members: + :special-members: + :private-members: diff --git a/docs/client_reference.rst b/docs/client_reference.rst index 4e5b6f80..8cb66bd5 100644 --- a/docs/client_reference.rst +++ b/docs/client_reference.rst @@ -5,7 +5,9 @@ Client Reference .. autoclass:: pyzeebe.ZeebeClient :members: :undoc-members: + :special-members: __init__ .. autoclass:: pyzeebe.SyncZeebeClient :members: :undoc-members: + :special-members: __init__ diff --git a/docs/conf.py b/docs/conf.py index 27e5e8fa..9e4f9d42 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -70,4 +70,7 @@ # Looks for objects in external projects intersphinx_mapping = { "grpc": ("https://grpc.github.io/grpc/python/", None), + "requests": ("https://requests.readthedocs.io/en/latest/", None), + "requests_oauthlib": ("https://requests-oauthlib.readthedocs.io/en/latest/", None), + "python": ("https://docs.python.org/3/", None), } diff --git a/docs/credentials.rst b/docs/credentials.rst deleted file mode 100644 index 0265bbbd..00000000 --- a/docs/credentials.rst +++ /dev/null @@ -1,57 +0,0 @@ -=========== -Credentials -=========== - -Oauth2 Client Credentials Plugin --------------------------------- - -.. autoclass:: pyzeebe.credentials.Oauth2ClientCredentialsMetadataPlugin - :members: - :special-members: - :private-members: - -Example: - -.. code-block:: python - - oauth2_client_credentials = Oauth2ClientCredentialsMetadataPlugin( - client_id=ZEEBE_CLIENT_ID, - client_secret=ZEEBE_CLIENT_SECRET, - authorization_server=ZEEBE_AUTHORIZATION_SERVER_URL, - scope="profile email", - audience="zeebe-api", - ) - call_credentials: grpc.CallCredentials = grpc.metadata_call_credentials(oauth2_client_credentials) - channel_credentials: grpc.ChannelCredentials = grpc.ssl_channel_credentials( - certificate_chain=None, private_key=None, root_certificates=None - ) - composite_credentials: grpc.ChannelCredentials = grpc.composite_channel_credentials( - channel_credentials, call_credentials - ) - options: ChannelArgumentType = (("grpc.so_reuseport", 0),) - channel: grpc.aio.Channel = grpc.aio.secure_channel( - target=ZEEBE_ADDRESS, credentials=composite_credentials, options=options - ) - client = ZeebeClient(channel) - -Oauth2 Plugin -------------- -.. autoclass:: pyzeebe.credentials.OAuth2MetadataPlugin - :members: - :special-members: - :private-members: - -Internal (Deprecated) ---------------------- - -.. autoclass:: pyzeebe.AuthMetadataPlugin - :members: - :undoc-members: - -.. autoclass:: pyzeebe.CredentialsABC - :members: - :undoc-members: - -.. autoclass:: pyzeebe.CamundaIdentityCredentials - :members: - :undoc-members: diff --git a/docs/errors.rst b/docs/errors.rst index 864eb349..15ebafd1 100644 --- a/docs/errors.rst +++ b/docs/errors.rst @@ -42,8 +42,6 @@ All ``pyzeebe`` errors inherit from :py:class:`PyZeebeError` .. autoexception:: pyzeebe.errors.InvalidOAuthCredentialsError -.. autoexception:: pyzeebe.errors.InvalidCamundaCloudCredentialsError - .. autoexception:: pyzeebe.errors.UnknownGrpcStatusCodeError diff --git a/docs/index.rst b/docs/index.rst index e512ca17..9958cf85 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -64,7 +64,6 @@ Table Of Contents Client Worker Channels - Credentials Decorators Exceptions Zeebe Adapter diff --git a/docs/worker_quickstart.rst b/docs/worker_quickstart.rst index 264134af..9e861f3a 100644 --- a/docs/worker_quickstart.rst +++ b/docs/worker_quickstart.rst @@ -26,9 +26,9 @@ Run using event loop .. warning:: - Calling ``worker.work`` directly using ``asyncio.run`` will not work. When you create an async grpc channel a new event loop will automatically be created, which causes problems when running the worker (see: https://github.com/camunda-community-hub/pyzeebe/issues/198). + Calling ``worker.work`` directly using ``asyncio.run`` will not work. When you create an async grpc channel a new event loop will automatically be created, which causes problems when running the worker (see: https://github.com/camunda-community-hub/pyzeebe/issues/198). - An easy workaround: + An easy workaround: .. code-block:: python diff --git a/docs/worker_reference.rst b/docs/worker_reference.rst index 3df71c4e..63e66b86 100644 --- a/docs/worker_reference.rst +++ b/docs/worker_reference.rst @@ -8,11 +8,13 @@ This means that all methods that :py:class:`ZeebeTaskRouter` has will also appea .. autoclass:: pyzeebe.ZeebeTaskRouter :members: :undoc-members: + :special-members: __init__ .. autoclass:: pyzeebe.ZeebeWorker :members: :undoc-members: + :special-members: __init__ .. autoclass:: pyzeebe.Job diff --git a/docs/worker_taskrouter.rst b/docs/worker_taskrouter.rst index ca869b16..aaa00b4b 100644 --- a/docs/worker_taskrouter.rst +++ b/docs/worker_taskrouter.rst @@ -17,7 +17,7 @@ Create a Router Create a task with a Router --------------------------- -Creating a task with a router is the exact same process as wiht a :py:class:`ZeebeWorker` instance. +Creating a task with a router is the exact same process as wiht a :py:class:`.ZeebeWorker` instance. .. code-block:: python @@ -28,7 +28,7 @@ Creating a task with a router is the exact same process as wiht a :py:class:`Zee .. note:: - The :py:class:`ZeebeTaskRouter` :py:func:`task` decorator has all the capabities of the :py:class:`ZeebeWorker` class. + The :py:meth:`.ZeebeTaskRouter.task` decorator has all the capabities of the :py:class:`.ZeebeWorker` class. Merge Router tasks to a worker ------------------------------ diff --git a/docs/worker_tasks.rst b/docs/worker_tasks.rst index 5098499b..afce8bb3 100644 --- a/docs/worker_tasks.rst +++ b/docs/worker_tasks.rst @@ -26,7 +26,7 @@ This is a task that does nothing. It receives no parameters and also doesn't ret Async/Sync Tasks ---------------- -Tasks can be regular or async functions. If given a regular function, pyzeebe will convert it into an async one by running `asyncio.run_in_executor` +Tasks can be regular or async functions. If given a regular function, pyzeebe will convert it into an async one by running :py:meth:`asyncio.loop.run_in_executor` .. note:: @@ -57,9 +57,10 @@ An exception handler's signature: .. code-block:: python - Callable[[Exception, Job], Awaitable[None]] + Callable[[Exception, Job, JobController], Awaitable[None]] -In other words: an exception handler is a function that receives an :class:`Exception` and :py:class:`Job` instance (a pyzeebe class). +In other words: an exception handler is a function that receives an :class:`Exception`, +:py:class:`.Job` instance and :py:class:`.JobController` (a pyzeebe class). The exception handler is called when the task has failed. diff --git a/docs/zeebe_adapter_reference.rst b/docs/zeebe_adapter_reference.rst index 0a1fb7f3..b016073e 100644 --- a/docs/zeebe_adapter_reference.rst +++ b/docs/zeebe_adapter_reference.rst @@ -5,6 +5,8 @@ Zeebe Adapter Reference .. autoclass:: pyzeebe.grpc_internals.zeebe_adapter.ZeebeAdapter :members: :undoc-members: + :special-members: __init__ + :inherited-members: ========================== Zeebe GRPC Responses diff --git a/pyzeebe/__init__.py b/pyzeebe/__init__.py index dfbea6b9..db581068 100644 --- a/pyzeebe/__init__.py +++ b/pyzeebe/__init__.py @@ -7,9 +7,6 @@ ) from pyzeebe.client.client import ZeebeClient from pyzeebe.client.sync_client import SyncZeebeClient -from pyzeebe.credentials.base import CredentialsABC -from pyzeebe.credentials.camunda_identity import CamundaIdentityCredentials -from pyzeebe.credentials.plugins import AuthMetadataPlugin from pyzeebe.job.job import Job, JobController from pyzeebe.job.job_status import JobStatus from pyzeebe.task.exception_handler import ExceptionHandler, default_exception_handler @@ -35,7 +32,4 @@ "ZeebeTaskRouter", "default_exception_handler", "ZeebeWorker", - "CredentialsABC", - "CamundaIdentityCredentials", - "AuthMetadataPlugin", ) diff --git a/pyzeebe/channel/camunda_cloud_channel.py b/pyzeebe/channel/camunda_cloud_channel.py deleted file mode 100644 index 864bb0c0..00000000 --- a/pyzeebe/channel/camunda_cloud_channel.py +++ /dev/null @@ -1,93 +0,0 @@ -from __future__ import annotations - -import grpc -from oauthlib import oauth2 -from requests import HTTPError -from requests_oauthlib import OAuth2Session -from typing_extensions import deprecated - -from pyzeebe.channel.channel_options import get_channel_options -from pyzeebe.errors import ( - InvalidCamundaCloudCredentialsError, - InvalidOAuthCredentialsError, -) -from pyzeebe.types import ChannelArgumentType - - -@deprecated( - "Use pyzeebe.channel.oauth_channel.create_camunda_cloud_channel function instead", - category=DeprecationWarning, - stacklevel=1, -) -def create_camunda_cloud_channel( - client_id: str, - client_secret: str, - cluster_id: str, - region: str = "bru-2", - channel_options: ChannelArgumentType | None = None, -) -> grpc.aio.Channel: - """ - Create channel connected to a Camunda Cloud cluster - - Args: - client_id (str): The client id provided by Camunda Cloud - client_secret (str): The client secret provided by Camunda Cloud - cluster_id (str): The zeebe cluster id to connect to - region (str): The cluster's region. Defaults to bru-2 - channel_options (Optional[ChannelArgumentType]): GRPC channel options. See https://grpc.github.io/grpc/python/glossary.html - - Returns: - grpc.aio.Channel: A GRPC Channel connected to the Zeebe gateway. - - Raises: - InvalidCamundaCloudCredentialsError: One of the provided camunda credentials is not correct - """ - channel_credentials = _create_camunda_cloud_credentials(client_id, client_secret, cluster_id, region) - - return grpc.aio.secure_channel( - f"{cluster_id}.{region}.zeebe.camunda.io:443", - channel_credentials, - options=get_channel_options(channel_options), - ) - - -def _create_camunda_cloud_credentials( - client_id: str, client_secret: str, cluster_id: str, region: str -) -> grpc.ChannelCredentials: - try: - access_token = _get_access_token( - url="https://login.cloud.camunda.io/oauth/token", - client_id=client_id, - client_secret=client_secret, - audience="zeebe.camunda.io", - ) - return _create_oauth_credentials(access_token) - except InvalidOAuthCredentialsError as oauth_error: - raise InvalidCamundaCloudCredentialsError(client_id, cluster_id) from oauth_error - - -def _get_access_token(url: str, client_id: str, client_secret: str, audience: str) -> str: - try: - client = oauth2.BackendApplicationClient(client_id) - client.prepare_request_body(include_client_id=True) - with OAuth2Session(client=client) as session: - response = session.post( - url, - data={ - "client_id": client_id, - "client_secret": client_secret, - "audience": audience, - }, - ) - response.raise_for_status() - access_token = response.json()["access_token"] - assert isinstance(access_token, str) - return access_token - except HTTPError as http_error: - raise InvalidOAuthCredentialsError(url=url, client_id=client_id, audience=audience) from http_error - - -def _create_oauth_credentials(access_token: str) -> grpc.ChannelCredentials: - token_credentials = grpc.access_token_call_credentials(access_token) - ssl_credentials = grpc.ssl_channel_credentials() - return grpc.composite_channel_credentials(ssl_credentials, token_credentials) diff --git a/pyzeebe/channel/insecure_channel.py b/pyzeebe/channel/insecure_channel.py index f71c7def..846cff72 100644 --- a/pyzeebe/channel/insecure_channel.py +++ b/pyzeebe/channel/insecure_channel.py @@ -14,9 +14,9 @@ def create_insecure_channel( Create an insecure channel Args: - grpc_address (Optional[str], optional): Zeebe Gateway Address + grpc_address (Optional[str]): Zeebe Gateway Address Default: None, alias the ZEEBE_ADDRESS environment variable or "localhost:26500" - channel_options (Optional[Dict], optional): GRPC channel options. + channel_options (Optional[ChannelArgumentType]): GRPC channel options. See https://grpc.github.io/grpc/python/glossary.html#term-channel_arguments Returns: diff --git a/pyzeebe/channel/oauth_channel.py b/pyzeebe/channel/oauth_channel.py index 57f3c267..d54746a4 100644 --- a/pyzeebe/channel/oauth_channel.py +++ b/pyzeebe/channel/oauth_channel.py @@ -23,8 +23,8 @@ def create_oauth2_client_credentials_channel( ) -> grpc.aio.Channel: """Create a gRPC channel for connecting to Camunda 8 (Self-Managed) with OAuth2ClientCredentials. - https://oauth.net/2/grant-types/client-credentials/ - https://datatracker.ietf.org/doc/html/rfc6749#section-11.2.2 + - https://oauth.net/2/grant-types/client-credentials/ + - https://datatracker.ietf.org/doc/html/rfc6749#section-11.2.2 Args: grpc_address (str): Zeebe Gateway Address. @@ -49,9 +49,6 @@ def create_oauth2_client_credentials_channel( Returns: grpc.aio.Channel: A gRPC channel connected to the Zeebe Gateway. - - Raises: - InvalidOAuthCredentialsError: One of the provided camunda credentials is not correct """ oauth2_client_credentials = Oauth2ClientCredentialsMetadataPlugin( diff --git a/pyzeebe/channel/secure_channel.py b/pyzeebe/channel/secure_channel.py index bddfad9b..e28b4f91 100644 --- a/pyzeebe/channel/secure_channel.py +++ b/pyzeebe/channel/secure_channel.py @@ -16,9 +16,9 @@ def create_secure_channel( Create a secure channel Args: - grpc_address (Optional[str], optional): Zeebe Gateway Address + grpc_address (Optional[str]): Zeebe Gateway Address Default: None, alias the ZEEBE_ADDRESS environment variable or "localhost:26500" - channel_options (Optional[Dict], optional): GRPC channel options. + channel_options (Optional[ChannelArgumentType]): GRPC channel options. See https://grpc.github.io/grpc/python/glossary.html#term-channel_arguments channel_credentials (Optional[grpc.ChannelCredentials]): Channel credentials to use. Will use grpc.ssl_channel_credentials() if not provided. diff --git a/pyzeebe/client/client.py b/pyzeebe/client/client.py index f51de6f8..e5e0d16d 100644 --- a/pyzeebe/client/client.py +++ b/pyzeebe/client/client.py @@ -77,7 +77,7 @@ async def run_process_with_result( variables (dict): A dictionary containing all the starting variables the process needs. Must be JSONable. version (int): The version of the process. Default: -1 (latest) timeout (int): How long to wait until a timeout occurs. Default: 0 (Zeebe default timeout) - variables_to_fetch (List[str]): Which variables to get from the finished process + variables_to_fetch (list[str]): Which variables to get from the finished process tenant_id (str): The tenant ID of the process definition. New in Zeebe 8.3. Returns: diff --git a/pyzeebe/client/sync_client.py b/pyzeebe/client/sync_client.py index f23c2b47..478a22ca 100644 --- a/pyzeebe/client/sync_client.py +++ b/pyzeebe/client/sync_client.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +from functools import partial, wraps import grpc @@ -14,12 +15,15 @@ ) from pyzeebe.types import Variables +copy_docstring = partial(wraps, assigned=["__doc__"], updated=[]) + class SyncZeebeClient: def __init__(self, grpc_channel: grpc.aio.Channel, max_connection_retries: int = 10) -> None: self.loop = asyncio.get_event_loop() self.client = ZeebeClient(grpc_channel, max_connection_retries) + @copy_docstring(ZeebeClient.run_process) def run_process( self, bpmn_process_id: str, @@ -29,6 +33,7 @@ def run_process( ) -> CreateProcessInstanceResponse: return self.loop.run_until_complete(self.client.run_process(bpmn_process_id, variables, version, tenant_id)) + @copy_docstring(ZeebeClient.run_process_with_result) def run_process_with_result( self, bpmn_process_id: str, @@ -44,12 +49,15 @@ def run_process_with_result( ) ) + @copy_docstring(ZeebeClient.cancel_process_instance) def cancel_process_instance(self, process_instance_key: int) -> CancelProcessInstanceResponse: return self.loop.run_until_complete(self.client.cancel_process_instance(process_instance_key)) + @copy_docstring(ZeebeClient.deploy_resource) def deploy_resource(self, *resource_file_path: str, tenant_id: str | None = None) -> DeployResourceResponse: return self.loop.run_until_complete(self.client.deploy_resource(*resource_file_path, tenant_id=tenant_id)) + @copy_docstring(ZeebeClient.publish_message) def publish_message( self, name: str, diff --git a/pyzeebe/credentials/__init__.py b/pyzeebe/credentials/__init__.py index 02c74df4..f23a71fc 100644 --- a/pyzeebe/credentials/__init__.py +++ b/pyzeebe/credentials/__init__.py @@ -1,12 +1,6 @@ -from .base import CredentialsABC -from .camunda_identity import CamundaIdentityCredentials from .oauth import Oauth2ClientCredentialsMetadataPlugin, OAuth2MetadataPlugin -from .plugins import AuthMetadataPlugin __all__ = ( - "CredentialsABC", - "CamundaIdentityCredentials", "Oauth2ClientCredentialsMetadataPlugin", "OAuth2MetadataPlugin", - "AuthMetadataPlugin", ) diff --git a/pyzeebe/credentials/base.py b/pyzeebe/credentials/base.py deleted file mode 100644 index 2aac4a4e..00000000 --- a/pyzeebe/credentials/base.py +++ /dev/null @@ -1,18 +0,0 @@ -import abc - -from pyzeebe.credentials.typing import AuthMetadata, CallContext - - -class CredentialsABC(abc.ABC): - """A specification for credentials manager. Passed to :py:class:`AuthMetadataPlugin`.""" - - @abc.abstractmethod - def get_auth_metadata(self, context: CallContext) -> AuthMetadata: - """ - Args: - context (grpc.AuthMetadataContext): Provides information to call credentials metadata plugins. - - Returns: - Tuple[Tuple[str, Union[str, bytes]], ...]: The `metadata` used to construct the :py:class:`grpc.CallCredentials`. - """ - raise NotImplementedError diff --git a/pyzeebe/credentials/camunda_identity.py b/pyzeebe/credentials/camunda_identity.py deleted file mode 100644 index d657d4d9..00000000 --- a/pyzeebe/credentials/camunda_identity.py +++ /dev/null @@ -1,88 +0,0 @@ -from __future__ import annotations - -import datetime -import threading -from typing import Any - -import requests - -from pyzeebe.credentials.base import CredentialsABC -from pyzeebe.credentials.typing import AuthMetadata, CallContext -from pyzeebe.errors import InvalidOAuthCredentialsError - - -class CamundaIdentityCredentials(CredentialsABC): - """Credentials client for Camunda Platform. - - Args: - oauth_url (str): The Keycloak auth endpoint url. - client_id (str): The client id provided by Camunda Platform - client_secret (str): The client secret provided by Camunda Platform - audience (str): Audience for Zeebe. Default: zeebe-api - refresh_threshold_seconds (int): Will try to refresh token if it expires in this number of seconds or less. Default: 20 - """ - - def __init__( - self, - *, - oauth_url: str, - client_id: str, - client_secret: str, - audience: str = "zeebe-api", - refresh_threshold_seconds: int = 20, - ) -> None: - self.oauth_url = oauth_url - self.client_id = client_id - self.client_secret = client_secret - self.audience = audience - - self._lock = threading.Lock() - self._refresh_threshold = datetime.timedelta(seconds=refresh_threshold_seconds) - - self._token: dict[str, Any] | None = None - self._expires_in: datetime.datetime | None = None - - def _expired(self) -> bool: - return ( - self._token is None - or self._expires_in is None - or (self._expires_in - self._refresh_threshold) < datetime.datetime.now(datetime.timezone.utc) - ) - - def _refresh(self) -> None: - try: - response = requests.post( - self.oauth_url, - data={ - "client_id": self.client_id, - "client_secret": self.client_secret, - "audience": self.audience, - "grant_type": "client_credentials", - }, - ) - response.raise_for_status() - data = response.json() - self._token = data["access_token"] - self._expires_in = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta( - seconds=int(data["expires_in"]) - ) - except requests.HTTPError as http_error: - raise InvalidOAuthCredentialsError( - url=self.oauth_url, client_id=self.client_id, audience=self.audience - ) from http_error - - def get_auth_metadata(self, context: CallContext) -> AuthMetadata: - """ - Args: - context (grpc.AuthMetadataContext): Provides information to call credentials metadata plugins. - - Returns: - Tuple[Tuple[str, Union[str, bytes]], ...]: The `metadata` used to construct the :py:class:`grpc.CallCredentials`. - - Raises: - InvalidOAuthCredentialsError: One of the provided camunda credentials is not correct - """ - with self._lock: - if self._expired() is True: - self._refresh() - return (("authorization", f"Bearer {self._token}"),) diff --git a/pyzeebe/credentials/oauth.py b/pyzeebe/credentials/oauth.py index 72cd2385..58eef7b1 100644 --- a/pyzeebe/credentials/oauth.py +++ b/pyzeebe/credentials/oauth.py @@ -12,6 +12,8 @@ from oauthlib import oauth2 from requests_oauthlib import OAuth2Session +from pyzeebe.errors import InvalidOAuthCredentialsError + logger = logging.getLogger(__name__) @@ -29,9 +31,9 @@ class OAuth2MetadataPlugin(grpc.AuthMetadataPlugin): # type: ignore[misc] Implements the AuthMetadataPlugin interface for OAuth2 Authentication based on oauthlib and requests_oauthlib. - https://datatracker.ietf.org/doc/html/rfc6749 - https://oauthlib.readthedocs.io/en/latest/oauth2/oauth2.html - https://requests-oauthlib.readthedocs.io/en/latest/oauth2_workflow.html + - https://datatracker.ietf.org/doc/html/rfc6749 + - https://oauthlib.readthedocs.io/en/latest/oauth2/oauth2.html + - https://requests-oauthlib.readthedocs.io/en/latest/oauth2_workflow.html """ def __init__( @@ -44,7 +46,7 @@ def __init__( """AuthMetadataPlugin for OAuth2 Authentication. Args: - oauth2session (OAuth2Session): The OAuth2Session object. + oauth2session (requests_oauthlib.OAuth2Session): The OAuth2Session object. func_fetch_token (Callable): The function to fetch the token. leeway (int): The number of seconds to consider the token as expired before the actual expiration time. @@ -111,7 +113,7 @@ def retrieve_token(self) -> None: except oauth2.OAuth2Error as e: logger.exception(str(e)) - raise e + raise InvalidOAuthCredentialsError(str(e)) from e def _no_expiration(self, r: requests.Response) -> requests.Response: """ @@ -136,8 +138,8 @@ def _no_expiration(self, r: requests.Response) -> requests.Response: class Oauth2ClientCredentialsMetadataPlugin(OAuth2MetadataPlugin): """AuthMetadataPlugin for OAuth2 Client Credentials Authentication based on Oauth2MetadataPlugin. - https://oauth.net/2/grant-types/client-credentials/ - https://datatracker.ietf.org/doc/html/rfc6749#section-11.2.2 + - https://oauth.net/2/grant-types/client-credentials/ + - https://datatracker.ietf.org/doc/html/rfc6749#section-11.2.2 """ def __init__( diff --git a/pyzeebe/credentials/plugins.py b/pyzeebe/credentials/plugins.py deleted file mode 100644 index af62e34e..00000000 --- a/pyzeebe/credentials/plugins.py +++ /dev/null @@ -1,32 +0,0 @@ -import grpc - -from pyzeebe.credentials.base import CredentialsABC - - -class AuthMetadataPlugin(grpc.AuthMetadataPlugin): # type: ignore[misc] - """Custom authentication plugin with exception catching. - - Args: - credentials (CredentialsABC): A credentials manager. - """ - - def __init__(self, *, credentials: CredentialsABC) -> None: - self._credentials = credentials - - def __call__(self, context: grpc.AuthMetadataContext, callback: grpc.AuthMetadataPluginCallback) -> None: - """Implements authentication by passing metadata to a callback. - - This method will be invoked asynchronously in a separate thread. - - Args: - context: An AuthMetadataContext providing information on the RPC that - the plugin is being called to authenticate. - callback: An AuthMetadataPluginCallback to be invoked either - synchronously or asynchronously. - """ - try: - metadata = self._credentials.get_auth_metadata(context) - except Exception as e: - callback((), e) - else: - callback(metadata, None) diff --git a/pyzeebe/credentials/typing.py b/pyzeebe/credentials/typing.py deleted file mode 100644 index 128e64e6..00000000 --- a/pyzeebe/credentials/typing.py +++ /dev/null @@ -1,8 +0,0 @@ -from typing import Protocol, Union - -AuthMetadata = tuple[tuple[str, Union[str, bytes]], ...] - - -class CallContext(Protocol): - service_url: str - method_name: str diff --git a/pyzeebe/errors/__init__.py b/pyzeebe/errors/__init__.py index d4aa3fdf..d6294791 100644 --- a/pyzeebe/errors/__init__.py +++ b/pyzeebe/errors/__init__.py @@ -1,7 +1,4 @@ -from .credentials_errors import ( - InvalidCamundaCloudCredentialsError, - InvalidOAuthCredentialsError, -) +from .credentials_errors import InvalidOAuthCredentialsError from .job_errors import ( ActivateJobsRequestInvalidError, JobAlreadyDeactivatedError, @@ -32,7 +29,6 @@ ) __all__ = ( - "InvalidCamundaCloudCredentialsError", "InvalidOAuthCredentialsError", "ActivateJobsRequestInvalidError", "JobAlreadyDeactivatedError", diff --git a/pyzeebe/errors/credentials_errors.py b/pyzeebe/errors/credentials_errors.py index b557c009..b0b9ffc9 100644 --- a/pyzeebe/errors/credentials_errors.py +++ b/pyzeebe/errors/credentials_errors.py @@ -2,12 +2,4 @@ class InvalidOAuthCredentialsError(PyZeebeError): - def __init__(self, url: str, client_id: str, audience: str) -> None: - super().__init__( - f"Invalid OAuth credentials supplied for {url} with audience {audience} and client id {client_id}" - ) - - -class InvalidCamundaCloudCredentialsError(PyZeebeError): - def __init__(self, client_id: str, cluster_id: str) -> None: - super().__init__(f"Invalid credentials supplied for cluster {cluster_id} with client {client_id}") + pass diff --git a/pyzeebe/grpc_internals/types.py b/pyzeebe/grpc_internals/types.py index 08897b2e..55e9f8d7 100644 --- a/pyzeebe/grpc_internals/types.py +++ b/pyzeebe/grpc_internals/types.py @@ -55,8 +55,7 @@ class ProcessMetadata: version: int #: the assigned key, which acts as a unique identifier for this process process_definition_key: int - #: the resource name (see: ProcessRequestObject.name) from which this process was - #: parsed + #: the resource name from which this process was parsed resource_name: str #: the tenant ID of the deployed process tenant_id: str | None @@ -94,7 +93,7 @@ class DecisionRequirementsMetadata: #: the assigned decision requirements key, which acts as a unique identifier #: for this decision requirements decision_requirements_key: int - #: the resource name (see: Resource.name) from which this decision + #: the resource name from which this decision #: requirements was parsed resource_name: str #: the tenant ID of the deployed decision requirements diff --git a/pyzeebe/worker/task_router.py b/pyzeebe/worker/task_router.py index df19f46e..85a42f27 100644 --- a/pyzeebe/worker/task_router.py +++ b/pyzeebe/worker/task_router.py @@ -30,8 +30,8 @@ def __init__( ): """ Args: - before (List[TaskDecorator]): Decorators to be performed before each task - after (List[TaskDecorator]): Decorators to be performed after each task + before (list[TaskDecorator]): Decorators to be performed before each task + after (list[TaskDecorator]): Decorators to be performed after each task exception_handler (ExceptionHandler): Handler that will be called when a job fails. """ self._exception_handler = exception_handler @@ -94,8 +94,8 @@ def task( on the worker and retry it. Default: 10000 (10 seconds). max_jobs_to_activate (int): Maximum amount of jobs the worker will activate in one request to the Zeebe gateway. Default: 32 max_running_jobs (int): Maximum amount of jobs that will run simultaneously. Default: 32 - before (List[TaskDecorator]): All decorators which should be performed before the task. - after (List[TaskDecorator]): All decorators which should be performed after the task. + before (list[TaskDecorator]): All decorators which should be performed before the task. + after (list[TaskDecorator]): All decorators which should be performed after the task. single_value (bool): If the function returns a single value (int, string, list) and not a dictionary set this to True. Default: False variable_name (str): If single_value then this will be the variable name given to zeebe: diff --git a/pyzeebe/worker/worker.py b/pyzeebe/worker/worker.py index f58e513e..aabaca1e 100644 --- a/pyzeebe/worker/worker.py +++ b/pyzeebe/worker/worker.py @@ -40,12 +40,12 @@ def __init__( grpc_channel (grpc.aio.Channel): GRPC Channel connected to a Zeebe gateway name (str): Name of zeebe worker request_timeout (int): Longpolling timeout for getting tasks from zeebe. If 0 default value is used - before (List[TaskDecorator]): Decorators to be performed before each task - after (List[TaskDecorator]): Decorators to be performed after each task + before (list[TaskDecorator]): Decorators to be performed before each task + after (list[TaskDecorator]): Decorators to be performed after each task exception_handler (ExceptionHandler): Handler that will be called when a job fails. max_connection_retries (int): Amount of connection retries before worker gives up on connecting to zeebe. To setup with infinite retries use -1 poll_retry_delay (int): The number of seconds to wait before attempting to poll again when reaching max amount of running jobs - tenant_ids (List[str]): A list of tenant IDs for which to activate jobs. New in Zeebe 8.3. + tenant_ids (list[str]): A list of tenant IDs for which to activate jobs. New in Zeebe 8.3. """ super().__init__(before, after, exception_handler) self.zeebe_adapter = ZeebeAdapter(grpc_channel, max_connection_retries) @@ -61,7 +61,7 @@ def _init_tasks(self) -> None: self._job_executors, self._job_pollers = [], [] for task in self.tasks: - jobs_queue: asyncio.Queue[Job] = asyncio.Queue() + jobs_queue = asyncio.Queue[Job]() task_state = TaskState() poller = JobPoller( @@ -81,7 +81,7 @@ def _init_tasks(self) -> None: async def work(self) -> None: """ - Start the worker. The worker will poll zeebe for jobs of each task in a different thread. + Start the worker. The worker will poll zeebe for jobs of each task in a different asyncio task. Raises: ActivateJobsRequestInvalidError: If one of the worker's task has invalid types diff --git a/tests/unit/channel/camunda_cloud_channel_test.py b/tests/unit/channel/camunda_cloud_channel_test.py deleted file mode 100644 index ba6508f2..00000000 --- a/tests/unit/channel/camunda_cloud_channel_test.py +++ /dev/null @@ -1,128 +0,0 @@ -from unittest.mock import Mock, patch -from uuid import uuid4 - -import grpc -import pytest -import responses - -from pyzeebe.channel.camunda_cloud_channel import create_camunda_cloud_channel -from pyzeebe.errors import InvalidCamundaCloudCredentialsError - - -@pytest.fixture -def mocked_responses(): - with responses.RequestsMock() as response_mock: - yield response_mock - - -@pytest.fixture -def client_id() -> str: - return str(uuid4()) - - -@pytest.fixture -def client_secret() -> str: - return str(uuid4()) - - -@pytest.fixture -def cluster_id() -> str: - return str(uuid4()) - - -@pytest.fixture -def region() -> str: - return str(uuid4()) - - -@pytest.fixture -def url() -> str: - return "https://login.cloud.camunda.io/oauth/token" - - -@pytest.fixture -def access_token() -> str: - return str(uuid4()) - - -@pytest.fixture -def mock_access_token_response(mocked_responses: responses.RequestsMock, url: str, access_token: str): - mocked_responses.add(responses.POST, url, json={"access_token": access_token}, status=200) - - -class TestCamundaCloudChannel: - @pytest.fixture(autouse=True) - def secure_channel_mock(self, aio_grpc_channel: grpc.aio.Channel): - with patch("grpc.aio.secure_channel", return_value=aio_grpc_channel) as mock: - yield mock - - @pytest.mark.usefixtures("mock_access_token_response") - def test_returns_grpc_channel( - self, - mocked_responses: responses.RequestsMock, - client_id: str, - client_secret: str, - cluster_id: str, - ): - channel = create_camunda_cloud_channel(client_id, client_secret, cluster_id) - - assert isinstance(channel, grpc.aio.Channel) - - @pytest.mark.usefixtures("mock_access_token_response") - def test_gets_access_token_from_camunda_cloud( - self, - mocked_responses: responses.RequestsMock, - client_id: str, - client_secret: str, - cluster_id: str, - ): - create_camunda_cloud_channel(client_id, client_secret, cluster_id) - - assert len(mocked_responses.calls) == 1 - - @pytest.mark.usefixtures("mock_access_token_response") - def test_gets_access_token_using_correct_parameters( - self, - mocked_responses: responses.RequestsMock, - client_id: str, - client_secret: str, - cluster_id: str, - region: str, - ): - expected_request_body = f"client_id={client_id}&client_secret={client_secret}&audience=zeebe.camunda.io" - - create_camunda_cloud_channel(client_id, client_secret, cluster_id, region) - - request = mocked_responses.calls[0].request - assert request.body == expected_request_body - - def test_raises_on_invalid_credentials( - self, - mocked_responses: responses.RequestsMock, - url: str, - client_id: str, - client_secret: str, - cluster_id: str, - ): - mocked_responses.add( - responses.POST, url, status=400 - ) # Camunda cloud returns 400 when invalid credentials are provided - - with pytest.raises(InvalidCamundaCloudCredentialsError): - create_camunda_cloud_channel(client_id, client_secret, cluster_id) - - @pytest.mark.usefixtures("mock_access_token_response") - def test_creates_channel_using_grpc_credentials( - self, - client_id: str, - client_secret: str, - cluster_id: str, - secure_channel_mock: Mock, - ): - create_camunda_cloud_channel(client_id, client_secret, cluster_id) - - secure_channel_call = secure_channel_mock.mock_calls[0] - arguments = [arg for arg in secure_channel_call.args] - assert any( - isinstance(arg, grpc.ChannelCredentials) for arg in arguments - ), "None of the arguments to grpc.aio.create_secure_channel were of type grpc.ChannelCredentials" diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index c4b74ad6..619161d1 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -1,6 +1,5 @@ from random import randint -from threading import Event -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import AsyncMock, MagicMock from uuid import uuid4 import grpc @@ -104,31 +103,6 @@ def original_function(): return mock -@pytest.fixture -def stop_after_test(): - stop_test = Event() - yield stop_test - stop_test.set() - - -@pytest.fixture -def handle_task_mock(): - with patch("pyzeebe.worker.worker.ZeebeWorker._handle_task") as mock: - yield mock - - -@pytest.fixture -def stop_event_mock(zeebe_worker): - with patch.object(zeebe_worker, "stop_event") as mock: - yield mock - - -@pytest.fixture -def handle_not_alive_thread_spy(mocker): - spy = mocker.spy(ZeebeWorker, "_handle_not_alive_thread") - yield spy - - @pytest.fixture def router(): return ZeebeTaskRouter() diff --git a/tests/unit/credentials/auth_metadata_plugin_test.py b/tests/unit/credentials/auth_metadata_plugin_test.py deleted file mode 100644 index 8bcb18f7..00000000 --- a/tests/unit/credentials/auth_metadata_plugin_test.py +++ /dev/null @@ -1,50 +0,0 @@ -from unittest.mock import Mock - -import grpc -import pytest - -from pyzeebe import AuthMetadataPlugin -from pyzeebe.credentials.base import CredentialsABC -from pyzeebe.credentials.typing import CallContext -from pyzeebe.errors.credentials_errors import InvalidOAuthCredentialsError - - -class TestAuthMetadataPlugin: - @pytest.fixture() - def credentials_mock(self) -> Mock: - return Mock(spec_set=CredentialsABC) - - @pytest.fixture() - def callback_mock(self) -> Mock: - return Mock(spec_set=grpc.AuthMetadataPluginCallback) - - @pytest.fixture() - def context_mock(self) -> Mock: - return Mock(spec_set=CallContext) - - def test_auth_plugin_metadata_success( - self, context_mock: Mock, credentials_mock: Mock, callback_mock: Mock - ) -> None: - metadata_mock = Mock() - - credentials_mock.get_auth_metadata.return_value = metadata_mock - - plugin = AuthMetadataPlugin(credentials=credentials_mock) - - plugin(context_mock, callback_mock) - - callback_mock.assert_called_once_with(metadata_mock, None) - credentials_mock.get_auth_metadata.assert_called_once_with(context_mock) - - def test_auth_plugin_metadata_exception( - self, context_mock: Mock, credentials_mock: Mock, callback_mock: Mock - ) -> None: - exception = InvalidOAuthCredentialsError(url=Mock(), client_id=Mock(), audience=Mock()) - credentials_mock.get_auth_metadata.side_effect = [exception] - - plugin = AuthMetadataPlugin(credentials=credentials_mock) - - plugin(context_mock, callback_mock) - - callback_mock.assert_called_once_with((), exception) - credentials_mock.get_auth_metadata.assert_called_once_with(context_mock) diff --git a/tests/unit/credentials/camunda_identity_credentials_test.py b/tests/unit/credentials/camunda_identity_credentials_test.py deleted file mode 100644 index 313831c3..00000000 --- a/tests/unit/credentials/camunda_identity_credentials_test.py +++ /dev/null @@ -1,118 +0,0 @@ -from unittest.mock import Mock -from uuid import uuid4 - -import pytest -import responses - -from pyzeebe import CamundaIdentityCredentials -from pyzeebe.errors import InvalidOAuthCredentialsError - - -@pytest.fixture -def mocked_responses(): - with responses.RequestsMock() as response_mock: - yield response_mock - - -@pytest.fixture -def client_id() -> str: - return str(uuid4()) - - -@pytest.fixture -def client_secret() -> str: - return str(uuid4()) - - -@pytest.fixture -def url() -> str: - return "https://login.cloud.camunda.io/oauth/token" - - -@pytest.fixture -def access_token() -> str: - return str(uuid4()) - - -@pytest.fixture -def mock_access_token_response(mocked_responses: responses.RequestsMock, url: str, access_token: str): - mocked_responses.add(responses.POST, url, json={"access_token": access_token, "expires_in": 30}, status=200) - - -class TestCamundaIdentityCredentials: - @pytest.mark.usefixtures("mock_access_token_response") - def test_gets_access_token( - self, - mocked_responses: responses.RequestsMock, - url: str, - client_id: str, - client_secret: str, - access_token: str, - ): - credentials = CamundaIdentityCredentials(oauth_url=url, client_id=client_id, client_secret=client_secret) - - assert credentials.get_auth_metadata(Mock()) == (("authorization", f"Bearer {access_token}"),) - - @pytest.mark.usefixtures("mock_access_token_response") - def test_gets_access_token_using_correct_parameters( - self, - mocked_responses: responses.RequestsMock, - url: str, - client_id: str, - client_secret: str, - ): - expected_request_body = ( - f"client_id={client_id}&client_secret={client_secret}&audience=zeebe-api&grant_type=client_credentials" - ) - - credentials = CamundaIdentityCredentials(oauth_url=url, client_id=client_id, client_secret=client_secret) - credentials.get_auth_metadata(Mock()) - - request = mocked_responses.calls[0].request - assert request.url == url - assert request.body == expected_request_body - - def test_raises_on_invalid_credentials( - self, - mocked_responses: responses.RequestsMock, - url: str, - client_id: str, - client_secret: str, - ): - mocked_responses.add(responses.POST, url, status=400) - credentials = CamundaIdentityCredentials(oauth_url=url, client_id=client_id, client_secret=client_secret) - - with pytest.raises(InvalidOAuthCredentialsError): - credentials.get_auth_metadata(Mock()) - - @pytest.mark.usefixtures("mock_access_token_response") - def test_gets_access_token_use_cache( - self, - mocked_responses: responses.RequestsMock, - url: str, - client_id: str, - client_secret: str, - access_token: str, - ): - credentials = CamundaIdentityCredentials(oauth_url=url, client_id=client_id, client_secret=client_secret) - - assert credentials.get_auth_metadata(Mock()) == (("authorization", f"Bearer {access_token}"),) - assert credentials.get_auth_metadata(Mock()) == (("authorization", f"Bearer {access_token}"),) - assert mocked_responses.assert_call_count(url, 1) - - def test_gets_access_token_refresh_threshold( - self, - mocked_responses: responses.RequestsMock, - url: str, - client_id: str, - client_secret: str, - ): - mocked_responses.add(responses.POST, url, json={"access_token": "test1", "expires_in": 9}, status=200) - mocked_responses.add(responses.POST, url, json={"access_token": "test2", "expires_in": 9}, status=200) - credentials = CamundaIdentityCredentials( - oauth_url=url, client_id=client_id, client_secret=client_secret, refresh_threshold_seconds=10 - ) - - assert credentials.get_auth_metadata(Mock()) == (("authorization", "Bearer test1"),) - assert credentials.get_auth_metadata(Mock()) == (("authorization", "Bearer test2"),) - assert mocked_responses.assert_call_count(url, 2) diff --git a/tests/unit/credentials/oauth_test.py b/tests/unit/credentials/oauth_test.py index 3c381c8e..26bc2fa8 100644 --- a/tests/unit/credentials/oauth_test.py +++ b/tests/unit/credentials/oauth_test.py @@ -16,6 +16,7 @@ Oauth2ClientCredentialsMetadataPlugin, OAuth2MetadataPlugin, ) +from pyzeebe.errors.credentials_errors import InvalidOAuthCredentialsError @pytest.fixture @@ -222,7 +223,7 @@ def test_authorized_error( oauth2mp.__call__(mock_context, mock_callback) - with pytest.raises(OAuth2Error, match="Error fetching token"): + with pytest.raises(InvalidOAuthCredentialsError, match="Error fetching token"): oauth2mp.retrieve_token()