Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add grpc auth plugin for Camunda Identity #412

Merged
merged 8 commits into from
Jun 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions docs/channels.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,22 @@ Example:
channel = create_secure_channel(channel_credentials=credentials)


Example with oauth2 (like Camunda Identity):

.. code-block:: python

import grpc
from pyzeebe import create_secure_channel
from pyzeebe import AuthMetadataPlugin, CamundaIdentityCredentials


credentials = CamundaIdentityCredentials(oauth_url=<...>, client_id=<...>, client_secret=<...>, audience=<...>)
call_credentials = grpc.metadata_call_credentials(AuthMetadataPlugin(credentials=credentials))
ssl_credentials = grpc.ssl_channel_credentials(root_certificates="<root_certificate>", private_key="<private_key>")
channel_credentials = grpc.composite_channel_credentials(ssl_credentials, call_credentials)
channel = create_secure_channel(channel_credentials=channel_credentials)


Camunda Cloud
-------------

Expand All @@ -59,3 +75,19 @@ Example:


channel = create_camunda_cloud_channel("client_id", "client_secret", "cluster_id")


Credentials
-----------

.. autoclass:: pyzeebe.AuthMetadataPlugin
:members:
:undoc-members:

.. autoclass:: pyzeebe.CredentialsABC
:members:
:undoc-members:

.. autoclass:: pyzeebe.CamundaIdentityCredentials
:members:
:undoc-members:
13 changes: 12 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,13 @@
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = ["sphinx_rtd_theme", "sphinx.ext.autodoc", "sphinx.ext.napoleon", "sphinx.ext.autosectionlabel"]
extensions = [
"sphinx_rtd_theme",
"sphinx.ext.autodoc",
"sphinx.ext.napoleon",
"sphinx.ext.autosectionlabel",
"sphinx.ext.intersphinx",
]

# Add any paths that contain templates here, relative to this directory.
templates_path = ["_templates"]
Expand All @@ -58,3 +64,8 @@
version = "3.0.4"

master_doc = "index"

# Looks for objects in external projects
intersphinx_mapping = {
"grpc": ("https://grpc.github.io/grpc/python/", None),
}
2 changes: 2 additions & 0 deletions docs/errors.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ All ``pyzeebe`` errors inherit from :py:class:`PyZeebeError`

.. autoexception:: pyzeebe.errors.InvalidCamundaCloudCredentialsError

.. autoexception:: pyzeebe.errors.UnkownGrpcStatusCodeError


=================
Exception Handler
Expand Down
3 changes: 3 additions & 0 deletions pyzeebe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from pyzeebe.channel import *
from pyzeebe.client.client import ZeebeClient
from pyzeebe.client.sync_client import SyncZeebeClient # type: ignore
from pyzeebe.credentials.base import CredentialsABC
from pyzeebe.credentials.camunda_identity import CamundaIdentityCredentials
from pyzeebe.credentials.plugins import AuthMetadataPlugin
from pyzeebe.job.job import Job
from pyzeebe.job.job_status import JobStatus
from pyzeebe.task.exception_handler import ExceptionHandler, default_exception_handler
Expand Down
3 changes: 3 additions & 0 deletions pyzeebe/credentials/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .base import CredentialsABC
from .camunda_identity import CamundaIdentityCredentials
from .plugins import AuthMetadataPlugin
18 changes: 18 additions & 0 deletions pyzeebe/credentials/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import abc

from pyzeebe.credentials.typing import AuthMetadata, CallContext


class CredentialsABC(abc.ABC):
"""A specification for credentials manager. Passed to :py:class:`AuthMetadataPlugin`."""

@abc.abstractmethod
def get_auth_metadata(self, context: CallContext) -> AuthMetadata:
"""
Args:
context (grpc.AuthMetadataContext): Provides information to call credentials metadata plugins.

Returns:
Tuple[Tuple[str, Union[str, bytes]], ...]: The `metadata` used to construct the :py:class:`grpc.CallCredentials`.
"""
raise NotImplementedError
86 changes: 86 additions & 0 deletions pyzeebe/credentials/camunda_identity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import datetime
import threading
from typing import Any, Dict, Optional

import requests

from pyzeebe.credentials.base import CredentialsABC
from pyzeebe.credentials.typing import AuthMetadata, CallContext
from pyzeebe.errors import InvalidOAuthCredentialsError


class CamundaIdentityCredentials(CredentialsABC):
"""Credentials client for Camunda Platform.

Args:
oauth_url (str): The Keycloak auth endpoint url.
client_id (str): The client id provided by Camunda Platform
client_secret (str): The client secret provided by Camunda Platform
audience (str): Audience for Zeebe. Default: zeebe-api
refresh_threshold_seconds (int): Will try to refresh token if it expires in this number of seconds or less. Default: 20
"""

