Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txthrottler: add metrics for topoWatcher and healthCheckStreamer #13153

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 58 additions & 40 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ import (
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/throttler"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"

querypb "vitess.io/vitess/go/vt/proto/query"
throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)

// These vars store the functions used to create the topo server, healthcheck,
Expand Down Expand Up @@ -63,6 +63,10 @@ func resetTxThrottlerFactories() {
}
}

func init() {
resetTxThrottlerFactories()
}

// TxThrottler defines the interface for the transaction throttler.
type TxThrottler interface {
InitDBConfig(target *querypb.Target)
Expand All @@ -71,10 +75,6 @@ type TxThrottler interface {
Throttle(priority int) (result bool)
}

func init() {
resetTxThrottlerFactories()
}

// ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler
// It is only used here to allow mocking out a throttler object.
type ThrottlerInterface interface {
Expand Down Expand Up @@ -142,9 +142,12 @@ type txThrottler struct {
topoServer *topo.Server

// stats
throttlerRunning *stats.Gauge
requestsTotal *stats.Counter
requestsThrottled *stats.Counter
throttlerRunning *stats.Gauge
topoWatchers *stats.GaugesWithSingleLabel
healthChecksReadTotal *stats.CountersWithMultiLabels
healthChecksRecordedTotal *stats.CountersWithMultiLabels
requestsTotal *stats.Counter
requestsThrottled *stats.Counter
}

// txThrottlerConfig holds the parameters that need to be
Expand All @@ -161,12 +164,13 @@ type txThrottlerConfig struct {
healthCheckCells []string

// tabletTypes stores the tablet types for throttling
tabletTypes *topoproto.TabletTypeListFlag
tabletTypes map[topodatapb.TabletType]bool
}

// txThrottlerState holds the state of an open TxThrottler object.
type txThrottlerState struct {
config *txThrottlerConfig
config *txThrottlerConfig
txThrottler *txThrottler

// throttleMu serializes calls to throttler.Throttler.Throttle(threadId).
// That method is required to be called in serial for each threadId.
Expand All @@ -175,7 +179,7 @@ type txThrottlerState struct {
stopHealthCheck context.CancelFunc

healthCheck discovery.HealthCheck
topologyWatchers []TopologyWatcherInterface
topologyWatchers map[string]TopologyWatcherInterface
}

// NewTxThrottler tries to construct a txThrottler from the
Expand All @@ -191,9 +195,14 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler {
// is immutable.
healthCheckCells := env.Config().TxThrottlerHealthCheckCells

tabletTypes := make(map[topodatapb.TabletType]bool, len(*env.Config().TxThrottlerTabletTypes))
for _, tabletType := range *env.Config().TxThrottlerTabletTypes {
tabletTypes[tabletType] = true
}

throttlerConfig = &txThrottlerConfig{
enabled: true,
tabletTypes: env.Config().TxThrottlerTabletTypes,
tabletTypes: tabletTypes,
throttlerConfig: env.Config().TxThrottlerConfig.Get(),
healthCheckCells: healthCheckCells,
}
Expand All @@ -202,9 +211,14 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler {
}

return &txThrottler{
config: throttlerConfig,
topoServer: topoServer,
throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),
config: throttlerConfig,
topoServer: topoServer,
throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),
topoWatchers: env.Exporter().NewGaugesWithSingleLabel("TransactionThrottlerTopoWatchers", "transaction throttler topology watchers", "cell"),
healthChecksReadTotal: env.Exporter().NewCountersWithMultiLabels("TransactionThrottlerHealthchecksRead", "transaction throttler healthchecks read",
[]string{"cell", "DbType"}),
healthChecksRecordedTotal: env.Exporter().NewCountersWithMultiLabels("TransactionThrottlerHealthchecksRecorded", "transaction throttler healthchecks recorded",
[]string{"cell", "DbType"}),
requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"),
requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"),
}
Expand All @@ -225,7 +239,7 @@ func (t *txThrottler) Open() (err error) {
}
log.Info("txThrottler: opening")
t.throttlerRunning.Set(1)
t.state, err = newTxThrottlerState(t.topoServer, t.config, t.target)
t.state, err = newTxThrottlerState(t, t.config, t.target)
return err
}

Expand Down Expand Up @@ -269,7 +283,7 @@ func (t *txThrottler) Throttle(priority int) (result bool) {
return result
}

func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, target *querypb.Target) (*txThrottlerState, error) {
func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, target *querypb.Target) (*txThrottlerState, error) {
maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}

