Skip to content

Commit

Permalink
pass tmClient and tabletInfo to self metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach committed Oct 7, 2024
1 parent adfcf6d commit d8b6ec2
Show file tree
Hide file tree
Showing 5 changed files with 308 additions and 33 deletions.
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletserver/throttle/base/metric_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import (
"github.com/patrickmn/go-cache"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/vttablet/tmclient"
)

// MetricsQueryType indicates the type of metrics query on MySQL backend. See following.
Expand Down Expand Up @@ -142,13 +143,13 @@ func (metric *ThrottleMetric) WithError(err error) *ThrottleMetric {

// ReadThrottleMetrics returns a metric for the given probe. Either by explicit query
// or via SHOW REPLICA STATUS
func ReadThrottleMetrics(ctx context.Context, probe *Probe, metricsFunc func(context.Context) ThrottleMetrics) ThrottleMetrics {
func ReadThrottleMetrics(ctx context.Context, probe *Probe, tmClient tmclient.TabletManagerClient, metricsFunc func(context.Context, tmclient.TabletManagerClient) ThrottleMetrics) ThrottleMetrics {
if metrics := getCachedThrottleMetrics(probe); metrics != nil {
return metrics
}

started := time.Now()
throttleMetrics := metricsFunc(ctx)
throttleMetrics := metricsFunc(ctx, tmClient)

go func(metrics ThrottleMetrics, started time.Time) {
stats.GetOrNewGauge("ThrottlerProbesLatency", "probes latency").Set(time.Since(started).Nanoseconds())
Expand Down
156 changes: 156 additions & 0 deletions go/vt/vttablet/tabletserver/throttle/base/self_metric_mysqld.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package base

import (
"context"
"errors"
"fmt"
"sync/atomic"
"time"

"vitess.io/vitess/go/timer"

tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
)

var (
mysqlHostMetricsRpcTimeout = 5 * time.Second
mysqlHostMetricsRateLimit = 10 * time.Second
mysqlHostMetricsRateLimiter atomic.Pointer[timer.RateLimiter]
lastMySQLHostMetricsResponse atomic.Pointer[tabletmanagerdatapb.MysqlHostMetricsResponse]
)

// getMysqlMetricsRateLimiter returns a rate limiter that is active until the given context is cancelled.
// This function will be called sequentially, but nonetheless it offers _some_ concurrent safety. Namely,
// that a created rate limiter is guaranteed to be cleaned up
func getMysqlMetricsRateLimiter(ctx context.Context, rateLimit time.Duration) *timer.RateLimiter {
rateLimiter := mysqlHostMetricsRateLimiter.Load()
if rateLimiter == nil {
rateLimiter = timer.NewRateLimiter(rateLimit)
go func() {
defer mysqlHostMetricsRateLimiter.Store(nil)
defer rateLimiter.Stop()
<-ctx.Done()
}()
mysqlHostMetricsRateLimiter.Store(rateLimiter)
}
return rateLimiter
}

// readMysqlHostMetrics reads MySQL host metrics sporadically from the tablet manager (which in turn reads
// them from mysql deamon). The metrics are then cached, whether successful or not.
// This idea is that is is very wasteful to read these metrics for every single query. E.g. right now the throttler
// can issue 4 reads per second, which is wasteful to go through two RPCs to get the disk space usage for example. Even the load
// average on the MySQL server is not that susceptible to change.
func readMysqlHostMetrics(ctx context.Context, params *SelfMetricReadParams) error {
if params.TmClient == nil {
return fmt.Errorf("tmClient is nil")
}
if params.TabletInfo == nil {
return fmt.Errorf("tabletInfo is nil")
}
rateLimiter := getMysqlMetricsRateLimiter(ctx, mysqlHostMetricsRateLimit)
err := rateLimiter.Do(func() error {
ctx, cancel := context.WithTimeout(ctx, mysqlHostMetricsRpcTimeout)
defer cancel()

resp, err := params.TmClient.MysqlHostMetrics(ctx, params.TabletInfo.Tablet, &tabletmanagerdatapb.MysqlHostMetricsRequest{})
if err != nil {
return err
}
lastMySQLHostMetricsResponse.Store(resp)
return nil
})
return err
}

// getMysqlHostMetric gets a metric from the last read MySQL host metrics. The metric will either be directly read from
// tablet manager (which then reads it from the mysql deamon), or from the cache.
func getMysqlHostMetric(ctx context.Context, params *SelfMetricReadParams, mysqlHostMetricName string) *ThrottleMetric {
metric := &ThrottleMetric{
Scope: SelfScope,
}
if err := readMysqlHostMetrics(ctx, params); err != nil {
return metric.WithError(err)
}
resp := lastMySQLHostMetricsResponse.Load()
if resp == nil {
return metric.WithError(ErrNoResultYet)
}
mysqlMetric := resp.HostMetrics.Metrics[mysqlHostMetricName]
if mysqlMetric == nil {
return metric.WithError(ErrNoSuchMetric)
}
metric.Value = mysqlMetric.Value
if mysqlMetric.Error != nil {
metric.Err = errors.New(mysqlMetric.Error.Message)
}
return metric
}

var _ SelfMetric = registerSelfMetric(&MysqldLoadAvgSelfMetric{})
var _ SelfMetric = registerSelfMetric(&MysqldDatadirUsedRatioSelfMetric{})

// MysqldLoadAvgSelfMetric stands for the load average per cpu, on the MySQL host.
type MysqldLoadAvgSelfMetric struct {
}

func (m *MysqldLoadAvgSelfMetric) Name() MetricName {
return MysqldLoadAvgMetricName
}

func (m *MysqldLoadAvgSelfMetric) DefaultScope() Scope {
return SelfScope
}

func (m *MysqldLoadAvgSelfMetric) DefaultThreshold() float64 {
return 1.0
}

func (m *MysqldLoadAvgSelfMetric) RequiresConn() bool {
return false
}

func (m *MysqldLoadAvgSelfMetric) Read(ctx context.Context, params *SelfMetricReadParams) *ThrottleMetric {
return getMysqlHostMetric(ctx, params, "loadavg")
}

// MysqldDatadirUsedRatioSelfMetric stands for the disk space usage of the mount where MySQL's datadir is located.
// Range: 0.0 (empty) - 1.0 (full)
type MysqldDatadirUsedRatioSelfMetric struct {
}

func (m *MysqldDatadirUsedRatioSelfMetric) Name() MetricName {
return MysqldDatadirUsedRatioMetricName
}

func (m *MysqldDatadirUsedRatioSelfMetric) DefaultScope() Scope {
return SelfScope
}

func (m *MysqldDatadirUsedRatioSelfMetric) DefaultThreshold() float64 {
return 0.98
}

func (m *MysqldDatadirUsedRatioSelfMetric) RequiresConn() bool {
return false
}

func (m *MysqldDatadirUsedRatioSelfMetric) Read(ctx context.Context, params *SelfMetricReadParams) *ThrottleMetric {
return getMysqlHostMetric(ctx, params, "datadir-used-ratio")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package base

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestGetMysqlMetricsRateLimiter(t *testing.T) {
rateLimit := 10 * time.Millisecond
for range 3 {
t.Run("iteration", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
{
rateLimiter := mysqlHostMetricsRateLimiter.Load()
assert.Nil(t, rateLimiter)
}
rateLimiter := getMysqlMetricsRateLimiter(ctx, rateLimit)
assert.NotNil(t, rateLimiter)
for range 5 {
r := getMysqlMetricsRateLimiter(ctx, rateLimit)
// Returning the same rate limiter
assert.Equal(t, rateLimiter, r)
}
val := 0
incr := func() error {
val++
return nil
}
for range 10 {
rateLimiter.Do(incr)
time.Sleep(2 * rateLimit)
}
assert.EqualValues(t, 10, val)
cancel()
// There can be a race condition where the rate limiter still emits one final tick after the context is cancelled.
// So we wait enough time to ensure that tick is "wasted".
time.Sleep(2 * rateLimit)
for range 10 {
rateLimiter.Do(incr)
time.Sleep(time.Millisecond)
}
assert.EqualValues(t, 10, val)
{
rateLimiter := mysqlHostMetricsRateLimiter.Load()
assert.Nil(t, rateLimiter)
}
})
}
}
37 changes: 24 additions & 13 deletions go/vt/vttablet/tabletserver/throttle/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ const (
DefaultThrottleRatio = 1.0

defaultReplicationLagQuery = "select unix_timestamp(now(6))-max(ts/1000000000) as replication_lag from %s.heartbeat"
threadsRunningQuery = "show global status like 'threads_running'"

inventoryPrefix = "inventory/"
throttlerConfigPrefix = "config/"
Expand Down Expand Up @@ -137,6 +136,7 @@ type Throttler struct {
keyspace string
shard string
tabletAlias *topodatapb.TabletAlias
tabletInfo atomic.Pointer[topo.TabletInfo]

check *ThrottlerCheck
isEnabled atomic.Bool
Expand Down Expand Up @@ -190,7 +190,7 @@ type Throttler struct {
cancelEnableContext context.CancelFunc
throttledAppsMutex sync.Mutex

readSelfThrottleMetrics func(context.Context) base.ThrottleMetrics // overwritten by unit test
readSelfThrottleMetrics func(context.Context, tmclient.TabletManagerClient) base.ThrottleMetrics // overwritten by unit test
}

// ThrottlerStatus published some status values from the throttler
Expand Down Expand Up @@ -262,8 +262,8 @@ func NewThrottler(env tabletenv.Env, srvTopoServer srvtopo.Server, ts *topo.Serv
}

throttler.StoreMetricsThreshold(base.RegisteredSelfMetrics[base.LagMetricName].DefaultThreshold())
throttler.readSelfThrottleMetrics = func(ctx context.Context) base.ThrottleMetrics {
return throttler.readSelfThrottleMetricsInternal(ctx)
throttler.readSelfThrottleMetrics = func(ctx context.Context, tmClient tmclient.TabletManagerClient) base.ThrottleMetrics {
return throttler.readSelfThrottleMetricsInternal(ctx, tmClient)
}
return throttler
}
Expand Down Expand Up @@ -338,6 +338,15 @@ func (throttler *Throttler) initConfig() {

// readThrottlerConfig proactively reads the throttler's config from SrvKeyspace in local topo
func (throttler *Throttler) readThrottlerConfig(ctx context.Context) (*topodatapb.ThrottlerConfig, error) {
// since we're reading from topo, let's seize this opportunity to read table info as well
if throttler.tabletInfo.Load() == nil {
if ti, err := throttler.ts.GetTablet(ctx, throttler.tabletAlias); err == nil {
throttler.tabletInfo.Store(ti)
} else {
log.Errorf("Throttler: error reading tablet info: %v", err)
}
}

srvks, err := throttler.ts.GetSrvKeyspace(ctx, throttler.tabletAlias.Cell, throttler.keyspace)
if err != nil {
return nil, err
Expand Down Expand Up @@ -804,7 +813,7 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) {
if throttler.IsOpen() {
// frequent
// Always collect self metrics:
throttler.collectSelfMetrics(ctx)
throttler.collectSelfMetrics(ctx, tmClient)
if !throttler.isDormant() {
throttler.collectShardMetrics(ctx, tmClient)
}
Expand Down Expand Up @@ -869,7 +878,7 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) {
}()
}

func (throttler *Throttler) generateTabletProbeFunction(scope base.Scope, tmClient tmclient.TabletManagerClient, probe *base.Probe) (probeFunc func(context.Context) base.ThrottleMetrics) {
func (throttler *Throttler) generateTabletProbeFunction(scope base.Scope, probe *base.Probe) (probeFunc func(context.Context, tmclient.TabletManagerClient) base.ThrottleMetrics) {
metricsWithError := func(err error) base.ThrottleMetrics {
metrics := base.ThrottleMetrics{}
for _, metricName := range base.KnownMetricNames {
Expand All @@ -882,7 +891,7 @@ func (throttler *Throttler) generateTabletProbeFunction(scope base.Scope, tmClie
}
return metrics
}
return func(ctx context.Context) base.ThrottleMetrics {
return func(ctx context.Context, tmClient tmclient.TabletManagerClient) base.ThrottleMetrics {
// Some reasonable timeout, to ensure we release connections even if they're hanging (otherwise grpc-go keeps polling those connections forever)
ctx, cancel := context.WithTimeout(ctx, 4*activeCollectInterval)
defer cancel()
Expand Down Expand Up @@ -940,7 +949,7 @@ func (throttler *Throttler) generateTabletProbeFunction(scope base.Scope, tmClie

// readSelfThrottleMetricsInternal rreads all registsred self metrics on this tablet (or backend MySQL server).
// This is the actual place where metrics are read, to be later aggregated and/or propagated to other tablets.
func (throttler *Throttler) readSelfThrottleMetricsInternal(ctx context.Context) base.ThrottleMetrics {
func (throttler *Throttler) readSelfThrottleMetricsInternal(ctx context.Context, tmClient tmclient.TabletManagerClient) base.ThrottleMetrics {
result := make(base.ThrottleMetrics, len(base.RegisteredSelfMetrics))
writeMetric := func(metric *base.ThrottleMetric) {
select {
Expand All @@ -951,7 +960,9 @@ func (throttler *Throttler) readSelfThrottleMetricsInternal(ctx context.Context)
}
readMetric := func(selfMetric base.SelfMetric) *base.ThrottleMetric {
params := &base.SelfMetricReadParams{
Throttler: throttler,
Throttler: throttler,
TmClient: tmClient,
TabletInfo: throttler.tabletInfo.Load(),
}
if selfMetric.RequiresConn() {
conn, err := throttler.pool.Get(ctx, nil)
Expand All @@ -978,7 +989,7 @@ func (throttler *Throttler) readSelfThrottleMetricsInternal(ctx context.Context)
return result
}

func (throttler *Throttler) collectSelfMetrics(ctx context.Context) {
func (throttler *Throttler) collectSelfMetrics(ctx context.Context, tmClient tmclient.TabletManagerClient) {
probe := throttler.inventory.ClustersProbes[throttler.tabletAliasString()]
if probe == nil {
// probe not created yet
Expand All @@ -993,7 +1004,7 @@ func (throttler *Throttler) collectSelfMetrics(ctx context.Context) {
defer atomic.StoreInt64(&probe.QueryInProgress, 0)

// Throttler is probing its own tablet's metrics:
_ = base.ReadThrottleMetrics(ctx, probe, throttler.readSelfThrottleMetrics)
_ = base.ReadThrottleMetrics(ctx, probe, tmClient, throttler.readSelfThrottleMetrics)
}()
}

Expand All @@ -1014,9 +1025,9 @@ func (throttler *Throttler) collectShardMetrics(ctx context.Context, tmClient tm
defer atomic.StoreInt64(&probe.QueryInProgress, 0)

// Throttler probing other tablets:
throttleMetricFunc := throttler.generateTabletProbeFunction(base.ShardScope, tmClient, probe)
throttleMetricFunc := throttler.generateTabletProbeFunction(base.ShardScope, probe)

throttleMetrics := base.ReadThrottleMetrics(ctx, probe, throttleMetricFunc)
throttleMetrics := base.ReadThrottleMetrics(ctx, probe, tmClient, throttleMetricFunc)
for _, metric := range throttleMetrics {
select {
case <-ctx.Done():
Expand Down
Loading

0 comments on commit d8b6ec2

Please sign in to comment.