Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/exception_middleware add exception middleware #1604

Merged
merged 19 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ search:
- [Context](getting-started/lifespan/context.md)
- [Testing](getting-started/lifespan/test.md)
- [Middlewares](getting-started/middlewares/index.md)
- [Exception Middleware](getting-started/middlewares/exception.md)
- AsyncAPI
- [Schema Export](getting-started/asyncapi/export.md)
- [Schema Hosting](getting-started/asyncapi/hosting.md)
Expand Down Expand Up @@ -117,6 +118,7 @@ search:
- [BaseMiddleware](public_api/faststream/BaseMiddleware.md)
- [Context](public_api/faststream/Context.md)
- [Depends](public_api/faststream/Depends.md)
- [ExceptionMiddleware](public_api/faststream/ExceptionMiddleware.md)
- [FastStream](public_api/faststream/FastStream.md)
- [Header](public_api/faststream/Header.md)
- [Path](public_api/faststream/Path.md)
Expand Down Expand Up @@ -205,6 +207,7 @@ search:
- [BaseMiddleware](api/faststream/BaseMiddleware.md)
- [Context](api/faststream/Context.md)
- [Depends](api/faststream/Depends.md)
- [ExceptionMiddleware](api/faststream/ExceptionMiddleware.md)
- [FastStream](api/faststream/FastStream.md)
- [Header](api/faststream/Header.md)
- [Path](api/faststream/Path.md)
Expand Down Expand Up @@ -372,8 +375,12 @@ search:
- [gen_cor_id](api/faststream/broker/message/gen_cor_id.md)
- middlewares
- [BaseMiddleware](api/faststream/broker/middlewares/BaseMiddleware.md)
- [ExceptionMiddleware](api/faststream/broker/middlewares/ExceptionMiddleware.md)
- base
- [BaseMiddleware](api/faststream/broker/middlewares/base/BaseMiddleware.md)
- exception
- [BaseExceptionMiddleware](api/faststream/broker/middlewares/exception/BaseExceptionMiddleware.md)
- [ExceptionMiddleware](api/faststream/broker/middlewares/exception/ExceptionMiddleware.md)
- logging
- [CriticalLogMiddleware](api/faststream/broker/middlewares/logging/CriticalLogMiddleware.md)
- proto
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/ExceptionMiddleware.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.ExceptionMiddleware
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.broker.middlewares.ExceptionMiddleware
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.broker.middlewares.exception.BaseExceptionMiddleware
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.broker.middlewares.exception.ExceptionMiddleware
106 changes: 106 additions & 0 deletions docs/docs/en/getting-started/middlewares/exception.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 10
---

# Exception Middleware

Sometimes you need to register exception processors at top level of your application instead of each message handler.

For this case, **FastStream** has a special `ExceptionMiddleware`. You just need to create it, register handlers and add to broker / router / subscribers you want (as a [regular middleware](index.md){.internal-link}).

```python linenums="1"
from faststream. import ExceptionMiddleware

exception_middleware = ExceptionMiddleware()

Broker(middlewares=[exception_middleware])
```

This middleware can be used in a two ways we discuss further.

## General Exceptions processing

The first way - general exceptions processing. It is a default case, that can be used to log exceptions correctly / clear / etc. This type of handlers processes all sources of errors: message handler, parser/decoder, another middlewares, publishing. But, it **can't be used to publish** something as a default value to request.

You can register such handlers in a two ways:

1. By middleware `#!python @add_handler(...)` decorator:
```python linenums="1" hl_lines="3"
exc_middleware = ExceptionMiddleware()

@exc_middleware.add_handler(Exception)
def error_handler(exc: Exception) -> None:
print(repr(exc))
```

2. By middleware `handlers` initial option
```python linenums="1" hl_lines="5-7"
def error_handler(exc: Exception) -> None:
print(repr(exc))

exc_middleware = ExceptionMiddleware(
handlers={
Exception: error_handler
}
)
```

## Publishing Exceptions Handlers

The first way to process messages - fallback to default result, that should be published at error. Such handlers are able to process errors in your message handler (or serialization) function only.

They can be registered the same two ways as a previous one, but with a little difference:

1. By middleware `#!python @add_handler(..., publish=True)` decorator:
```python linenums="1" hl_lines="3"
exc_middleware = ExceptionMiddleware()

@exc_middleware.add_handler(Exception, publish=True)
def error_handler(exc: Exception) -> str:
print(repr(exc))
return "error occurred"
```

2. By middleware `publish_handlers` initial option
```python linenums="1" hl_lines="6-8"
def error_handler(exc: Exception) -> str:
print(repr(exc))
return "error occurred"

exc_middleware = ExceptionMiddleware(
publish_handlers={
Exception: error_handler
}
)
```

## Handlers requirements

Your registered exception handlers are also wrapped by **FastDepends** serialization mechanism, so they can be

* sync/async both
* ask for [Context](../context/index.md){.internal-link} feature

As a regular message handler does.

As an example - you can get a consumed message in your handler in a regular way:

