diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 95e08290d53..d37b7419fd6 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -35,6 +35,7 @@ import ( "bytes" "context" "encoding/json" + "errors" "fmt" "hash/crc32" "net/http" @@ -105,6 +106,9 @@ var ( // HealthCheckHealthyTemplate uses healthCheckTemplate with the `HealthCheck Tablet - Healthy Tablets` title to // create the HTML code required to render the list of healthy tablets from the HealthCheck. HealthCheckHealthyTemplate = fmt.Sprintf(healthCheckTemplate, "HealthCheck - Healthy Tablets") + + // errKeyspacesToWatchAndTabletFilters is an error for cases where incompatible filters are defined. + errKeyspacesToWatchAndTabletFilters = errors.New("only one of --keyspaces_to_watch and --tablet_filters may be specified at a time") ) // See the documentation for NewHealthCheck below for an explanation of these parameters. @@ -301,6 +305,27 @@ type HealthCheckImpl struct { healthCheckDialSem *semaphore.Weighted } +// NewVTGateHealthCheckFilters returns healthcheck filters for vtgate. +func NewVTGateHealthCheckFilters() (filters TabletFilters, err error) { + if len(tabletFilters) > 0 { + if len(KeyspacesToWatch) > 0 { + return nil, errKeyspacesToWatchAndTabletFilters + } + + fbs, err := NewFilterByShard(tabletFilters) + if err != nil { + return nil, fmt.Errorf("failed to parse tablet_filters value %q: %v", strings.Join(tabletFilters, ","), err) + } + filters = append(filters, fbs) + } else if len(KeyspacesToWatch) > 0 { + filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch)) + } + if len(tabletFilterTags) > 0 { + filters = append(filters, NewFilterByTabletTags(tabletFilterTags)) + } + return filters, nil +} + // NewHealthCheck creates a new HealthCheck object. // Parameters: // retryDelay. @@ -322,10 +347,14 @@ type HealthCheckImpl struct { // // The localCell for this healthcheck // -// callback. +// cellsToWatch. // -// A function to call when there is a primary change. Used to notify vtgate's buffer to stop buffering. -func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string) *HealthCheckImpl { +// Is a list of cells to watch for tablets. +// +// filters. +// +// Is one or more filters to apply when determining what tablets we want to stream healthchecks from. +func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string, filters TabletFilter) *HealthCheckImpl { log.Infof("loading tablets for cells: %v", cellsToWatch) hc := &HealthCheckImpl{ @@ -353,22 +382,6 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur if c == "" { continue } - if len(tabletFilters) > 0 { - if len(KeyspacesToWatch) > 0 { - log.Exitf("Only one of -keyspaces_to_watch and -tablet_filters may be specified at a time") - } - - fbs, err := NewFilterByShard(tabletFilters) - if err != nil { - log.Exitf("Cannot parse tablet_filters parameter: %v", err) - } - filters = append(filters, fbs) - } else if len(KeyspacesToWatch) > 0 { - filters = append(filters, NewFilterByKeyspace(KeyspacesToWatch)) - } - if len(tabletFilterTags) > 0 { - filters = append(filters, NewFilterByTabletTags(tabletFilterTags)) - } topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency)) } diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go index 31376bd8c7d..fa7c497c146 100644 --- a/go/vt/discovery/healthcheck_test.go +++ b/go/vt/discovery/healthcheck_test.go @@ -63,6 +63,77 @@ func init() { refreshInterval = time.Minute } +func TestNewVTGateHealthCheckFilters(t *testing.T) { + defer func() { + KeyspacesToWatch = nil + tabletFilters = nil + tabletFilterTags = nil + }() + + testCases := []struct { + name string + keyspacesToWatch []string + tabletFilters []string + tabletFilterTags map[string]string + expectedError string + expectedFilterTypes []any + }{ + { + name: "noFilters", + }, + { + name: "tabletFilters", + tabletFilters: []string{"ks1|-80"}, + expectedFilterTypes: []any{&FilterByShard{}}, + }, + { + name: "keyspacesToWatch", + keyspacesToWatch: []string{"ks1"}, + expectedFilterTypes: []any{&FilterByKeyspace{}}, + }, + { + name: "tabletFiltersAndTags", + tabletFilters: []string{"ks1|-80"}, + tabletFilterTags: map[string]string{"test": "true"}, + expectedFilterTypes: []any{&FilterByShard{}, &FilterByTabletTags{}}, + }, + { + name: "keyspacesToWatchAndTags", + tabletFilterTags: map[string]string{"test": "true"}, + keyspacesToWatch: []string{"ks1"}, + expectedFilterTypes: []any{&FilterByKeyspace{}, &FilterByTabletTags{}}, + }, + { + name: "failKeyspacesToWatchAndFilters", + tabletFilters: []string{"ks1|-80"}, + keyspacesToWatch: []string{"ks1"}, + expectedError: errKeyspacesToWatchAndTabletFilters.Error(), + }, + { + name: "failInvalidTabletFilters", + tabletFilters: []string{"shouldfail|"}, + expectedError: "failed to parse tablet_filters value \"shouldfail|\": error parsing shard name : Code: INVALID_ARGUMENT\nempty name\n", + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + KeyspacesToWatch = testCase.keyspacesToWatch + tabletFilters = testCase.tabletFilters + tabletFilterTags = testCase.tabletFilterTags + + filters, err := NewVTGateHealthCheckFilters() + if testCase.expectedError != "" { + assert.EqualError(t, err, testCase.expectedError) + } + assert.Len(t, filters, len(testCase.expectedFilterTypes)) + for i, filter := range filters { + assert.IsType(t, testCase.expectedFilterTypes[i], filter) + } + }) + } +} + func TestHealthCheck(t *testing.T) { ctx := utils.LeakCheckContext(t) // reset error counters @@ -979,7 +1050,7 @@ func TestPrimaryInOtherCell(t *testing.T) { ts := memorytopo.NewServer(ctx, "cell1", "cell2") defer ts.Close() - hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2") + hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil) defer hc.Close() // add a tablet as primary in different cell @@ -1039,7 +1110,7 @@ func TestReplicaInOtherCell(t *testing.T) { ts := memorytopo.NewServer(ctx, "cell1", "cell2") defer ts.Close() - hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2") + hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil) defer hc.Close() // add a tablet as replica @@ -1144,7 +1215,7 @@ func TestCellAliases(t *testing.T) { ts := memorytopo.NewServer(ctx, "cell1", "cell2") defer ts.Close() - hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2") + hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell1", "cell1, cell2", nil) defer hc.Close() cellsAlias := &topodatapb.CellsAlias{ @@ -1295,7 +1366,7 @@ func tabletDialer(tablet *topodatapb.Tablet, _ grpcclient.FailFast) (queryservic } func createTestHc(ctx context.Context, ts *topo.Server) *HealthCheckImpl { - return NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell", "") + return NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, "cell", "", nil) } type fakeConn struct { diff --git a/go/vt/discovery/keyspace_events_test.go b/go/vt/discovery/keyspace_events_test.go index af60479a42b..b7124ec3c13 100644 --- a/go/vt/discovery/keyspace_events_test.go +++ b/go/vt/discovery/keyspace_events_test.go @@ -41,7 +41,7 @@ func TestSrvKeyspaceWithNilNewKeyspace(t *testing.T) { factory.AddCell(cell) ts := faketopo.NewFakeTopoServer(ctx, factory) ts2 := &fakeTopoServer{} - hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, cell, "") + hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, cell, "", nil) defer hc.Close() kew := NewKeyspaceEventWatcher(ctx, ts2, hc, cell) kss := &keyspaceState{ @@ -80,7 +80,7 @@ func TestKeyspaceEventTypes(t *testing.T) { factory.AddCell(cell) ts := faketopo.NewFakeTopoServer(ctx, factory) ts2 := &fakeTopoServer{} - hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, cell, "") + hc := NewHealthCheck(ctx, 1*time.Millisecond, time.Hour, ts, cell, "", nil) defer hc.Close() kew := NewKeyspaceEventWatcher(ctx, ts2, hc, cell) diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index 9de996564a0..775267af65c 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -122,10 +122,11 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) { defer ts.Close() fhc := NewFakeHealthCheck(nil) defer fhc.Close() + filter := NewFilterByKeyspace([]string{"keyspace"}) logger := logutil.NewMemoryLogger() topologyWatcherOperations.ZeroAll() counts := topologyWatcherOperations.Counts() - tw := NewTopologyWatcher(context.Background(), ts, fhc, nil, "aa", 10*time.Minute, refreshKnownTablets, 5) + tw := NewTopologyWatcher(context.Background(), ts, fhc, filter, "aa", 10*time.Minute, refreshKnownTablets, 5) counts = checkOpCounts(t, counts, map[string]int64{}) checkChecksum(t, tw, 0) @@ -172,10 +173,31 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) { require.NoError(t, ts.CreateTablet(context.Background(), tablet2), "CreateTablet failed for %v", tablet2.Alias) tw.loadTablets() + // Confirm second tablet triggers ListTablets + AddTablet calls. counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1}) checkChecksum(t, tw, 2762153755) - // Check the new tablet is returned by GetAllTablets(). + // Add a third tablet in a filtered keyspace to the topology. + tablet3 := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "aa", + Uid: 3, + }, + Hostname: "host3", + PortMap: map[string]int32{ + "vt": 789, + }, + Keyspace: "excluded", + Shard: "shard", + } + require.NoError(t, ts.CreateTablet(context.Background(), tablet3), "CreateTablet failed for %v", tablet3.Alias) + tw.loadTablets() + + // Confirm filtered tablet did not trigger an AddTablet call. + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 0}) + checkChecksum(t, tw, 3177315266) + + // Check the second tablet is returned by GetAllTablets(). This should not contain the filtered tablet. allTablets = fhc.GetAllTablets() key = TabletToMapKey(tablet2) assert.Len(t, allTablets, 2) @@ -207,14 +229,14 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) { assert.Contains(t, allTablets, key) assert.True(t, proto.Equal(tablet, allTablets[key])) assert.NotContains(t, allTablets, origKey) - checkChecksum(t, tw, 2762153755) + checkChecksum(t, tw, 3177315266) } else { counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "ReplaceTablet": 0}) assert.Len(t, allTablets, 2) assert.Contains(t, allTablets, origKey) assert.True(t, proto.Equal(origTablet, allTablets[origKey])) assert.NotContains(t, allTablets, key) - checkChecksum(t, tw, 2762153755) + checkChecksum(t, tw, 3177315266) } // Both tablets restart on different hosts. @@ -269,7 +291,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) { require.Nil(t, err, "FixShardReplication failed") tw.loadTablets() counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1}) - checkChecksum(t, tw, 789108290) + checkChecksum(t, tw, 852159264) allTablets = fhc.GetAllTablets() assert.Len(t, allTablets, 1) @@ -280,8 +302,10 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) { assert.Contains(t, allTablets, key) assert.True(t, proto.Equal(tablet2, allTablets[key])) - // Remove the other and check that it is detected as being gone. + // Remove the other tablets and check that it is detected as being gone. + // Deleting the filtered tablet should not trigger a RemoveTablet call. require.NoError(t, ts.DeleteTablet(context.Background(), tablet2.Alias)) + require.NoError(t, ts.DeleteTablet(context.Background(), tablet3.Alias)) _, err = topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard") require.Nil(t, err, "FixShardReplication failed") tw.loadTablets() diff --git a/go/vt/throttler/demo/throttler_demo.go b/go/vt/throttler/demo/throttler_demo.go index 15228475bfb..4276763961b 100644 --- a/go/vt/throttler/demo/throttler_demo.go +++ b/go/vt/throttler/demo/throttler_demo.go @@ -239,7 +239,7 @@ func newClient(ctx context.Context, primary *primary, replica *replica, ts *topo log.Fatal(err) } - healthCheck := discovery.NewHealthCheck(ctx, 5*time.Second, 1*time.Minute, ts, "cell1", "") + healthCheck := discovery.NewHealthCheck(ctx, 5*time.Second, 1*time.Minute, ts, "cell1", "", nil) c := &client{ primary: primary, healthCheck: healthCheck, diff --git a/go/vt/throttler/replication_lag_cache.go b/go/vt/throttler/replication_lag_cache.go index c9c2e94f113..d47b82474ac 100644 --- a/go/vt/throttler/replication_lag_cache.go +++ b/go/vt/throttler/replication_lag_cache.go @@ -18,6 +18,7 @@ package throttler import ( "sort" + "sync" "time" "vitess.io/vitess/go/vt/discovery" @@ -30,6 +31,8 @@ type replicationLagCache struct { // The map key is replicationLagRecord.LegacyTabletStats.Key. entries map[string]*replicationLagHistory + mu sync.Mutex + // slowReplicas is a set of slow replicas. // The map key is replicationLagRecord.LegacyTabletStats.Key. // This map will always be recomputed by sortByLag() and must not be modified @@ -60,6 +63,9 @@ func newReplicationLagCache(historyCapacityPerReplica int) *replicationLagCache // add inserts or updates "r" in the cache for the replica with the key "r.Key". func (c *replicationLagCache) add(r replicationLagRecord) { + c.mu.Lock() + defer c.mu.Unlock() + if !r.Serving { // Tablet is down. Do no longer track it. delete(c.entries, discovery.TabletToMapKey(r.Tablet)) @@ -76,9 +82,35 @@ func (c *replicationLagCache) add(r replicationLagRecord) { entry.add(r) } +// maxLag returns the maximum replication lag for the entries in cache. +func (c *replicationLagCache) maxLag() (maxLag uint32) { + c.mu.Lock() + defer c.mu.Unlock() + + for key := range c.entries { + if c.isIgnored(key) { + continue + } + + entry := c.entries[key] + if entry == nil { + continue + } + + latest := entry.latest() + if lag := latest.Stats.ReplicationLagSeconds; lag > maxLag { + maxLag = lag + } + } + + return maxLag +} + // latest returns the current lag record for the given LegacyTabletStats.Key string. // A zero record is returned if there is no latest entry. func (c *replicationLagCache) latest(key string) replicationLagRecord { + c.mu.Lock() + defer c.mu.Unlock() entry, ok := c.entries[key] if !ok { return replicationLagRecord{} @@ -90,6 +122,8 @@ func (c *replicationLagCache) latest(key string) replicationLagRecord { // or just after it. // If there is no such record, a zero record is returned. func (c *replicationLagCache) atOrAfter(key string, at time.Time) replicationLagRecord { + c.mu.Lock() + defer c.mu.Unlock() entry, ok := c.entries[key] if !ok { return replicationLagRecord{} @@ -100,6 +134,9 @@ func (c *replicationLagCache) atOrAfter(key string, at time.Time) replicationLag // sortByLag sorts all replicas by their latest replication lag value and // tablet uid and updates the c.slowReplicas set. func (c *replicationLagCache) sortByLag(ignoreNSlowestReplicas int, minimumReplicationLag int64) { + c.mu.Lock() + defer c.mu.Unlock() + // Reset the current list of ignored replicas. c.slowReplicas = make(map[string]bool) @@ -142,6 +179,9 @@ func (a byLagAndTabletUID) Less(i, j int) bool { // this slow replica. // "key" refers to ReplicationLagRecord.LegacyTabletStats.Key. func (c *replicationLagCache) ignoreSlowReplica(key string) bool { + c.mu.Lock() + defer c.mu.Unlock() + if len(c.slowReplicas) == 0 { // No slow replicas at all. return false diff --git a/go/vt/throttler/replication_lag_cache_test.go b/go/vt/throttler/replication_lag_cache_test.go index 312f97e1999..9b34210d096 100644 --- a/go/vt/throttler/replication_lag_cache_test.go +++ b/go/vt/throttler/replication_lag_cache_test.go @@ -20,6 +20,8 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/vt/discovery" ) @@ -91,3 +93,10 @@ func TestReplicationLagCache_SortByLag(t *testing.T) { t.Fatal("r1 should be tracked as a slow replica") } } + +func TestReplicationLagCache_MaxLag(t *testing.T) { + c := newReplicationLagCache(2) + c.add(lagRecord(sinceZero(1*time.Second), r1, 30)) + c.add(lagRecord(sinceZero(1*time.Second), r2, 1)) + require.Equal(t, uint32(30), c.maxLag()) +} diff --git a/go/vt/throttler/throttler.go b/go/vt/throttler/throttler.go index 909888bd0d4..19b95559fed 100644 --- a/go/vt/throttler/throttler.go +++ b/go/vt/throttler/throttler.go @@ -229,22 +229,10 @@ func (t *Throttler) Throttle(threadID int) time.Duration { // the provided type, excluding ignored tablets. func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 { cache := t.maxReplicationLagModule.lagCacheByType(tabletType) - - var maxLag uint32 - cacheEntries := cache.entries - - for key := range cacheEntries { - if cache.isIgnored(key) { - continue - } - - lag := cache.latest(key).Stats.ReplicationLagSeconds - if lag > maxLag { - maxLag = lag - } + if cache == nil { + return 0 } - - return maxLag + return cache.maxLag() } // ThreadFinished marks threadID as finished and redistributes the thread's diff --git a/go/vt/throttler/throttler_test.go b/go/vt/throttler/throttler_test.go index 0bb0ed0387a..199a2f66e21 100644 --- a/go/vt/throttler/throttler_test.go +++ b/go/vt/throttler/throttler_test.go @@ -17,12 +17,26 @@ limitations under the License. package throttler import ( + "context" "runtime" "strings" + "sync" "testing" "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/discovery" + "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/proto/topodata" ) +// testTabletTypes is the list of tablet types to test. +var testTabletTypes = []topodata.TabletType{ + topodata.TabletType_REPLICA, + topodata.TabletType_RDONLY, +} + // The main purpose of the benchmarks below is to demonstrate the functionality // of the throttler in the real-world (using a non-faked time.Now). // The benchmark values should be as close as possible to the request interval @@ -426,3 +440,72 @@ func TestThreadFinished_SecondCallPanics(t *testing.T) { }() throttler.ThreadFinished(0) } + +func TestThrottlerMaxLag(t *testing.T) { + fc := &fakeClock{} + throttler, err := newThrottlerWithClock(t.Name(), "queries", 1, 1, 10, fc.now) + require.NoError(t, err) + defer throttler.Close() + + require.NotNil(t, throttler) + require.NotNil(t, throttler.maxReplicationLagModule) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var wg sync.WaitGroup + + // run .add() and .MaxLag() concurrently to detect races + for _, tabletType := range testTabletTypes { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + throttler.MaxLag(tabletType) + } + } + }() + + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + cache := throttler.maxReplicationLagModule.lagCacheByType(tabletType) + require.NotNil(t, cache) + cache.add(replicationLagRecord{ + time: time.Now(), + TabletHealth: discovery.TabletHealth{ + Serving: true, + Stats: &query.RealtimeStats{ + ReplicationLagSeconds: 5, + }, + Tablet: &topodata.Tablet{ + Hostname: t.Name(), + Type: tabletType, + PortMap: map[string]int32{ + "test": 15999, + }, + }, + }, + }) + } + } + }() + } + time.Sleep(time.Second) + cancel() + wg.Wait() + + // check .MaxLag() + for _, tabletType := range testTabletTypes { + require.Equal(t, uint32(5), throttler.MaxLag(tabletType)) + } +} diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index 2e224b6379e..4c67f5672e1 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -101,7 +101,11 @@ type TabletGateway struct { } func createHealthCheck(ctx context.Context, retryDelay, timeout time.Duration, ts *topo.Server, cell, cellsToWatch string) discovery.HealthCheck { - return discovery.NewHealthCheck(ctx, retryDelay, timeout, ts, cell, cellsToWatch) + filters, err := discovery.NewVTGateHealthCheckFilters() + if err != nil { + log.Exit(err) + } + return discovery.NewHealthCheck(ctx, retryDelay, timeout, ts, cell, cellsToWatch, filters) } // NewTabletGateway creates and returns a new TabletGateway diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 4a682ffd298..fc149fc8b87 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -41,7 +41,7 @@ import ( // These vars store the functions used to create the topo server, healthcheck, // and go/vt/throttler. These are provided here so that they can be overridden // in tests to generate mocks. -type healthCheckFactoryFunc func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck +type healthCheckFactoryFunc func(ctx context.Context, topoServer *topo.Server, cell, keyspace, shard string, cellsToWatch []string) (discovery.HealthCheck, error) type throttlerFactoryFunc func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) var ( @@ -50,8 +50,13 @@ var ( ) func resetTxThrottlerFactories() { - healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck { - return discovery.NewHealthCheck(context.Background(), discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ",")) + healthCheckFactory = func(ctx context.Context, topoServer *topo.Server, cell, keyspace, shard string, cellsToWatch []string) (discovery.HealthCheck, error) { + // discovery.NewFilterByShard expects a single-shard filter to be in "keyspace|shard" format. + filter, err := discovery.NewFilterByShard([]string{keyspace + "|" + shard}) + if err != nil { + return nil, err + } + return discovery.NewHealthCheck(ctx, discovery.DefaultHealthCheckRetryDelay, discovery.DefaultHealthCheckTimeout, topoServer, cell, strings.Join(cellsToWatch, ","), filter), nil } throttlerFactory = func(name, unit string, threadCount int, maxRate int64, maxReplicationLagConfig throttler.MaxReplicationLagModuleConfig) (ThrottlerInterface, error) { return throttler.NewThrottlerFromConfig(name, unit, threadCount, maxRate, maxReplicationLagConfig, time.Now) @@ -158,9 +163,11 @@ type txThrottlerStateImpl struct { // throttleMu serializes calls to throttler.Throttler.Throttle(threadId). // That method is required to be called in serial for each threadId. - throttleMu sync.Mutex - throttler ThrottlerInterface - stopHealthCheck context.CancelFunc + throttleMu sync.Mutex + throttler ThrottlerInterface + + ctx context.Context + cancel context.CancelFunc healthCheck discovery.HealthCheck healthCheckChan chan *discovery.TabletHealth @@ -284,7 +291,10 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi tabletTypes[tabletType] = true } + ctx, cancel := context.WithCancel(context.Background()) state := &txThrottlerStateImpl{ + ctx: ctx, + cancel: cancel, config: config, healthCheckCells: config.TxThrottlerHealthCheckCells, tabletTypes: tabletTypes, @@ -295,38 +305,41 @@ func newTxThrottlerState(txThrottler *txThrottler, config *tabletenv.TabletConfi // get cells from topo if none defined in tabletenv config if len(state.healthCheckCells) == 0 { - ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) - defer cancel() - state.healthCheckCells = fetchKnownCells(ctx, txThrottler.topoServer, target) + cellsCtx, cellsCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) + defer cellsCancel() + state.healthCheckCells = fetchKnownCells(cellsCtx, txThrottler.topoServer, target) state.cellsFromTopo = true } - ctx, cancel := context.WithCancel(context.Background()) - state.stopHealthCheck = cancel - state.initHealthCheckStream(txThrottler.topoServer, target) - go state.healthChecksProcessor(ctx, txThrottler.topoServer, target) + if err := state.initHealthCheckStream(txThrottler.topoServer, target); err != nil { + return nil, err + } + go state.healthChecksProcessor(txThrottler.topoServer, target) state.waitForTermination.Add(1) go state.updateMaxLag() return state, nil } -func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) { - ts.healthCheck = healthCheckFactory(topoServer, target.Cell, ts.healthCheckCells) +func (ts *txThrottlerStateImpl) initHealthCheckStream(topoServer *topo.Server, target *querypb.Target) (err error) { + ts.healthCheck, err = healthCheckFactory(ts.ctx, topoServer, target.Cell, target.Keyspace, target.Shard, ts.healthCheckCells) + if err != nil { + return err + } ts.healthCheckChan = ts.healthCheck.Subscribe() - + return nil } func (ts *txThrottlerStateImpl) closeHealthCheckStream() { if ts.healthCheck == nil { return } - ts.stopHealthCheck() + ts.cancel() ts.healthCheck.Close() } -func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { - fetchCtx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) +func (ts *txThrottlerStateImpl) updateHealthCheckCells(topoServer *topo.Server, target *querypb.Target) error { + fetchCtx, cancel := context.WithTimeout(ts.ctx, topo.RemoteOperationTimeout) defer cancel() knownCells := fetchKnownCells(fetchCtx, topoServer, target) @@ -334,11 +347,12 @@ func (ts *txThrottlerStateImpl) updateHealthCheckCells(ctx context.Context, topo log.Info("txThrottler: restarting healthcheck stream due to topology cells update") ts.healthCheckCells = knownCells ts.closeHealthCheckStream() - ts.initHealthCheckStream(topoServer, target) + return ts.initHealthCheckStream(topoServer, target) } + return nil } -func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoServer *topo.Server, target *querypb.Target) { +func (ts *txThrottlerStateImpl) healthChecksProcessor(topoServer *topo.Server, target *querypb.Target) { var cellsUpdateTicks <-chan time.Time if ts.cellsFromTopo { ticker := time.NewTicker(ts.config.TxThrottlerTopoRefreshInterval) @@ -347,10 +361,12 @@ func (ts *txThrottlerStateImpl) healthChecksProcessor(ctx context.Context, topoS } for { select { - case <-ctx.Done(): + case <-ts.ctx.Done(): return case <-cellsUpdateTicks: - ts.updateHealthCheckCells(ctx, topoServer, target) + if err := ts.updateHealthCheckCells(topoServer, target); err != nil { + log.Errorf("txThrottler: failed to update cell list: %+v", err) + } case th := <-ts.healthCheckChan: ts.StatsUpdate(th) } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index fe352cf96f4..babe71a6135 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -71,8 +71,8 @@ func TestEnabledThrottler(t *testing.T) { hcCall1.Do(func() {}) hcCall2 := mockHealthCheck.EXPECT().Close() hcCall2.After(hcCall1) - healthCheckFactory = func(topoServer *topo.Server, cell string, cellsToWatch []string) discovery.HealthCheck { - return mockHealthCheck + healthCheckFactory = func(ctx context.Context, topoServer *topo.Server, cell, keyspace, shard string, cellsToWatch []string) (discovery.HealthCheck, error) { + return mockHealthCheck, nil } mockThrottler := NewMockThrottlerInterface(mockCtrl)