Skip to content

Commit

Permalink
feat: initial ASGI support
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Aug 3, 2024
1 parent a36d282 commit c5c8ec5
Show file tree
Hide file tree
Showing 6 changed files with 224 additions and 0 deletions.
7 changes: 7 additions & 0 deletions faststream/asgi/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from faststream.asgi.app import AsgiFastStream
from faststream.asgi.handlers import make_ping_asgi

__all__ = (
"AsgiFastStream",
"make_ping_asgi",
)
133 changes: 133 additions & 0 deletions faststream/asgi/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import traceback
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, Any, Optional, Sequence, Tuple, Union

from faststream.app import FastStream
from faststream.asgi.response import AsgiResponse
from faststream.asgi.websocket import WebSocketClose
from faststream.log.logging import logger

if TYPE_CHECKING:
from faststream.asgi.types import ASGIApp, Receive, Scope, Send
from faststream.asyncapi.schema import (
Contact,
ContactDict,
ExternalDocs,
ExternalDocsDict,
License,
LicenseDict,
Tag,
TagDict,
)
from faststream.broker.core.usecase import BrokerUsecase
from faststream.types import (
AnyDict,
AnyHttpUrl,
Lifespan,
LoggerProto,
)


class AsgiFastStream(FastStream):
def __init__(
self,
broker: Optional["BrokerUsecase[Any, Any]"] = None,
/,
asgi_routes: Sequence[Tuple[str, "ASGIApp"]] = (),
logger: Optional["LoggerProto"] = logger,
lifespan: Optional["Lifespan"] = None,
# AsyncAPI args,
title: str = "FastStream",
version: str = "0.1.0",
description: str = "",
terms_of_service: Optional["AnyHttpUrl"] = None,
license: Optional[Union["License", "LicenseDict", "AnyDict"]] = None,
contact: Optional[Union["Contact", "ContactDict", "AnyDict"]] = None,
tags: Optional[Sequence[Union["Tag", "TagDict", "AnyDict"]]] = None,
external_docs: Optional[
Union["ExternalDocs", "ExternalDocsDict", "AnyDict"]
] = None,
identifier: Optional[str] = None,
) -> None:
self.routes = list(asgi_routes)

super().__init__(
broker=broker,
logger=logger,
lifespan=lifespan,
title=title,
version=version,
description=description,
terms_of_service=terms_of_service,
license=license,
contact=contact,
tags=tags,
external_docs=external_docs,
identifier=identifier,
)

def mount(self, path: str, route: "ASGIApp") -> None:
self.routes.append(route)

async def __call__(self, scope: "Scope", receive: "Receive", send: "Send") -> None:
if scope["type"] == "lifespan":
await self.lifespan(scope, receive, send)
return

if scope["type"] == "http" and scope["method"] == "GET":
for path, app in self.routes:
if scope["path"] == path:
await app(scope, receive, send)
return

await self.not_found(scope, receive, send)
return

@asynccontextmanager
async def started_lifespan_context(self) -> None:
async with self.lifespan_context():
await self._startup()
try:
yield
finally:
await self._shutdown()

async def lifespan(self, scope: "Scope", receive: "Receive", send: "Send") -> None:
"""Handle ASGI lifespan messages to start and shutdown the app."""
started = False
await receive() # handle `lifespan.startup` event

try:
async with self.started_lifespan_context():
await send({"type": "lifespan.startup.complete"})
started = True
await receive() # handle `lifespan.shutdown` event

except BaseException:
exc_text = traceback.format_exc()
if started:
await send({"type": "lifespan.shutdown.failed", "message": exc_text})
else:
await send({"type": "lifespan.startup.failed", "message": exc_text})
raise

else:
await send({"type": "lifespan.shutdown.complete"})

async def not_found(self, scope: "Scope", receive: "Receive", send: "Send") -> None:
not_found_msg = "FastStream doesn't support regular HTTP protocol."

if scope["type"] == "websocket":
websocket_close = WebSocketClose(
code=1000,
reason=not_found_msg,
)
await websocket_close(scope, receive, send)
return

response = AsgiResponse(
body=not_found_msg.encode(),
status_code=404,
)

await response(scope, receive, send)
26 changes: 26 additions & 0 deletions faststream/asgi/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from typing import TYPE_CHECKING, Any, Optional

from faststream.asgi.response import AsgiResponse

if TYPE_CHECKING:
from faststream.asgi.types import ASGIApp, Receive, Scope, Send
from faststream.broker.core.usecase import BrokerUsecase


def make_ping_asgi(
broker: "BrokerUsecase[Any, Any]",
/,
timeout: Optional[float] = None,
) -> "ASGIApp":
async def ping(
scope: "Scope",
receive: "Receive",
send: "Send",
) -> None:
if await broker.ping(timeout):
response = AsgiResponse(b"", status_code=204)
else:
response = AsgiResponse(b"", status_code=500)
await response(scope, receive, send)

return ping
32 changes: 32 additions & 0 deletions faststream/asgi/response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from typing import TYPE_CHECKING, Dict, Optional

if TYPE_CHECKING:
from faststream.asgi.types import Receive, Scope, Send


class AsgiResponse:
def __init__(
self,
body: bytes,
status_code: int,
raw_headers: Optional[Dict[bytes, bytes]] = None,
) -> None:
self.status_code = status_code
self.body = body
self.raw_headers = raw_headers or {}

async def __call__(self, scope: "Scope", receive: "Receive", send: "Send") -> None:
prefix = "websocket." if (scope["type"] == "websocket") else ""
await send(
{
"type": f"{prefix}http.response.start",
"status": self.status_code,
"headers": list(self.raw_headers.items()),
}
)
await send(
{
"type": f"{prefix}http.response.body",
"body": self.body,
}
)
7 changes: 7 additions & 0 deletions faststream/asgi/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from typing import Any, Awaitable, Callable, MutableMapping

Scope = MutableMapping[str, Any]
Message = MutableMapping[str, Any]
Receive = Callable[[], Awaitable[Message]]
Send = Callable[[Message], Awaitable[None]]
ASGIApp = Callable[[Scope, Receive, Send], Awaitable[None]]
19 changes: 19 additions & 0 deletions faststream/asgi/websocket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from typing import TYPE_CHECKING, Optional

if TYPE_CHECKING:
from faststream.asgi.types import Receive, Scope, Send


class WebSocketClose:
def __init__(
self,
code: int,
reason: Optional[str],
) -> None:
self.code = code
self.reason = reason or ""

async def __call__(self, scope: "Scope", receive: "Receive", send: "Send") -> None:
await send(
{"type": "websocket.close", "code": self.code, "reason": self.reason}
)

0 comments on commit c5c8ec5

Please sign in to comment.