Skip to content

Commit

Permalink
Confluent AsyncAPI security tests
Browse files Browse the repository at this point in the history
  • Loading branch information
KrySeyt committed Aug 3, 2024
1 parent a058310 commit 04aa26a
Showing 1 changed file with 229 additions and 0 deletions.
229 changes: 229 additions & 0 deletions tests/asyncapi/confluent/v3_0_0/test_security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
import ssl
from copy import deepcopy

from faststream.app import FastStream
from faststream.asyncapi.generate import get_app_schema
from faststream.asyncapi.version import AsyncAPIVersion
from faststream.confluent import KafkaBroker
from faststream.security import (
BaseSecurity,
SASLPlaintext,
SASLScram256,
SASLScram512,
)

basic_schema = {
"info": {
"title": "FastStream",
"version": "0.1.0",
"description": ""
},
"asyncapi": "3.0.0",
"defaultContentType": "application/json",
"servers": {
"development": {
"host": "",
"pathname": "9092",
"protocol": "kafka-secure",
"protocolVersion": "auto",
"security": []
}
},
"channels": {
"test_1:TestTopic": {
"address": "test_1:TestTopic",
"servers": [
{
"$ref": "#/servers/development"
}
],
"messages": {
"SubscribeMessage": {
"$ref": "#/components/messages/test_1:TestTopic:SubscribeMessage"
}
},
"bindings": {
"kafka": {
"topic": "test_1",
"bindingVersion": "0.4.0"
}
}
},
"test_2:Publisher": {
"address": "test_2:Publisher",
"servers": [
{
"$ref": "#/servers/development"
}
],
"messages": {
"Message": {
"$ref": "#/components/messages/test_2:Publisher:Message"
}
},
"bindings": {
"kafka": {
"topic": "test_2",
"bindingVersion": "0.4.0"
}
}
}
},
"operations": {
"test_1:TestTopicSubscribe": {
"action": "receive",
"messages": [
{
"$ref": "#/channels/test_1:TestTopic/messages/SubscribeMessage"
}
],
"channel": {
"$ref": "#/channels/test_1:TestTopic"
}
},
"test_2:Publisher": {
"action": "send",
"messages": [
{
"$ref": "#/channels/test_2:Publisher/messages/Message"
}
],
"channel": {
"$ref": "#/channels/test_2:Publisher"
}
}
},
"components": {
"messages": {
"test_1:TestTopic:Message": {
"title": "test_1:TestTopic:Message",
"correlationId": {
"location": "$message.header#/correlation_id"
},
"payload": {
"$ref": "#/components/schemas/TestTopic:Message:Payload"
}
},
"test_2:Publisher:Message": {
"title": "test_2:Publisher:Message",
"correlationId": {
"location": "$message.header#/correlation_id"
},
"payload": {
"$ref": "#/components/schemas/test_2:Publisher:Message:Payload"
}
}
},
"schemas": {
"TestTopic:Message:Payload": {
"title": "TestTopic:Message:Payload",
"type": "string"
},
"test_2:Publisher:Message:Payload": {
"title": "test_2:Publisher:Message:Payload",
"type": "string"
}
},
"securitySchemes": {}
}
}


def test_base_security_schema():
ssl_context = ssl.create_default_context()
security = BaseSecurity(ssl_context=ssl_context)

broker = KafkaBroker("localhost:9092", security=security)
app = FastStream(broker, asyncapi_version=AsyncAPIVersion.v3_0)

@broker.publisher("test_2")
@broker.subscriber("test_1")
async def test_topic(msg: str) -> str:
pass

schema = get_app_schema(app).to_jsonable()

assert schema == basic_schema


def test_plaintext_security_schema():
ssl_context = ssl.create_default_context()
security = SASLPlaintext(
ssl_context=ssl_context,
username="admin",
password="password", # pragma: allowlist secret
)

broker = KafkaBroker("localhost:9092", security=security)
app = FastStream(broker, asyncapi_version=AsyncAPIVersion.v3_0)

@broker.publisher("test_2")
@broker.subscriber("test_1")
async def test_topic(msg: str) -> str:
pass

schema = get_app_schema(app).to_jsonable()

plaintext_security_schema = deepcopy(basic_schema)
plaintext_security_schema["servers"]["development"]["security"] = [
{"user-password": []}
]
plaintext_security_schema["components"]["securitySchemes"] = {
"user-password": {"type": "userPassword"}
}

assert schema == plaintext_security_schema


def test_scram256_security_schema():
ssl_context = ssl.create_default_context()
security = SASLScram256(
ssl_context=ssl_context,
username="admin",
password="password", # pragma: allowlist secret
)

broker = KafkaBroker("localhost:9092", security=security)
app = FastStream(broker, asyncapi_version=AsyncAPIVersion.v3_0)

@broker.publisher("test_2")
@broker.subscriber("test_1")
async def test_topic(msg: str) -> str:
pass

schema = get_app_schema(app).to_jsonable()

sasl256_security_schema = deepcopy(basic_schema)
sasl256_security_schema["servers"]["development"]["security"] = [{"scram256": []}]
sasl256_security_schema["components"]["securitySchemes"] = {
"scram256": {"type": "scramSha256"}
}

assert schema == sasl256_security_schema


def test_scram512_security_schema():
ssl_context = ssl.create_default_context()
security = SASLScram512(
ssl_context=ssl_context,
username="admin",
password="password", # pragma: allowlist secret
)

broker = KafkaBroker("localhost:9092", security=security)
app = FastStream(broker, asyncapi_version=AsyncAPIVersion.v3_0)

@broker.publisher("test_2")
@broker.subscriber("test_1")
async def test_topic(msg: str) -> str:
pass

schema = get_app_schema(app).to_jsonable()

sasl512_security_schema = deepcopy(basic_schema)
sasl512_security_schema["servers"]["development"]["security"] = [{"scram512": []}]
sasl512_security_schema["components"]["securitySchemes"] = {
"scram512": {"type": "scramSha512"}
}

assert schema == sasl512_security_schema

0 comments on commit 04aa26a

Please sign in to comment.