Skip to content

Commit

Permalink
RabbitMQ AsyncAPI security tests
Browse files Browse the repository at this point in the history
  • Loading branch information
KrySeyt committed Aug 3, 2024
1 parent 6af490a commit 3d6d96b
Showing 1 changed file with 126 additions and 0 deletions.
126 changes: 126 additions & 0 deletions tests/asyncapi/rabbit/v3_0_0/test_security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import ssl

from faststream.app import FastStream
from faststream.asyncapi.generate import get_app_schema
from faststream.asyncapi.version import AsyncAPIVersion
from faststream.rabbit import RabbitBroker
from faststream.security import (
BaseSecurity,
SASLPlaintext,
)


def test_base_security_schema():
ssl_context = ssl.create_default_context()
security = BaseSecurity(ssl_context=ssl_context)

broker = RabbitBroker("amqp://guest:guest@localhost:5672/", security=security)

assert (
broker.url == "amqps://guest:guest@localhost:5672/" # pragma: allowlist secret
) # pragma: allowlist secret
assert broker._connection_kwargs.get("ssl_context") is ssl_context

schema = get_app_schema(FastStream(broker, asyncapi_version=AsyncAPIVersion.v3_0)).to_jsonable()

assert schema == {
"asyncapi": "3.0.0",
"channels": {},
"operations": {},
"components": {"messages": {}, "schemas": {}, "securitySchemes": {}},
"defaultContentType": "application/json",
"info": {"description": "", "title": "FastStream", "version": "0.1.0"},
"servers": {
"development": {
"protocol": "amqps",
"protocolVersion": "0.9.1",
"security": [],
"host": "guest:guest@localhost:5672", # pragma: allowlist secret
"pathname": "/",
}
},
}


def test_plaintext_security_schema():
ssl_context = ssl.create_default_context()

security = SASLPlaintext(
ssl_context=ssl_context,
username="admin",
password="password", # pragma: allowlist secret
)

broker = RabbitBroker("amqp://guest:guest@localhost/", security=security)

assert (
broker.url
== "amqps://admin:password@localhost:5671/" # pragma: allowlist secret
) # pragma: allowlist secret
assert broker._connection_kwargs.get("ssl_context") is ssl_context

schema = get_app_schema(FastStream(broker, asyncapi_version=AsyncAPIVersion.v3_0)).to_jsonable()
assert (
schema
== {
"asyncapi": "3.0.0",
"channels": {},
"operations": {},
"components": {
"messages": {},
"schemas": {},
"securitySchemes": {"user-password": {"type": "userPassword"}},
},
"defaultContentType": "application/json",
"info": {"description": "", "title": "FastStream", "version": "0.1.0"},
"servers": {
"development": {
"protocol": "amqps",
"protocolVersion": "0.9.1",
"security": [{"user-password": []}],
"host": "admin:password@localhost:5671", # pragma: allowlist secret
"pathname": "/",
}
},
}
)


def test_plaintext_security_schema_without_ssl():
security = SASLPlaintext(
username="admin",
password="password", # pragma: allowlist secret
)

broker = RabbitBroker("amqp://guest:guest@localhost:5672/", security=security)

assert (
broker.url
== "amqp://admin:password@localhost:5672/" # pragma: allowlist secret
) # pragma: allowlist secret

schema = get_app_schema(FastStream(broker, asyncapi_version=AsyncAPIVersion.v3_0)).to_jsonable()
assert (
schema
== {
"asyncapi": "3.0.0",
"channels": {},
"operations": {},
"components": {
"messages": {},
"schemas": {},
"securitySchemes": {"user-password": {"type": "userPassword"}},
},
"defaultContentType": "application/json",
"info": {"description": "", "title": "FastStream", "version": "0.1.0"},
"servers": {
"development": {
"protocol": "amqp",
"protocolVersion": "0.9.1",
"security": [{"user-password": []}],
"host": "admin:password@localhost:5672", # pragma: allowlist secret
"pathname": "/", # pragma: allowlist secret
}
},
}
)

0 comments on commit 3d6d96b

Please sign in to comment.