Skip to content

Commit

Permalink
Adds SASLOAuthBearer flow to AIO Kafka's Faststream Security Parsing (#…
Browse files Browse the repository at this point in the history
…1761)

* Added same OAUTHBEARER security parsing as Confluent Kafka to AIO Kafka

* Added same OAUTHBEARER security parsing as Confluent Kafka to AIO Kafka

* Added SSL Context to Kafka AIO Oauth flow

* Added more docs for Kafka OAuth Security Flow.

* linting

* linting

---------

Co-authored-by: Alex Sinnott <[email protected]>
Co-authored-by: Pastukhov Nikita <[email protected]>
  • Loading branch information
3 people authored Sep 5, 2024
1 parent 80ee72b commit 9492712
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 1 deletion.
13 changes: 12 additions & 1 deletion docs/docs/en/kafka/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,18 @@ This chapter discusses the security options available in **FastStream** and how
{!> docs_src/kafka/security/sasl_scram512.py [ln:1-10.25,11-] !}
```

### 4. SASLGSSAPI Object with SSL/TLS
### 4. SASLOAuthBearer Object with SSL/TLS

**Purpose:** The `SASLOAuthBearer` is used for authentication using the `OAUTHBEARER` sasl_mechanism. You'll likely need to provide your own `sasl_oauth_token_provider` to the KafkaBroker object in order to complete the authentication flow, such as AWS's [`aws-msk-iam-sasl-signer-python`](https://github.com/aws/aws-msk-iam-sasl-signer-python). For more information see AIOKafka's documentation on [AbstractTokenProvider](https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.abc.AbstractTokenProvider).

**Usage:**

=== "OauthBearer"
```python linenums="1"
{!> docs_src/kafka/security/sasl_oauthbearer.py [ln:1-16] !}
```

### 5. SASLGSSAPI Object with SSL/TLS

**Purpose:** The `SASLGSSAPI` object is used for authentication using Kerberos.

Expand Down
16 changes: 16 additions & 0 deletions docs/docs_src/kafka/security/sasl_oauthbearer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import ssl

from faststream.kafka import KafkaBroker
from faststream.security import SASLOAuthBearer

ssl_context = ssl.create_default_context()
security = SASLOAuthBearer(
use_ssl=True,
ssl_context=ssl_context
)

broker = KafkaBroker(
"localhost:9092",
security=security,
sasl_oauth_token_provider=...
)
11 changes: 11 additions & 0 deletions faststream/kafka/security.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from faststream.security import (
SASLGSSAPI,
BaseSecurity,
SASLOAuthBearer,
SASLPlaintext,
SASLScram256,
SASLScram512,
Expand All @@ -21,6 +22,8 @@ def parse_security(security: Optional[BaseSecurity]) -> "AnyDict":
return _parse_sasl_scram256(security)
elif isinstance(security, SASLScram512):
return _parse_sasl_scram512(security)
elif isinstance(security, SASLOAuthBearer):
return _parse_sasl_oauthbearer(security)
elif isinstance(security, SASLGSSAPI):
return _parse_sasl_gssapi(security)
elif isinstance(security, BaseSecurity):
Expand Down Expand Up @@ -66,6 +69,14 @@ def _parse_sasl_scram512(security: SASLScram512) -> "AnyDict":
}


def _parse_sasl_oauthbearer(security: SASLOAuthBearer) -> "AnyDict":
return {
"security_protocol": "SASL_SSL" if security.use_ssl else "SASL_PLAINTEXT",
"ssl_context": security.ssl_context,
"sasl_mechanism": "OAUTHBEARER",
}


def _parse_sasl_gssapi(security: SASLGSSAPI) -> "AnyDict":
return {
"security_protocol": "SASL_SSL" if security.use_ssl else "SASL_PLAINTEXT",
Expand Down
28 changes: 28 additions & 0 deletions tests/asyncapi/kafka/test_security.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from faststream.security import (
SASLGSSAPI,
BaseSecurity,
SASLOAuthBearer,
SASLPlaintext,
SASLScram256,
SASLScram512,
Expand Down Expand Up @@ -170,6 +171,33 @@ async def test_topic(msg: str) -> str:
assert schema == sasl512_security_schema


def test_oauthbearer_security_schema():
ssl_context = ssl.create_default_context()
security = SASLOAuthBearer(
ssl_context=ssl_context,
)

broker = KafkaBroker("localhost:9092", security=security)
app = FastStream(broker)

@broker.publisher("test_2")
@broker.subscriber("test_1")
async def test_topic(msg: str) -> str:
pass

schema = get_app_schema(app).to_jsonable()

sasl_oauthbearer_security_schema = deepcopy(basic_schema)
sasl_oauthbearer_security_schema["servers"]["development"]["security"] = [
{"oauthbearer": []}
]
sasl_oauthbearer_security_schema["components"]["securitySchemes"] = {
"oauthbearer": {"type": "oauthBearer"}
}

assert schema == sasl_oauthbearer_security_schema


def test_gssapi_security_schema():
ssl_context = ssl.create_default_context()
security = SASLGSSAPI(
Expand Down

0 comments on commit 9492712

Please sign in to comment.