From f0202588bc8a8c14fce894f2b76f1e5fba8f16c5 Mon Sep 17 00:00:00 2001 From: Arthur Schreiber Date: Sat, 18 Nov 2023 12:55:09 +0000 Subject: [PATCH] Fix `buffer_drain_concurrency` not doing anything. Signed-off-by: Arthur Schreiber --- go/vt/vtgate/buffer/shard_buffer.go | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/go/vt/vtgate/buffer/shard_buffer.go b/go/vt/vtgate/buffer/shard_buffer.go index ae33aabb399..4fd1f594378 100644 --- a/go/vt/vtgate/buffer/shard_buffer.go +++ b/go/vt/vtgate/buffer/shard_buffer.go @@ -569,10 +569,32 @@ func (sb *shardBuffer) drain(q []*entry, err error) { sb.timeoutThread.stop() start := sb.timeNow() - // TODO(mberlin): Parallelize the drain by pumping the data through a channel. + + // Parallelize the drain by pumping the data through a channel. + entryChan := make(chan *entry, len(q)) + + parallelism := min(bufferDrainConcurrency, len(q)) + + var wg sync.WaitGroup + wg.Add(parallelism) + + for i := 0; i < parallelism; i++ { + go func() { + for _, e := range q { + sb.unblockAndWait(e, err, true /* releaseSlot */, true /* blockingWait */) + } + + wg.Done() + }() + } + for _, e := range q { - sb.unblockAndWait(e, err, true /* releaseSlot */, true /* blockingWait */) + entryChan <- e } + + close(entryChan) + wg.Wait() + d := sb.timeNow().Sub(start) log.Infof("Draining finished for shard: %s Took: %v for: %d requests.", topoproto.KeyspaceShardString(sb.keyspace, sb.shard), d, len(q)) requestsDrained.Add(sb.statsKey, int64(len(q)))