Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/exception_middleware add exception middleware #1604

Merged
merged 19 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ search:
- [Context](getting-started/lifespan/context.md)
- [Testing](getting-started/lifespan/test.md)
- [Middlewares](getting-started/middlewares/index.md)
- [Exception Middleware](getting-started/middlewares/exception.md)
- AsyncAPI
- [Schema Export](getting-started/asyncapi/export.md)
- [Schema Hosting](getting-started/asyncapi/hosting.md)
Expand Down Expand Up @@ -117,6 +118,7 @@ search:
- [BaseMiddleware](public_api/faststream/BaseMiddleware.md)
- [Context](public_api/faststream/Context.md)
- [Depends](public_api/faststream/Depends.md)
- [ExceptionMiddleware](public_api/faststream/ExceptionMiddleware.md)
- [FastStream](public_api/faststream/FastStream.md)
- [Header](public_api/faststream/Header.md)
- [Path](public_api/faststream/Path.md)
Expand Down Expand Up @@ -205,6 +207,7 @@ search:
- [BaseMiddleware](api/faststream/BaseMiddleware.md)
- [Context](api/faststream/Context.md)
- [Depends](api/faststream/Depends.md)
- [ExceptionMiddleware](api/faststream/ExceptionMiddleware.md)
- [FastStream](api/faststream/FastStream.md)
- [Header](api/faststream/Header.md)
- [Path](api/faststream/Path.md)
Expand Down Expand Up @@ -372,8 +375,12 @@ search:
- [gen_cor_id](api/faststream/broker/message/gen_cor_id.md)
- middlewares
- [BaseMiddleware](api/faststream/broker/middlewares/BaseMiddleware.md)
- [ExceptionMiddleware](api/faststream/broker/middlewares/ExceptionMiddleware.md)
- base
- [BaseMiddleware](api/faststream/broker/middlewares/base/BaseMiddleware.md)
- exception
- [BaseExceptionMiddleware](api/faststream/broker/middlewares/exception/BaseExceptionMiddleware.md)
- [ExceptionMiddleware](api/faststream/broker/middlewares/exception/ExceptionMiddleware.md)
- logging
- [CriticalLogMiddleware](api/faststream/broker/middlewares/logging/CriticalLogMiddleware.md)
- proto
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/ExceptionMiddleware.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.ExceptionMiddleware
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.broker.middlewares.ExceptionMiddleware
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.broker.middlewares.exception.BaseExceptionMiddleware
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.broker.middlewares.exception.ExceptionMiddleware
106 changes: 106 additions & 0 deletions docs/docs/en/getting-started/middlewares/exception.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 10
---

# Exception Middleware

Sometimes, you need to register exception processors at the top level of your application instead of within each message handler.

For this purpose, **FastStream** provides a special `ExceptionMiddleware`. You just need to create it, register handlers, and add it to the broker, router, or subscribers you want (as a [regular middleware](index.md){.internal-link}).

```python linenums="1"
from faststream. import ExceptionMiddleware

exception_middleware = ExceptionMiddleware()

Broker(middlewares=[exception_middleware])
```

This middleware can be used in two ways, which we will discuss further.

## General Exceptions Processing

The first way is general exception processing. This is the default case, which can be used to log exceptions correctly, perform cleanup, etc. This type of handler processes all sources of errors such as message handlers, parser/decoder, other middlewares, and publishing. However, it **cannot be used to publish** a default value in response to a request.

You can register such handlers in two ways:

1. By using the middleware's `#!python @add_handler(...)` decorator:
```python linenums="1" hl_lines="3"
exc_middleware = ExceptionMiddleware()

@exc_middleware.add_handler(Exception)
def error_handler(exc: Exception) -> None:
print(repr(exc))
```

2. By using the middleware's `handlers` initialization option:
```python linenums="1" hl_lines="5-7"
def error_handler(exc: Exception) -> None:
print(repr(exc))

exc_middleware = ExceptionMiddleware(
handlers={
Exception: error_handler
}
)
```

## Publishing Exceptions Handlers

The second way to process messages is to fallback to a default result that should be published in case of an error. Such handlers can process errors in your message handler (or serialization) function only.

They can be registered in the same two ways as the previous one, but with a slight difference:

1. By using the middleware's `#!python @add_handler(..., publish=True)` decorator:
```python linenums="1" hl_lines="3"
exc_middleware = ExceptionMiddleware()

@exc_middleware.add_handler(Exception, publish=True)
def error_handler(exc: Exception) -> str:
print(repr(exc))
return "error occurred"
```

2. By using the middleware's `publish_handlers` initialization option:
```python linenums="1" hl_lines="6-8"
def error_handler(exc: Exception) -> str:
print(repr(exc))
return "error occurred"

exc_middleware = ExceptionMiddleware(
publish_handlers={
Exception: error_handler
}
)
```

## Handler Requirements

Your registered exception handlers are also wrapped by the **FastDepends** serialization mechanism, so they can be:

* Either sync or async
* Able to access the [Context](../context/index.md){.internal-link} feature

This works in the same way as a regular message handler.

For example, you can access a consumed message in your handler as follows:

