Skip to content

Commit

Permalink
feature/exception_middleware add exception middleware (#1604)
Browse files Browse the repository at this point in the history
* feature/exception_middleware add exception middleware

* feature/exception_middleware refactored exception miffleware

* feature/exception_middleware improved type hints, added tests

* feature/exception_middleware tets refactored

* feature/exception_middleware fixed add_handler decorator, added new test

* feature/exception_middleware added publish_handlers and tests

* Better search for exception + pass message to error handler

* Fix typehint

* feature: add FastDepends to exception handlers

* docs: generate API References

* docs: correct navigation

* docs: generate API References

* chore: fix CI

* Proofread docs

---------

Co-authored-by: Pastukhov Nikita <[email protected]>
Co-authored-by: sheldy <[email protected]>
Co-authored-by: Nikita Pastukhov <[email protected]>
Co-authored-by: Lancetnik <[email protected]>
Co-authored-by: Kumaran Rajendhiran <[email protected]>
  • Loading branch information
6 people authored Aug 22, 2024
1 parent ce092e6 commit e015bf8
Show file tree
Hide file tree
Showing 17 changed files with 628 additions and 12 deletions.
7 changes: 7 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ search:
- [Context](getting-started/lifespan/context.md)
- [Testing](getting-started/lifespan/test.md)
- [Middlewares](getting-started/middlewares/index.md)
- [Exception Middleware](getting-started/middlewares/exception.md)
- AsyncAPI
- [Schema Export](getting-started/asyncapi/export.md)
- [Schema Hosting](getting-started/asyncapi/hosting.md)
Expand Down Expand Up @@ -117,6 +118,7 @@ search:
- [BaseMiddleware](public_api/faststream/BaseMiddleware.md)
- [Context](public_api/faststream/Context.md)
- [Depends](public_api/faststream/Depends.md)
- [ExceptionMiddleware](public_api/faststream/ExceptionMiddleware.md)
- [FastStream](public_api/faststream/FastStream.md)
- [Header](public_api/faststream/Header.md)
- [Path](public_api/faststream/Path.md)
Expand Down Expand Up @@ -205,6 +207,7 @@ search:
- [BaseMiddleware](api/faststream/BaseMiddleware.md)
- [Context](api/faststream/Context.md)
- [Depends](api/faststream/Depends.md)
- [ExceptionMiddleware](api/faststream/ExceptionMiddleware.md)
- [FastStream](api/faststream/FastStream.md)
- [Header](api/faststream/Header.md)
- [Path](api/faststream/Path.md)
Expand Down Expand Up @@ -372,8 +375,12 @@ search:
- [gen_cor_id](api/faststream/broker/message/gen_cor_id.md)
- middlewares
- [BaseMiddleware](api/faststream/broker/middlewares/BaseMiddleware.md)
- [ExceptionMiddleware](api/faststream/broker/middlewares/ExceptionMiddleware.md)
- base
- [BaseMiddleware](api/faststream/broker/middlewares/base/BaseMiddleware.md)
- exception
- [BaseExceptionMiddleware](api/faststream/broker/middlewares/exception/BaseExceptionMiddleware.md)
- [ExceptionMiddleware](api/faststream/broker/middlewares/exception/ExceptionMiddleware.md)
- logging
- [CriticalLogMiddleware](api/faststream/broker/middlewares/logging/CriticalLogMiddleware.md)
- proto
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/ExceptionMiddleware.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.ExceptionMiddleware
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.broker.middlewares.ExceptionMiddleware
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.broker.middlewares.exception.BaseExceptionMiddleware
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.broker.middlewares.exception.ExceptionMiddleware
106 changes: 106 additions & 0 deletions docs/docs/en/getting-started/middlewares/exception.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 10
---

# Exception Middleware

Sometimes, you need to register exception processors at the top level of your application instead of within each message handler.

For this purpose, **FastStream** provides a special `ExceptionMiddleware`. You just need to create it, register handlers, and add it to the broker, router, or subscribers you want (as a [regular middleware](index.md){.internal-link}).

```python linenums="1"
from faststream. import ExceptionMiddleware

exception_middleware = ExceptionMiddleware()

Broker(middlewares=[exception_middleware])
```

This middleware can be used in two ways, which we will discuss further.

## General Exceptions Processing

The first way is general exception processing. This is the default case, which can be used to log exceptions correctly, perform cleanup, etc. This type of handler processes all sources of errors such as message handlers, parser/decoder, other middlewares, and publishing. However, it **cannot be used to publish** a default value in response to a request.

You can register such handlers in two ways:

1. By using the middleware's `#!python @add_handler(...)` decorator:
```python linenums="1" hl_lines="3"
exc_middleware = ExceptionMiddleware()

@exc_middleware.add_handler(Exception)
def error_handler(exc: Exception) -> None:
print(repr(exc))
```

2. By using the middleware's `handlers` initialization option:
```python linenums="1" hl_lines="5-7"
def error_handler(exc: Exception) -> None:
print(repr(exc))

exc_middleware = ExceptionMiddleware(
handlers={
Exception: error_handler
}
)
```

## Publishing Exceptions Handlers

The second way to process messages is to fallback to a default result that should be published in case of an error. Such handlers can process errors in your message handler (or serialization) function only.

They can be registered in the same two ways as the previous one, but with a slight difference:

1. By using the middleware's `#!python @add_handler(..., publish=True)` decorator:
```python linenums="1" hl_lines="3"
exc_middleware = ExceptionMiddleware()

@exc_middleware.add_handler(Exception, publish=True)
def error_handler(exc: Exception) -> str:
print(repr(exc))
return "error occurred"
```

2. By using the middleware's `publish_handlers` initialization option:
```python linenums="1" hl_lines="6-8"
def error_handler(exc: Exception) -> str:
print(repr(exc))
return "error occurred"

exc_middleware = ExceptionMiddleware(
publish_handlers={
Exception: error_handler
}
)
```

## Handler Requirements

Your registered exception handlers are also wrapped by the **FastDepends** serialization mechanism, so they can be:

* Either sync or async
* Able to access the [Context](../context/index.md){.internal-link} feature

This works in the same way as a regular message handler.

For example, you can access a consumed message in your handler as follows:

```python linenums="1" hl_lines="8"
from faststream import ExceptionMiddleware, Context

exc_middleware = ExceptionMiddleware()

@exc_middleware.add_handler(Exception, publish=True)
def base_exc_handler(
exc: Exception,
message = Context(),
) -> str:
print(exc, msg)
return "default"
```
1 change: 1 addition & 0 deletions docs/docs/navigation_template.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ search:
- [Context](getting-started/lifespan/context.md)
- [Testing](getting-started/lifespan/test.md)
- [Middlewares](getting-started/middlewares/index.md)
- [Exception Middleware](getting-started/middlewares/exception.md)
- AsyncAPI
- [Schema Export](getting-started/asyncapi/export.md)
- [Schema Hosting](getting-started/asyncapi/hosting.md)
Expand Down
3 changes: 2 additions & 1 deletion faststream/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from faststream.annotations import ContextRepo, Logger, NoCast
from faststream.app import FastStream
from faststream.broker.middlewares import BaseMiddleware
from faststream.broker.middlewares import BaseMiddleware, ExceptionMiddleware
from faststream.broker.response import Response
from faststream.testing.app import TestApp
from faststream.utils import Context, Depends, Header, Path, apply_types, context
Expand All @@ -24,6 +24,7 @@
"NoCast",
# middlewares
"BaseMiddleware",
"ExceptionMiddleware",
# basic
"Response",
)
3 changes: 2 additions & 1 deletion faststream/broker/middlewares/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from faststream.broker.middlewares.base import BaseMiddleware
from faststream.broker.middlewares.exception import ExceptionMiddleware

