Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix buffer_drain_concurrency not doing anything. #14545

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions go/vt/vtgate/buffer/shard_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,10 +569,32 @@ func (sb *shardBuffer) drain(q []*entry, err error) {
sb.timeoutThread.stop()

start := sb.timeNow()
// TODO(mberlin): Parallelize the drain by pumping the data through a channel.

// Parallelize the drain by pumping the data through a channel.
entryChan := make(chan *entry, len(q))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I need to make the channel buffered? 🤔 I think it should be fine to not have it buffered, but I'm not 100% sure.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should not be buffered, definitely not buffered as len(q) because that's a lot of underlying memory being allocated!


parallelism := min(bufferDrainConcurrency, len(q))

var wg sync.WaitGroup
wg.Add(parallelism)

for i := 0; i < parallelism; i++ {
go func() {
for _, e := range q {
sb.unblockAndWait(e, err, true /* releaseSlot */, true /* blockingWait */)
}

wg.Done()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

defer wg.Done() at the start of the goroutine to prevent a deadlock on panic.

}()
}

for _, e := range q {
sb.unblockAndWait(e, err, true /* releaseSlot */, true /* blockingWait */)
entryChan <- e
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To elaborate: entryChan doesn't need to be buffered because the sending goroutine (i.e. this one) doesn't have anything else to do besides sending the entries through the channel. If it's not blocked on the channel send, it'll block on wg.Wait() a couple lines below. So buffering is not an optimization here.

}

close(entryChan)
wg.Wait()
Comment on lines +574 to +596
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to only pump the entries through the channel if bufferDrainConcurrency is set to a value higher than 1? Or is the overhead negligible?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I think this is an important optimization. We need a legacy codepath for when bufferDrainConcurrency <= 1.


d := sb.timeNow().Sub(start)
log.Infof("Draining finished for shard: %s Took: %v for: %d requests.", topoproto.KeyspaceShardString(sb.keyspace, sb.shard), d, len(q))
requestsDrained.Add(sb.statsKey, int64(len(q)))
Expand Down
Loading