You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
It provides an interface for processing message batches received from the topics consumer has subscribed on. Common use-case of consumer looks like this:
kafka::MessagesBatchView is an alias for std::span<const kafka::Message>, where kafka::Message is a C++ wrapper for rd_kafka_message_t*
The library periodically polls the message batches and invokes the user-provided callback passing the message batch view.
I am trying to support the message reconsumption when the callback throws an exception.
To implement such logic, current implementation closes the consumer and subscribes for the same topics, after catching the exception, to start reading the topic partition from the last committed offset.
But the implementation looks not optimal, because, for example, when using an EAGER rebalance strategy, after closing the consumer, all partitions are firstly revoked from all consumers in the current group and then assigned to them again.
I read lots of issues in the repo and googled a lot, but, unfortunately, did not find the, how to reread not committed messages
using the balanced consumer.
I saw several solutions to reread the messages, but, as I understand, the won't work when consumer subscribed to topics:
First solution is to fetch the current consumer assignment with rd_kafka_assignment and assign it to consumer with rd_kafka_assign. But in many issues I read that rd_kafka_assign assigns the static set to consumer and can not be used outside of rebalance_callback
Second, is to use the rd_kafka_seek to move the fetched offsets back to the commited values. But, again, I read that seek can used only after manual partition assignment with rd_kafka_assign
The last solution is to cache the unprocessed message batch to pass it again and again to the use callback, until the callback invokation succeeded. But in such solution consumer stops the polling loop and may be kicked from the consumer group after some time
So, the question is, how properly reconsume the uncommitted messages using the balanced consumer (rd_kafka_subscribe), with minimum overhead?
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
-
Hello everyone! I am currently implementing the driver for Kafka in @userver-framework. The code is available here: https://github.com/userver-framework/userver/tree/develop/kafka
It provides an interface for processing message batches received from the topics consumer has subscribed on. Common use-case of consumer looks like this:
kafka::MessagesBatchView
is an alias forstd::span<const kafka::Message>
, wherekafka::Message
is a C++ wrapper forrd_kafka_message_t*
The library periodically polls the message batches and invokes the user-provided callback passing the message batch view.
I am trying to support the message reconsumption when the callback throws an exception.
To implement such logic, current implementation closes the consumer and subscribes for the same topics, after catching the exception, to start reading the topic partition from the last committed offset.
But the implementation looks not optimal, because, for example, when using an EAGER rebalance strategy, after closing the consumer, all partitions are firstly revoked from all consumers in the current group and then assigned to them again.
I read lots of issues in the repo and googled a lot, but, unfortunately, did not find the, how to reread not committed messages
using the balanced consumer.
I saw several solutions to reread the messages, but, as I understand, the won't work when consumer subscribed to topics:
rd_kafka_assignment
and assign it to consumer withrd_kafka_assign
. But in many issues I read thatrd_kafka_assign
assigns the static set to consumer and can not be used outside of rebalance_callbackrd_kafka_seek
to move the fetched offsets back to the commited values. But, again, I read that seek can used only after manual partition assignment withrd_kafka_assign
So, the question is, how properly reconsume the uncommitted messages using the balanced consumer (
rd_kafka_subscribe
), with minimum overhead?Beta Was this translation helpful? Give feedback.
All reactions