Skip to content

Commit

Permalink
Add metrics and debugging info around conn pool get/expire
Browse files Browse the repository at this point in the history
Signed-off-by: Brendan Dougherty <[email protected]>
  • Loading branch information
brendar committed Sep 26, 2024
1 parent b21783f commit 8e14efa
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 1 deletion.
58 changes: 57 additions & 1 deletion go/pools/smartconnpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,30 @@ type Metrics struct {
idleClosed atomic.Int64
diffSetting atomic.Int64
resetSetting atomic.Int64
expireFuncTimings *servenv.TimingsWrapper // duration of waitlist.expire calls
expireWaitTimings *servenv.TimingsWrapper // duration of waiting for a connection that results in expiration
expireDeltaTimings *servenv.TimingsWrapper // duration between desired context expiry and actual expiration
}

func (m *Metrics) RecordExpireFunc(start time.Time) {
// Some pools don't register stats
if m.expireFuncTimings != nil {
m.expireFuncTimings.Record("ExpireFunc", start)
}
}

func (m *Metrics) RecordExpireWait(start time.Time) {
// Some pools don't register stats
if m.expireWaitTimings != nil {
m.expireWaitTimings.Record("ExpireWait", start)
}
}

func (m *Metrics) RecordExpireDelta(start time.Time) {
// Some pools don't register stats
if m.expireDeltaTimings != nil {
m.expireDeltaTimings.Record("ExpireDelta", start)
}
}

func (m *Metrics) MaxLifetimeClosed() int64 {
Expand Down Expand Up @@ -189,13 +213,40 @@ func (pool *ConnPool[C]) runWorker(close <-chan struct{}, interval time.Duration
}()
}

// Copy of runWorker, just exists to give the expire worker goroutine a unique name (based on the function that started it)
func (pool *ConnPool[C]) runExpireWorker(close <-chan struct{}, interval time.Duration, worker func(now time.Time) bool) {
pool.workers.Add(1)

go func() {
tick := time.NewTicker(interval)

defer tick.Stop()
defer pool.workers.Done()

for {
select {
case now := <-tick.C:
if !worker(now) {
return
}
case <-close:
return
}
}
}()
}

func (pool *ConnPool[C]) open() {
pool.close = make(chan struct{})
pool.capacity.Store(pool.config.maxCapacity)

// The expire worker takes care of removing from the waiter list any clients whose
// context has been cancelled.
pool.runWorker(pool.close, 1*time.Second, func(_ time.Time) bool {
pool.runExpireWorker(pool.close, 1*time.Second, func(_ time.Time) bool {
funcStart := time.Now()
defer func() {
pool.Metrics.RecordExpireFunc(funcStart)
}()
pool.wait.expire(false)
return true
})
Expand Down Expand Up @@ -511,6 +562,7 @@ func (pool *ConnPool[C]) get(ctx context.Context) (*Pooled[C], error) {
start := time.Now()
conn, err = pool.wait.waitForConn(ctx, nil)
if err != nil {
pool.Metrics.RecordExpireWait(start)
return nil, ErrTimeout
}
pool.recordWait(start)
Expand Down Expand Up @@ -717,6 +769,10 @@ func (pool *ConnPool[C]) RegisterStats(stats *servenv.Exporter, name string) {

pool.Name = name

pool.Metrics.expireFuncTimings = stats.NewTimings(name+"ExpireFuncCalls", "Duration of calls to waitlist.expire", "type")
pool.Metrics.expireWaitTimings = stats.NewTimings(name+"ExpireWaits", "Duration of waits that end in pool timeout expired", "type")
pool.Metrics.expireDeltaTimings = stats.NewTimings(name+"ExpireWaitsDelta", "Duration between expected context expiry and actual expiry of waits", "type")

stats.NewGaugeFunc(name+"Capacity", "Tablet server conn pool capacity", func() int64 {
return pool.Capacity()
})
Expand Down
14 changes: 14 additions & 0 deletions go/vt/vttablet/tabletserver/connpool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/pools/smartconnpool"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/callerid"
"vitess.io/vitess/go/vt/dbconfigs"
Expand Down Expand Up @@ -79,6 +80,10 @@ func NewPool(env tabletenv.Env, name string, cfg tabletenv.ConnPoolConfig) *Pool
}

cp.getConnTime = env.Exporter().NewTimings(name+"GetConnTime", "Tracks the amount of time it takes to get a connection", "Settings")

stats.NewGaugeFunc(name+"Timeout", "Tablet server conn pool waiter timeout (ms)", func() int64 {
return cp.timeout.Milliseconds()
})
}

cp.ConnPool = smartconnpool.NewPool(&config)
Expand Down Expand Up @@ -140,6 +145,15 @@ func (cp *Pool) Get(ctx context.Context, setting *smartconnpool.Setting) (*Poole
start := time.Now()
conn, err := cp.ConnPool.Get(ctx, setting)
if err != nil {
switch err {
case smartconnpool.ErrTimeout:
if cp.timeout != 0 {
deadline := start.Add(cp.timeout)
if deadline.Before(time.Now()) {
cp.ConnPool.Metrics.RecordExpireDelta(deadline)
}
}
}
return nil, err
}
if cp.getConnTime != nil {
Expand Down

0 comments on commit 8e14efa

Please sign in to comment.