diff --git a/docs/docs/SUMMARY.md b/docs/docs/SUMMARY.md index a8a2947235..b4b2e7648d 100644 --- a/docs/docs/SUMMARY.md +++ b/docs/docs/SUMMARY.md @@ -630,6 +630,7 @@ search: - [FakeConsumer](api/faststream/kafka/message/FakeConsumer.md) - [KafkaAckableMessage](api/faststream/kafka/message/KafkaAckableMessage.md) - [KafkaMessage](api/faststream/kafka/message/KafkaMessage.md) + - [KafkaRawMessage](api/faststream/kafka/message/KafkaRawMessage.md) - opentelemetry - [KafkaTelemetryMiddleware](api/faststream/kafka/opentelemetry/KafkaTelemetryMiddleware.md) - middleware @@ -676,6 +677,7 @@ search: - subscriber - asyncapi - [AsyncAPIBatchSubscriber](api/faststream/kafka/subscriber/asyncapi/AsyncAPIBatchSubscriber.md) + - [AsyncAPIConcurrentBetweenPartitionsSubscriber](api/faststream/kafka/subscriber/asyncapi/AsyncAPIConcurrentBetweenPartitionsSubscriber.md) - [AsyncAPIConcurrentDefaultSubscriber](api/faststream/kafka/subscriber/asyncapi/AsyncAPIConcurrentDefaultSubscriber.md) - [AsyncAPIDefaultSubscriber](api/faststream/kafka/subscriber/asyncapi/AsyncAPIDefaultSubscriber.md) - [AsyncAPISubscriber](api/faststream/kafka/subscriber/asyncapi/AsyncAPISubscriber.md) @@ -683,6 +685,7 @@ search: - [create_subscriber](api/faststream/kafka/subscriber/factory/create_subscriber.md) - usecase - [BatchSubscriber](api/faststream/kafka/subscriber/usecase/BatchSubscriber.md) + - [ConcurrentBetweenPartitionsSubscriber](api/faststream/kafka/subscriber/usecase/ConcurrentBetweenPartitionsSubscriber.md) - [ConcurrentDefaultSubscriber](api/faststream/kafka/subscriber/usecase/ConcurrentDefaultSubscriber.md) - [DefaultSubscriber](api/faststream/kafka/subscriber/usecase/DefaultSubscriber.md) - [LogicSubscriber](api/faststream/kafka/subscriber/usecase/LogicSubscriber.md) diff --git a/docs/docs/en/api/faststream/kafka/message/KafkaRawMessage.md b/docs/docs/en/api/faststream/kafka/message/KafkaRawMessage.md new file mode 100644 index 0000000000..0b64947444 --- /dev/null +++ b/docs/docs/en/api/faststream/kafka/message/KafkaRawMessage.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.kafka.message.KafkaRawMessage diff --git a/docs/docs/en/api/faststream/kafka/subscriber/asyncapi/AsyncAPIConcurrentBetweenPartitionsSubscriber.md b/docs/docs/en/api/faststream/kafka/subscriber/asyncapi/AsyncAPIConcurrentBetweenPartitionsSubscriber.md new file mode 100644 index 0000000000..b64722f460 --- /dev/null +++ b/docs/docs/en/api/faststream/kafka/subscriber/asyncapi/AsyncAPIConcurrentBetweenPartitionsSubscriber.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.kafka.subscriber.asyncapi.AsyncAPIConcurrentBetweenPartitionsSubscriber diff --git a/docs/docs/en/api/faststream/kafka/subscriber/usecase/ConcurrentBetweenPartitionsSubscriber.md b/docs/docs/en/api/faststream/kafka/subscriber/usecase/ConcurrentBetweenPartitionsSubscriber.md new file mode 100644 index 0000000000..bfde5b4cc5 --- /dev/null +++ b/docs/docs/en/api/faststream/kafka/subscriber/usecase/ConcurrentBetweenPartitionsSubscriber.md @@ -0,0 +1,11 @@ +--- +# 0.5 - API +# 2 - Release +# 3 - Contributing +# 5 - Template Page +# 10 - Default +search: + boost: 0.5 +--- + +::: faststream.kafka.subscriber.usecase.ConcurrentBetweenPartitionsSubscriber diff --git a/docs/docs/en/kafka/Subscriber/index.md b/docs/docs/en/kafka/Subscriber/index.md index 831acc814b..8e0f4c511a 100644 --- a/docs/docs/en/kafka/Subscriber/index.md +++ b/docs/docs/en/kafka/Subscriber/index.md @@ -72,3 +72,10 @@ async def base_handler( ): ... ``` + + +## Concurrent processing + +There are two possible modes of concurrent message processing: +- With `auto_commit=False` and `max_workers` > 1, a handler processes all messages concurrently in a at-most-once semantic. +- With `auto_commit=True` and `max_workers` > 1, processing is concurrent between topic partitions and sequential within a partition to ensure reliable at-least-once processing. Maximum concurrency is achieved when total number of workers across all application instances running workers in the same consumer group is equal to the number of partitions in the topic. Increasing worker count beyond that will result in idle workers as not more than one consumer from a consumer group can be consuming from the same partition. diff --git a/faststream/broker/subscriber/usecase.py b/faststream/broker/subscriber/usecase.py index 9641a99513..a82ccee15e 100644 --- a/faststream/broker/subscriber/usecase.py +++ b/faststream/broker/subscriber/usecase.py @@ -87,6 +87,7 @@ class SubscriberUsecase( extra_watcher_options: "AnyDict" extra_context: "AnyDict" graceful_timeout: Optional[float] + logger: Optional["LoggerProto"] _broker_dependencies: Iterable["Depends"] _call_options: Optional["_CallOptions"] @@ -162,6 +163,7 @@ def setup( # type: ignore[override] self._producer = producer self.graceful_timeout = graceful_timeout self.extra_context = extra_context + self.logger = logger self.watcher = get_watcher_context(logger, self._no_ack, self._retry) @@ -470,3 +472,18 @@ def get_payloads(self) -> List[Tuple["AnyDict", str]]: ) return payloads + + def _log( + self, + message: str, + log_level: int, + extra: Optional["AnyDict"] = None, + exc_info: Optional[Exception] = None, + ) -> None: + if self.logger is not None: + self.logger.log( + log_level, + message, + extra=extra, + exc_info=exc_info, + ) diff --git a/faststream/kafka/broker/registrator.py b/faststream/kafka/broker/registrator.py index 45ef5b5a64..4a223db437 100644 --- a/faststream/kafka/broker/registrator.py +++ b/faststream/kafka/broker/registrator.py @@ -2,6 +2,7 @@ TYPE_CHECKING, Any, Callable, + Collection, Dict, Iterable, Literal, @@ -41,6 +42,7 @@ ) from faststream.kafka.subscriber.asyncapi import ( AsyncAPIBatchSubscriber, + AsyncAPIConcurrentBetweenPartitionsSubscriber, AsyncAPIConcurrentDefaultSubscriber, AsyncAPIDefaultSubscriber, ) @@ -62,6 +64,7 @@ class KafkaRegistrator( "AsyncAPIBatchSubscriber", "AsyncAPIDefaultSubscriber", "AsyncAPIConcurrentDefaultSubscriber", + "AsyncAPIConcurrentBetweenPartitionsSubscriber", ], ] _publishers: Dict[ @@ -382,7 +385,7 @@ def subscriber( ), ] = None, partitions: Annotated[ - Iterable["TopicPartition"], + Collection["TopicPartition"], Doc( """ An explicit partitions list to assign. @@ -763,7 +766,7 @@ def subscriber( ), ] = None, partitions: Annotated[ - Iterable["TopicPartition"], + Collection["TopicPartition"], Doc( """ An explicit partitions list to assign. @@ -1144,7 +1147,7 @@ def subscriber( ), ] = None, partitions: Annotated[ - Iterable["TopicPartition"], + Collection["TopicPartition"], Doc( """ An explicit partitions list to assign. @@ -1528,7 +1531,7 @@ def subscriber( ), ] = None, partitions: Annotated[ - Iterable["TopicPartition"], + Collection["TopicPartition"], Doc( """ An explicit partitions list to assign. @@ -1555,7 +1558,14 @@ def subscriber( ] = (), max_workers: Annotated[ int, - Doc("Number of workers to process messages concurrently."), + Doc( + "Maximum number of messages being processed concurrently. With " + "`auto_commit=False` processing is concurrent between partitions and " + "sequential within a partition. With `auto_commit=False` maximum " + "concurrency is achieved when total number of workers across all " + "application instances running workers in the same consumer group " + "is equal to the number of partitions in the topic." + ), ] = 1, filter: Annotated[ "Filter[KafkaMessage]", @@ -1602,6 +1612,7 @@ def subscriber( "AsyncAPIDefaultSubscriber", "AsyncAPIBatchSubscriber", "AsyncAPIConcurrentDefaultSubscriber", + "AsyncAPIConcurrentBetweenPartitionsSubscriber", ]: subscriber = super().subscriber( create_subscriber( @@ -1660,13 +1671,26 @@ def subscriber( else: if max_workers > 1: - return cast("AsyncAPIConcurrentDefaultSubscriber", subscriber).add_call( - filter_=filter, - parser_=parser or self._parser, - decoder_=decoder or self._decoder, - dependencies_=dependencies, - middlewares_=middlewares, - ) + if not auto_commit: + return cast( + "AsyncAPIConcurrentBetweenPartitionsSubscriber", subscriber + ).add_call( + filter_=filter, + parser_=parser or self._parser, + decoder_=decoder or self._decoder, + dependencies_=dependencies, + middlewares_=middlewares, + ) + else: + return cast( + "AsyncAPIConcurrentDefaultSubscriber", subscriber + ).add_call( + filter_=filter, + parser_=parser or self._parser, + decoder_=decoder or self._decoder, + dependencies_=dependencies, + middlewares_=middlewares, + ) else: return cast("AsyncAPIDefaultSubscriber", subscriber).add_call( filter_=filter, diff --git a/faststream/kafka/fastapi/fastapi.py b/faststream/kafka/fastapi/fastapi.py index 0ce8e3ac3a..7ea264cbfe 100644 --- a/faststream/kafka/fastapi/fastapi.py +++ b/faststream/kafka/fastapi/fastapi.py @@ -60,6 +60,7 @@ ) from faststream.kafka.subscriber.asyncapi import ( AsyncAPIBatchSubscriber, + AsyncAPIConcurrentBetweenPartitionsSubscriber, AsyncAPIConcurrentDefaultSubscriber, AsyncAPIDefaultSubscriber, ) @@ -2621,12 +2622,20 @@ def subscriber( ] = False, max_workers: Annotated[ int, - Doc("Number of workers to process messages concurrently."), + Doc( + "Maximum number of messages being processed concurrently. With " + "`auto_commit=False` processing is concurrent between partitions and " + "sequential within a partition. With `auto_commit=False` maximum " + "concurrency is achieved when total number of workers across all " + "application instances running workers in the same consumer group " + "is equal to the number of partitions in the topic." + ), ] = 1, ) -> Union[ "AsyncAPIBatchSubscriber", "AsyncAPIDefaultSubscriber", "AsyncAPIConcurrentDefaultSubscriber", + "AsyncAPIConcurrentBetweenPartitionsSubscriber", ]: subscriber = super().subscriber( *topics, @@ -2683,7 +2692,12 @@ def subscriber( return cast("AsyncAPIBatchSubscriber", subscriber) else: if max_workers > 1: - return cast("AsyncAPIConcurrentDefaultSubscriber", subscriber) + if not auto_commit: + return cast( + "AsyncAPIConcurrentBetweenPartitionsSubscriber", subscriber + ) + else: + return cast("AsyncAPIConcurrentDefaultSubscriber", subscriber) else: return cast("AsyncAPIDefaultSubscriber", subscriber) diff --git a/faststream/kafka/message.py b/faststream/kafka/message.py index bde7669787..d5ee8c1ca7 100644 --- a/faststream/kafka/message.py +++ b/faststream/kafka/message.py @@ -1,12 +1,11 @@ -from typing import TYPE_CHECKING, Any, Protocol, Tuple, Union +from dataclasses import dataclass +from typing import Any, Protocol, Tuple, Union +from aiokafka import AIOKafkaConsumer, ConsumerRecord from aiokafka import TopicPartition as AIOKafkaTopicPartition from faststream.broker.message import StreamMessage -if TYPE_CHECKING: - from aiokafka import ConsumerRecord - class ConsumerProtocol(Protocol): """A protocol for Kafka consumers.""" @@ -38,6 +37,11 @@ def seek( FAKE_CONSUMER = FakeConsumer() +@dataclass +class KafkaRawMessage(ConsumerRecord): # type: ignore[misc] + consumer: AIOKafkaConsumer + + class KafkaMessage( StreamMessage[ Union[ diff --git a/faststream/kafka/parser.py b/faststream/kafka/parser.py index bea29db170..3745246ddb 100644 --- a/faststream/kafka/parser.py +++ b/faststream/kafka/parser.py @@ -1,7 +1,7 @@ -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type, cast +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type, Union, cast from faststream.broker.message import decode_message, gen_cor_id -from faststream.kafka.message import FAKE_CONSUMER, KafkaMessage +from faststream.kafka.message import FAKE_CONSUMER, KafkaMessage, KafkaRawMessage from faststream.utils.context.repository import context if TYPE_CHECKING: @@ -27,7 +27,7 @@ def __init__( async def parse_message( self, - message: "ConsumerRecord", + message: Union["ConsumerRecord", "KafkaRawMessage"], ) -> "StreamMessage[ConsumerRecord]": """Parses a Kafka message.""" headers = {i: j.decode() for i, j in message.headers} @@ -42,7 +42,9 @@ async def parse_message( correlation_id=headers.get("correlation_id", gen_cor_id()), raw_message=message, path=self.get_path(message.topic), - consumer=getattr(handler, "consumer", None) or FAKE_CONSUMER, + consumer=getattr(message, "consumer", None) + or getattr(handler, "consumer", None) + or FAKE_CONSUMER, ) async def decode_message( diff --git a/faststream/kafka/subscriber/asyncapi.py b/faststream/kafka/subscriber/asyncapi.py index 1c3ad53ce7..c3444ec91c 100644 --- a/faststream/kafka/subscriber/asyncapi.py +++ b/faststream/kafka/subscriber/asyncapi.py @@ -17,6 +17,7 @@ from faststream.broker.types import MsgType from faststream.kafka.subscriber.usecase import ( BatchSubscriber, + ConcurrentBetweenPartitionsSubscriber, ConcurrentDefaultSubscriber, DefaultSubscriber, LogicSubscriber, @@ -80,3 +81,10 @@ class AsyncAPIConcurrentDefaultSubscriber( ConcurrentDefaultSubscriber, ): pass + + +class AsyncAPIConcurrentBetweenPartitionsSubscriber( + ConcurrentBetweenPartitionsSubscriber, + AsyncAPISubscriber["ConsumerRecord"], +): + pass diff --git a/faststream/kafka/subscriber/factory.py b/faststream/kafka/subscriber/factory.py index cdc2b35a7d..403016f34a 100644 --- a/faststream/kafka/subscriber/factory.py +++ b/faststream/kafka/subscriber/factory.py @@ -1,5 +1,6 @@ from typing import ( TYPE_CHECKING, + Collection, Iterable, Literal, Optional, @@ -12,6 +13,7 @@ from faststream.exceptions import SetupError from faststream.kafka.subscriber.asyncapi import ( AsyncAPIBatchSubscriber, + AsyncAPIConcurrentBetweenPartitionsSubscriber, AsyncAPIConcurrentDefaultSubscriber, AsyncAPIDefaultSubscriber, ) @@ -36,7 +38,7 @@ def create_subscriber( listener: Optional["ConsumerRebalanceListener"], pattern: Optional[str], connection_args: "AnyDict", - partitions: Iterable["TopicPartition"], + partitions: Collection["TopicPartition"], is_manual: bool, # Subscriber args max_workers: int, @@ -63,7 +65,7 @@ def create_subscriber( listener: Optional["ConsumerRebalanceListener"], pattern: Optional[str], connection_args: "AnyDict", - partitions: Iterable["TopicPartition"], + partitions: Collection["TopicPartition"], is_manual: bool, # Subscriber args max_workers: int, @@ -93,7 +95,7 @@ def create_subscriber( listener: Optional["ConsumerRebalanceListener"], pattern: Optional[str], connection_args: "AnyDict", - partitions: Iterable["TopicPartition"], + partitions: Collection["TopicPartition"], is_manual: bool, # Subscriber args max_workers: int, @@ -125,7 +127,7 @@ def create_subscriber( listener: Optional["ConsumerRebalanceListener"], pattern: Optional[str], connection_args: "AnyDict", - partitions: Iterable["TopicPartition"], + partitions: Collection["TopicPartition"], is_manual: bool, # Subscriber args max_workers: int, @@ -144,12 +146,24 @@ def create_subscriber( "AsyncAPIDefaultSubscriber", "AsyncAPIBatchSubscriber", "AsyncAPIConcurrentDefaultSubscriber", + "AsyncAPIConcurrentBetweenPartitionsSubscriber", ]: if is_manual and not group_id: raise SetupError("You must use `group_id` with manual commit mode.") if is_manual and max_workers > 1: - raise SetupError("Max workers not work with manual commit mode.") + if len(topics) > 1: + raise SetupError( + "You must use a single topic with concurrent manual commit mode." + ) + if pattern is not None: + raise SetupError( + "You can not use a pattern with concurrent manual commit mode." + ) + if partitions: + raise SetupError( + "Manual partition assignment is not supported with concurrent manual commit mode." + ) if not topics and not partitions and not pattern: raise SetupError( @@ -185,24 +199,44 @@ def create_subscriber( else: if max_workers > 1: - return AsyncAPIConcurrentDefaultSubscriber( - *topics, - max_workers=max_workers, - group_id=group_id, - listener=listener, - pattern=pattern, - connection_args=connection_args, - partitions=partitions, - is_manual=is_manual, - no_ack=no_ack, - no_reply=no_reply, - retry=retry, - broker_dependencies=broker_dependencies, - broker_middlewares=broker_middlewares, - title_=title_, - description_=description_, - include_in_schema=include_in_schema, - ) + if is_manual: + return AsyncAPIConcurrentBetweenPartitionsSubscriber( + *topics, + max_workers=max_workers, + group_id=group_id, + listener=listener, + pattern=pattern, + connection_args=connection_args, + partitions=partitions, + is_manual=is_manual, + no_ack=no_ack, + no_reply=no_reply, + retry=retry, + broker_dependencies=broker_dependencies, + broker_middlewares=broker_middlewares, + title_=title_, + description_=description_, + include_in_schema=include_in_schema, + ) + else: + return AsyncAPIConcurrentDefaultSubscriber( + *topics, + max_workers=max_workers, + group_id=group_id, + listener=listener, + pattern=pattern, + connection_args=connection_args, + partitions=partitions, + is_manual=is_manual, + no_ack=no_ack, + no_reply=no_reply, + retry=retry, + broker_dependencies=broker_dependencies, + broker_middlewares=broker_middlewares, + title_=title_, + description_=description_, + include_in_schema=include_in_schema, + ) else: return AsyncAPIDefaultSubscriber( *topics, diff --git a/faststream/kafka/subscriber/usecase.py b/faststream/kafka/subscriber/usecase.py index 9b682f21ae..0a2788db19 100644 --- a/faststream/kafka/subscriber/usecase.py +++ b/faststream/kafka/subscriber/usecase.py @@ -1,3 +1,4 @@ +import logging from abc import ABC, abstractmethod from itertools import chain from typing import ( @@ -10,6 +11,7 @@ Optional, Sequence, Tuple, + cast, ) import anyio @@ -27,7 +29,7 @@ MsgType, ) from faststream.broker.utils import process_msg -from faststream.kafka.message import KafkaAckableMessage, KafkaMessage +from faststream.kafka.message import KafkaAckableMessage, KafkaMessage, KafkaRawMessage from faststream.kafka.parser import AioKafkaBatchParser, AioKafkaParser from faststream.utils.path import compile_path @@ -95,8 +97,8 @@ def __init__( self.group_id = group_id self._pattern = pattern - self.__listener = listener - self.__connection_args = connection_args + self._listener = listener + self._connection_args = connection_args # Setup it later self.client_id = "" @@ -147,14 +149,14 @@ async def start(self) -> None: self.consumer = consumer = self.builder( group_id=self.group_id, client_id=self.client_id, - **self.__connection_args, + **self._connection_args, ) if self.topics or self._pattern: consumer.subscribe( topics=self.topics, pattern=self._pattern, - listener=self.__listener, + listener=self._listener, ) elif self.partitions: @@ -164,7 +166,7 @@ async def start(self) -> None: await super().start() if self.calls: - self.add_task(self._consume()) + self.add_task(self._run_consume_loop(self.consumer)) async def close(self) -> None: await super().close() @@ -217,16 +219,16 @@ def _make_response_publisher( ) @abstractmethod - async def get_msg(self) -> MsgType: + async def get_msg(self, consumer: "AIOKafkaConsumer") -> MsgType: raise NotImplementedError() - async def _consume(self) -> None: - assert self.consumer, "You should start subscriber at first." # nosec B101 + async def _run_consume_loop(self, consumer: "AIOKafkaConsumer") -> None: + assert consumer, "You should start subscriber at first." # nosec B101 connected = True while self.running: try: - msg = await self.get_msg() + msg = await self.get_msg(consumer) # pragma: no cover except KafkaError: # noqa: PERF203 @@ -352,9 +354,9 @@ def __init__( include_in_schema=include_in_schema, ) - async def get_msg(self) -> "ConsumerRecord": - assert self.consumer, "You should setup subscriber at first." # nosec B101 - return await self.consumer.getone() + async def get_msg(self, consumer: "AIOKafkaConsumer") -> "ConsumerRecord": + assert consumer, "You should setup subscriber at first." # nosec B101 + return await consumer.getone() def get_log_context( self, @@ -438,10 +440,12 @@ def __init__( include_in_schema=include_in_schema, ) - async def get_msg(self) -> Tuple["ConsumerRecord", ...]: - assert self.consumer, "You should setup subscriber at first." # nosec B101 + async def get_msg( + self, consumer: "AIOKafkaConsumer" + ) -> Tuple["ConsumerRecord", ...]: + assert consumer, "You should setup subscriber at first." # nosec B101 - messages = await self.consumer.getmany( + messages = await consumer.getmany( timeout_ms=self.batch_timeout_ms, max_records=self.max_records, ) @@ -518,3 +522,104 @@ async def start(self) -> None: async def consume_one(self, msg: "ConsumerRecord") -> None: await self._put_msg(msg) + + +class ConcurrentBetweenPartitionsSubscriber(DefaultSubscriber): + consumer_subgroup: Iterable["AIOKafkaConsumer"] + + def __init__( + self, + *topics: str, + # Kafka information + group_id: Optional[str], + listener: Optional["ConsumerRebalanceListener"], + pattern: Optional[str], + connection_args: "AnyDict", + partitions: Iterable["TopicPartition"], + is_manual: bool, + # Subscriber args + max_workers: int, + no_ack: bool, + no_reply: bool, + retry: bool, + broker_dependencies: Iterable["Depends"], + broker_middlewares: Sequence["BrokerMiddleware[ConsumerRecord]"], + # AsyncAPI args + title_: Optional[str], + description_: Optional[str], + include_in_schema: bool, + ) -> None: + super().__init__( + *topics, + group_id=group_id, + listener=listener, + pattern=pattern, + connection_args=connection_args, + partitions=partitions, + is_manual=is_manual, + # Propagated args + no_ack=no_ack, + no_reply=no_reply, + retry=retry, + broker_middlewares=broker_middlewares, + broker_dependencies=broker_dependencies, + # AsyncAPI args + title_=title_, + description_=description_, + include_in_schema=include_in_schema, + ) + self.max_workers = max_workers + + async def start(self) -> None: + """Start the consumer subgroup.""" + assert self.builder, "You should setup subscriber at first." # nosec B101 + + self.consumer_subgroup = [ + self.builder( + group_id=self.group_id, + client_id=self.client_id, + **self._connection_args, + ) + for _ in range(self.max_workers) + ] + + [ + consumer.subscribe( + topics=self.topics, + listener=self._listener, + ) + for consumer in self.consumer_subgroup + ] + + async with anyio.create_task_group() as tg: + for consumer in self.consumer_subgroup: + tg.start_soon(consumer.start) + + for consumer in self.consumer_subgroup: + self._log( + f"Consumer {consumer._coordinator.member_id} assigned to partitions: " + f"{consumer.assignment()}", + logging.INFO, + ) + + self.running = True + + if self.calls: + for consumer in self.consumer_subgroup: + self.add_task(self._run_consume_loop(consumer)) + + async def close(self) -> None: + if self.consumer_subgroup: + async with anyio.create_task_group() as tg: + for consumer in self.consumer_subgroup: + tg.start_soon(consumer.stop) + + self.consumer_subgroup = [] + + await super().close() + + async def get_msg(self, consumer: "AIOKafkaConsumer") -> "KafkaRawMessage": + assert consumer, "You should setup subscriber at first." # nosec B101 + message = await consumer.getone() + message.consumer = consumer + return cast(KafkaRawMessage, message) diff --git a/tests/brokers/base/testclient.py b/tests/brokers/base/testclient.py index 4cc45da756..c6b1d89814 100644 --- a/tests/brokers/base/testclient.py +++ b/tests/brokers/base/testclient.py @@ -158,8 +158,6 @@ async def test_broker_with_real_patches_publishers_and_subscribers( async def m(msg): await publisher.publish(f"response: {msg}") - await test_broker.start() - async with self.test_class(test_broker, with_real=True) as br: await br.publish("hello", queue) diff --git a/tests/brokers/kafka/test_consume.py b/tests/brokers/kafka/test_consume.py index ed7fc52f7c..01c9ac1516 100644 --- a/tests/brokers/kafka/test_consume.py +++ b/tests/brokers/kafka/test_consume.py @@ -3,6 +3,7 @@ import pytest from aiokafka import AIOKafkaConsumer +from aiokafka.admin import AIOKafkaAdminClient, NewTopic from faststream.exceptions import AckMessage from faststream.kafka import KafkaBroker, TopicPartition @@ -352,6 +353,110 @@ async def handler(msg): assert event2.is_set() assert mock.call_count == 2, mock.call_count + @pytest.mark.asyncio + @pytest.mark.slow + async def test_concurrent_consume_between_partitions( + self, + queue: str, + ): + inputs = set() + + admin_client = AIOKafkaAdminClient() + try: + await admin_client.start() + await admin_client.create_topics([NewTopic(queue, 2, 1)]) + finally: + await admin_client.close() + + consume_broker = self.get_broker() + + @consume_broker.subscriber( + queue, + max_workers=3, + auto_commit=False, + group_id="service_1", + ) + async def handler(msg: str): + nonlocal inputs + inputs.add(msg) + await asyncio.sleep(2) + + async with self.patch_broker(consume_broker) as broker: + await broker.start() + + await asyncio.wait( + ( + asyncio.create_task(broker.publish("hello1", queue, partition=0)), + asyncio.create_task(broker.publish("hello3", queue, partition=0)), + asyncio.create_task(broker.publish("hello2", queue, partition=1)), + asyncio.create_task(broker.publish("hello4", queue, partition=1)), + asyncio.create_task(broker.publish("hello5", queue, partition=0)), + asyncio.create_task(asyncio.sleep(1)), + ), + timeout=1, + ) + + assert inputs == {"hello1", "hello2"} + await asyncio.sleep(2) + assert inputs == {"hello1", "hello2", "hello3", "hello4"} + await asyncio.sleep(2) + assert inputs == {"hello1", "hello2", "hello3", "hello4", "hello5"} + + await broker.close() + + @pytest.mark.asyncio + @pytest.mark.slow + @pytest.mark.parametrize("with_explicit_commit", [True, False]) + async def test_concurrent_consume_between_partitions_commit( + self, + queue: str, + with_explicit_commit: bool, + ): + admin_client = AIOKafkaAdminClient() + try: + await admin_client.start() + await admin_client.create_topics([NewTopic(queue, 2, 1)]) + finally: + await admin_client.close() + + consume_broker = self.get_broker(apply_types=True) + + @consume_broker.subscriber( + queue, + max_workers=3, + auto_commit=False, + group_id="service_1", + ) + async def handler(msg: KafkaMessage): + await asyncio.sleep(1) + if with_explicit_commit: + await msg.ack() + + async with self.patch_broker(consume_broker) as broker: + await broker.start() + + with patch.object( + AIOKafkaConsumer, "commit", spy_decorator(AIOKafkaConsumer.commit) + ) as mock: + await asyncio.wait( + ( + asyncio.create_task( + broker.publish("hello1", queue, partition=0) + ), + asyncio.create_task( + broker.publish("hello3", queue, partition=0) + ), + asyncio.create_task( + broker.publish("hello2", queue, partition=1) + ), + asyncio.create_task(asyncio.sleep(1.5)), + ), + timeout=10, + ) + assert mock.mock.call_count == 2 + + await broker.close() + @pytest.mark.asyncio async def test_consume_without_value( self, diff --git a/tests/brokers/kafka/test_misconfigure.py b/tests/brokers/kafka/test_misconfigure.py index 771c45426f..9cacf89908 100644 --- a/tests/brokers/kafka/test_misconfigure.py +++ b/tests/brokers/kafka/test_misconfigure.py @@ -1,11 +1,30 @@ import pytest +from aiokafka import TopicPartition from faststream.exceptions import SetupError from faststream.kafka import KafkaBroker -def test_max_workers_with_manual(queue: str) -> None: +def test_max_workers_with_manual_commit_with_multiple_queues() -> None: broker = KafkaBroker() with pytest.raises(SetupError): - broker.subscriber(queue, max_workers=3, auto_commit=False) + broker.subscriber(["queue1", "queue2"], max_workers=3, auto_commit=False) + + +def test_max_workers_with_manual_commit_with_pattern() -> None: + broker = KafkaBroker() + + with pytest.raises(SetupError): + broker.subscriber(pattern="pattern", max_workers=3, auto_commit=False) + + +def test_max_workers_with_manual_commit_partitions() -> None: + broker = KafkaBroker() + + with pytest.raises(SetupError): + broker.subscriber( + partitions=[TopicPartition(topic="topic", partition=1)], + max_workers=3, + auto_commit=False, + )