Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: create KafkaBatch - facade object over BatchBuilder (aiokafka). #1991

Open
spataphore1337 opened this issue Dec 16, 2024 · 0 comments
Labels
AioKafka Issues related to `faststream.kafka` module enhancement New feature or request

Comments

@spataphore1337
Copy link
Contributor

At this point, we have implemented the ability to throw an exception if the Kafka (aiokafka client) batch overflow (pull #1990). However, it is envisioned that we could extend this functionality by allowing developers to control the overflow of the batch themselves.

Aiokafka operates on a BatchBuilder object, which has an append method. This method replenishes the buffer of the batch and returns some metadata. However, if its buffer overflows, it returns None. By tracking directly the type of the returned object, we can throw an exception, which is generally already implemented. But we assume that some developers will want to monitor the overflow of the batch themselves, and perhaps this is part of the business logic of the application. To do this, we need to make a KafkaBatch facade object with the implementation of two methods: extend and append.

The append method should have the same implementation as aiokafka's BatchBuilder.append. The behavior of the return type is the same: if the batch is full, return None. The extend method should accept a collection of messages, extend the current batch with this collection, and throw an exception if the batch overflows.

Here is the working code of KafkaBatch:

from typing import Optional, Literal

from aiokafka.producer.message_accumulator import BatchBuilder
from aiokafka.util import parse_kafka_version

from _internal.basic_types import SendableMessage
from message import encode_message
from faststream.kafka.exceptions import BatchBufferOverflowException


def make_magic_number(api_version: str) -> int:
    if api_version == "auto":
        _api_version = (0, 9, 0)
    else:
        _api_version = parse_kafka_version(api_version=api_version)
    if _api_version >= (0, 11):
        magic = 2
    elif _api_version >= (0, 10):
        magic = 1
    else:
        magic = 0
    return magic


def headers_to_publish(headers: Optional[dict[str, str]] = None,
                       reply_to: str = "",
                       correlation_id: Optional[str] = None) -> dict[str, str]:
    result_headers = {}

    if correlation_id:
        result_headers["correlation_id"] = correlation_id

    if reply_to:
        result_headers["reply_to"] = reply_to

    return result_headers | headers


class KafkaBatch:
    _batch_builder: BatchBuilder

    def __init__(self,
                 *messages: SendableMessage,
                 timestamp_ms: Optional[int] = None,
                 headers: Optional[dict[str, str]] = None,
                 reply_to: str = "",
                 correlation_id: Optional[str] = None,
                 batch_size: Optional[int] = 16 * 1024,
                 compression_type: Optional[Literal["gzip", "snappy", "lz4", "zstd"]] = None,
                 api_version: Optional[str] = "auto",
                 is_transactional: Optional[bool] = False) -> None:
        self._batch_builder = BatchBuilder(magic=make_magic_number(api_version=api_version),
                                           batch_size=batch_size,
                                           compression_type=compression_type,
                                           is_transactional=is_transactional)
        self.extend(*messages,
                    timestamp_ms=timestamp_ms,
                    headers=headers,
                    reply_to=reply_to,
                    correlation_id=correlation_id)

    def extend(self,
               *messages: SendableMessage,
               timestamp_ms: Optional[int] = None,
               headers: Optional[dict[str, str]] = None,
               reply_to: str = "",
               correlation_id: Optional[str] = None,
               ) -> None:
        for message_position, message in enumerate(messages):
            metadata = self.append(message=message,
                                   headers=headers,
                                   reply_to=reply_to,
                                   correlation_id=correlation_id,
                                   timestamp_ms=timestamp_ms)
            if metadata is None:
                raise BatchBufferOverflowError(message_position=message_position)

    def append(self,
               message: SendableMessage,
               timestamp_ms: Optional[int] = None,
               headers: Optional[dict[str, str]] = None,
               reply_to: str = "",
               correlation_id: Optional[str] = None,
               ):
        headers_to_send = headers_to_publish(headers=headers,
                                             reply_to=reply_to,
                                             correlation_id=correlation_id)
        message, content_type = encode_message(message)
        if content_type:
            final_headers = {
                "content-type": content_type,
                **headers_to_send,
            }
        else:
            final_headers = headers_to_send.copy()
        metadata = self._batch_builder.append(key=None,
                                              value=message,
                                              timestamp=timestamp_ms,
                                              headers=[(i, j.encode()) for i, j in final_headers.items()])
        return metadata

This KafkaBatch prototype has a number of problems that it is not yet clear how to solve. The main problem is the requirements of BatchBuilder parameters for its instance. The most painful parameter is some magic. Magic is a number in aiokafka that is generated based on the current version of the API. I used the code for forming this number in the prototype code above so that the example could be run immediately. To generate this number yourself, you need to take api_version as input, which is also extremely painful if we are talking about the purpose of our task - to fill the batche with our messages in a controlled way. So far, the solution is to leave the default value, and if necessary, throw in the api_version value in some other way.

The unpleasant part is removal of two functions (for magic-number generation) and formation of headers for the batch. Ideally, it's not a big deal, but I'd like to find a more elegant solution in this case than just copying implementations from different parts of the code.

We also need to extend Kafka's publish_batch function signature:

async def publish_batch(*msgs: SendableMessage, batch: Optional[KafkaBatch]=None, ...): ...

In such a case, we will have extended logic for sending the batch as follows:

    async def publish_batch(
        self,
        *msgs: "SendableMessage",
        correlation_id: str,
        topic: str,
        batch: Optional[KafkaBatch] = None,
        partition: Optional[int] = None,
        timestamp_ms: Optional[int] = None,
        headers: Optional[Dict[str, str]] = None,
        reply_to: str = "",
        no_confirm: bool = False,
    ) -> None:
        """Publish a batch of messages to a topic."""
        if batch is None:
           batch = KafkaBatch(timestamp_ms=timestamp_ms,
                                            headers=headers,
                                            reply_to=reply_to,
                                            correlation_idcorrelation_id)
        batch.extend(*msgs,
                              timestamp_ms=timestamp_ms,
                              headers=headers,
                              reply_to=reply_to,
                              correlation_idcorrelation_id)
        send_future = await self._producer.send_batch(batch, topic, partition=partition)
        if not no_confirm:
            await send_future

We can have three cases when we use this behavior:

  1. we fill the batch before calling the method, we pass the finished batch without additionally passing the message collection to the publish_batch method. Then, when we overflow the batch, we will get an error before the method call;
  2. pass messages to be sent as before, and we will get an error when the method is called (in case of overflow of the batch);
  3. pass both KafkaBatch and the message collection. In this case, if KafkaBatch was not overflowed, but will be overflowed when it is expanded with new messages (*msgs in publish_batch method), an exception will be thrown specifying the position of the message in the passed message collection on which the overflow occurred.

To control the overflow of a batch, you will need to create a KafkaBatch and use the append method. It will return None, if the batch are overflowed.

To summarize: the only problem we face at the moment for introducing this feature is the design of KafkaBuilder. If you have any ideas on how it can be made better, more elegant, simpler - you are welcome!

@spataphore1337 spataphore1337 added the enhancement New feature or request label Dec 16, 2024
@Lancetnik Lancetnik added Confluent Issues related to `faststream.confluent` module AioKafka Issues related to `faststream.kafka` module and removed Confluent Issues related to `faststream.confluent` module labels Dec 16, 2024
@Lancetnik Lancetnik moved this to Backlog in FastStream Jan 8, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
AioKafka Issues related to `faststream.kafka` module enhancement New feature or request
Projects
Status: Backlog
Development

No branches or pull requests

2 participants