```python linenums="1" hl_lines="8"
from faststream import ExceptionMiddleware, Context

exc_middleware = ExceptionMiddleware()

@exc_middleware.add_handler(Exception, publish=True)
def base_exc_handler(
exc: Exception,
message = Context(),
) -> str:
print(exc, msg)
return "default"
```
1 change: 1 addition & 0 deletions docs/docs/navigation_template.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ search:
- [Context](getting-started/lifespan/context.md)
- [Testing](getting-started/lifespan/test.md)
- [Middlewares](getting-started/middlewares/index.md)
- [Exception Middleware](getting-started/middlewares/exception.md)
- AsyncAPI
- [Schema Export](getting-started/asyncapi/export.md)
- [Schema Hosting](getting-started/asyncapi/hosting.md)
Expand Down
3 changes: 2 additions & 1 deletion faststream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from faststream.annotations import ContextRepo, Logger, NoCast
from faststream.app import FastStream
from faststream.broker.middlewares import BaseMiddleware
from faststream.broker.middlewares import BaseMiddleware, ExceptionMiddleware
from faststream.broker.response import Response
from faststream.testing.app import TestApp
from faststream.utils import Context, Depends, Header, Path, apply_types, context
Expand All @@ -24,6 +24,7 @@
"NoCast",
# middlewares
"BaseMiddleware",
"ExceptionMiddleware",
# basic
"Response",
)
3 changes: 2 additions & 1 deletion faststream/broker/middlewares/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from faststream.broker.middlewares.base import BaseMiddleware
from faststream.broker.middlewares.exception import ExceptionMiddleware

__all__ = ("BaseMiddleware",)
__all__ = ("BaseMiddleware", "ExceptionMiddleware")
166 changes: 166 additions & 0 deletions faststream/broker/middlewares/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
ContextManager,
Dict,
Optional,
Type,
Union,
overload,
)

from typing_extensions import Literal, TypeAlias

from faststream.broker.middlewares.base import BaseMiddleware
from faststream.utils import apply_types, context
from faststream.utils.functions import sync_fake_context, to_async

if TYPE_CHECKING:
from types import TracebackType

from faststream.broker.message import StreamMessage
from faststream.types import AsyncFuncAny, SendableMessage


GeneralExceptionHandler: TypeAlias = Callable[..., None]
PublishingExceptionHandler: TypeAlias = Callable[..., "SendableMessage"]

CastedGeneralExceptionHandler: TypeAlias = Callable[..., Awaitable[None]]
CastedPublishingExceptionHandler: TypeAlias = Callable[
..., Awaitable["SendableMessage"]
]
CastedHandlers: TypeAlias = Dict[Type[Exception], CastedGeneralExceptionHandler]
CastedPublishingHandlers: TypeAlias = Dict[
Type[Exception], CastedPublishingExceptionHandler
]


class BaseExceptionMiddleware(BaseMiddleware):
def __init__(
self,
handlers: CastedHandlers,
publish_handlers: CastedPublishingHandlers,
msg: Optional[Any] = None,
) -> None:
super().__init__(msg)
self._handlers = handlers
self._publish_handlers = publish_handlers

async def consume_scope(
self,
call_next: "AsyncFuncAny",
msg: "StreamMessage[Any]",
) -> Any:
try:
return await call_next(await self.on_consume(msg))

except Exception as exc:
exc_type = type(exc)

for handler_type, handler in self._publish_handlers.items():
if issubclass(exc_type, handler_type):
return await handler(exc)

raise exc

async def after_processed(
self,
exc_type: Optional[Type[BaseException]] = None,
exc_val: Optional[BaseException] = None,
exc_tb: Optional["TracebackType"] = None,
) -> Optional[bool]:
if exc_type:
for handler_type, handler in self._handlers.items():
if issubclass(exc_type, handler_type):
# TODO: remove it after context will be moved to middleware
# In case parser/decoder error occurred
scope: ContextManager[Any]
if not context.get_local("message"):
scope = context.scope("message", self.msg)
else:
scope = sync_fake_context()

with scope:
await handler(exc_val)

return True

return False

return None


class ExceptionMiddleware:
__slots__ = ("_handlers", "_publish_handlers")

_handlers: CastedHandlers
_publish_handlers: CastedPublishingHandlers

def __init__(
self,
handlers: Optional[Dict[Type[Exception], GeneralExceptionHandler]] = None,
publish_handlers: Optional[
Dict[Type[Exception], PublishingExceptionHandler]
] = None,
) -> None:
self._handlers = {
exc_type: apply_types(to_async(handler))
for exc_type, handler in (handlers or {}).items()
}

self._publish_handlers = {
exc_type: apply_types(to_async(handler))
for exc_type, handler in (publish_handlers or {}).items()
}

@overload
def add_handler(
self,
exc: Type[Exception],
publish: Literal[False] = False,
) -> Callable[[GeneralExceptionHandler], GeneralExceptionHandler]: ...

@overload
def add_handler(
self,
exc: Type[Exception],
publish: Literal[True],
) -> Callable[[PublishingExceptionHandler], PublishingExceptionHandler]: ...

def add_handler(
self,
exc: Type[Exception],
publish: bool = False,
) -> Union[
Callable[[GeneralExceptionHandler], GeneralExceptionHandler],
Callable[[PublishingExceptionHandler], PublishingExceptionHandler],
]:
if publish:

def pub_wrapper(
func: PublishingExceptionHandler,
) -> PublishingExceptionHandler:
self._publish_handlers[exc] = apply_types(to_async(func))
return func

return pub_wrapper

else:

def default_wrapper(
func: GeneralExceptionHandler,
) -> GeneralExceptionHandler:
self._handlers[exc] = apply_types(to_async(func))
return func

return default_wrapper

def __call__(self, msg: Optional[Any]) -> BaseExceptionMiddleware:
"""Real middleware runtime constructor."""
return BaseExceptionMiddleware(
handlers=self._handlers,
publish_handlers=self._publish_handlers,
msg=msg,
)
Loading
Loading