Skip to content

Commit

Permalink
Add logging to SubscriberUsecase
Browse files Browse the repository at this point in the history
  • Loading branch information
Arseniy-Popov committed Jan 3, 2025
1 parent 8ea3227 commit 5b0f54d
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 5 deletions.
17 changes: 17 additions & 0 deletions faststream/broker/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class SubscriberUsecase(
extra_watcher_options: "AnyDict"
extra_context: "AnyDict"
graceful_timeout: Optional[float]
logger: Optional["LoggerProto"]

_broker_dependencies: Iterable["Depends"]
_call_options: Optional["_CallOptions"]
Expand Down Expand Up @@ -162,6 +163,7 @@ def setup( # type: ignore[override]
self._producer = producer
self.graceful_timeout = graceful_timeout
self.extra_context = extra_context
self.logger = logger

self.watcher = get_watcher_context(logger, self._no_ack, self._retry)

Expand Down Expand Up @@ -470,3 +472,18 @@ def get_payloads(self) -> List[Tuple["AnyDict", str]]:
)

return payloads

def _log(
self,
message: str,
log_level: int,
extra: Optional["AnyDict"] = None,
exc_info: Optional[Exception] = None,
) -> None:
if self.logger is not None:
self.logger.log(
log_level,
message,
extra=extra,
exc_info=exc_info,
)
10 changes: 5 additions & 5 deletions faststream/kafka/subscriber/usecase.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from abc import ABC, abstractmethod
from itertools import chain
from typing import (
Expand Down Expand Up @@ -30,7 +31,6 @@
from faststream.broker.utils import process_msg
from faststream.kafka.message import KafkaAckableMessage, KafkaMessage, KafkaRawMessage
from faststream.kafka.parser import AioKafkaBatchParser, AioKafkaParser
from faststream.log import logger
from faststream.utils.path import compile_path

if TYPE_CHECKING:
Expand Down Expand Up @@ -596,10 +596,10 @@ async def start(self) -> None:
tg.start_soon(consumer.start)

for consumer in self.consumer_subgroup:
logger.info(
"Consumer %s assigned to partitions: %s",
consumer._coordinator.member_id,
consumer.assignment(),
self._log(
f"Consumer {consumer._coordinator.member_id} assigned to partitions: "
f"{consumer.assignment()}",
logging.INFO,
)

self.running = True
Expand Down

0 comments on commit 5b0f54d

Please sign in to comment.