Skip to content

Commit

Permalink
Merge branch 'main' into feat/asgi
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik authored Aug 4, 2024
2 parents cc14939 + d705076 commit 50655c0
Show file tree
Hide file tree
Showing 27 changed files with 477 additions and 141 deletions.
2 changes: 1 addition & 1 deletion .codespell-whitelist.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
dependant
unsecure
socio-economic
socio-economic
7 changes: 6 additions & 1 deletion .github/workflows/docs_update-references.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ jobs:
cache-dependency-path: pyproject.toml
- name: Install Dependencies
if: steps.cache.outputs.cache-hit != 'true'
run: pip install -e ".[dev]"
shell: bash
# should install with `-e`
run: |
set -ux
python -m pip install uv
uv pip install --system -e ".[dev]"
- name: Run build docs
run: bash scripts/build-docs.sh
- name: Commit
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/pr_tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
run: |
set -ux
python -m pip install uv
uv pip install --system -e ".[lint]"
uv pip install --system ".[lint]"
- name: Run ruff
shell: bash
Expand Down
4 changes: 2 additions & 2 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@
"filename": "docs/docs/en/release.md",
"hashed_secret": "35675e68f4b5af7b995d9205ad0fc43842f16450",
"is_verified": false,
"line_number": 1325,
"line_number": 1423,
"is_secret": false
}
],
Expand Down Expand Up @@ -163,5 +163,5 @@
}
]
},
"generated_at": "2024-06-10T09:56:52Z"
"generated_at": "2024-07-23T21:38:30Z"
}
1 change: 1 addition & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,7 @@ search:
- security
- [BaseSecurity](api/faststream/security/BaseSecurity.md)
- [SASLGSSAPI](api/faststream/security/SASLGSSAPI.md)
- [SASLOAuthBearer](api/faststream/security/SASLOAuthBearer.md)
- [SASLPlaintext](api/faststream/security/SASLPlaintext.md)
- [SASLScram256](api/faststream/security/SASLScram256.md)
- [SASLScram512](api/faststream/security/SASLScram512.md)
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/security/SASLOAuthBearer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.security.SASLOAuthBearer
18 changes: 15 additions & 3 deletions docs/docs/en/confluent/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,31 @@ This chapter discusses the security options available in **FastStream** and how
{!> docs_src/confluent/security/sasl_scram512.py [ln:1-6.25,7-] !}
```

### 4. Other security related usecases
### 4. SASLOAuthBearer Object with SSL/TLS

**Purpose:** The `SASLOAuthBearer` is used for authentication using the Oauth sasl.mechanism. While using it you additionaly need to provide necessary `sasl.oauthbearer.*` values in config and provide it to `KafkaBroker`, eg. `sasl.oauthbearer.client.id`, `sasl.oauthbearer.client.secret`. Full list is available in the [confluent doc](https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md){.external-link target="_blank"}

**Usage:**

=== "OauthBearer"
```python linenums="1"
{!> docs_src/confluent/security/sasl_oauthbearer.py [ln:1-8] !}
```


### 5. Other security related usecases

**Purpose**: If you want to pass additional values to `confluent-kafka-python`, you can pass a dictionary called `config` to `KafkaBroker`. For example, to pass your own certificate file:

**Usage:**

```python
```python linenums="1"
from faststream.confluent import KafkaBroker
from faststream.security import SASLPlaintext

security = SASLPlaintext(
username="admin",
password="password", # pragma: allowlist secret
password="password",
)

config = {"ssl.ca.location": "~/my_certs/CRT_cacerts.pem"}
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/en/kafka/Subscriber/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,4 @@ async def base_handler(
level: str = Path(),
):
...
```
```
8 changes: 8 additions & 0 deletions docs/docs_src/confluent/security/sasl_oauthbearer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from faststream.confluent import KafkaBroker
from faststream.security import SASLOAuthBearer

security = SASLOAuthBearer(
use_ssl=True,
)

broker = KafkaBroker("localhost:9092", security=security)
4 changes: 2 additions & 2 deletions faststream/confluent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def __init__(
}
self.config = {**self.config, **config_from_params}

if sasl_mechanism:
if sasl_mechanism in ["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"]:
self.config.update(
{
"sasl.mechanism": sasl_mechanism,
Expand Down Expand Up @@ -365,7 +365,7 @@ def __init__(
}
self.config = {**self.config, **config_from_params}

if sasl_mechanism:
if sasl_mechanism in ["PLAIN", "SCRAM-SHA-256", "SCRAM-SHA-512"]:
self.config.update(
{
"sasl.mechanism": sasl_mechanism,
Expand Down
10 changes: 10 additions & 0 deletions faststream/confluent/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from faststream.exceptions import SetupError
from faststream.security import (
BaseSecurity,
SASLOAuthBearer,
SASLPlaintext,
SASLScram256,
SASLScram512,
Expand All @@ -27,6 +28,8 @@ def parse_security(security: Optional[BaseSecurity]) -> "AnyDict":
return _parse_sasl_scram256(security)
elif isinstance(security, SASLScram512):
return _parse_sasl_scram512(security)
elif isinstance(security, SASLOAuthBearer):
return _parse_sasl_oauthbearer(security)
elif isinstance(security, BaseSecurity):
return _parse_base_security(security)
else:
Expand Down Expand Up @@ -64,3 +67,10 @@ def _parse_sasl_scram512(security: SASLScram512) -> "AnyDict":
"sasl_plain_username": security.username,
"sasl_plain_password": security.password,
}


def _parse_sasl_oauthbearer(security: SASLOAuthBearer) -> "AnyDict":
return {
"security_protocol": "SASL_SSL" if security.use_ssl else "SASL_PLAINTEXT",
"sasl_mechanism": "OAUTHBEARER",
}
27 changes: 27 additions & 0 deletions faststream/nats/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,9 @@ async def _create_subscription( # type: ignore[override]
connection: "Client",
) -> None:
"""Create NATS subscription and start consume task."""
if self.subscription:
return

self.subscription = await connection.subscribe(
subject=self.clear_subject,
queue=self.queue,
Expand Down Expand Up @@ -495,6 +498,9 @@ async def _create_subscription( # type: ignore[override]
connection: "Client",
) -> None:
"""Create NATS subscription and start consume task."""
if self.subscription:
return

self.start_consume_task()

self.subscription = await connection.subscribe(
Expand Down Expand Up @@ -576,6 +582,9 @@ async def _create_subscription( # type: ignore[override]
connection: "JetStreamContext",
) -> None:
"""Create NATS subscription and start consume task."""
if self.subscription:
return

self.subscription = await connection.subscribe(
subject=self.clear_subject,
queue=self.queue,
Expand Down Expand Up @@ -636,6 +645,9 @@ async def _create_subscription( # type: ignore[override]
connection: "JetStreamContext",
) -> None:
"""Create NATS subscription and start consume task."""
if self.subscription:
return

self.start_consume_task()

self.subscription = await connection.subscribe(
Expand Down Expand Up @@ -698,6 +710,9 @@ async def _create_subscription( # type: ignore[override]
connection: "JetStreamContext",
) -> None:
"""Create NATS subscription and start consume task."""
if self.subscription:
return

self.subscription = await connection.pull_subscribe(
subject=self.clear_subject,
config=self.config,
Expand Down Expand Up @@ -775,6 +790,9 @@ async def _create_subscription( # type: ignore[override]
connection: "JetStreamContext",
) -> None:
"""Create NATS subscription and start consume task."""
if self.subscription:
return

self.start_consume_task()

self.subscription = await connection.pull_subscribe(
Expand Down Expand Up @@ -841,6 +859,9 @@ async def _create_subscription( # type: ignore[override]
connection: "JetStreamContext",
) -> None:
"""Create NATS subscription and start consume task."""
if self.subscription:
return

self.subscription = await connection.pull_subscribe(
subject=self.clear_subject,
config=self.config,
Expand Down Expand Up @@ -905,6 +926,9 @@ async def _create_subscription( # type: ignore[override]
*,
connection: "KVBucketDeclarer",
) -> None:
if self.subscription:
return

bucket = await connection.create_key_value(
bucket=self.kv_watch.name,
declare=self.kv_watch.declare,
Expand Down Expand Up @@ -1012,6 +1036,9 @@ async def _create_subscription( # type: ignore[override]
*,
connection: "OSBucketDeclarer",
) -> None:
if self.subscription:
return

self.bucket = await connection.create_object_store(
bucket=self.subject,
declare=self.obj_watch.declare,
Expand Down
20 changes: 15 additions & 5 deletions faststream/rabbit/testing.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from typing import TYPE_CHECKING, Any, Optional, Union
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Generator, Optional, Union
from unittest import mock
from unittest.mock import AsyncMock

import aiormq
Expand Down Expand Up @@ -34,10 +36,18 @@ class TestRabbitBroker(TestBroker[RabbitBroker]):
"""A class to test RabbitMQ brokers."""

@classmethod
def _patch_test_broker(cls, broker: RabbitBroker) -> None:
broker._channel = AsyncMock()
broker.declarer = AsyncMock()
super()._patch_test_broker(broker)
@contextmanager
def _patch_broker(cls, broker: RabbitBroker) -> Generator[None, None, None]:
with mock.patch.object(
broker,
"_channel",
new_callable=AsyncMock,
), mock.patch.object(
broker,
"declarer",
new_callable=AsyncMock,
), super()._patch_broker(broker):
yield

@staticmethod
async def _fake_connect(broker: RabbitBroker, *args: Any, **kwargs: Any) -> None:
Expand Down
17 changes: 16 additions & 1 deletion faststream/redis/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,15 @@ async def start( # type: ignore[override]
self,
*args: Any,
) -> None:
if self.task:
return

await super().start()

start_signal = anyio.Event()
self.task = asyncio.create_task(self._consume(*args, start_signal=start_signal))
self.task = asyncio.create_task(
self._consume(*args, start_signal=start_signal)
)

with anyio.fail_after(3.0):
await start_signal.wait()
Expand Down Expand Up @@ -253,6 +258,9 @@ def get_log_context(

@override
async def start(self) -> None:
if self.subscription:
return

assert self._client, "You should setup subscriber at first." # nosec B101

self.subscription = psub = self._client.pubsub()
Expand Down Expand Up @@ -352,6 +360,9 @@ async def _consume( # type: ignore[override]

@override
async def start(self) -> None:
if self.task:
return

assert self._client, "You should setup subscriber at first." # nosec B101
await super().start(self._client)

Expand Down Expand Up @@ -512,7 +523,11 @@ def get_log_context(

@override
async def start(self) -> None:
if self.task:
return

assert self._client, "You should setup subscriber at first." # nosec B101

client = self._client

self.extra_watcher_options.update(
Expand Down
33 changes: 21 additions & 12 deletions faststream/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,28 +153,37 @@ def get_schema(self) -> Dict[str, Dict[str, str]]:
return {"scram512": {"type": "scramSha512"}}


class SASLOAuthBearer(BaseSecurity):
"""Security configuration for SASL/OAUTHBEARER authentication.
This class defines basic security configuration for SASL/OAUTHBEARER authentication.
"""

__slots__ = (
"use_ssl",
"ssl_context"
)

def get_requirement(self) -> List["AnyDict"]:
"""Get the security requirements for SASL/OAUTHBEARER authentication."""
return [{"oauthbearer": []}]

def get_schema(self) -> Dict[str, Dict[str, str]]:
"""Get the security schema for SASL/OAUTHBEARER authentication."""
return {"oauthbearer": {"type": "oauthBearer"}}


class SASLGSSAPI(BaseSecurity):
"""Security configuration for SASL/GSSAPI authentication.
This class defines security configuration for SASL/GSSAPI authentication.
"""

# TODO: mv to SecretStr
__slots__ = (
"use_ssl",
"ssl_context",
"ssl_context"
)

def __init__(
self,
ssl_context: Optional["SSLContext"] = None,
use_ssl: Optional[bool] = None,
) -> None:
super().__init__(
ssl_context=ssl_context,
use_ssl=use_ssl,
)

def get_requirement(self) -> List["AnyDict"]:
"""Get the security requirements for SASL/GSSAPI authentication."""
return [{"gssapi": []}]
Expand Down
Loading

0 comments on commit 50655c0

Please sign in to comment.