t, err := throttlerFactory(
Expand All @@ -286,27 +300,28 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar
t.Close()
return nil, err
}
result := &txThrottlerState{
config: config,
throttler: t,
state := &txThrottlerState{
config: config,
throttler: t,
txThrottler: txThrottler,
}
createTxThrottlerHealthCheck(topoServer, config, result, target.Cell)
createTxThrottlerHealthCheck(txThrottler.topoServer, config, state, target.Cell)

result.topologyWatchers = make(
[]TopologyWatcherInterface, 0, len(config.healthCheckCells))
state.topologyWatchers = make(
map[string]TopologyWatcherInterface, len(config.healthCheckCells))
for _, cell := range config.healthCheckCells {
result.topologyWatchers = append(
result.topologyWatchers,
topologyWatcherFactory(
topoServer,
result.healthCheck,
cell,
target.Keyspace,
target.Shard,
discovery.DefaultTopologyWatcherRefreshInterval,
discovery.DefaultTopoReadConcurrency))
state.topologyWatchers[cell] = topologyWatcherFactory(
txThrottler.topoServer,
state.healthCheck,
cell,
target.Keyspace,
target.Shard,
discovery.DefaultTopologyWatcherRefreshInterval,
discovery.DefaultTopoReadConcurrency,
)
txThrottler.topoWatchers.Add(cell, 1)
}
return result, nil
return state, nil
}

func createTxThrottlerHealthCheck(topoServer *topo.Server, config *txThrottlerConfig, result *txThrottlerState, cell string) {
Expand Down Expand Up @@ -341,8 +356,9 @@ func (ts *txThrottlerState) deallocateResources() {
// We don't really need to nil out the fields here
// as deallocateResources is not expected to be called
// more than once, but it doesn't hurt to do so.
for _, watcher := range ts.topologyWatchers {
for cell, watcher := range ts.topologyWatchers {
watcher.Stop()
ts.txThrottler.topoWatchers.Reset(cell)
}
ts.topologyWatchers = nil

Expand All @@ -361,12 +377,14 @@ func (ts *txThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) {
return
}

tabletType := tabletStats.Target.TabletType
metricLabels := []string{tabletStats.Target.Cell, tabletType.String()}
ts.txThrottler.healthChecksReadTotal.Add(metricLabels, 1)

// Monitor tablets for replication lag if they have a tablet
// type specified by the --tx_throttler_tablet_types flag.
for _, expectedTabletType := range *ts.config.tabletTypes {
if tabletStats.Target.TabletType == expectedTabletType {
ts.throttler.RecordReplicationLag(time.Now(), tabletStats)
return
}
if ts.config.tabletTypes[tabletType] {
ts.throttler.RecordReplicationLag(time.Now(), tabletStats)
ts.txThrottler.healthChecksRecordedTotal.Add(metricLabels, 1)
}
}
11 changes: 11 additions & 0 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func TestEnabledThrottler(t *testing.T) {
call1.Return(0 * time.Second)
tabletStats := &discovery.TabletHealth{
Target: &querypb.Target{
Cell: "cell1",
TabletType: topodatapb.TabletType_REPLICA,
},
}
Expand Down Expand Up @@ -119,24 +120,33 @@ func TestEnabledThrottler(t *testing.T) {
throttlerImpl, _ := throttler.(*txThrottler)
assert.NotNil(t, throttlerImpl)
throttler.InitDBConfig(&querypb.Target{
Cell: "cell1",
Keyspace: "keyspace",
Shard: "shard",
})

assert.Nil(t, throttlerImpl.Open())
assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get())
assert.Equal(t, map[string]int64{"cell1": 1, "cell2": 1}, throttlerImpl.topoWatchers.Counts())

assert.False(t, throttlerImpl.Throttle(100))
assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Get())
assert.Zero(t, throttlerImpl.requestsThrottled.Get())

throttlerImpl.state.StatsUpdate(tabletStats) // This calls replication lag thing
assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksReadTotal.Counts())
assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts())
rdonlyTabletStats := &discovery.TabletHealth{
Target: &querypb.Target{
Cell: "cell2",
TabletType: topodatapb.TabletType_RDONLY,
},
}
// This call should not be forwarded to the go/vt/throttlerImpl.Throttler object.
throttlerImpl.state.StatsUpdate(rdonlyTabletStats)
assert.Equal(t, map[string]int64{"cell1.REPLICA": 1, "cell2.RDONLY": 1}, throttlerImpl.healthChecksReadTotal.Counts())
assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts())

// The second throttle call should reject.
assert.True(t, throttlerImpl.Throttle(100))
assert.Equal(t, int64(2), throttlerImpl.requestsTotal.Get())
Expand All @@ -148,6 +158,7 @@ func TestEnabledThrottler(t *testing.T) {
assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get())
throttlerImpl.Close()
assert.Zero(t, throttlerImpl.throttlerRunning.Get())
assert.Equal(t, map[string]int64{"cell1": 0, "cell2": 0}, throttlerImpl.topoWatchers.Counts())
}

func TestNewTxThrottler(t *testing.T) {
Expand Down