Skip to content

Commit

Permalink
Add concurrent between partitions kafka subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
Arseniy-Popov committed Jan 3, 2025
1 parent 2ba3eed commit e7b31dd
Show file tree
Hide file tree
Showing 16 changed files with 438 additions and 65 deletions.
3 changes: 3 additions & 0 deletions docs/docs/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ search:
- [FakeConsumer](api/faststream/kafka/message/FakeConsumer.md)
- [KafkaAckableMessage](api/faststream/kafka/message/KafkaAckableMessage.md)
- [KafkaMessage](api/faststream/kafka/message/KafkaMessage.md)
- [KafkaRawMessage](api/faststream/kafka/message/KafkaRawMessage.md)
- opentelemetry
- [KafkaTelemetryMiddleware](api/faststream/kafka/opentelemetry/KafkaTelemetryMiddleware.md)
- middleware
Expand Down Expand Up @@ -676,13 +677,15 @@ search:
- subscriber
- asyncapi
- [AsyncAPIBatchSubscriber](api/faststream/kafka/subscriber/asyncapi/AsyncAPIBatchSubscriber.md)
- [AsyncAPIConcurrentBetweenPartitionsSubscriber](api/faststream/kafka/subscriber/asyncapi/AsyncAPIConcurrentBetweenPartitionsSubscriber.md)
- [AsyncAPIConcurrentDefaultSubscriber](api/faststream/kafka/subscriber/asyncapi/AsyncAPIConcurrentDefaultSubscriber.md)
- [AsyncAPIDefaultSubscriber](api/faststream/kafka/subscriber/asyncapi/AsyncAPIDefaultSubscriber.md)
- [AsyncAPISubscriber](api/faststream/kafka/subscriber/asyncapi/AsyncAPISubscriber.md)
- factory
- [create_subscriber](api/faststream/kafka/subscriber/factory/create_subscriber.md)
- usecase
- [BatchSubscriber](api/faststream/kafka/subscriber/usecase/BatchSubscriber.md)
- [ConcurrentBetweenPartitionsSubscriber](api/faststream/kafka/subscriber/usecase/ConcurrentBetweenPartitionsSubscriber.md)
- [ConcurrentDefaultSubscriber](api/faststream/kafka/subscriber/usecase/ConcurrentDefaultSubscriber.md)
- [DefaultSubscriber](api/faststream/kafka/subscriber/usecase/DefaultSubscriber.md)
- [LogicSubscriber](api/faststream/kafka/subscriber/usecase/LogicSubscriber.md)
Expand Down
11 changes: 11 additions & 0 deletions docs/docs/en/api/faststream/kafka/message/KafkaRawMessage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.kafka.message.KafkaRawMessage
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.kafka.subscriber.asyncapi.AsyncAPIConcurrentBetweenPartitionsSubscriber
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
# 0.5 - API
# 2 - Release
# 3 - Contributing
# 5 - Template Page
# 10 - Default
search:
boost: 0.5
---