__all__ = ("BaseMiddleware",)
__all__ = ("BaseMiddleware", "ExceptionMiddleware")
166 changes: 166 additions & 0 deletions faststream/broker/middlewares/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
ContextManager,
Dict,
Optional,
Type,
Union,
overload,
)

from typing_extensions import Literal, TypeAlias

from faststream.broker.middlewares.base import BaseMiddleware
from faststream.utils import apply_types, context
from faststream.utils.functions import sync_fake_context, to_async

if TYPE_CHECKING:
from types import TracebackType

from faststream.broker.message import StreamMessage
from faststream.types import AsyncFuncAny, SendableMessage


GeneralExceptionHandler: TypeAlias = Callable[..., None]
PublishingExceptionHandler: TypeAlias = Callable[..., "SendableMessage"]

CastedGeneralExceptionHandler: TypeAlias = Callable[..., Awaitable[None]]
CastedPublishingExceptionHandler: TypeAlias = Callable[
..., Awaitable["SendableMessage"]
]
CastedHandlers: TypeAlias = Dict[Type[Exception], CastedGeneralExceptionHandler]
CastedPublishingHandlers: TypeAlias = Dict[
Type[Exception], CastedPublishingExceptionHandler
]


class BaseExceptionMiddleware(BaseMiddleware):
def __init__(
self,
handlers: CastedHandlers,
publish_handlers: CastedPublishingHandlers,
msg: Optional[Any] = None,
) -> None:
super().__init__(msg)
self._handlers = handlers
self._publish_handlers = publish_handlers

