Skip to content

Commit

Permalink
confluent: make laze logging
Browse files Browse the repository at this point in the history
  • Loading branch information
Lancetnik committed Sep 20, 2024
1 parent 83d850c commit bfca156
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 18 deletions.
11 changes: 8 additions & 3 deletions faststream/confluent/broker/broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
LoggerProto,
SendableMessage,
)
from faststream._internal.setup.state import BaseState
from faststream._internal.types import (
BrokerMiddleware,
CustomCallable,
Expand Down Expand Up @@ -445,7 +446,7 @@ async def _connect( # type: ignore[override]
native_producer = AsyncConfluentProducer(
**kwargs,
client_id=client_id,
logger=self._state.logger_state.logger.logger,
logger=self._state.logger_state,
config=self.config,
)

Expand All @@ -458,15 +459,19 @@ async def _connect( # type: ignore[override]
return partial(
AsyncConfluentConsumer,
**filter_by_dict(ConsumerConnectionParams, kwargs),
logger=self._state.logger_state.logger.logger,
logger=self._state.logger_state,
config=self.config,
)

async def start(self) -> None:
self._setup()
await self.connect()
self._setup()
await super().start()

def _setup(self, state: Optional["BaseState"] = None) -> None:
super()._setup(state)
self._producer._setup(self._state.logger_state)

@property
def _subscriber_setup_extra(self) -> "AnyDict":
return {
Expand Down
42 changes: 27 additions & 15 deletions faststream/confluent/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from typing_extensions import NotRequired, TypedDict

from faststream._internal.basic_types import AnyDict, LoggerProto
from faststream._internal.setup.logger import LoggerState

class _SendKwargs(TypedDict):
value: Optional[Union[str, bytes]]
Expand Down Expand Up @@ -68,8 +69,6 @@ def __init__(
sasl_plain_password: Optional[str] = None,
sasl_plain_username: Optional[str] = None,
) -> None:
self.logger = logger

if isinstance(bootstrap_servers, Iterable) and not isinstance(
bootstrap_servers, str
):
Expand Down Expand Up @@ -112,11 +111,17 @@ def __init__(
}
)

self.producer = Producer(final_config, logger=self.logger)
self.config = final_config

self.__running = True
self._poll_task = asyncio.create_task(self._poll_loop())

def _setup(self, logger_state: "LoggerState") -> None:
self.producer = Producer(
self.config,
logger=logger_state.logger.logger,
)

async def _poll_loop(self) -> None:
while self.__running:
with suppress(Exception):
Expand Down Expand Up @@ -223,7 +228,7 @@ def __init__(
self,
*topics: str,
partitions: Sequence["TopicPartition"],
logger: Optional["LoggerProto"],
logger: "LoggerState",
config: config_module.ConfluentFastConfig,
bootstrap_servers: Union[str, List[str]] = "localhost",
client_id: Optional[str] = "confluent-kafka-consumer",
Expand Down Expand Up @@ -251,7 +256,7 @@ def __init__(
sasl_plain_password: Optional[str] = None,
sasl_plain_username: Optional[str] = None,
) -> None:
self.logger = logger
self.logger_state = logger

if isinstance(bootstrap_servers, Iterable) and not isinstance(
bootstrap_servers, str
Expand Down Expand Up @@ -312,23 +317,30 @@ def __init__(
)

self.config = final_config
self.consumer = Consumer(final_config, logger=self.logger)

@property
def topics_to_create(self) -> List[str]:
return list({*self.topics, *(p.topic for p in self.partitions)})

async def start(self) -> None:
"""Starts the Kafka consumer and subscribes to the specified topics."""
self.consumer = Consumer(
self.config,
logger=self.logger_state.logger.logger,
)

if self.allow_auto_create_topics:
await call_or_await(
create_topics, self.topics_to_create, self.config, self.logger
create_topics,
self.topics_to_create,
self.config,
self.logger_state.logger.logger,
)

elif self.logger:
self.logger.log(
logging.WARNING,
"Auto create topics is disabled. Make sure the topics exist.",
else:
self.logger_state.log(
log_level=logging.WARNING,
message="Auto create topics is disabled. Make sure the topics exist.",
)

if self.topics:
Expand Down Expand Up @@ -359,10 +371,10 @@ async def stop(self) -> None:
# No offset stored issue is not a problem - https://github.com/confluentinc/confluent-kafka-python/issues/295#issuecomment-355907183
if "No offset stored" in str(e):
pass
elif self.logger:
self.logger.log(
logging.ERROR,
"Consumer closing error occurred.",
else:
self.logger_state.log(
log_level=logging.ERROR,
message="Consumer closing error occurred.",
exc_info=e,
)

Expand Down
4 changes: 4 additions & 0 deletions faststream/confluent/publisher/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

if TYPE_CHECKING:
from faststream._internal.basic_types import SendableMessage
from faststream._internal.setup.logger import LoggerState
from faststream._internal.types import CustomCallable
from faststream.confluent.client import AsyncConfluentProducer

Expand All @@ -30,6 +31,9 @@ def __init__(
self._parser = resolve_custom_func(parser, default.parse_message)
self._decoder = resolve_custom_func(decoder, default.decode_message)

def _setup(self, logger_state: "LoggerState") -> None:
self._producer._setup(logger_state)

@override
async def publish( # type: ignore[override]
self,
Expand Down
4 changes: 4 additions & 0 deletions faststream/confluent/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

if TYPE_CHECKING:
from faststream._internal.basic_types import SendableMessage
from faststream._internal.setup.logger import LoggerState
from faststream.confluent.publisher.publisher import SpecificationPublisher
from faststream.confluent.subscriber.usecase import LogicSubscriber

Expand Down Expand Up @@ -88,6 +89,9 @@ def __init__(self, broker: KafkaBroker) -> None:
self._parser = resolve_custom_func(broker._parser, default.parse_message)
self._decoder = resolve_custom_func(broker._decoder, default.decode_message)

def _setup(self, logger_stater: "LoggerState") -> None:
pass

@override
async def publish( # type: ignore[override]
self,
Expand Down

0 comments on commit bfca156

Please sign in to comment.