::: faststream.kafka.subscriber.usecase.ConcurrentBetweenPartitionsSubscriber
7 changes: 7 additions & 0 deletions docs/docs/en/kafka/Subscriber/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,10 @@ async def base_handler(
):
...
```


## Concurrent processing

There are two possible modes of concurrent message processing:
- With `auto_commit=False` and `max_workers` > 1, a handler processes all messages concurrently in a at-most-once semantic.
- With `auto_commit=True` and `max_workers` > 1, processing is concurrent between topic partitions and sequential within a partition to ensure reliable at-least-once processing. Maximum concurrency is achieved when total number of workers across all application instances running workers in the same consumer group is equal to the number of partitions in the topic. Increasing worker count beyond that will result in idle workers as not more than one consumer from a consumer group can be consuming from the same partition.
17 changes: 17 additions & 0 deletions faststream/broker/subscriber/usecase.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ class SubscriberUsecase(
extra_watcher_options: "AnyDict"
extra_context: "AnyDict"
graceful_timeout: Optional[float]
logger: Optional["LoggerProto"]

_broker_dependencies: Iterable["Depends"]
_call_options: Optional["_CallOptions"]
Expand Down Expand Up @@ -162,6 +163,7 @@ def setup( # type: ignore[override]
self._producer = producer
self.graceful_timeout = graceful_timeout
self.extra_context = extra_context
self.logger = logger

self.watcher = get_watcher_context(logger, self._no_ack, self._retry)

Expand Down Expand Up @@ -470,3 +472,18 @@ def get_payloads(self) -> List[Tuple["AnyDict", str]]:
)

return payloads

def _log(
self,
message: str,
log_level: int,
extra: Optional["AnyDict"] = None,
exc_info: Optional[Exception] = None,
) -> None:
if self.logger is not None:
self.logger.log(
log_level,
message,
extra=extra,
exc_info=exc_info,
)
48 changes: 36 additions & 12 deletions faststream/kafka/broker/registrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
TYPE_CHECKING,
Any,
Callable,
Collection,
Dict,
Iterable,
Literal,
Expand Down Expand Up @@ -41,6 +42,7 @@
)
from faststream.kafka.subscriber.asyncapi import (
AsyncAPIBatchSubscriber,
AsyncAPIConcurrentBetweenPartitionsSubscriber,
AsyncAPIConcurrentDefaultSubscriber,
AsyncAPIDefaultSubscriber,
)
Expand All @@ -62,6 +64,7 @@ class KafkaRegistrator(
"AsyncAPIBatchSubscriber",
"AsyncAPIDefaultSubscriber",
"AsyncAPIConcurrentDefaultSubscriber",
"AsyncAPIConcurrentBetweenPartitionsSubscriber",
],
]
_publishers: Dict[
Expand Down Expand Up @@ -382,7 +385,7 @@ def subscriber(
),
] = None,
partitions: Annotated[
Iterable["TopicPartition"],
Collection["TopicPartition"],
Doc(
"""
An explicit partitions list to assign.
Expand Down Expand Up @@ -763,7 +766,7 @@ def subscriber(
),
] = None,
partitions: Annotated[
Iterable["TopicPartition"],
Collection["TopicPartition"],
Doc(
"""
An explicit partitions list to assign.
Expand Down Expand Up @@ -1144,7 +1147,7 @@ def subscriber(
),
] = None,
partitions: Annotated[
Iterable["TopicPartition"],
Collection["TopicPartition"],
Doc(
"""
An explicit partitions list to assign.
Expand Down Expand Up @@ -1528,7 +1531,7 @@ def subscriber(
),
] = None,
partitions: Annotated[
Iterable["TopicPartition"],
Collection["TopicPartition"],
Doc(
"""
An explicit partitions list to assign.
Expand All @@ -1555,7 +1558,14 @@ def subscriber(
] = (),
max_workers: Annotated[
int,
Doc("Number of workers to process messages concurrently."),
Doc(
"Maximum number of messages being processed concurrently. With "
"`auto_commit=False` processing is concurrent between partitions and "
"sequential within a partition. With `auto_commit=False` maximum "
"concurrency is achieved when total number of workers across all "
"application instances running workers in the same consumer group "
"is equal to the number of partitions in the topic."
),
] = 1,
filter: Annotated[
"Filter[KafkaMessage]",
Expand Down Expand Up @@ -1602,6 +1612,7 @@ def subscriber(
"AsyncAPIDefaultSubscriber",
"AsyncAPIBatchSubscriber",
"AsyncAPIConcurrentDefaultSubscriber",
"AsyncAPIConcurrentBetweenPartitionsSubscriber",
]:
subscriber = super().subscriber(
create_subscriber(
Expand Down Expand Up @@ -1660,13 +1671,26 @@ def subscriber(

else:
if max_workers > 1:
return cast("AsyncAPIConcurrentDefaultSubscriber", subscriber).add_call(
filter_=filter,
parser_=parser or self._parser,
decoder_=decoder or self._decoder,
dependencies_=dependencies,
middlewares_=middlewares,
)
if not auto_commit:
return cast(
"AsyncAPIConcurrentBetweenPartitionsSubscriber", subscriber
).add_call(
filter_=filter,
parser_=parser or self._parser,
decoder_=decoder or self._decoder,
dependencies_=dependencies,
middlewares_=middlewares,
)
else:
return cast(
"AsyncAPIConcurrentDefaultSubscriber", subscriber
).add_call(
filter_=filter,
parser_=parser or self._parser,
decoder_=decoder or self._decoder,
dependencies_=dependencies,
middlewares_=middlewares,
)
else:
return cast("AsyncAPIDefaultSubscriber", subscriber).add_call(
filter_=filter,
Expand Down
18 changes: 16 additions & 2 deletions faststream/kafka/fastapi/fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
)
from faststream.kafka.subscriber.asyncapi import (
AsyncAPIBatchSubscriber,
AsyncAPIConcurrentBetweenPartitionsSubscriber,
AsyncAPIConcurrentDefaultSubscriber,
AsyncAPIDefaultSubscriber,
)
Expand Down Expand Up @@ -2621,12 +2622,20 @@ def subscriber(
] = False,
max_workers: Annotated[
int,
Doc("Number of workers to process messages concurrently."),
Doc(
"Maximum number of messages being processed concurrently. With "
"`auto_commit=False` processing is concurrent between partitions and "
"sequential within a partition. With `auto_commit=False` maximum "
"concurrency is achieved when total number of workers across all "
"application instances running workers in the same consumer group "
"is equal to the number of partitions in the topic."
),
] = 1,
) -> Union[
"AsyncAPIBatchSubscriber",
"AsyncAPIDefaultSubscriber",
"AsyncAPIConcurrentDefaultSubscriber",
"AsyncAPIConcurrentBetweenPartitionsSubscriber",
]:
subscriber = super().subscriber(
*topics,
Expand Down Expand Up @@ -2683,7 +2692,12 @@ def subscriber(
return cast("AsyncAPIBatchSubscriber", subscriber)
else:
if max_workers > 1:
return cast("AsyncAPIConcurrentDefaultSubscriber", subscriber)
if not auto_commit:
return cast(
"AsyncAPIConcurrentBetweenPartitionsSubscriber", subscriber
)
else:
return cast("AsyncAPIConcurrentDefaultSubscriber", subscriber)
else:
return cast("AsyncAPIDefaultSubscriber", subscriber)

Expand Down
12 changes: 8 additions & 4 deletions faststream/kafka/message.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from typing import TYPE_CHECKING, Any, Protocol, Tuple, Union
from dataclasses import dataclass
from typing import Any, Protocol, Tuple, Union

from aiokafka import AIOKafkaConsumer, ConsumerRecord
from aiokafka import TopicPartition as AIOKafkaTopicPartition

from faststream.broker.message import StreamMessage

if TYPE_CHECKING:
from aiokafka import ConsumerRecord


class ConsumerProtocol(Protocol):
"""A protocol for Kafka consumers."""
Expand Down Expand Up @@ -38,6 +37,11 @@ def seek(
FAKE_CONSUMER = FakeConsumer()


@dataclass
class KafkaRawMessage(ConsumerRecord): # type: ignore[misc]
consumer: AIOKafkaConsumer


class KafkaMessage(
StreamMessage[
Union[
Expand Down
10 changes: 6 additions & 4 deletions faststream/kafka/parser.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type, cast
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type, Union, cast

from faststream.broker.message import decode_message, gen_cor_id
from faststream.kafka.message import FAKE_CONSUMER, KafkaMessage
from faststream.kafka.message import FAKE_CONSUMER, KafkaMessage, KafkaRawMessage
from faststream.utils.context.repository import context

if TYPE_CHECKING:
Expand All @@ -27,7 +27,7 @@ def __init__(

async def parse_message(
self,
message: "ConsumerRecord",
message: Union["ConsumerRecord", "KafkaRawMessage"],
) -> "StreamMessage[ConsumerRecord]":
"""Parses a Kafka message."""
headers = {i: j.decode() for i, j in message.headers}
Expand All @@ -42,7 +42,9 @@ async def parse_message(
correlation_id=headers.get("correlation_id", gen_cor_id()),
raw_message=message,
path=self.get_path(message.topic),
consumer=getattr(handler, "consumer", None) or FAKE_CONSUMER,
consumer=getattr(message, "consumer", None)
or getattr(handler, "consumer", None)
or FAKE_CONSUMER,
)

async def decode_message(
Expand Down
8 changes: 8 additions & 0 deletions faststream/kafka/subscriber/asyncapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from faststream.broker.types import MsgType
from faststream.kafka.subscriber.usecase import (
BatchSubscriber,
ConcurrentBetweenPartitionsSubscriber,
ConcurrentDefaultSubscriber,
DefaultSubscriber,
LogicSubscriber,
Expand Down Expand Up @@ -80,3 +81,10 @@ class AsyncAPIConcurrentDefaultSubscriber(
ConcurrentDefaultSubscriber,
):
pass


class AsyncAPIConcurrentBetweenPartitionsSubscriber(
ConcurrentBetweenPartitionsSubscriber,
AsyncAPISubscriber["ConsumerRecord"],
):
pass
Loading

0 comments on commit e7b31dd

Please sign in to comment.