From 04aa26aabbd126cd9a915e4cd571b7cef7185f0c Mon Sep 17 00:00:00 2001 From: Vladimir Kibisov Date: Sat, 3 Aug 2024 21:23:16 +0300 Subject: [PATCH] Confluent AsyncAPI security tests --- .../confluent/v3_0_0/test_security.py | 229 ++++++++++++++++++ 1 file changed, 229 insertions(+) create mode 100644 tests/asyncapi/confluent/v3_0_0/test_security.py diff --git a/tests/asyncapi/confluent/v3_0_0/test_security.py b/tests/asyncapi/confluent/v3_0_0/test_security.py new file mode 100644 index 0000000000..ae06f90f3d --- /dev/null +++ b/tests/asyncapi/confluent/v3_0_0/test_security.py @@ -0,0 +1,229 @@ +import ssl +from copy import deepcopy + +from faststream.app import FastStream +from faststream.asyncapi.generate import get_app_schema +from faststream.asyncapi.version import AsyncAPIVersion +from faststream.confluent import KafkaBroker +from faststream.security import ( + BaseSecurity, + SASLPlaintext, + SASLScram256, + SASLScram512, +) + +basic_schema = { + "info": { + "title": "FastStream", + "version": "0.1.0", + "description": "" + }, + "asyncapi": "3.0.0", + "defaultContentType": "application/json", + "servers": { + "development": { + "host": "", + "pathname": "9092", + "protocol": "kafka-secure", + "protocolVersion": "auto", + "security": [] + } + }, + "channels": { + "test_1:TestTopic": { + "address": "test_1:TestTopic", + "servers": [ + { + "$ref": "#/servers/development" + } + ], + "messages": { + "SubscribeMessage": { + "$ref": "#/components/messages/test_1:TestTopic:SubscribeMessage" + } + }, + "bindings": { + "kafka": { + "topic": "test_1", + "bindingVersion": "0.4.0" + } + } + }, + "test_2:Publisher": { + "address": "test_2:Publisher", + "servers": [ + { + "$ref": "#/servers/development" + } + ], + "messages": { + "Message": { + "$ref": "#/components/messages/test_2:Publisher:Message" + } + }, + "bindings": { + "kafka": { + "topic": "test_2", + "bindingVersion": "0.4.0" + } + } + } + }, + "operations": { + "test_1:TestTopicSubscribe": { + "action": "receive", + "messages": [ + { + "$ref": "#/channels/test_1:TestTopic/messages/SubscribeMessage" + } + ], + "channel": { + "$ref": "#/channels/test_1:TestTopic" + } + }, + "test_2:Publisher": { + "action": "send", + "messages": [ + { + "$ref": "#/channels/test_2:Publisher/messages/Message" + } + ], + "channel": { + "$ref": "#/channels/test_2:Publisher" + } + } + }, + "components": { + "messages": { + "test_1:TestTopic:Message": { + "title": "test_1:TestTopic:Message", + "correlationId": { + "location": "$message.header#/correlation_id" + }, + "payload": { + "$ref": "#/components/schemas/TestTopic:Message:Payload" + } + }, + "test_2:Publisher:Message": { + "title": "test_2:Publisher:Message", + "correlationId": { + "location": "$message.header#/correlation_id" + }, + "payload": { + "$ref": "#/components/schemas/test_2:Publisher:Message:Payload" + } + } + }, + "schemas": { + "TestTopic:Message:Payload": { + "title": "TestTopic:Message:Payload", + "type": "string" + }, + "test_2:Publisher:Message:Payload": { + "title": "test_2:Publisher:Message:Payload", + "type": "string" + } + }, + "securitySchemes": {} + } +} + + +def test_base_security_schema(): + ssl_context = ssl.create_default_context() + security = BaseSecurity(ssl_context=ssl_context) + + broker = KafkaBroker("localhost:9092", security=security) + app = FastStream(broker, asyncapi_version=AsyncAPIVersion.v3_0) + + @broker.publisher("test_2") + @broker.subscriber("test_1") + async def test_topic(msg: str) -> str: + pass + + schema = get_app_schema(app).to_jsonable() + + assert schema == basic_schema + + +def test_plaintext_security_schema(): + ssl_context = ssl.create_default_context() + security = SASLPlaintext( + ssl_context=ssl_context, + username="admin", + password="password", # pragma: allowlist secret + ) + + broker = KafkaBroker("localhost:9092", security=security) + app = FastStream(broker, asyncapi_version=AsyncAPIVersion.v3_0) + + @broker.publisher("test_2") + @broker.subscriber("test_1") + async def test_topic(msg: str) -> str: + pass + + schema = get_app_schema(app).to_jsonable() + + plaintext_security_schema = deepcopy(basic_schema) + plaintext_security_schema["servers"]["development"]["security"] = [ + {"user-password": []} + ] + plaintext_security_schema["components"]["securitySchemes"] = { + "user-password": {"type": "userPassword"} + } + + assert schema == plaintext_security_schema + + +def test_scram256_security_schema(): + ssl_context = ssl.create_default_context() + security = SASLScram256( + ssl_context=ssl_context, + username="admin", + password="password", # pragma: allowlist secret + ) + + broker = KafkaBroker("localhost:9092", security=security) + app = FastStream(broker, asyncapi_version=AsyncAPIVersion.v3_0) + + @broker.publisher("test_2") + @broker.subscriber("test_1") + async def test_topic(msg: str) -> str: + pass + + schema = get_app_schema(app).to_jsonable() + + sasl256_security_schema = deepcopy(basic_schema) + sasl256_security_schema["servers"]["development"]["security"] = [{"scram256": []}] + sasl256_security_schema["components"]["securitySchemes"] = { + "scram256": {"type": "scramSha256"} + } + + assert schema == sasl256_security_schema + + +def test_scram512_security_schema(): + ssl_context = ssl.create_default_context() + security = SASLScram512( + ssl_context=ssl_context, + username="admin", + password="password", # pragma: allowlist secret + ) + + broker = KafkaBroker("localhost:9092", security=security) + app = FastStream(broker, asyncapi_version=AsyncAPIVersion.v3_0) + + @broker.publisher("test_2") + @broker.subscriber("test_1") + async def test_topic(msg: str) -> str: + pass + + schema = get_app_schema(app).to_jsonable() + + sasl512_security_schema = deepcopy(basic_schema) + sasl512_security_schema["servers"]["development"]["security"] = [{"scram512": []}] + sasl512_security_schema["components"]["securitySchemes"] = { + "scram512": {"type": "scramSha512"} + } + + assert schema == sasl512_security_schema