-
Notifications
You must be signed in to change notification settings - Fork 86
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[server] ReadWriteLock on LeaderFollowerState #1251
Conversation
...inci-client/src/main/java/com/linkedin/davinci/kafka/consumer/PartitionConsumptionState.java
Outdated
Show resolved
Hide resolved
5a05bdb
to
808d633
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for fixing this big issue. It will make SIT thread more stable. Change looks good and clean to me. Please address other comments before checking in.
…ock()`, which can be error-prone. 💹
808d633
to
a4b050c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for preparing this! Left a few comments, generally this looks good!
|
||
LeaderFollowerState() { | ||
this.state = LeaderFollowerStateType.STANDBY; | ||
this.rwLock = new ReentrantReadWriteLock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe here we should use the venice flavor of ReentrantReadWriteLock: VeniceReentrantReadWriteLock
for debugging purpose.
Besides, this lock was by default as unfair, it does not prefer writing. But for our use-case, we may need to prefer write a bit as for heavy ingestion case we do not want to block state transition for long time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Omg that class it not used anywhere else in the codebase...
I moved the VeniceReentrantReadWriteLock
from venice-controller
to venice-client-common
and used it in this file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the default as fair or unfair, what's your preference?
* acquire the write lock to modify the leader-follower state, since it would need to wait for this to finish. | ||
*/ | ||
final ReadWriteLock leaderFollowerStateLock = partitionConsumptionState.getLeaderFollowerStateLock(); | ||
try (AutoCloseableLock ignore = AutoCloseableSingleLock.of(leaderFollowerStateLock.readLock())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here you acquire lock for entire loop but inside produceToStoreBufferServiceOrKafka
lock is acquired for every message processing, maybe there is a reason for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the condition of shouldProcessMessage()
needs to hold for the entire duration for which the message is processed by the consumer.
In the batch version (produceToStoreBufferServiceOrKafkaInBatch()
), the condition is checked to determine the batches of selected messages, and then processing begins with ingestionBatchProcessor.process()
. Thus, it is easiest and safest to protect the entire section with the read lock. Perhaps this can be improved in the future if needed?
* Wait for the main test thread to go past shouldProcessRecord() and reach waitUntilValueSchemaAvailable() | ||
* inside waitReadyToProcessRecord(), then free the main test thread by making the value schema available | ||
*/ | ||
Utils.sleep(1000L); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This time you set here is not quite deterministic in test, not sure if there is a way to make this wait deterministic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I would've liked to make the wait way shorter than 1 second, but I can't think of a way to make it deterministic either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe set a flag inside SIT to indicate waitUntilValueSchemaAvailable
and then wait until that flag is turned on?
…roller` into `internal/venice-client-common` and used it in `PartitionConsumptionState`. 🍿
The code would work from correctness POV, but I am not convinced that we need this lock in this code path.
Please let me know the motivation of this change. |
Closing and going with increased timeout approach in #1213. |
Problem
The problem is that the
StoreIngestionTask
does not always wait for all inflight messages to be processed before transitioning thePartitionConsumptionState
.As part of the unsubscribing process, waitAfterUnsubscribe() waits up to 10 seconds for the consumer’s next call to poll(). Each poll() fetches a batch of messages from Kafka for processing and indicates that the previous set of inflight messages (fetched under the old partition state) was done processing. This is when state transition can safely proceed.
If a state transition happens while those inflight messages are still being processed, it can cause incompatibilities, cause a thrown exception, crash the
SIT
, halt ingestion progress, and result in noLEADER
until the host is restarted.Here are two examples of such issues, but there could be additional unknown issues:
Leader -> Follower Transition
FOLLOWER
SIT
fails a sanity check upon encountering an UPDATE message (intended only for leaders) while in theFOLLOWER
state (link)Follower -> Leader Transition
LEADER
SIT
fails a sanity check upon encountering a local VT message before producing back to local VT while in theLEADER
state (link)Alternatively, if the timeout is reached but the unsubscribe was not induced by a state transition, I cannot think of any issues, but there is always the possibility of the unknown. Thus, this is primarily a problem during state transitions.
Solution
To solve this problem, we introduce a
ReadWriteLock
to theLeaderFollowerState
. This is the perfect use-case for such a mechanism because the state is read extremely often but updated very infrequently.Additionally, the consumer must maintain the read lock throughout the duration of processing of message in order to protect against another thread modifying the state.
Changes
ReadWriteLock
to theLeaderFollowerState
inPartitionConsumptionState
, which guards the usage of the stateproduceToStoreBufferServiceOrKafka()
testShouldProcessRecord()
which simulates the following scenario:Correctness
produceToStoreBufferServiceOrKafka()
) order for the condition ofshouldProcessMessage()
to hold.LeaderFollowerState
as part of a state transition would need to wait for this consumer thread to finish processing the message and release the read lock.produceToStoreBufferServiceOrKafkaInBatch()
.Performance Impact
produceToStoreBufferServiceOrKafka()
and its batch version, because that is the only location where the lock is held for a long period of time. If a writer is waiting on the write lock, it must wait for all readers to release their locks. All future readers will also need to wait until the writer is done because the writer has priority, so they are also bottlenecked byproduceToStoreBufferServiceOrKafka()
.enum
value, the critical section should be instantaneous once it manages to acquire the lock, and any slowdown while locking the writer lock should be minimal.How was this PR tested?
CI
Does this PR introduce any user-facing changes?