Skip to content

Commit

Permalink
chore(mempool): Wait for buffer instead of returning error (#13298)
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Haudum <[email protected]>
  • Loading branch information
chaudum authored Jun 25, 2024
1 parent 0a7e913 commit b1adfce
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 16 deletions.
8 changes: 8 additions & 0 deletions pkg/util/mempool/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type metrics struct {
availableBuffersPerSlab *prometheus.GaugeVec
errorsCounter *prometheus.CounterVec
accesses *prometheus.CounterVec
waitDuration *prometheus.HistogramVec
}

const (
Expand Down Expand Up @@ -41,5 +42,12 @@ func newMetrics(r prometheus.Registerer, name string) *metrics {
Help: "The total amount of accesses to the pool.",
ConstLabels: prometheus.Labels{"pool": name},
}, []string{"slab", "op"}),
waitDuration: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{
Namespace: constants.Loki,
Subsystem: "mempool",
Name: "wait_duration_seconds",
Help: "Time spent waiting for obtaining buffer from slab.",
ConstLabels: prometheus.Labels{"pool": name},
}, []string{"slab"}),
}
}
13 changes: 5 additions & 8 deletions pkg/util/mempool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"sync"
"time"
"unsafe"

"github.com/dustin/go-humanize"
Expand Down Expand Up @@ -51,15 +52,11 @@ func (s *slab) get(size int) ([]byte, error) {
s.metrics.accesses.WithLabelValues(s.name, opTypeGet).Inc()
s.once.Do(s.init)

waitStart := time.Now()
// wait for available buffer on channel
var buf []byte
select {
case ptr := <-s.buffer:
buf = unsafe.Slice((*byte)(ptr), s.size)
default:
s.metrics.errorsCounter.WithLabelValues(s.name, reasonSlabExhausted).Inc()
return nil, errSlabExhausted
}
ptr := <-s.buffer
buf := unsafe.Slice((*byte)(ptr), s.size)
s.metrics.waitDuration.WithLabelValues(s.name).Observe(time.Since(waitStart).Seconds())

return buf[:size], nil
}
Expand Down
28 changes: 20 additions & 8 deletions pkg/util/mempool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,26 @@ func TestMemPool(t *testing.T) {
require.Equal(t, 512, cap(res))
})

t.Run("pool returns error when no buffer is available", func(t *testing.T) {
t.Run("pool blocks when no buffer is available", func(t *testing.T) {
pool := New("test", []Bucket{
{Size: 1, Capacity: 64},
}, nil)
buf1, _ := pool.Get(32)
require.Equal(t, 32, len(buf1))

delay := 20 * time.Millisecond
start := time.Now()

go func(p *MemPool) {
time.Sleep(delay)
p.Put(make([]byte, 16))
}(pool)

_, err := pool.Get(16)
require.ErrorContains(t, err, errSlabExhausted.Error())
duration := time.Since(start)

require.NoError(t, err)
require.Greater(t, duration, delay)
})

t.Run("test ring buffer returns same backing array", func(t *testing.T) {
Expand All @@ -80,16 +91,17 @@ func TestMemPool(t *testing.T) {
})

t.Run("concurrent access", func(t *testing.T) {
numWorkers := 256

pool := New("test", []Bucket{
{Size: 32, Capacity: 2 << 10},
{Size: 16, Capacity: 4 << 10},
{Size: 8, Capacity: 8 << 10},
{Size: 4, Capacity: 16 << 10},
{Size: 2, Capacity: 32 << 10},
{Size: numWorkers, Capacity: 2 << 10},
{Size: numWorkers, Capacity: 4 << 10},
{Size: numWorkers, Capacity: 8 << 10},
{Size: numWorkers, Capacity: 16 << 10},
{Size: numWorkers, Capacity: 32 << 10},
}, nil)

var wg sync.WaitGroup
numWorkers := 256
n := 10

for i := 0; i < numWorkers; i++ {
Expand Down

0 comments on commit b1adfce

Please sign in to comment.