From 392456653407f5d71e52fdb148f57fb0f9be13cf Mon Sep 17 00:00:00 2001 From: Deepthi Sigireddi Date: Wed, 1 Nov 2023 07:00:47 -0700 Subject: [PATCH] tx throttler: remove unused topology watchers (#14412) Signed-off-by: deepthi --- changelog/19.0/19.0.0/summary.md | 1 + go/vt/discovery/replicationlag.go | 30 ++++---- go/vt/discovery/topology_watcher.go | 70 +++++++++---------- .../txthrottler/mock_topology_watcher_test.go | 58 --------------- .../tabletserver/txthrottler/tx_throttler.go | 42 +++-------- .../txthrottler/tx_throttler_test.go | 13 ---- 6 files changed, 60 insertions(+), 154 deletions(-) delete mode 100644 go/vt/vttablet/tabletserver/txthrottler/mock_topology_watcher_test.go diff --git a/changelog/19.0/19.0.0/summary.md b/changelog/19.0/19.0.0/summary.md index 5d413c25cae..d9f655ecbc2 100644 --- a/changelog/19.0/19.0.0/summary.md +++ b/changelog/19.0/19.0.0/summary.md @@ -12,6 +12,7 @@ ### Deprecations and Deletions - The `MYSQL_FLAVOR` environment variable is now removed from all Docker Images. +- VTTablet metrics for TxThrottler's topology watchers have been deprecated. They will be deleted in the next release. ### Docker diff --git a/go/vt/discovery/replicationlag.go b/go/vt/discovery/replicationlag.go index e7afa5ca844..9592440196a 100644 --- a/go/vt/discovery/replicationlag.go +++ b/go/vt/discovery/replicationlag.go @@ -111,13 +111,13 @@ func SetMinNumTablets(numTablets int) { minNumTablets.Set(numTablets) } -// IsReplicationLagHigh verifies that the given LegacytabletHealth refers to a tablet with high +// IsReplicationLagHigh verifies that the given TabletHealth refers to a tablet with high // replication lag, i.e. higher than the configured discovery_low_replication_lag flag. func IsReplicationLagHigh(tabletHealth *TabletHealth) bool { return float64(tabletHealth.Stats.ReplicationLagSeconds) > lowReplicationLag.Get().Seconds() } -// IsReplicationLagVeryHigh verifies that the given LegacytabletHealth refers to a tablet with very high +// IsReplicationLagVeryHigh verifies that the given TabletHealth refers to a tablet with very high // replication lag, i.e. higher than the configured discovery_high_replication_lag_minimum_serving flag. func IsReplicationLagVeryHigh(tabletHealth *TabletHealth) bool { return float64(tabletHealth.Stats.ReplicationLagSeconds) > highReplicationLagMinServing.Get().Seconds() @@ -153,7 +153,7 @@ func FilterStatsByReplicationLag(tabletHealthList []*TabletHealth) []*TabletHeal return filterStatsByLag(tabletHealthList) } res := filterStatsByLagWithLegacyAlgorithm(tabletHealthList) - // run the filter again if exactly one tablet is removed, + // Run the filter again if exactly one tablet is removed, // and we have spare tablets. if len(res) > minNumTablets.Get() && len(res) == len(tabletHealthList)-1 { res = filterStatsByLagWithLegacyAlgorithm(res) @@ -164,12 +164,12 @@ func FilterStatsByReplicationLag(tabletHealthList []*TabletHealth) []*TabletHeal func filterStatsByLag(tabletHealthList []*TabletHealth) []*TabletHealth { list := make([]tabletLagSnapshot, 0, len(tabletHealthList)) - // filter non-serving tablets and those with very high replication lag + // Filter out non-serving tablets and those with very high replication lag. for _, ts := range tabletHealthList { if !ts.Serving || ts.LastError != nil || ts.Stats == nil || IsReplicationLagVeryHigh(ts) { continue } - // Pull the current replication lag for a stable sort later. + // Save the current replication lag for a stable sort later. list = append(list, tabletLagSnapshot{ ts: ts, replag: ts.Stats.ReplicationLagSeconds}) @@ -178,7 +178,7 @@ func filterStatsByLag(tabletHealthList []*TabletHealth) []*TabletHealth { // Sort by replication lag. sort.Sort(tabletLagSnapshotList(list)) - // Pick those with low replication lag, but at least minNumTablets tablets regardless. + // Pick tablets with low replication lag, but at least minNumTablets tablets regardless. res := make([]*TabletHealth, 0, len(list)) for i := 0; i < len(list); i++ { if !IsReplicationLagHigh(list[i].ts) || i < minNumTablets.Get() { @@ -190,7 +190,7 @@ func filterStatsByLag(tabletHealthList []*TabletHealth) []*TabletHealth { func filterStatsByLagWithLegacyAlgorithm(tabletHealthList []*TabletHealth) []*TabletHealth { list := make([]*TabletHealth, 0, len(tabletHealthList)) - // filter non-serving tablets + // Filter out non-serving tablets. for _, ts := range tabletHealthList { if !ts.Serving || ts.LastError != nil || ts.Stats == nil { continue @@ -200,7 +200,7 @@ func filterStatsByLagWithLegacyAlgorithm(tabletHealthList []*TabletHealth) []*Ta if len(list) <= 1 { return list } - // if all have low replication lag (<=30s), return all tablets. + // If all tablets have low replication lag (<=30s), return all of them. allLowLag := true for _, ts := range list { if IsReplicationLagHigh(ts) { @@ -211,12 +211,12 @@ func filterStatsByLagWithLegacyAlgorithm(tabletHealthList []*TabletHealth) []*Ta if allLowLag { return list } - // filter those affecting "mean" lag significantly - // calculate mean for all tablets + // We want to filter out tablets that are affecting "mean" lag significantly. + // We first calculate the mean across all tablets. res := make([]*TabletHealth, 0, len(list)) m, _ := mean(list, -1) for i, ts := range list { - // calculate mean by excluding ith tablet + // Now we calculate the mean by excluding ith tablet mi, _ := mean(list, i) if float64(mi) > float64(m)*0.7 { res = append(res, ts) @@ -225,9 +225,11 @@ func filterStatsByLagWithLegacyAlgorithm(tabletHealthList []*TabletHealth) []*Ta if len(res) >= minNumTablets.Get() { return res } - // return at least minNumTablets tablets to avoid over loading, - // if there is enough tablets with replication lag < highReplicationLagMinServing. - // Pull the current replication lag for a stable sort. + + // We want to return at least minNumTablets tablets to avoid overloading, + // as long as there are enough tablets with replication lag < highReplicationLagMinServing. + + // Save the current replication lag for a stable sort. snapshots := make([]tabletLagSnapshot, 0, len(list)) for _, ts := range list { if !IsReplicationLagVeryHigh(ts) { diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index d1bd2d3acf8..b3298f55700 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -53,15 +53,15 @@ var ( "Operation", topologyWatcherOpListTablets, topologyWatcherOpGetTablet) ) -// tabletInfo is used internally by the TopologyWatcher class +// tabletInfo is used internally by the TopologyWatcher struct. type tabletInfo struct { alias string tablet *topodata.Tablet } -// TopologyWatcher polls tablet from a configurable set of tablets -// periodically. When tablets are added / removed, it calls -// the LegacyTabletRecorder AddTablet / RemoveTablet interface appropriately. +// TopologyWatcher polls the topology periodically for changes to +// the set of tablets. When tablets are added / removed / modified, +// it calls the AddTablet / RemoveTablet interface appropriately. type TopologyWatcher struct { // set at construction time topoServer *topo.Server @@ -79,20 +79,21 @@ type TopologyWatcher struct { // mu protects all variables below mu sync.Mutex - // tablets contains a map of alias -> tabletInfo for all known tablets + // tablets contains a map of alias -> tabletInfo for all known tablets. tablets map[string]*tabletInfo - // topoChecksum stores a crc32 of the tablets map and is exported as a metric + // topoChecksum stores a crc32 of the tablets map and is exported as a metric. topoChecksum uint32 - // lastRefresh records the timestamp of the last topo refresh + // lastRefresh records the timestamp of the last refresh of the topology. lastRefresh time.Time - // firstLoadDone is true when first load of the topology data is done. + // firstLoadDone is true when the initial load of the topology data is complete. firstLoadDone bool - // firstLoadChan is closed when the initial loading of topology data is done. + // firstLoadChan is closed when the initial load of topology data is complete. firstLoadChan chan struct{} } // NewTopologyWatcher returns a TopologyWatcher that monitors all -// the tablets in a cell, and starts refreshing. +// the tablets that it is configured to watch, and reloads them periodically if needed. +// As of now there is only one implementation: watch all tablets in a cell. func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher { tw := &TopologyWatcher{ topoServer: topoServer, @@ -114,14 +115,14 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC } // NewCellTabletsWatcher returns a TopologyWatcher that monitors all -// the tablets in a cell, and starts refreshing. +// the tablets in a cell, and reloads them as needed. func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher { return NewTopologyWatcher(ctx, topoServer, hc, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) { return tw.topoServer.GetTabletAliasesByCell(ctx, tw.cell) }) } -// Start starts the topology watcher +// Start starts the topology watcher. func (tw *TopologyWatcher) Start() { tw.wg.Add(1) go func(t *TopologyWatcher) { @@ -140,7 +141,7 @@ func (tw *TopologyWatcher) Start() { }(tw) } -// Stop stops the watcher. It does not clean up the tablets added to LegacyTabletRecorder. +// Stop stops the watcher. It does not clean up the tablets added to HealthCheck. func (tw *TopologyWatcher) Stop() { tw.cancelFunc() // wait for watch goroutine to finish. @@ -151,7 +152,7 @@ func (tw *TopologyWatcher) loadTablets() { var wg sync.WaitGroup newTablets := make(map[string]*tabletInfo) - // first get the list of relevant tabletAliases + // First get the list of relevant tabletAliases. tabletAliases, err := tw.getTablets(tw) topologyWatcherOperations.Add(topologyWatcherOpListTablets, 1) if err != nil { @@ -166,7 +167,7 @@ func (tw *TopologyWatcher) loadTablets() { } // Accumulate a list of all known alias strings to use later - // when sorting + // when sorting. tabletAliasStrs := make([]string, 0, len(tabletAliases)) tw.mu.Lock() @@ -175,7 +176,7 @@ func (tw *TopologyWatcher) loadTablets() { tabletAliasStrs = append(tabletAliasStrs, aliasStr) if !tw.refreshKnownTablets { - // we already have a tabletInfo for this and the flag tells us to not refresh + // We already have a tabletInfo for this and the flag tells us to not refresh. if val, ok := tw.tablets[aliasStr]; ok { newTablets[aliasStr] = val continue @@ -188,7 +189,7 @@ func (tw *TopologyWatcher) loadTablets() { tw.sem <- 1 // Wait for active queue to drain. tablet, err := tw.topoServer.GetTablet(tw.ctx, alias) topologyWatcherOperations.Add(topologyWatcherOpGetTablet, 1) - <-tw.sem // Done; enable next request to run + <-tw.sem // Done; enable next request to run. if err != nil { topologyWatcherErrors.Add(topologyWatcherOpGetTablet, 1) select { @@ -218,7 +219,7 @@ func (tw *TopologyWatcher) loadTablets() { continue } - // trust the alias from topo and add it if it doesn't exist + // Trust the alias from topo and add it if it doesn't exist. if val, ok := tw.tablets[alias]; ok { // check if the host and port have changed. If yes, replace tablet. oldKey := TabletToMapKey(val.tablet) @@ -230,7 +231,7 @@ func (tw *TopologyWatcher) loadTablets() { topologyWatcherOperations.Add(topologyWatcherOpReplaceTablet, 1) } } else { - // This is a new tablet record, let's add it to the healthcheck + // This is a new tablet record, let's add it to the HealthCheck. tw.healthcheck.AddTablet(newVal.tablet) topologyWatcherOperations.Add(topologyWatcherOpAddTablet, 1) } @@ -252,8 +253,8 @@ func (tw *TopologyWatcher) loadTablets() { close(tw.firstLoadChan) } - // iterate through the tablets in a stable order and compute a - // checksum of the tablet map + // Iterate through the tablets in a stable order and compute a + // checksum of the tablet map. sort.Strings(tabletAliasStrs) var buf bytes.Buffer for _, alias := range tabletAliasStrs { @@ -269,7 +270,7 @@ func (tw *TopologyWatcher) loadTablets() { } -// RefreshLag returns the time since the last refresh +// RefreshLag returns the time since the last refresh. func (tw *TopologyWatcher) RefreshLag() time.Duration { tw.mu.Lock() defer tw.mu.Unlock() @@ -277,7 +278,7 @@ func (tw *TopologyWatcher) RefreshLag() time.Duration { return time.Since(tw.lastRefresh) } -// TopoChecksum returns the checksum of the current state of the topo +// TopoChecksum returns the checksum of the current state of the topo. func (tw *TopologyWatcher) TopoChecksum() uint32 { tw.mu.Lock() defer tw.mu.Unlock() @@ -286,7 +287,7 @@ func (tw *TopologyWatcher) TopoChecksum() uint32 { } // TabletFilter is an interface that can be given to a TopologyWatcher -// to be applied as an additional filter on the list of tablets returned by its getTablets function +// to be applied as an additional filter on the list of tablets returned by its getTablets function. type TabletFilter interface { // IsIncluded returns whether tablet is included in this filter IsIncluded(tablet *topodata.Tablet) bool @@ -300,18 +301,18 @@ type FilterByShard struct { } // filterShard describes a filter for a given shard or keyrange inside -// a keyspace +// a keyspace. type filterShard struct { keyspace string shard string keyRange *topodata.KeyRange // only set if shard is also a KeyRange } -// NewFilterByShard creates a new FilterByShard on top of an existing -// LegacyTabletRecorder. Each filter is a keyspace|shard entry, where shard +// NewFilterByShard creates a new FilterByShard for use by a +// TopologyWatcher. Each filter is a keyspace|shard entry, where shard // can either be a shard name, or a keyrange. All tablets that match -// at least one keyspace|shard tuple will be forwarded to the -// underlying LegacyTabletRecorder. +// at least one keyspace|shard tuple will be forwarded by the +// TopologyWatcher to its consumer. func NewFilterByShard(filters []string) (*FilterByShard, error) { m := make(map[string][]*filterShard) for _, filter := range filters { @@ -348,8 +349,7 @@ func NewFilterByShard(filters []string) (*FilterByShard, error) { }, nil } -// IsIncluded returns true iff the tablet's keyspace and shard should be -// forwarded to the underlying LegacyTabletRecorder. +// IsIncluded returns true iff the tablet's keyspace and shard match what we have. func (fbs *FilterByShard) IsIncluded(tablet *topodata.Tablet) bool { canonical, kr, err := topo.ValidateShardName(tablet.Shard) if err != nil { @@ -370,15 +370,14 @@ func (fbs *FilterByShard) IsIncluded(tablet *topodata.Tablet) bool { return false } -// FilterByKeyspace is a filter that filters tablets by -// keyspace +// FilterByKeyspace is a filter that filters tablets by keyspace. type FilterByKeyspace struct { keyspaces map[string]bool } // NewFilterByKeyspace creates a new FilterByKeyspace. // Each filter is a keyspace entry. All tablets that match -// a keyspace will be forwarded to the underlying LegacyTabletRecorder. +// a keyspace will be forwarded to the TopologyWatcher's consumer. func NewFilterByKeyspace(selectedKeyspaces []string) *FilterByKeyspace { m := make(map[string]bool) for _, keyspace := range selectedKeyspaces { @@ -390,8 +389,7 @@ func NewFilterByKeyspace(selectedKeyspaces []string) *FilterByKeyspace { } } -// IsIncluded returns true if the tablet's keyspace should be -// forwarded to the underlying LegacyTabletRecorder. +// IsIncluded returns true if the tablet's keyspace matches what we have. func (fbk *FilterByKeyspace) IsIncluded(tablet *topodata.Tablet) bool { _, exist := fbk.keyspaces[tablet.Keyspace] return exist diff --git a/go/vt/vttablet/tabletserver/txthrottler/mock_topology_watcher_test.go b/go/vt/vttablet/tabletserver/txthrottler/mock_topology_watcher_test.go deleted file mode 100644 index 163c4c44d4d..00000000000 --- a/go/vt/vttablet/tabletserver/txthrottler/mock_topology_watcher_test.go +++ /dev/null @@ -1,58 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler (interfaces: TopologyWatcherInterface) - -// Package txthrottler is a generated GoMock package. -package txthrottler - -import ( - reflect "reflect" - - gomock "go.uber.org/mock/gomock" -) - -// MockTopologyWatcherInterface is a mock of TopologyWatcherInterface interface. -type MockTopologyWatcherInterface struct { - ctrl *gomock.Controller - recorder *MockTopologyWatcherInterfaceMockRecorder -} - -// MockTopologyWatcherInterfaceMockRecorder is the mock recorder for MockTopologyWatcherInterface. -type MockTopologyWatcherInterfaceMockRecorder struct { - mock *MockTopologyWatcherInterface -} - -// NewMockTopologyWatcherInterface creates a new mock instance. -func NewMockTopologyWatcherInterface(ctrl *gomock.Controller) *MockTopologyWatcherInterface { - mock := &MockTopologyWatcherInterface{ctrl: ctrl} - mock.recorder = &MockTopologyWatcherInterfaceMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockTopologyWatcherInterface) EXPECT() *MockTopologyWatcherInterfaceMockRecorder { - return m.recorder -} - -// Start mocks base method. -func (m *MockTopologyWatcherInterface) Start() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Start") -} - -// Start indicates an expected call of Start. -func (mr *MockTopologyWatcherInterfaceMockRecorder) Start() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockTopologyWatcherInterface)(nil).Start)) -} - -// Stop mocks base method. -func (m *MockTopologyWatcherInterface) Stop() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Stop") -} - -// Stop indicates an expected call of Stop. -func (mr *MockTopologyWatcherInterfaceMockRecorder) Stop() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockTopologyWatcherInterface)(nil).Stop)) -} diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 18dede5f30a..f78c65a4587 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -38,25 +38,20 @@ import ( ) // These vars store the functions used to create the topo server, healthcheck, -// topology watchers and go/vt/throttler. These are provided here so that they can be overridden +// and go/vt/throttler. These are provided here so that they can be overridden // in tests to generate mocks. type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck -type topologyWatcherFactoryFunc func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) var ( - healthCheckFactory healthCheckFactoryFunc - topologyWatcherFactory topologyWatcherFactoryFunc - throttlerFactory throttlerFactoryFunc + healthCheckFactory healthCheckFactoryFunc + throttlerFactory throttlerFactoryFunc ) func resetTxThrottlerFactories() { healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck { return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ",")) } - topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface { - return discovery.NewCellTabletsWatcher(context.Background(), topoServer, hc, discovery.NewFilterByKeyspace([]string{keyspace}), cell, refreshInterval, true, topoReadConcurrency) - } throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) { return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now) } @@ -149,7 +144,8 @@ type txThrottler struct { topoServer *topo.Server // stats - throttlerRunning *stats.Gauge + throttlerRunning *stats.Gauge + // TODO(deepthi): deprecated, should be deleted in v20 topoWatchers *stats.GaugesWithSingleLabel healthChecksReadTotal *stats.CountersWithMultiLabels healthChecksRecordedTotal *stats.CountersWithMultiLabels @@ -170,10 +166,9 @@ type txThrottlerStateImpl struct { // throttleMu serializes calls to throttler.Throttler.Throttle(threadId). // That method is required to be called in serial for each threadId. - throttleMu sync.Mutex - throttler ThrottlerInterface - stopHealthCheck context.CancelFunc - topologyWatchers map[string]TopologyWatcherInterface + throttleMu sync.Mutex + throttler ThrottlerInterface + stopHealthCheck context.CancelFunc healthCheck discovery.HealthCheck healthCheckChan chan *discovery.TabletHealth @@ -204,7 +199,7 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler { config: config, topoServer: topoServer, throttlerRunning: env.Exporter().NewGauge(TxThrottlerName+"Running", "transaction throttler running state"), - topoWatchers: env.Exporter().NewGaugesWithSingleLabel(TxThrottlerName+"TopoWatchers", "transaction throttler topology watchers", "cell"), + topoWatchers: env.Exporter().NewGaugesWithSingleLabel(TxThrottlerName+"TopoWatchers", "DEPRECATED: transaction throttler topology watchers", "cell"), healthChecksReadTotal: env.Exporter().NewCountersWithMultiLabels(TxThrottlerName+"HealthchecksRead", "transaction throttler healthchecks read", []string{"cell", "DbType"}), healthChecksRecordedTotal: env.Exporter().NewCountersWithMultiLabels(TxThrottlerName+"HealthchecksRecorded", "transaction throttler healthchecks recorded", @@ -322,31 +317,12 @@ func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, t ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.healthCheckCells) ts.healthCheckChan = ts.healthCheck.Subscribe() - ts.topologyWatchers = make( - map[string]TopologyWatcherInterface, len(ts.healthCheckCells)) - for _, cell := range ts.healthCheckCells { - ts.topologyWatchers[cell] = topologyWatcherFactory( - topoServer, - ts.healthCheck, - cell, - target.Keyspace, - target.Shard, - discovery.DefaultTopologyWatcherRefreshInterval, - discovery.DefaultTopoReadConcurrency, - ) - ts.txThrottler.topoWatchers.Add(cell, 1) - } } func (ts *txThrottlerStateImpl) closeHealthCheckStream() { if ts.healthCheck == nil { return } - for cell, watcher := range ts.topologyWatchers { - watcher.Stop() - ts.txThrottler.topoWatchers.Reset(cell) - } - ts.topologyWatchers = nil ts.stopHealthCheck() ts.healthCheck.Close() } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index ea57d37ad8e..268a37437d9 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -19,7 +19,6 @@ package txthrottler // Commands to generate the mocks for this test. //go:generate mockgen -destination mock_healthcheck_test.go -package txthrottler -mock_names "HealthCheck=MockHealthCheck" vitess.io/vitess/go/vt/discovery HealthCheck //go:generate mockgen -destination mock_throttler_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler ThrottlerInterface -//go:generate mockgen -destination mock_topology_watcher_test.go -package txthrottler vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler TopologyWatcherInterface import ( "context" @@ -74,16 +73,6 @@ func TestEnabledThrottler(t *testing.T) { return mockHealthCheck } - topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface { - assert.Equal(t, ts, topoServer) - assert.Contains(t, []string{"cell1", "cell2"}, cell) - assert.Equal(t, "keyspace", keyspace) - assert.Equal(t, "shard", shard) - result := NewMockTopologyWatcherInterface(mockCtrl) - result.EXPECT().Stop() - return result - } - mockThrottler := NewMockThrottlerInterface(mockCtrl) throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) { assert.Equal(t, 1, threadCount) @@ -131,7 +120,6 @@ func TestEnabledThrottler(t *testing.T) { throttlerStateImpl := throttlerImpl.state.(*txThrottlerStateImpl) assert.Equal(t, map[topodatapb.TabletType]bool{topodatapb.TabletType_REPLICA: true}, throttlerStateImpl.tabletTypes) assert.Equal(t, int64(1), throttlerImpl.throttlerRunning.Get()) - assert.Equal(t, map[string]int64{"cell1": 1, "cell2": 1}, throttlerImpl.topoWatchers.Counts()) assert.False(t, throttlerImpl.Throttle(100, "some_workload")) assert.Equal(t, int64(1), throttlerImpl.requestsTotal.Counts()["some_workload"]) @@ -162,7 +150,6 @@ func TestEnabledThrottler(t *testing.T) { assert.Equal(t, int64(1), throttlerImpl.requestsThrottled.Counts()["some_workload"]) throttlerImpl.Close() assert.Zero(t, throttlerImpl.throttlerRunning.Get()) - assert.Equal(t, map[string]int64{"cell1": 0, "cell2": 0}, throttlerImpl.topoWatchers.Counts()) } func TestFetchKnownCells(t *testing.T) {