def __init__(
self,
*,
oauth_url: str,
client_id: str,
client_secret: str,
audience: str = "zeebe-api",
refresh_threshold_seconds: int = 20
) -> None:
self.oauth_url = oauth_url
self.client_id = client_id
self.client_secret = client_secret
self.audience = audience

self._lock = threading.Lock()
self._refresh_threshold = datetime.timedelta(seconds=refresh_threshold_seconds)

self._token: Optional[Dict[str, Any]] = None
self._expires_in: Optional[datetime.datetime] = None

def _expired(self) -> bool:
return (
self._token is None
or self._expires_in is None
or (self._expires_in - self._refresh_threshold) < datetime.datetime.now(datetime.timezone.utc)
)

def _refresh(self) -> None:
try:
response = requests.post(
self.oauth_url,
data={
"client_id": self.client_id,
"client_secret": self.client_secret,
"audience": self.audience,
"grant_type": "client_credentials",
},
)
response.raise_for_status()
data = response.json()
self._token = data["access_token"]
self._expires_in = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(
seconds=int(data["expires_in"])
)
except requests.HTTPError as http_error:
raise InvalidOAuthCredentialsError(
url=self.oauth_url, client_id=self.client_id, audience=self.audience
) from http_error

def get_auth_metadata(self, context: CallContext) -> AuthMetadata:
"""
Args:
context (grpc.AuthMetadataContext): Provides information to call credentials metadata plugins.

Returns:
Tuple[Tuple[str, Union[str, bytes]], ...]: The `metadata` used to construct the :py:class:`grpc.CallCredentials`.

Raises:
InvalidOAuthCredentialsError: One of the provided camunda credentials is not correct
"""
with self._lock:
if self._expired() is True:
self._refresh()
return (("authorization", "Bearer {}".format(self._token)),)
32 changes: 32 additions & 0 deletions pyzeebe/credentials/plugins.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import grpc

from pyzeebe.credentials.base import CredentialsABC


class AuthMetadataPlugin(grpc.AuthMetadataPlugin):
"""Custom authentication plugin with exception catching.

Args:
credentials (CredentialsABC): A credentials manager.
"""

def __init__(self, *, credentials: CredentialsABC) -> None:
self._credentials = credentials

def __call__(self, context: grpc.AuthMetadataContext, callback: grpc.AuthMetadataPluginCallback) -> None:
"""Implements authentication by passing metadata to a callback.

This method will be invoked asynchronously in a separate thread.

Args:
context: An AuthMetadataContext providing information on the RPC that
the plugin is being called to authenticate.
callback: An AuthMetadataPluginCallback to be invoked either
synchronously or asynchronously.
"""
try:
metadata = self._credentials.get_auth_metadata(context)
except Exception as e:
callback((), e)
else:
callback(metadata, None)
8 changes: 8 additions & 0 deletions pyzeebe/credentials/typing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from typing import Protocol, Tuple, Union

AuthMetadata = Tuple[Tuple[str, Union[str, bytes]], ...]


class CallContext(Protocol):
service_url: str
method_name: str
Empty file.
50 changes: 50 additions & 0 deletions tests/unit/credentials/auth_metadata_plugin_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from unittest.mock import Mock

import grpc
import pytest

from pyzeebe import AuthMetadataPlugin
from pyzeebe.credentials.base import CredentialsABC
from pyzeebe.credentials.typing import CallContext
from pyzeebe.errors.credentials_errors import InvalidOAuthCredentialsError


class TestAuthMetadataPlugin:
@pytest.fixture()
def credentials_mock(self) -> Mock:
return Mock(spec_set=CredentialsABC)

@pytest.fixture()
def callback_mock(self) -> Mock:
return Mock(spec_set=grpc.AuthMetadataPluginCallback)

@pytest.fixture()
def context_mock(self) -> Mock:
return Mock(spec_set=CallContext)

def test_auth_plugin_metadata_success(
self, context_mock: Mock, credentials_mock: Mock, callback_mock: Mock
) -> None:
metadata_mock = Mock()

credentials_mock.get_auth_metadata.return_value = metadata_mock

plugin = AuthMetadataPlugin(credentials=credentials_mock)

plugin(context_mock, callback_mock)

callback_mock.assert_called_once_with(metadata_mock, None)
credentials_mock.get_auth_metadata.assert_called_once_with(context_mock)

def test_auth_plugin_metadata_exception(
self, context_mock: Mock, credentials_mock: Mock, callback_mock: Mock
) -> None:
exception = InvalidOAuthCredentialsError(url=Mock(), client_id=Mock(), audience=Mock())
credentials_mock.get_auth_metadata.side_effect = [exception]

plugin = AuthMetadataPlugin(credentials=credentials_mock)

plugin(context_mock, callback_mock)

callback_mock.assert_called_once_with((), exception)
credentials_mock.get_auth_metadata.assert_called_once_with(context_mock)
Loading
Loading