diff --git a/faststream/confluent/broker/broker.py b/faststream/confluent/broker/broker.py index dc567928b8..570f2d6b16 100644 --- a/faststream/confluent/broker/broker.py +++ b/faststream/confluent/broker/broker.py @@ -47,6 +47,7 @@ LoggerProto, SendableMessage, ) + from faststream._internal.setup.state import BaseState from faststream._internal.types import ( BrokerMiddleware, CustomCallable, @@ -445,7 +446,7 @@ async def _connect( # type: ignore[override] native_producer = AsyncConfluentProducer( **kwargs, client_id=client_id, - logger=self._state.logger_state.logger.logger, + logger=self._state.logger_state, config=self.config, ) @@ -458,15 +459,19 @@ async def _connect( # type: ignore[override] return partial( AsyncConfluentConsumer, **filter_by_dict(ConsumerConnectionParams, kwargs), - logger=self._state.logger_state.logger.logger, + logger=self._state.logger_state, config=self.config, ) async def start(self) -> None: - self._setup() await self.connect() + self._setup() await super().start() + def _setup(self, state: Optional["BaseState"] = None) -> None: + super()._setup(state) + self._producer._setup(self._state.logger_state) + @property def _subscriber_setup_extra(self) -> "AnyDict": return { diff --git a/faststream/confluent/client.py b/faststream/confluent/client.py index 2c62873ef3..87d193069e 100644 --- a/faststream/confluent/client.py +++ b/faststream/confluent/client.py @@ -30,6 +30,7 @@ from typing_extensions import NotRequired, TypedDict from faststream._internal.basic_types import AnyDict, LoggerProto + from faststream._internal.setup.logger import LoggerState class _SendKwargs(TypedDict): value: Optional[Union[str, bytes]] @@ -68,8 +69,6 @@ def __init__( sasl_plain_password: Optional[str] = None, sasl_plain_username: Optional[str] = None, ) -> None: - self.logger = logger - if isinstance(bootstrap_servers, Iterable) and not isinstance( bootstrap_servers, str ): @@ -112,11 +111,17 @@ def __init__( } ) - self.producer = Producer(final_config, logger=self.logger) + self.config = final_config self.__running = True self._poll_task = asyncio.create_task(self._poll_loop()) + def _setup(self, logger_state: "LoggerState") -> None: + self.producer = Producer( + self.config, + logger=logger_state.logger.logger, + ) + async def _poll_loop(self) -> None: while self.__running: with suppress(Exception): @@ -223,7 +228,7 @@ def __init__( self, *topics: str, partitions: Sequence["TopicPartition"], - logger: Optional["LoggerProto"], + logger: "LoggerState", config: config_module.ConfluentFastConfig, bootstrap_servers: Union[str, List[str]] = "localhost", client_id: Optional[str] = "confluent-kafka-consumer", @@ -251,7 +256,7 @@ def __init__( sasl_plain_password: Optional[str] = None, sasl_plain_username: Optional[str] = None, ) -> None: - self.logger = logger + self.logger_state = logger if isinstance(bootstrap_servers, Iterable) and not isinstance( bootstrap_servers, str @@ -312,7 +317,6 @@ def __init__( ) self.config = final_config - self.consumer = Consumer(final_config, logger=self.logger) @property def topics_to_create(self) -> List[str]: @@ -320,15 +324,23 @@ def topics_to_create(self) -> List[str]: async def start(self) -> None: """Starts the Kafka consumer and subscribes to the specified topics.""" + self.consumer = Consumer( + self.config, + logger=self.logger_state.logger.logger, + ) + if self.allow_auto_create_topics: await call_or_await( - create_topics, self.topics_to_create, self.config, self.logger + create_topics, + self.topics_to_create, + self.config, + self.logger_state.logger.logger, ) - elif self.logger: - self.logger.log( - logging.WARNING, - "Auto create topics is disabled. Make sure the topics exist.", + else: + self.logger_state.log( + log_level=logging.WARNING, + message="Auto create topics is disabled. Make sure the topics exist.", ) if self.topics: @@ -359,10 +371,10 @@ async def stop(self) -> None: # No offset stored issue is not a problem - https://github.com/confluentinc/confluent-kafka-python/issues/295#issuecomment-355907183 if "No offset stored" in str(e): pass - elif self.logger: - self.logger.log( - logging.ERROR, - "Consumer closing error occurred.", + else: + self.logger_state.log( + log_level=logging.ERROR, + message="Consumer closing error occurred.", exc_info=e, ) diff --git a/faststream/confluent/publisher/producer.py b/faststream/confluent/publisher/producer.py index bb162d720f..a917722f3c 100644 --- a/faststream/confluent/publisher/producer.py +++ b/faststream/confluent/publisher/producer.py @@ -10,6 +10,7 @@ if TYPE_CHECKING: from faststream._internal.basic_types import SendableMessage + from faststream._internal.setup.logger import LoggerState from faststream._internal.types import CustomCallable from faststream.confluent.client import AsyncConfluentProducer @@ -30,6 +31,9 @@ def __init__( self._parser = resolve_custom_func(parser, default.parse_message) self._decoder = resolve_custom_func(decoder, default.decode_message) + def _setup(self, logger_state: "LoggerState") -> None: + self._producer._setup(logger_state) + @override async def publish( # type: ignore[override] self, diff --git a/faststream/confluent/testing.py b/faststream/confluent/testing.py index 13be8c316c..58704c3d6e 100644 --- a/faststream/confluent/testing.py +++ b/faststream/confluent/testing.py @@ -18,6 +18,7 @@ if TYPE_CHECKING: from faststream._internal.basic_types import SendableMessage + from faststream._internal.setup.logger import LoggerState from faststream.confluent.publisher.publisher import SpecificationPublisher from faststream.confluent.subscriber.usecase import LogicSubscriber @@ -88,6 +89,9 @@ def __init__(self, broker: KafkaBroker) -> None: self._parser = resolve_custom_func(broker._parser, default.parse_message) self._decoder = resolve_custom_func(broker._decoder, default.decode_message) + def _setup(self, logger_stater: "LoggerState") -> None: + pass + @override async def publish( # type: ignore[override] self,