-
Notifications
You must be signed in to change notification settings - Fork 86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[server] Increased Wait After Unsubscribe During State Transitions #1213
base: main
Are you sure you want to change the base?
Conversation
a94fcc7
to
aa05e74
Compare
.../da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaConsumerService.java
Outdated
Show resolved
Hide resolved
clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/KafkaConsumerServiceStats.java
Outdated
Show resolved
Hide resolved
...s/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/SharedKafkaConsumer.java
Outdated
Show resolved
Hide resolved
clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/KafkaConsumerServiceStats.java
Outdated
Show resolved
Hide resolved
a7acec9
to
03ba493
Compare
Closing to consider approach in #1251 instead. |
…ugh `VeniceServerConfig` and directly to the `SharedKafkaConsumer` from `SharedConsumerAssignmentStrategy`. 📸 Added metric `wait_after_unsubscribe_latency` to see how long the wait after unsubscribe is. ⏳
…rite_buffer_latency` metric should be sufficient. 📒
…cs data from `consumer_records_producing_to_write_buffer_latency`. 📬
4db714e
to
b63e0e8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From my understanding, we don't need to change the constructor of KafkaConsumerService
or SharedKafkaConsumer
, but just add one param: waitingTimeMS
to unsubscribe
function call, and when the SIT
calls unsubscribe
, it knows when to use a longer timeout and when to use a shorter timeout.
By default, it should use a short timeout, and only for leader
<-> follower
transition, SIT
needs to pass a long timeout to unsubscribe
call.
Let me know if I have any misunderstanding.
…from StoreIngestionTask to SharedKafkaConsumer. 🫣
…ndled by mocks. 🐬🐬
Summary
Problem
The problem is that
StoreIngestionTask
does not always wait for all inflight messages to be processed before transitioning the leader-follower state in thePartitionConsumptionState
.waitAfterUnsubscribe()
waits up to 10 seconds for the consumers nextpoll()
(which would indicate that the inflight messages from the lastpoll()
were processed). This can lead to state mismatches such as from the leader-follower transition and follower-leader transition. The10s
timeout has been hit 150K times in the past month.Mitigation
Several possible solutions were discussed but they all could be complicated. As an immediate action, we can increase the timeout value so that the consumer will more frequently safely unsubscribe instead of timing out. According to the values of the metric
consumer_records_producing_to_write_buffer_latency
, increasing the timeout to1800s
should cover almost 100% of all cases.Changes
server.wait.after.unsubscribe.timeout.ms
to turn the timeout wait inwaitAfterUnsubscribe()
into a configurable setting, and also increased the timeout:10s
to1800s
/30m
based on theconsumer_records_producing_to_write_buffer_latency
metric.KafkaConsumerService#unsubscribeAll()
is called, the timeout will remain10s
in order to not block shutdown. If the server config is lower than10s
, then that value will be used instead.How was this PR tested?
GHCI
Does this PR introduce any user-facing changes?