From f04827fd4c387c6f32a40dfa02ba6e2e2531e937 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 25 May 2023 21:58:11 +0200 Subject: [PATCH 1/3] txthrottler: add metrics for topoWatcher and healthCheckStreamer Signed-off-by: Tim Vaillancourt --- .../tabletserver/txthrottler/tx_throttler.go | 69 +++++++++++-------- .../txthrottler/tx_throttler_test.go | 6 ++ 2 files changed, 47 insertions(+), 28 deletions(-) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index bc5235593ac..42a61cc1b64 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -144,9 +144,12 @@ type txThrottler struct { topoServer *topo.Server // stats - throttlerRunning *stats.Gauge - requestsTotal *stats.Counter - requestsThrottled *stats.Counter + throttlerRunning *stats.Gauge + topoWatchers *stats.GaugesWithSingleLabel + healthChecksReadTotal *stats.CountersWithSingleLabel + healthChecksRecordedTotal *stats.CountersWithSingleLabel + requestsTotal *stats.Counter + requestsThrottled *stats.Counter } // txThrottlerConfig holds the parameters that need to be @@ -168,7 +171,8 @@ type txThrottlerConfig struct { // txThrottlerState holds the state of an open TxThrottler object. type txThrottlerState struct { - config *txThrottlerConfig + config *txThrottlerConfig + txThrottler *txThrottler // throttleMu serializes calls to throttler.Throttler.Throttle(threadId). // That method is required to be called in serial for each threadId. @@ -177,7 +181,7 @@ type txThrottlerState struct { stopHealthCheck context.CancelFunc healthCheck discovery.HealthCheck - topologyWatchers []TopologyWatcherInterface + topologyWatchers map[string]TopologyWatcherInterface } // NewTxThrottler tries to construct a txThrottler from the @@ -238,11 +242,14 @@ func newTxThrottler(env tabletenv.Env, topoServer *topo.Server, config *txThrott } } return &txThrottler{ - config: config, - topoServer: topoServer, - throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"), - requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"), - requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"), + config: config, + topoServer: topoServer, + throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"), + topoWatchers: env.Exporter().NewGaugesWithSingleLabel("TransactionThrottlerTopoWatchers", "transaction throttler topology watchers", "cell"), + healthChecksReadTotal: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerHealthchecksRead", "transaction throttler healthchecks read", "DbType"), + healthChecksRecordedTotal: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerHealthchecksRecorded", "transaction throttler healthchecks recorded", "DbType"), + requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"), + requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"), }, nil } @@ -256,7 +263,7 @@ func (t *txThrottler) Open() (err error) { } log.Info("txThrottler: opening") t.throttlerRunning.Set(1) - t.state, err = newTxThrottlerState(t.topoServer, t.config, t.target) + t.state, err = newTxThrottlerState(t, t.config, t.target) return err } @@ -300,7 +307,7 @@ func (t *txThrottler) Throttle(priority int) (result bool) { return result } -func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, target *querypb.Target) (*txThrottlerState, error) { +func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, target *querypb.Target) (*txThrottlerState, error) { maxReplicationLagModuleConfig := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig} t, err := throttlerFactory( @@ -318,24 +325,25 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar return nil, err } result := &txThrottlerState{ - config: config, - throttler: t, + config: config, + throttler: t, + txThrottler: txThrottler, } - createTxThrottlerHealthCheck(topoServer, config, result, target.Cell) + createTxThrottlerHealthCheck(txThrottler.topoServer, config, result, target.Cell) result.topologyWatchers = make( - []TopologyWatcherInterface, 0, len(config.healthCheckCells)) + map[string]TopologyWatcherInterface, len(config.healthCheckCells)) for _, cell := range config.healthCheckCells { - result.topologyWatchers = append( - result.topologyWatchers, - topologyWatcherFactory( - topoServer, - result.healthCheck, - cell, - target.Keyspace, - target.Shard, - discovery.DefaultTopologyWatcherRefreshInterval, - discovery.DefaultTopoReadConcurrency)) + result.topologyWatchers[cell] = topologyWatcherFactory( + txThrottler.topoServer, + result.healthCheck, + cell, + target.Keyspace, + target.Shard, + discovery.DefaultTopologyWatcherRefreshInterval, + discovery.DefaultTopoReadConcurrency, + ) + result.txThrottler.topoWatchers.Add(cell, 1) } return result, nil } @@ -372,8 +380,9 @@ func (ts *txThrottlerState) deallocateResources() { // We don't really need to nil out the fields here // as deallocateResources is not expected to be called // more than once, but it doesn't hurt to do so. - for _, watcher := range ts.topologyWatchers { + for cell, watcher := range ts.topologyWatchers { watcher.Stop() + ts.txThrottler.topoWatchers.Reset(cell) } ts.topologyWatchers = nil @@ -392,11 +401,15 @@ func (ts *txThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) { return } + tabletType := tabletStats.Target.TabletType + ts.txThrottler.healthChecksReadTotal.Add(tabletType.String(), 1) + // Monitor tablets for replication lag if they have a tablet // type specified by the --tx_throttler_tablet_types flag. for _, expectedTabletType := range *ts.config.tabletTypes { - if tabletStats.Target.TabletType == expectedTabletType { + if tabletType == expectedTabletType { ts.throttler.RecordReplicationLag(time.Now(), tabletStats) + ts.txThrottler.healthChecksRecordedTotal.Add(tabletType.String(), 1) return } } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 97138e3928c..65876442e3e 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -124,12 +124,15 @@ func TestEnabledThrottler(t *testing.T) { }) assert.Nil(t, throttler.Open()) assert.Equal(t, int64(1), throttler.throttlerRunning.Get()) + assert.Equal(t, map[string]int64{"cell1": 1, "cell2": 1}, throttler.topoWatchers.Counts()) assert.False(t, throttler.Throttle(100)) assert.Equal(t, int64(1), throttler.requestsTotal.Get()) assert.Zero(t, throttler.requestsThrottled.Get()) throttler.state.StatsUpdate(tabletStats) // This calls replication lag thing + assert.Equal(t, map[string]int64{"REPLICA": 1}, throttler.healthChecksReadTotal.Counts()) + assert.Equal(t, map[string]int64{"REPLICA": 1}, throttler.healthChecksRecordedTotal.Counts()) rdonlyTabletStats := &discovery.TabletHealth{ Target: &querypb.Target{ TabletType: topodatapb.TabletType_RDONLY, @@ -137,6 +140,8 @@ func TestEnabledThrottler(t *testing.T) { } // This call should not be forwarded to the go/vt/throttler.Throttler object. throttler.state.StatsUpdate(rdonlyTabletStats) + assert.Equal(t, map[string]int64{"REPLICA": 1, "RDONLY": 1}, throttler.healthChecksReadTotal.Counts()) + assert.Equal(t, map[string]int64{"REPLICA": 1}, throttler.healthChecksRecordedTotal.Counts()) // The second throttle call should reject. assert.True(t, throttler.Throttle(100)) assert.Equal(t, int64(2), throttler.requestsTotal.Get()) @@ -148,6 +153,7 @@ func TestEnabledThrottler(t *testing.T) { assert.Equal(t, int64(1), throttler.requestsThrottled.Get()) throttler.Close() assert.Zero(t, throttler.throttlerRunning.Get()) + assert.Equal(t, map[string]int64{"cell1": 0, "cell2": 0}, throttler.topoWatchers.Counts()) } func TestNewTxThrottler(t *testing.T) { From c77efb8ce4e5c12c35839027dcf1114a27ae895b Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Fri, 26 May 2023 12:35:04 +0200 Subject: [PATCH 2/3] Add cell to labels Signed-off-by: Tim Vaillancourt --- .../tabletserver/txthrottler/tx_throttler.go | 67 ++++++++++--------- .../txthrottler/tx_throttler_test.go | 11 +-- 2 files changed, 43 insertions(+), 35 deletions(-) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 42a61cc1b64..e06d065cf0a 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -33,11 +33,11 @@ import ( "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/throttler" "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" querypb "vitess.io/vitess/go/vt/proto/query" throttlerdatapb "vitess.io/vitess/go/vt/proto/throttlerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) // These vars store the functions used to create the topo server, healthcheck, @@ -65,6 +65,10 @@ func resetTxThrottlerFactories() { } } +func init() { + resetTxThrottlerFactories() +} + // TxThrottler defines the interface for the transaction throttler. type TxThrottler interface { InitDBConfig(target *querypb.Target) @@ -73,10 +77,6 @@ type TxThrottler interface { Throttle(priority int) (result bool) } -func init() { - resetTxThrottlerFactories() -} - // ThrottlerInterface defines the public interface that is implemented by go/vt/throttler.Throttler // It is only used here to allow mocking out a throttler object. type ThrottlerInterface interface { @@ -146,8 +146,8 @@ type txThrottler struct { // stats throttlerRunning *stats.Gauge topoWatchers *stats.GaugesWithSingleLabel - healthChecksReadTotal *stats.CountersWithSingleLabel - healthChecksRecordedTotal *stats.CountersWithSingleLabel + healthChecksReadTotal *stats.CountersWithMultiLabels + healthChecksRecordedTotal *stats.CountersWithMultiLabels requestsTotal *stats.Counter requestsThrottled *stats.Counter } @@ -166,7 +166,7 @@ type txThrottlerConfig struct { healthCheckCells []string // tabletTypes stores the tablet types for throttling - tabletTypes *topoproto.TabletTypeListFlag + tabletTypes map[topodatapb.TabletType]bool } // txThrottlerState holds the state of an open TxThrottler object. @@ -222,9 +222,14 @@ func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*txThrott healthCheckCells := make([]string, len(env.Config().TxThrottlerHealthCheckCells)) copy(healthCheckCells, env.Config().TxThrottlerHealthCheckCells) + tabletTypes := make(map[topodatapb.TabletType]bool, len(*env.Config().TxThrottlerTabletTypes)) + for _, tabletType := range *env.Config().TxThrottlerTabletTypes { + tabletTypes[tabletType] = true + } + return newTxThrottler(env, topoServer, &txThrottlerConfig{ enabled: true, - tabletTypes: env.Config().TxThrottlerTabletTypes, + tabletTypes: tabletTypes, throttlerConfig: &throttlerConfig, healthCheckCells: healthCheckCells, }) @@ -242,14 +247,16 @@ func newTxThrottler(env tabletenv.Env, topoServer *topo.Server, config *txThrott } } return &txThrottler{ - config: config, - topoServer: topoServer, - throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"), - topoWatchers: env.Exporter().NewGaugesWithSingleLabel("TransactionThrottlerTopoWatchers", "transaction throttler topology watchers", "cell"), - healthChecksReadTotal: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerHealthchecksRead", "transaction throttler healthchecks read", "DbType"), - healthChecksRecordedTotal: env.Exporter().NewCountersWithSingleLabel("TransactionThrottlerHealthchecksRecorded", "transaction throttler healthchecks recorded", "DbType"), - requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"), - requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"), + config: config, + topoServer: topoServer, + throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"), + topoWatchers: env.Exporter().NewGaugesWithSingleLabel("TransactionThrottlerTopoWatchers", "transaction throttler topology watchers", "cell"), + healthChecksReadTotal: env.Exporter().NewCountersWithMultiLabels("TransactionThrottlerHealthchecksRead", "transaction throttler healthchecks read", + []string{"cell", "DbType"}), + healthChecksRecordedTotal: env.Exporter().NewCountersWithMultiLabels("TransactionThrottlerHealthchecksRecorded", "transaction throttler healthchecks recorded", + []string{"cell", "DbType"}), + requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"), + requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"), }, nil } @@ -324,28 +331,28 @@ func newTxThrottlerState(txThrottler *txThrottler, config *txThrottlerConfig, ta t.Close() return nil, err } - result := &txThrottlerState{ + state := &txThrottlerState{ config: config, throttler: t, txThrottler: txThrottler, } - createTxThrottlerHealthCheck(txThrottler.topoServer, config, result, target.Cell) + createTxThrottlerHealthCheck(txThrottler.topoServer, config, state, target.Cell) - result.topologyWatchers = make( + state.topologyWatchers = make( map[string]TopologyWatcherInterface, len(config.healthCheckCells)) for _, cell := range config.healthCheckCells { - result.topologyWatchers[cell] = topologyWatcherFactory( + state.topologyWatchers[cell] = topologyWatcherFactory( txThrottler.topoServer, - result.healthCheck, + state.healthCheck, cell, target.Keyspace, target.Shard, discovery.DefaultTopologyWatcherRefreshInterval, discovery.DefaultTopoReadConcurrency, ) - result.txThrottler.topoWatchers.Add(cell, 1) + txThrottler.topoWatchers.Add(cell, 1) } - return result, nil + return state, nil } func createTxThrottlerHealthCheck(topoServer *topo.Server, config *txThrottlerConfig, result *txThrottlerState, cell string) { @@ -402,15 +409,13 @@ func (ts *txThrottlerState) StatsUpdate(tabletStats *discovery.TabletHealth) { } tabletType := tabletStats.Target.TabletType - ts.txThrottler.healthChecksReadTotal.Add(tabletType.String(), 1) + metricLabels := []string{tabletStats.Target.Cell, tabletType.String()} + ts.txThrottler.healthChecksReadTotal.Add(metricLabels, 1) // Monitor tablets for replication lag if they have a tablet // type specified by the --tx_throttler_tablet_types flag. - for _, expectedTabletType := range *ts.config.tabletTypes { - if tabletType == expectedTabletType { - ts.throttler.RecordReplicationLag(time.Now(), tabletStats) - ts.txThrottler.healthChecksRecordedTotal.Add(tabletType.String(), 1) - return - } + if ts.config.tabletTypes[tabletType] { + ts.throttler.RecordReplicationLag(time.Now(), tabletStats) + ts.txThrottler.healthChecksRecordedTotal.Add(metricLabels, 1) } } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 65876442e3e..a8d3920c0ef 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -93,6 +93,7 @@ func TestEnabledThrottler(t *testing.T) { call1.Return(0 * time.Second) tabletStats := &discovery.TabletHealth{ Target: &querypb.Target{ + Cell: "cell1", TabletType: topodatapb.TabletType_REPLICA, }, } @@ -119,6 +120,7 @@ func TestEnabledThrottler(t *testing.T) { throttler, err := tryCreateTxThrottler(env, ts) assert.Nil(t, err) throttler.InitDBConfig(&querypb.Target{ + Cell: "cell1", Keyspace: "keyspace", Shard: "shard", }) @@ -131,17 +133,18 @@ func TestEnabledThrottler(t *testing.T) { assert.Zero(t, throttler.requestsThrottled.Get()) throttler.state.StatsUpdate(tabletStats) // This calls replication lag thing - assert.Equal(t, map[string]int64{"REPLICA": 1}, throttler.healthChecksReadTotal.Counts()) - assert.Equal(t, map[string]int64{"REPLICA": 1}, throttler.healthChecksRecordedTotal.Counts()) + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttler.healthChecksReadTotal.Counts()) + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttler.healthChecksRecordedTotal.Counts()) rdonlyTabletStats := &discovery.TabletHealth{ Target: &querypb.Target{ + Cell: "cell2", TabletType: topodatapb.TabletType_RDONLY, }, } // This call should not be forwarded to the go/vt/throttler.Throttler object. throttler.state.StatsUpdate(rdonlyTabletStats) - assert.Equal(t, map[string]int64{"REPLICA": 1, "RDONLY": 1}, throttler.healthChecksReadTotal.Counts()) - assert.Equal(t, map[string]int64{"REPLICA": 1}, throttler.healthChecksRecordedTotal.Counts()) + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1, "cell2.RDONLY": 1}, throttler.healthChecksReadTotal.Counts()) + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttler.healthChecksRecordedTotal.Counts()) // The second throttle call should reject. assert.True(t, throttler.Throttle(100)) assert.Equal(t, int64(2), throttler.requestsTotal.Get()) From 823543d07a1285e2c51e0079f482c23bdeb5fe71 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Sun, 18 Jun 2023 08:51:33 +0200 Subject: [PATCH 3/3] Fix merge conflict Signed-off-by: Tim Vaillancourt --- .../tabletserver/txthrottler/tx_throttler.go | 9 +++++++-- .../tabletserver/txthrottler/tx_throttler_test.go | 12 ++++++------ 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index f15383cd771..e25e4c0da89 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -195,9 +195,14 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler { // is immutable. healthCheckCells := env.Config().TxThrottlerHealthCheckCells + tabletTypes := make(map[topodatapb.TabletType]bool, len(*env.Config().TxThrottlerTabletTypes)) + for _, tabletType := range *env.Config().TxThrottlerTabletTypes { + tabletTypes[tabletType] = true + } + throttlerConfig = &txThrottlerConfig{ enabled: true, - tabletTypes: env.Config().TxThrottlerTabletTypes, + tabletTypes: tabletTypes, throttlerConfig: env.Config().TxThrottlerConfig.Get(), healthCheckCells: healthCheckCells, } @@ -206,7 +211,7 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler { } return &txThrottler{ - config: config, + config: throttlerConfig, topoServer: topoServer, throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"), topoWatchers: env.Exporter().NewGaugesWithSingleLabel("TransactionThrottlerTopoWatchers", "transaction throttler topology watchers", "cell"), diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 08f72364296..9c9c725e1fd 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -127,15 +127,15 @@ func TestEnabledThrottler(t *testing.T) { assert.Nil(t, throttlerImpl.Open()) assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get()) - assert.Equal(t, map[string]int64{"cell1": 1, "cell2": 1}, throttler.topoWatchers.Counts()) + assert.Equal(t, map[string]int64{"cell1": 1, "cell2": 1}, throttlerImpl.topoWatchers.Counts()) assert.False(t, throttlerImpl.Throttle(100)) assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Get()) assert.Zero(t, throttlerImpl.requestsThrottled.Get()) throttlerImpl.state.StatsUpdate(tabletStats) // This calls replication lag thing - assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttler.healthChecksReadTotal.Counts()) - assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttler.healthChecksRecordedTotal.Counts()) + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksReadTotal.Counts()) + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts()) rdonlyTabletStats := &discovery.TabletHealth{ Target: &querypb.Target{ Cell: "cell2", @@ -144,8 +144,8 @@ func TestEnabledThrottler(t *testing.T) { } // This call should not be forwarded to the go/vt/throttlerImpl.Throttler object. throttlerImpl.state.StatsUpdate(rdonlyTabletStats) - assert.Equal(t, map[string]int64{"cell1.REPLICA": 1, "cell2.RDONLY": 1}, throttler.healthChecksReadTotal.Counts()) - assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttler.healthChecksRecordedTotal.Counts()) + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1, "cell2.RDONLY": 1}, throttlerImpl.healthChecksReadTotal.Counts()) + assert.Equal(t, map[string]int64{"cell1.REPLICA": 1}, throttlerImpl.healthChecksRecordedTotal.Counts()) // The second throttle call should reject. assert.True(t, throttlerImpl.Throttle(100)) @@ -158,7 +158,7 @@ func TestEnabledThrottler(t *testing.T) { assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Get()) throttlerImpl.Close() assert.Zero(t, throttlerImpl.throttlerRunning.Get()) - assert.Equal(t, map[string]int64{"cell1": 0, "cell2": 0}, throttlerImpl.topoWatchers.Counts()) + assert.Equal(t, map[string]int64{"cell1": 0, "cell2": 0}, throttlerImpl.topoWatchers.Counts()) } func TestNewTxThrottler(t *testing.T) {