Skip to content

Commit

Permalink
logging fsm
Browse files Browse the repository at this point in the history
* feat: refactor logging with FSM

* refactor: delete subscribers without dict

* refactor: new kafka logging

* refactor: new confluent logging

* docs: generate API References

* tests: fix tests

* tests: fix in-memory mocks

* confluent: fix self.logger usage

* confluent: fix logger usage

* confluent: fix logger usage

* confluent: make laze logging

* confluent: check producer before setup

* confluent: fix producer

* tests: correct setup call order

* fix: remove Confluent producer logger

* fix: remove useless option

* tests: fix confluent

* fix confluent

---------

Co-authored-by: Lancetnik <[email protected]>
  • Loading branch information
Lancetnik and Lancetnik authored Sep 20, 2024
1 parent 0a6b83b commit 865185e
Show file tree
Hide file tree
Showing 59 changed files with 1,116 additions and 892 deletions.
12 changes: 7 additions & 5 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ search:
- broker
- [KafkaBroker](api/faststream/confluent/broker/broker/KafkaBroker.md)
- logging
- [KafkaLoggingBroker](api/faststream/confluent/broker/logging/KafkaLoggingBroker.md)
- [KafkaParamsStorage](api/faststream/confluent/broker/logging/KafkaParamsStorage.md)
- registrator
- [KafkaRegistrator](api/faststream/confluent/broker/registrator/KafkaRegistrator.md)
- client
Expand Down Expand Up @@ -335,6 +335,7 @@ search:
- [FastStreamException](api/faststream/exceptions/FastStreamException.md)
- [HandlerException](api/faststream/exceptions/HandlerException.md)
- [IgnoredException](api/faststream/exceptions/IgnoredException.md)
- [IncorrectState](api/faststream/exceptions/IncorrectState.md)
- [NackMessage](api/faststream/exceptions/NackMessage.md)
- [OperationForbiddenError](api/faststream/exceptions/OperationForbiddenError.md)
- [RejectMessage](api/faststream/exceptions/RejectMessage.md)
Expand All @@ -358,7 +359,7 @@ search:
- broker
- [KafkaBroker](api/faststream/kafka/broker/broker/KafkaBroker.md)
- logging
- [KafkaLoggingBroker](api/faststream/kafka/broker/logging/KafkaLoggingBroker.md)
- [KafkaParamsStorage](api/faststream/kafka/broker/logging/KafkaParamsStorage.md)
- registrator
- [KafkaRegistrator](api/faststream/kafka/broker/registrator/KafkaRegistrator.md)
- fastapi
Expand Down Expand Up @@ -444,6 +445,7 @@ search:
- [ignore_handler](api/faststream/middlewares/exception/ignore_handler.md)
- logging
- [CriticalLogMiddleware](api/faststream/middlewares/logging/CriticalLogMiddleware.md)
- [LoggingMiddleware](api/faststream/middlewares/logging/LoggingMiddleware.md)
- nats
- [AckPolicy](api/faststream/nats/AckPolicy.md)
- [ConsumerConfig](api/faststream/nats/ConsumerConfig.md)
Expand Down Expand Up @@ -473,7 +475,7 @@ search:
- broker
- [NatsBroker](api/faststream/nats/broker/broker/NatsBroker.md)
- logging
- [NatsLoggingBroker](api/faststream/nats/broker/logging/NatsLoggingBroker.md)
- [NatsParamsStorage](api/faststream/nats/broker/logging/NatsParamsStorage.md)
- registrator
- [NatsRegistrator](api/faststream/nats/broker/registrator/NatsRegistrator.md)
- fastapi
Expand Down Expand Up @@ -617,7 +619,7 @@ search:
- broker
- [RabbitBroker](api/faststream/rabbit/broker/broker/RabbitBroker.md)
- logging
- [RabbitLoggingBroker](api/faststream/rabbit/broker/logging/RabbitLoggingBroker.md)
- [RabbitParamsStorage](api/faststream/rabbit/broker/logging/RabbitParamsStorage.md)
- registrator
- [RabbitRegistrator](api/faststream/rabbit/broker/registrator/RabbitRegistrator.md)
- fastapi
Expand Down Expand Up @@ -699,7 +701,7 @@ search:
- broker
- [RedisBroker](api/faststream/redis/broker/broker/RedisBroker.md)
- logging
- [RedisLoggingBroker](api/faststream/redis/broker/logging/RedisLoggingBroker.md)
- [RedisParamsStorage](api/faststream/redis/broker/logging/RedisParamsStorage.md)
- registrator
- [RedisRegistrator](api/faststream/redis/broker/registrator/RedisRegistrator.md)
- fastapi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ search:
boost: 0.5
---

::: faststream.confluent.broker.logging.KafkaLoggingBroker
::: faststream.confluent.broker.logging.KafkaParamsStorage
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ search:
boost: 0.5
---

::: faststream.nats.broker.logging.NatsLoggingBroker
::: faststream.exceptions.IncorrectState
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ search:
boost: 0.5
---

::: faststream.redis.broker.logging.RedisLoggingBroker
::: faststream.kafka.broker.logging.KafkaParamsStorage
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ search:
boost: 0.5
---

::: faststream.kafka.broker.logging.KafkaLoggingBroker
::: faststream.middlewares.logging.LoggingMiddleware
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ search:
boost: 0.5
---

::: faststream.rabbit.broker.logging.RabbitLoggingBroker
::: faststream.nats.broker.logging.NatsParamsStorage
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.rabbit.broker.logging.RabbitParamsStorage
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.redis.broker.logging.RedisParamsStorage
60 changes: 27 additions & 33 deletions faststream/_internal/broker/abc_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,43 +90,37 @@ def include_router(
for h in router._subscribers.values():
h.add_prefix("".join((self.prefix, prefix)))

if (key := hash(h)) not in self._subscribers:
if include_in_schema is None:
h.include_in_schema = self._solve_include_in_schema(
h.include_in_schema
)
else:
h.include_in_schema = include_in_schema

h._broker_middlewares = (
*self._middlewares,
*middlewares,
*h._broker_middlewares,
)
h._broker_dependencies = (
*self._dependencies,
*dependencies,
*h._broker_dependencies,
)
self._subscribers = {**self._subscribers, key: h}
if include_in_schema is None:
h.include_in_schema = self._solve_include_in_schema(h.include_in_schema)
else:
h.include_in_schema = include_in_schema

h._broker_middlewares = (
*self._middlewares,
*middlewares,
*h._broker_middlewares,
)
h._broker_dependencies = (
*self._dependencies,
*dependencies,
*h._broker_dependencies,
)
self._subscribers = {**self._subscribers, hash(h): h}

for p in router._publishers.values():
p.add_prefix(self.prefix)

if (key := hash(p)) not in self._publishers:
if include_in_schema is None:
p.include_in_schema = self._solve_include_in_schema(
p.include_in_schema
)
else:
p.include_in_schema = include_in_schema

p._broker_middlewares = (
*self._middlewares,
*middlewares,
*p._broker_middlewares,
)
self._publishers = {**self._publishers, key: p}
if include_in_schema is None:
p.include_in_schema = self._solve_include_in_schema(p.include_in_schema)
else:
p.include_in_schema = include_in_schema

p._broker_middlewares = (
*self._middlewares,
*middlewares,
*p._broker_middlewares,
)
self._publishers = {**self._publishers, hash(p): p}

def include_routers(
self,
Expand Down
Loading

0 comments on commit 865185e

Please sign in to comment.