```python linenums="1" hl_lines="8"
from faststream import ExceptionMiddleware, Context

exc_middleware = ExceptionMiddleware()

@exc_middleware.add_handler(Exception, publish=True)
def base_exc_handler(
exc: Exception,
message = Context(),
) -> str:
print(exc, msg)
return "default"
```
1 change: 1 addition & 0 deletions docs/docs/navigation_template.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ search:
- [Context](getting-started/lifespan/context.md)
- [Testing](getting-started/lifespan/test.md)
- [Middlewares](getting-started/middlewares/index.md)
- [Exception Middleware](getting-started/middlewares/exception.md)
- AsyncAPI
- [Schema Export](getting-started/asyncapi/export.md)
- [Schema Hosting](getting-started/asyncapi/hosting.md)
Expand Down
3 changes: 2 additions & 1 deletion faststream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from faststream.annotations import ContextRepo, Logger, NoCast
from faststream.app import FastStream
from faststream.broker.middlewares import BaseMiddleware
from faststream.broker.middlewares import BaseMiddleware, ExceptionMiddleware
from faststream.broker.response import Response
from faststream.testing.app import TestApp
from faststream.utils import Context, Depends, Header, Path, apply_types, context
Expand All @@ -24,6 +24,7 @@
"NoCast",
# middlewares
"BaseMiddleware",
"ExceptionMiddleware",
# basic
"Response",
)
3 changes: 2 additions & 1 deletion faststream/broker/middlewares/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from faststream.broker.middlewares.base import BaseMiddleware
from faststream.broker.middlewares.exception import ExceptionMiddleware

__all__ = ("BaseMiddleware",)
__all__ = ("BaseMiddleware", "ExceptionMiddleware")
166 changes: 166 additions & 0 deletions faststream/broker/middlewares/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
ContextManager,
Dict,
Optional,
Type,
Union,
overload,
)

from typing_extensions import Literal, TypeAlias

from faststream.broker.middlewares.base import BaseMiddleware
from faststream.utils import apply_types, context
from faststream.utils.functions import sync_fake_context, to_async

if TYPE_CHECKING:
from types import TracebackType

from faststream.broker.message import StreamMessage
from faststream.types import AsyncFuncAny, SendableMessage


GeneralExceptionHandler: TypeAlias = Callable[..., None]
PublishingExceptionHandler: TypeAlias = Callable[..., "SendableMessage"]

CastedGeneralExceptionHandler: TypeAlias = Callable[..., Awaitable[None]]
CastedPublishingExceptionHandler: TypeAlias = Callable[
..., Awaitable["SendableMessage"]
]
CastedHandlers: TypeAlias = Dict[Type[Exception], CastedGeneralExceptionHandler]
CastedPublishingHandlers: TypeAlias = Dict[
Type[Exception], CastedPublishingExceptionHandler
]


class BaseExceptionMiddleware(BaseMiddleware):
def __init__(
self,
handlers: CastedHandlers,
publish_handlers: CastedPublishingHandlers,
msg: Optional[Any] = None,
) -> None:
super().__init__(msg)
self._handlers = handlers
self._publish_handlers = publish_handlers

async def consume_scope(
self,
call_next: "AsyncFuncAny",
msg: "StreamMessage[Any]",
) -> Any:
try:
return await call_next(await self.on_consume(msg))

except Exception as exc:
exc_type = type(exc)

for handler_type, handler in self._publish_handlers.items():
if issubclass(exc_type, handler_type):
return await handler(exc)

raise exc

async def after_processed(
self,
exc_type: Optional[Type[BaseException]] = None,
exc_val: Optional[BaseException] = None,
exc_tb: Optional["TracebackType"] = None,
) -> Optional[bool]:
if exc_type:
for handler_type, handler in self._handlers.items():
if issubclass(exc_type, handler_type):
# TODO: remove it after context will be moved to middleware
# In case parser/decoder error occurred
scope: ContextManager[Any]
if not context.get_local("message"):
scope = context.scope("message", self.msg)
else:
scope = sync_fake_context()

with scope:
await handler(exc_val)

return True

return False

return None


class ExceptionMiddleware:
__slots__ = ("_handlers", "_publish_handlers")

_handlers: CastedHandlers
_publish_handlers: CastedPublishingHandlers

def __init__(
self,
handlers: Optional[Dict[Type[Exception], GeneralExceptionHandler]] = None,
publish_handlers: Optional[
Dict[Type[Exception], PublishingExceptionHandler]
] = None,
) -> None:
self._handlers = {
exc_type: apply_types(to_async(handler))
for exc_type, handler in (handlers or {}).items()
}

self._publish_handlers = {
exc_type: apply_types(to_async(handler))
for exc_type, handler in (publish_handlers or {}).items()
}

@overload
def add_handler(
self,
exc: Type[Exception],
publish: Literal[False] = False,
) -> Callable[[GeneralExceptionHandler], GeneralExceptionHandler]: ...

@overload
def add_handler(
self,
exc: Type[Exception],
publish: Literal[True],
) -> Callable[[PublishingExceptionHandler], PublishingExceptionHandler]: ...

def add_handler(
self,
exc: Type[Exception],
publish: bool = False,
) -> Union[
Callable[[GeneralExceptionHandler], GeneralExceptionHandler],
Callable[[PublishingExceptionHandler], PublishingExceptionHandler],
]:
if publish:

def pub_wrapper(
func: PublishingExceptionHandler,
) -> PublishingExceptionHandler:
self._publish_handlers[exc] = apply_types(to_async(func))
return func

return pub_wrapper

else:

def default_wrapper(
func: GeneralExceptionHandler,
) -> GeneralExceptionHandler:
self._handlers[exc] = apply_types(to_async(func))
return func

return default_wrapper

def __call__(self, msg: Optional[Any]) -> BaseExceptionMiddleware:
"""Real middleware runtime constructor."""
return BaseExceptionMiddleware(
handlers=self._handlers,
publish_handlers=self._publish_handlers,
msg=msg,
)
Loading
Loading