Skip to content

Commit

Permalink
fix: remove Confluent producer logger
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Sep 20, 2024
1 parent b27fed3 commit ffa7388
Show file tree
Hide file tree
Showing 7 changed files with 6 additions and 23 deletions.
9 changes: 1 addition & 8 deletions faststream/confluent/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
LoggerProto,
SendableMessage,
)
from faststream._internal.setup.state import BaseState
from faststream._internal.types import (
BrokerMiddleware,
CustomCallable,
Expand All @@ -60,14 +59,14 @@


class KafkaBroker(
KafkaRegistrator,
BrokerUsecase[
Union[
confluent_kafka.Message,
Tuple[confluent_kafka.Message, ...],
],
Callable[..., AsyncConfluentConsumer],
],
KafkaRegistrator,
):
url: List[str]
_producer: Optional[AsyncConfluentFastProducer]
Expand Down Expand Up @@ -446,7 +445,6 @@ async def _connect( # type: ignore[override]
native_producer = AsyncConfluentProducer(
**kwargs,
client_id=client_id,
logger=self._state.logger_state,
config=self.config,
)

Expand All @@ -468,11 +466,6 @@ async def start(self) -> None:
self._setup()
await super().start()

def _setup(self, state: Optional["BaseState"] = None) -> None:
super()._setup(state)
if self._producer:
self._producer._setup(self._state.logger_state)

@property
def _subscriber_setup_extra(self) -> "AnyDict":
return {
Expand Down
8 changes: 1 addition & 7 deletions faststream/confluent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,8 @@ def __init__(
}
)

self.config = final_config
self.__running = False
self.producer = Producer(final_config)

def _setup(self, logger_state: "LoggerState") -> None:
self.producer = Producer(
self.config,
logger=logger_state.logger.logger,
)
self.__running = True
self._poll_task = asyncio.create_task(self._poll_loop())

Expand Down
4 changes: 0 additions & 4 deletions faststream/confluent/publisher/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

if TYPE_CHECKING:
from faststream._internal.basic_types import SendableMessage
from faststream._internal.setup.logger import LoggerState
from faststream._internal.types import CustomCallable
from faststream.confluent.client import AsyncConfluentProducer

Expand All @@ -31,9 +30,6 @@ def __init__(
self._parser = resolve_custom_func(parser, default.parse_message)
self._decoder = resolve_custom_func(decoder, default.decode_message)

def _setup(self, logger_state: "LoggerState") -> None:
self._producer._setup(logger_state)

@override
async def publish( # type: ignore[override]
self,
Expand Down
2 changes: 1 addition & 1 deletion faststream/kafka/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,11 +231,11 @@ class KafkaInitKwargs(TypedDict, total=False):


class KafkaBroker(
KafkaRegistrator,
BrokerUsecase[
Union[aiokafka.ConsumerRecord, Tuple[aiokafka.ConsumerRecord, ...]],
Callable[..., aiokafka.AIOKafkaConsumer],
],
KafkaRegistrator,
):
url: List[str]
_producer: Optional["AioKafkaFastProducer"]
Expand Down
2 changes: 1 addition & 1 deletion faststream/nats/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ class NatsInitKwargs(TypedDict, total=False):


class NatsBroker(
BrokerUsecase[Msg, Client],
NatsRegistrator,
BrokerUsecase[Msg, Client],
):
"""A class to represent a NATS broker."""

Expand Down
2 changes: 1 addition & 1 deletion faststream/rabbit/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@


class RabbitBroker(
BrokerUsecase[IncomingMessage, RobustConnection],
RabbitRegistrator,
BrokerUsecase[IncomingMessage, RobustConnection],
):
"""A class to represent a RabbitMQ broker."""

Expand Down
2 changes: 1 addition & 1 deletion faststream/redis/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ class RedisInitKwargs(TypedDict, total=False):


class RedisBroker(
BrokerUsecase[UnifyRedisDict, "Redis[bytes]"],
RedisRegistrator,
BrokerUsecase[UnifyRedisDict, "Redis[bytes]"],
):
"""Redis broker."""

Expand Down

0 comments on commit ffa7388

Please sign in to comment.