async def consume_scope(
self,
call_next: "AsyncFuncAny",
msg: "StreamMessage[Any]",
) -> Any:
try:
return await call_next(await self.on_consume(msg))

except Exception as exc:
exc_type = type(exc)

for handler_type, handler in self._publish_handlers.items():
if issubclass(exc_type, handler_type):
return await handler(exc)

raise exc

async def after_processed(
self,
exc_type: Optional[Type[BaseException]] = None,
exc_val: Optional[BaseException] = None,
exc_tb: Optional["TracebackType"] = None,
) -> Optional[bool]:
if exc_type:
for handler_type, handler in self._handlers.items():
if issubclass(exc_type, handler_type):
# TODO: remove it after context will be moved to middleware
# In case parser/decoder error occurred
scope: ContextManager[Any]
if not context.get_local("message"):
scope = context.scope("message", self.msg)
else:
scope = sync_fake_context()

with scope:
await handler(exc_val)

return True

return False

return None


class ExceptionMiddleware:
__slots__ = ("_handlers", "_publish_handlers")

_handlers: CastedHandlers
_publish_handlers: CastedPublishingHandlers

def __init__(
self,
handlers: Optional[Dict[Type[Exception], GeneralExceptionHandler]] = None,
publish_handlers: Optional[
Dict[Type[Exception], PublishingExceptionHandler]
] = None,
) -> None:
self._handlers = {
exc_type: apply_types(to_async(handler))
for exc_type, handler in (handlers or {}).items()
}

self._publish_handlers = {
exc_type: apply_types(to_async(handler))
for exc_type, handler in (publish_handlers or {}).items()
}

@overload
def add_handler(
self,
exc: Type[Exception],
publish: Literal[False] = False,
) -> Callable[[GeneralExceptionHandler], GeneralExceptionHandler]: ...

@overload
def add_handler(
self,
exc: Type[Exception],
publish: Literal[True],
) -> Callable[[PublishingExceptionHandler], PublishingExceptionHandler]: ...

def add_handler(
self,
exc: Type[Exception],
publish: bool = False,
) -> Union[
Callable[[GeneralExceptionHandler], GeneralExceptionHandler],
Callable[[PublishingExceptionHandler], PublishingExceptionHandler],
]:
if publish:

def pub_wrapper(
func: PublishingExceptionHandler,
) -> PublishingExceptionHandler:
self._publish_handlers[exc] = apply_types(to_async(func))
return func

return pub_wrapper

else:

def default_wrapper(
func: GeneralExceptionHandler,
) -> GeneralExceptionHandler:
self._handlers[exc] = apply_types(to_async(func))
return func

return default_wrapper

def __call__(self, msg: Optional[Any]) -> BaseExceptionMiddleware:
"""Real middleware runtime constructor."""
return BaseExceptionMiddleware(
handlers=self._handlers,
publish_handlers=self._publish_handlers,
msg=msg,
)
Loading

0 comments on commit e015bf8

Please sign in to comment.