From 5b0f54d6d32a0e3b5b299d7d0580d27cea677e08 Mon Sep 17 00:00:00 2001 From: Arseniy Popov Date: Fri, 3 Jan 2025 12:35:51 +0300 Subject: [PATCH] Add logging to SubscriberUsecase --- faststream/broker/subscriber/usecase.py | 17 +++++++++++++++++ faststream/kafka/subscriber/usecase.py | 10 +++++----- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/faststream/broker/subscriber/usecase.py b/faststream/broker/subscriber/usecase.py index 9641a99513..a82ccee15e 100644 --- a/faststream/broker/subscriber/usecase.py +++ b/faststream/broker/subscriber/usecase.py @@ -87,6 +87,7 @@ class SubscriberUsecase( extra_watcher_options: "AnyDict" extra_context: "AnyDict" graceful_timeout: Optional[float] + logger: Optional["LoggerProto"] _broker_dependencies: Iterable["Depends"] _call_options: Optional["_CallOptions"] @@ -162,6 +163,7 @@ def setup( # type: ignore[override] self._producer = producer self.graceful_timeout = graceful_timeout self.extra_context = extra_context + self.logger = logger self.watcher = get_watcher_context(logger, self._no_ack, self._retry) @@ -470,3 +472,18 @@ def get_payloads(self) -> List[Tuple["AnyDict", str]]: ) return payloads + + def _log( + self, + message: str, + log_level: int, + extra: Optional["AnyDict"] = None, + exc_info: Optional[Exception] = None, + ) -> None: + if self.logger is not None: + self.logger.log( + log_level, + message, + extra=extra, + exc_info=exc_info, + ) diff --git a/faststream/kafka/subscriber/usecase.py b/faststream/kafka/subscriber/usecase.py index 376455815d..2e95d5babc 100644 --- a/faststream/kafka/subscriber/usecase.py +++ b/faststream/kafka/subscriber/usecase.py @@ -1,3 +1,4 @@ +import logging from abc import ABC, abstractmethod from itertools import chain from typing import ( @@ -30,7 +31,6 @@ from faststream.broker.utils import process_msg from faststream.kafka.message import KafkaAckableMessage, KafkaMessage, KafkaRawMessage from faststream.kafka.parser import AioKafkaBatchParser, AioKafkaParser -from faststream.log import logger from faststream.utils.path import compile_path if TYPE_CHECKING: @@ -596,10 +596,10 @@ async def start(self) -> None: tg.start_soon(consumer.start) for consumer in self.consumer_subgroup: - logger.info( - "Consumer %s assigned to partitions: %s", - consumer._coordinator.member_id, - consumer.assignment(), + self._log( + f"Consumer {consumer._coordinator.member_id} assigned to partitions: " + f"{consumer.assignment()}", + logging.INFO, ) self.running = True