Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process messages consumed from different partitions on different thread pool threads #2312

Open
KrzysztofBranicki opened this issue Sep 16, 2024 · 2 comments

Comments

@KrzysztofBranicki
Copy link

KrzysztofBranicki commented Sep 16, 2024

Hi, I want to implement wrapper for IConsumer<TKey, TValue> where I'm able to process messages from different partitions on different thread pool threads. Plan is to create additional buffer where I would put messages retrieved using IConsumer.Consume. This buffer would have queue for every partition and would be populated in one thread. Separate thread would go through buffer, partition after partition in a round robin fashion and start processing of messages on a thread pool threads. Of course I want to maintain ordering within one partition so I would not pick up a message from partition which still has a message that didn't finish processing.

All this is to be able to process more messages asynchronously at the same time as our processing logic is implemented with async await and performs a lot of IO bound operations where thread is returned to thread pool while awaiting.

Question:
In this context I have a question about Consume method an what is the approach to order in which messages from different partitions are returned. Let's assume I'm subscribing 10 partitions, on each partition there are 10 unconsumed messages. Let's assume Confluent.Kafka.Consumer already loaded all 100 messages to it's internal buffer. Now i will call Consume method 10 times. What will be the result? Will I get 10 messages from the same partition? Will I get one message for every partition (round robin)? Will I get messages in order based on Timestamp? Or it is more complicated and dependent on different factors?

@milindl
Copy link
Contributor

milindl commented Oct 22, 2024

There is no ordering for the operation you're describing. The 10 messages from the first 10 calls may be from any partition. The only thing that will be guaranteed will be that the messages from the same partition will maintain ordering within themselves based on Offset, that is, Offset N for a particular topic partition will be returned before Offset N+1 from Consume. There may be any number of messages from other topic partitions between these two messages.

However, there's no relation to any other topic partition and they might be interleaved in an arbitrary way.

@KrzysztofBranicki
Copy link
Author

KrzysztofBranicki commented Nov 12, 2024

@milindl thanks for the answer. I just noticed that there is API which essentially allows to achieve what I need, which is round robin style of Consume where each consume gives message from different partition. I'm referring to Pause and Resume API's. With this API I can theoretically pause partition corresponding to the message I just consumed right before putting the message into my buffer. I can then resume partition as soon as I finish processing it on a thread pool thread.

I have three questions regarding that

  1. Is Resume method thread safe?
  2. Is Pause/Resume heavy operation? does it require interaction with a broker or it is just in-memory data which is checked when Consume is called?
  3. How does Pause and Resume work with partitions revocation/reassignment. Does partition revocation and reassignment "reset" the collection of paused partitions? If I pause partition before I got it revoked do I still need to resume it just in case I will get it reassigned again?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants