diff --git a/go/stats/counter_map.go b/go/stats/counter_map.go index 5ee7d19181e..a9af4495c60 100644 --- a/go/stats/counter_map.go +++ b/go/stats/counter_map.go @@ -25,7 +25,7 @@ var ( countersMu sync.RWMutex ) -// GetOrNewCounter returns a Counter with given name; the functiona either creates the counter +// GetOrNewCounter returns a Counter with given name; the function either creates the counter // if it does not exist, or returns a pre-existing one. The function is thread safe. func GetOrNewCounter(name string, help string) *Counter { // first, attempt read lock only diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index 9824f28ae2b..df63d5a84a1 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -89,6 +89,7 @@ var ( throttledAppsAPIPath = "throttler/throttled-apps" checkAPIPath = "throttler/check" checkSelfAPIPath = "throttler/check-self" + statusAPIPath = "throttler/status" getResponseBody = func(resp *http.Response) string { body, _ := io.ReadAll(resp.Body) return string(body) @@ -180,6 +181,16 @@ func throttleCheckSelf(tablet *cluster.Vttablet) (*http.Response, error) { return httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=%s", tablet.HTTPPort, checkSelfAPIPath, testAppName)) } +func throttleStatus(t *testing.T, tablet *cluster.Vttablet) string { + resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, statusAPIPath)) + require.NoError(t, err) + defer resp.Body.Close() + + b, err := io.ReadAll(resp.Body) + require.NoError(t, err) + return string(b) +} + func warmUpHeartbeat(t *testing.T) (respStatus int) { // because we run with -heartbeat_on_demand_duration=5s, the heartbeat is "cold" right now. // Let's warm it up. @@ -314,17 +325,32 @@ func TestInitialThrottler(t *testing.T) { }) t.Run("validating OK response from throttler with low threshold, heartbeats running", func(t *testing.T) { time.Sleep(1 * time.Second) + cluster.ValidateReplicationIsHealthy(t, replicaTablet) resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) + if !assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) { + rs, err := replicaTablet.VttabletProcess.QueryTablet("show replica status", keyspaceName, false) + assert.NoError(t, err) + t.Logf("Seconds_Behind_Source: %s", rs.Named().Row()["Seconds_Behind_Source"].ToString()) + t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet)) + t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet)) + } }) + t.Run("validating OK response from throttler with low threshold, heartbeats running still", func(t *testing.T) { time.Sleep(1 * time.Second) + cluster.ValidateReplicationIsHealthy(t, replicaTablet) resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() - assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) + if !assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) { + rs, err := replicaTablet.VttabletProcess.QueryTablet("show replica status", keyspaceName, false) + assert.NoError(t, err) + t.Logf("Seconds_Behind_Source: %s", rs.Named().Row()["Seconds_Behind_Source"].ToString()) + t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet)) + t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet)) + } }) t.Run("validating pushback response from throttler on low threshold once heartbeats go stale", func(t *testing.T) { time.Sleep(2 * onDemandHeartbeatDuration) // just... really wait long enough, make sure on-demand stops @@ -375,7 +401,13 @@ func TestLag(t *testing.T) { }) t.Run("accumulating lag, expecting throttler push back", func(t *testing.T) { time.Sleep(2 * throttler.DefaultThreshold) + }) + t.Run("requesting heartbeats while replication stopped", func(t *testing.T) { + // By now on-demand heartbeats have stopped. + _ = warmUpHeartbeat(t) + }) + t.Run("expecting throttler push back", func(t *testing.T) { resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) defer resp.Body.Close() @@ -386,7 +418,10 @@ func TestLag(t *testing.T) { require.NoError(t, err) defer resp.Body.Close() // self (on primary) is unaffected by replication lag - assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) + if !assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) { + t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet)) + t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet)) + } }) t.Run("replica self-check should show error", func(t *testing.T) { resp, err := throttleCheckSelf(replicaTablet) diff --git a/go/timer/rate_limiter.go b/go/timer/rate_limiter.go index 25bc2b32f61..d42a4d7e14c 100644 --- a/go/timer/rate_limiter.go +++ b/go/timer/rate_limiter.go @@ -28,7 +28,7 @@ import ( // For example, we can create a RateLimiter of 1second. Then, we can ask it, over time, to run many // tasks. It will only ever run a single task in any 1 second time frame. The rest are ignored. type RateLimiter struct { - tickerValue int64 + tickerValue atomic.Int64 lastDoValue int64 mu sync.Mutex @@ -37,7 +37,8 @@ type RateLimiter struct { // NewRateLimiter creates a new limiter with given duration. It is immediately ready to run tasks. func NewRateLimiter(d time.Duration) *RateLimiter { - r := &RateLimiter{tickerValue: 1} + r := &RateLimiter{} + r.lastDoValue = math.MinInt32 // Far enough to make a difference, but not too far to overflow. ctx, cancel := context.WithCancel(context.Background()) r.cancel = cancel go func() { @@ -48,7 +49,7 @@ func NewRateLimiter(d time.Duration) *RateLimiter { case <-ctx.Done(): return case <-ticker.C: - atomic.StoreInt64(&r.tickerValue, r.tickerValue+1) + r.tickerValue.Add(1) } } }() @@ -61,16 +62,29 @@ func (r *RateLimiter) Do(f func() error) (err error) { r.mu.Lock() defer r.mu.Unlock() - if r.lastDoValue >= atomic.LoadInt64(&r.tickerValue) { + if r.lastDoValue >= r.tickerValue.Load() { return nil // rate limited. Skipped. } if f != nil { err = f() } - r.lastDoValue = atomic.LoadInt64(&r.tickerValue) + r.lastDoValue = r.tickerValue.Load() return err } +// DoEmpty is a convenience method to invoke Do() with no function. +func (r *RateLimiter) DoEmpty() { + _ = r.Do(nil) +} + +// Diff returns the logical clock diff between the ticker and the last Do() call. +func (r *RateLimiter) Diff() int64 { + r.mu.Lock() + defer r.mu.Unlock() + + return r.tickerValue.Load() - r.lastDoValue +} + // Stop terminates rate limiter's operation and will not allow any more Do() executions. func (r *RateLimiter) Stop() { r.cancel() diff --git a/go/timer/rate_limiter_test.go b/go/timer/rate_limiter_test.go index 84122233996..83690b98a22 100644 --- a/go/timer/rate_limiter_test.go +++ b/go/timer/rate_limiter_test.go @@ -17,6 +17,7 @@ limitations under the License. package timer import ( + "math" "testing" "time" @@ -75,3 +76,18 @@ func TestRateLimiterStop(t *testing.T) { } assert.Equal(t, valSnapshot, val) } + +func TestRateLimiterDiff(t *testing.T) { + d := 2 * time.Second + r := NewRateLimiter(d) + require.NotNil(t, r) + defer r.Stop() + + // This assumes the last couple lines of code run faster than 2 seconds, which should be the case. + // But if you see flakiness due to slow runners, we can revisit the logic. + assert.Greater(t, r.Diff(), int64(math.MaxInt32)) + time.Sleep(d + time.Second) + assert.Greater(t, r.Diff(), int64(math.MaxInt32)) + r.DoEmpty() + assert.LessOrEqual(t, r.Diff(), int64(1)) +} diff --git a/go/vt/vttablet/tabletserver/throttle/check.go b/go/vt/vttablet/tabletserver/throttle/check.go index 9dfbade8af6..85952a496d1 100644 --- a/go/vt/vttablet/tabletserver/throttle/check.go +++ b/go/vt/vttablet/tabletserver/throttle/check.go @@ -148,20 +148,18 @@ func (check *ThrottlerCheck) Check(ctx context.Context, appName string, storeTyp } checkResult = check.checkAppMetricResult(ctx, appName, storeType, storeName, metricResultFunc, flags) - check.throttler.lastCheckTimeNano.Store(time.Now().UnixNano()) - - go func(statusCode int) { - stats.GetOrNewCounter("ThrottlerCheckAnyTotal", "total number of checks").Add(1) - stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheckAny%s%sTotal", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), "").Add(1) - - if statusCode != http.StatusOK { - stats.GetOrNewCounter("ThrottlerCheckAnyError", "total number of failed checks").Add(1) - stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheckAny%s%sError", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), "").Add(1) - } - - check.throttler.markRecentApp(appName, remoteAddr) - }(checkResult.StatusCode) - + check.throttler.markRecentApp(appName, remoteAddr) + if !throttlerapp.VitessName.Equals(appName) { + go func(statusCode int) { + stats.GetOrNewCounter("ThrottlerCheckAnyTotal", "total number of checks").Add(1) + stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheckAny%s%sTotal", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), "").Add(1) + + if statusCode != http.StatusOK { + stats.GetOrNewCounter("ThrottlerCheckAnyError", "total number of failed checks").Add(1) + stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheckAny%s%sError", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), "").Add(1) + } + }(checkResult.StatusCode) + } return checkResult } @@ -227,6 +225,7 @@ func (check *ThrottlerCheck) SelfChecks(ctx context.Context) { for metricName, metricResult := range check.AggregatedMetrics(ctx) { metricName := metricName metricResult := metricResult + go check.localCheck(ctx, metricName) go check.reportAggregated(metricName, metricResult) } diff --git a/go/vt/vttablet/tabletserver/throttle/throttler.go b/go/vt/vttablet/tabletserver/throttle/throttler.go index e388df724ed..3a35818cb61 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler.go @@ -82,19 +82,20 @@ import ( ) const ( - leaderCheckInterval = 5 * time.Second - mysqlCollectInterval = 250 * time.Millisecond - mysqlDormantCollectInterval = 5 * time.Second - mysqlRefreshInterval = 10 * time.Second - mysqlAggregateInterval = 125 * time.Millisecond - throttledAppsSnapshotInterval = 5 * time.Second + leaderCheckInterval = 5 * time.Second + mysqlCollectInterval = 250 * time.Millisecond // PRIMARY polls replicas + mysqlDormantCollectInterval = 5 * time.Second // PRIMARY polls replicas when dormant (no recent checks) + mysqlRefreshInterval = 10 * time.Second // Refreshing tablet inventory + mysqlAggregateInterval = 125 * time.Millisecond + throttledAppsSnapshotInterval = 5 * time.Second + recentCheckRateLimiterInterval = 1 * time.Second // Ticker assisting in determining when the throttler was last checked aggregatedMetricsExpiration = 5 * time.Second recentAppsExpiration = time.Hour * 24 nonDeprioritizedAppMapExpiration = time.Second - dormantPeriod = time.Minute + dormantPeriod = time.Minute // How long since last check to be considered dormant DefaultAppThrottleDuration = time.Hour DefaultThrottleRatio = 1.0 @@ -159,6 +160,7 @@ type Throttler struct { mysqlRefreshInterval time.Duration mysqlAggregateInterval time.Duration throttledAppsSnapshotInterval time.Duration + dormantPeriod time.Duration configSettings *config.ConfigurationSettings env tabletenv.Env @@ -169,11 +171,8 @@ type Throttler struct { heartbeatWriter heartbeat.HeartbeatWriter overrideTmClient tmclient.TabletManagerClient - // recentCheckTickerValue is an ever increasing number, incrementing once per second. - recentCheckTickerValue atomic.Int64 - // recentCheckValue is set to match or exceed recentCheckTickerValue whenever a "check" was made (other than by the throttler itself). - // when recentCheckValue < recentCheckTickerValue that means there hasn't been a recent check. - recentCheckValue atomic.Int64 + recentCheckRateLimiter *timer.RateLimiter + recentCheckDormantDiff int64 throttleTabletTypesMap map[topodatapb.TabletType]bool @@ -194,8 +193,6 @@ type Throttler struct { recentApps *cache.Cache metricsHealth *cache.Cache - lastCheckTimeNano atomic.Int64 - initMutex sync.Mutex enableMutex sync.Mutex cancelOpenContext context.CancelFunc @@ -263,6 +260,8 @@ func NewThrottler(env tabletenv.Env, srvTopoServer srvtopo.Server, ts *topo.Serv throttler.mysqlRefreshInterval = mysqlRefreshInterval throttler.mysqlAggregateInterval = mysqlAggregateInterval throttler.throttledAppsSnapshotInterval = throttledAppsSnapshotInterval + throttler.dormantPeriod = dormantPeriod + throttler.recentCheckDormantDiff = int64(throttler.dormantPeriod / recentCheckRateLimiterInterval) throttler.StoreMetricsThreshold(defaultThrottleLagThreshold.Seconds()) //default throttler.readSelfThrottleMetric = func(ctx context.Context, p *mysql.Probe) *mysql.MySQLThrottleMetric { @@ -574,10 +573,42 @@ func (throttler *Throttler) Close() { // requestHeartbeats sends a heartbeat lease request to the heartbeat writer. // This action is recorded in stats. func (throttler *Throttler) requestHeartbeats() { + if !throttler.isLeader.Load() { + return + } go throttler.heartbeatWriter.RequestHeartbeats() go stats.GetOrNewCounter("ThrottlerHeartbeatRequests", "heartbeat requests").Add(1) } +// stimulatePrimaryThrottler sends a check request to the primary tablet in the shard, to stimulate +// it to request for heartbeats. +func (throttler *Throttler) stimulatePrimaryThrottler(ctx context.Context, tmClient tmclient.TabletManagerClient) error { + // Some reasonable timeout, to ensure we release connections even if they're hanging (otherwise grpc-go keeps polling those connections forever) + ctx, cancel := context.WithTimeout(ctx, throttler.dormantPeriod) + defer cancel() + + tabletAliases, err := throttler.ts.FindAllTabletAliasesInShard(ctx, throttler.keyspace, throttler.shard) + if err != nil { + return err + } + for _, tabletAlias := range tabletAliases { + tablet, err := throttler.ts.GetTablet(ctx, tabletAlias) + if err != nil { + return err + } + if tablet.Type != topodatapb.TabletType_PRIMARY { + continue + } + req := &tabletmanagerdatapb.CheckThrottlerRequest{AppName: throttlerapp.ThrottlerStimulatorName.String()} + _, err = tmClient.CheckThrottler(ctx, tablet.Tablet, req) + if err != nil { + log.Errorf("stimulatePrimaryThrottler: %+v", err) + } + return err + } + return nil +} + func (throttler *Throttler) generateSelfMySQLThrottleMetricFunc(ctx context.Context, probe *mysql.Probe) func() *mysql.MySQLThrottleMetric { f := func() *mysql.MySQLThrottleMetric { return throttler.readSelfThrottleMetric(ctx, probe) @@ -642,10 +673,11 @@ func (throttler *Throttler) ThrottledApps() (result []base.AppThrottle) { return result } -// isDormant returns true when the last check was more than dormantPeriod ago +// isDormant returns true when the last check was more than dormantPeriod ago. +// Instead of measuring actual time, we use the fact recentCheckRateLimiter ticks every second, and take +// a logical diff, counting the number of ticks since the last check. This is a good enough approximation. func (throttler *Throttler) isDormant() bool { - lastCheckTime := time.Unix(0, throttler.lastCheckTimeNano.Load()) - return time.Since(lastCheckTime) > dormantPeriod + return throttler.recentCheckRateLimiter.Diff() > throttler.recentCheckDormantDiff } // Operate is the main entry point for the throttler operation and logic. It will @@ -663,11 +695,14 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) { mysqlRefreshTicker := addTicker(throttler.mysqlRefreshInterval) mysqlAggregateTicker := addTicker(throttler.mysqlAggregateInterval) throttledAppsTicker := addTicker(throttler.throttledAppsSnapshotInterval) - recentCheckTicker := addTicker(time.Second) + primaryStimulatorRateLimiter := timer.NewRateLimiter(throttler.dormantPeriod) + throttler.recentCheckRateLimiter = timer.NewRateLimiter(recentCheckRateLimiterInterval) wg.Add(1) go func() { defer func() { + throttler.recentCheckRateLimiter.Stop() + primaryStimulatorRateLimiter.Stop() throttler.aggregatedMetrics.Flush() throttler.recentApps.Flush() throttler.nonLowPriorityAppRequestsThrottled.Flush() @@ -724,15 +759,42 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) { case <-mysqlCollectTicker.C: if throttler.IsOpen() { // frequent + // Always collect self metrics: + throttler.collectMySQLMetrics(ctx, tmClient, func(clusterName string) bool { + return clusterName == selfStoreName + }) if !throttler.isDormant() { - throttler.collectMySQLMetrics(ctx, tmClient) + throttler.collectMySQLMetrics(ctx, tmClient, func(clusterName string) bool { + return clusterName != selfStoreName + }) } + // + if throttler.recentCheckRateLimiter.Diff() <= 1 { // recently checked + if !throttler.isLeader.Load() { + // This is a replica, and has just recently been checked. + // We want to proactively "stimulate" the primary throttler to renew the heartbeat lease. + // The intent is to "wake up" an on-demand heartbeat lease. We don't need to poke the + // primary for every single time this replica was checked, so we rate limit. The idea is that + // once heartbeats update, more checks will be successful, this replica will be "recently checked" + // more than not, and the primary throttler will pick that up, extending the on-demand lease + // even further. + // Another outcome is that the primary will go out of "dormant" mode, and start collecting + // replica metrics more frequently. + primaryStimulatorRateLimiter.Do( + func() error { + return throttler.stimulatePrimaryThrottler(ctx, tmClient) + }) + } + } + } case <-mysqlDormantCollectTicker.C: if throttler.IsOpen() { // infrequent if throttler.isDormant() { - throttler.collectMySQLMetrics(ctx, tmClient) + throttler.collectMySQLMetrics(ctx, tmClient, func(clusterName string) bool { + return clusterName != selfStoreName + }) } } case metric := <-throttler.mysqlThrottleMetricChan: @@ -756,9 +818,6 @@ func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) { } case throttlerConfig := <-throttler.throttlerConfigChan: throttler.applyThrottlerConfig(ctx, throttlerConfig) - case <-recentCheckTicker.C: - // Increment recentCheckTickerValue by one. - throttler.recentCheckTickerValue.Add(1) } } }() @@ -799,9 +858,12 @@ func (throttler *Throttler) generateTabletProbeFunction(ctx context.Context, clu } } -func (throttler *Throttler) collectMySQLMetrics(ctx context.Context, tmClient tmclient.TabletManagerClient) error { +func (throttler *Throttler) collectMySQLMetrics(ctx context.Context, tmClient tmclient.TabletManagerClient, includeCluster func(clusterName string) bool) error { // synchronously, get lists of probes for clusterName, probes := range throttler.mysqlInventory.ClustersProbes { + if !includeCluster(clusterName) { + continue + } clusterName := clusterName // probes is known not to change. It can be *replaced*, but not changed. // so it's safe to iterate it @@ -1174,20 +1236,25 @@ func (throttler *Throttler) checkStore(ctx context.Context, appName string, stor // continuous and do not generate a substantial load. return okMetricCheckResult } - if !flags.SkipRequestHeartbeats && !throttlerapp.VitessName.Equals(appName) { + + checkResult = throttler.check.Check(ctx, appName, "mysql", storeName, remoteAddr, flags) + + shouldRequestHeartbeats := !flags.SkipRequestHeartbeats + if throttlerapp.VitessName.Equals(appName) { + // Override: "vitess" app never requests heartbeats. + shouldRequestHeartbeats = false + } + if throttlerapp.ThrottlerStimulatorName.Equals(appName) { + // Ovreride: "throttler-stimulator" app always requests heartbeats. + shouldRequestHeartbeats = true + } + + if shouldRequestHeartbeats { throttler.requestHeartbeats() + throttler.recentCheckRateLimiter.DoEmpty() // This check was made by someone other than the throttler itself, i.e. this came from online-ddl or vreplication or other. // We mark the fact that someone just made a check. If this is a REPLICA or RDONLY tables, this will be reported back // to the PRIMARY so that it knows it must renew the heartbeat lease. - throttler.recentCheckValue.Store(1 + throttler.recentCheckTickerValue.Load()) - } - checkResult = throttler.check.Check(ctx, appName, "mysql", storeName, remoteAddr, flags) - - if throttler.recentCheckValue.Load() >= throttler.recentCheckTickerValue.Load() { - // This indicates someone, who is not "vitess" ie not internal to the throttling logic, did a _recent_ `check`. - // This could be online-ddl, or vreplication or whoever else. - // If this tablet is a REPLICA or RDONLY, we want to advertise to the PRIMARY that someone did a recent check, - // so that the PRIMARY knows it must renew the heartbeat lease. checkResult.RecentlyChecked = true go stats.GetOrNewCounter("ThrottlerRecentlyChecked", "recently checked").Add(1) } diff --git a/go/vt/vttablet/tabletserver/throttle/throttler_test.go b/go/vt/vttablet/tabletserver/throttle/throttler_test.go index 25de8ca96f5..98f94439a3d 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttler_test.go +++ b/go/vt/vttablet/tabletserver/throttle/throttler_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net/http" + "sync" "sync/atomic" "testing" "time" @@ -34,6 +35,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/config" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/mysql" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" "vitess.io/vitess/go/vt/vttablet/tmclient" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" @@ -46,6 +48,9 @@ const ( type fakeTMClient struct { tmclient.TabletManagerClient + appNames []string + + mu sync.Mutex } func (c *fakeTMClient) Close() { @@ -56,15 +61,28 @@ func (c *fakeTMClient) CheckThrottler(ctx context.Context, tablet *topodatapb.Ta StatusCode: http.StatusOK, Value: 0, Threshold: 1, - RecentlyChecked: true, + RecentlyChecked: false, } + c.mu.Lock() + defer c.mu.Unlock() + c.appNames = append(c.appNames, request.AppName) return resp, nil } +func (c *fakeTMClient) AppNames() []string { + c.mu.Lock() + defer c.mu.Unlock() + return c.appNames +} + type FakeTopoServer struct { } func (ts *FakeTopoServer) GetTablet(ctx context.Context, alias *topodatapb.TabletAlias) (*topo.TabletInfo, error) { + tabletType := topodatapb.TabletType_PRIMARY + if alias.Uid != 100 { + tabletType = topodatapb.TabletType_REPLICA + } tablet := &topo.TabletInfo{ Tablet: &topodatapb.Tablet{ Alias: alias, @@ -72,7 +90,7 @@ func (ts *FakeTopoServer) GetTablet(ctx context.Context, alias *topodatapb.Table MysqlHostname: "127.0.0.1", MysqlPort: 3306, PortMap: map[string]int32{"vt": 5000}, - Type: topodatapb.TabletType_REPLICA, + Type: tabletType, }, } return tablet, nil @@ -80,8 +98,9 @@ func (ts *FakeTopoServer) GetTablet(ctx context.Context, alias *topodatapb.Table func (ts *FakeTopoServer) FindAllTabletAliasesInShard(ctx context.Context, keyspace, shard string) ([]*topodatapb.TabletAlias, error) { aliases := []*topodatapb.TabletAlias{ - {Cell: "zone1", Uid: 100}, - {Cell: "zone2", Uid: 101}, + {Cell: "fakezone1", Uid: 100}, + {Cell: "fakezone2", Uid: 101}, + {Cell: "fakezone3", Uid: 103}, } return aliases, nil } @@ -92,9 +111,15 @@ func (ts *FakeTopoServer) GetSrvKeyspace(ctx context.Context, cell, keyspace str } type FakeHeartbeatWriter struct { + requests atomic.Int64 } -func (w FakeHeartbeatWriter) RequestHeartbeats() { +func (w *FakeHeartbeatWriter) RequestHeartbeats() { + w.requests.Add(1) +} + +func (w *FakeHeartbeatWriter) Requests() int64 { + return w.requests.Load() } func newTestThrottler() *Throttler { @@ -113,7 +138,7 @@ func newTestThrottler() *Throttler { throttler := &Throttler{ mysqlClusterProbesChan: make(chan *mysql.ClusterProbes), mysqlClusterThresholds: cache.New(cache.NoExpiration, 0), - heartbeatWriter: FakeHeartbeatWriter{}, + heartbeatWriter: &FakeHeartbeatWriter{}, ts: &FakeTopoServer{}, mysqlInventory: mysql.NewInventory(), pool: connpool.NewPool(env, "ThrottlerPool", tabletenv.ConnPoolConfig{}), @@ -137,13 +162,15 @@ func newTestThrottler() *Throttler { throttler.initThrottleTabletTypes() throttler.check = NewThrottlerCheck(throttler) - // High contention & racy itnervals: + // High contention & racy intervals: throttler.leaderCheckInterval = 10 * time.Millisecond throttler.mysqlCollectInterval = 10 * time.Millisecond throttler.mysqlDormantCollectInterval = 10 * time.Millisecond throttler.mysqlRefreshInterval = 10 * time.Millisecond throttler.mysqlAggregateInterval = 10 * time.Millisecond throttler.throttledAppsSnapshotInterval = 10 * time.Millisecond + throttler.dormantPeriod = 5 * time.Second + throttler.recentCheckDormantDiff = int64(throttler.dormantPeriod / recentCheckRateLimiterInterval) throttler.readSelfThrottleMetric = func(ctx context.Context, p *mysql.Probe) *mysql.MySQLThrottleMetric { return &mysql.MySQLThrottleMetric{ @@ -160,7 +187,7 @@ func newTestThrottler() *Throttler { func TestIsAppThrottled(t *testing.T) { throttler := Throttler{ throttledApps: cache.New(cache.NoExpiration, 0), - heartbeatWriter: FakeHeartbeatWriter{}, + heartbeatWriter: &FakeHeartbeatWriter{}, } assert.False(t, throttler.IsAppThrottled("app1")) assert.False(t, throttler.IsAppThrottled("app2")) @@ -190,7 +217,7 @@ func TestIsAppExempted(t *testing.T) { throttler := Throttler{ throttledApps: cache.New(cache.NoExpiration, 0), - heartbeatWriter: FakeHeartbeatWriter{}, + heartbeatWriter: &FakeHeartbeatWriter{}, } assert.False(t, throttler.IsAppExempted("app1")) assert.False(t, throttler.IsAppExempted("app2")) @@ -315,10 +342,10 @@ func TestRefreshMySQLInventory(t *testing.T) { }) } -// runThrottler opens and enables the throttler, therby making it run the Operate() function, for a given amount of time. +// runThrottler opens and enables the throttler, thereby making it run the Operate() function, for a given amount of time. // Optionally, running a given function halfway while the throttler is still open and running. -func runThrottler(t *testing.T, throttler *Throttler, timeout time.Duration, f func(*testing.T)) { - ctx, cancel := context.WithTimeout(context.Background(), timeout) +func runThrottler(t *testing.T, ctx context.Context, throttler *Throttler, timeout time.Duration, f func(*testing.T, context.Context)) { + ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() assert.False(t, throttler.IsOpen()) @@ -339,9 +366,17 @@ func runThrottler(t *testing.T, throttler *Throttler, timeout time.Duration, f f wg2 := throttler.Enable() assert.Nil(t, wg2) + sleepTime := 3 * time.Second + if timeout/2 < sleepTime { + sleepTime = timeout / 2 + } if f != nil { - time.Sleep(timeout / 2) - f(t) + select { + case <-ctx.Done(): + return + case <-time.After(sleepTime): + f(t, ctx) + } } <-ctx.Done() @@ -355,17 +390,23 @@ func runThrottler(t *testing.T, throttler *Throttler, timeout time.Duration, f f // This is relevant to `go test -race` func TestRace(t *testing.T) { throttler := newTestThrottler() - runThrottler(t, throttler, 5*time.Second, nil) + runThrottler(t, context.Background(), throttler, 5*time.Second, nil) } -// TestProbes enables a throttler for a few seocnds, and afterwards expects to find probes and metrics. +// TestProbes enables a throttler for a few seconds, and afterwards expects to find probes and metrics. func TestProbesWhileOperating(t *testing.T) { throttler := newTestThrottler() + tmClient, ok := throttler.overrideTmClient.(*fakeTMClient) + require.True(t, ok) + assert.Empty(t, tmClient.AppNames()) + t.Run("aggregated", func(t *testing.T) { assert.Equal(t, 0, throttler.aggregatedMetrics.ItemCount()) }) - runThrottler(t, throttler, 5*time.Second, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + runThrottler(t, ctx, throttler, time.Minute, func(t *testing.T, ctx context.Context) { t.Run("aggregated", func(t *testing.T) { assert.Equal(t, 2, throttler.aggregatedMetrics.ItemCount()) // flushed upon Disable() aggr := throttler.aggregatedMetricsSnapshot() @@ -382,6 +423,21 @@ func TestProbesWhileOperating(t *testing.T) { assert.Failf(t, "unknown clusterName", "%v", clusterName) } } + assert.NotEmpty(t, tmClient.AppNames()) + // The throttler here emulates a PRIMARY tablet, and therefore should probe the replicas using + // the "vitess" app name. + uniqueNames := map[string]int{} + for _, appName := range tmClient.AppNames() { + uniqueNames[appName]++ + } + // PRIMARY throttler probes replicas with empty app name, which is then + // interpreted as "vitess" name. + _, ok := uniqueNames[""] + assert.Truef(t, ok, "%+v", uniqueNames) + // And that's the only app we expect to see. + assert.Equalf(t, 1, len(uniqueNames), "%+v", uniqueNames) + + cancel() // end test early }) }) } @@ -389,7 +445,7 @@ func TestProbesWhileOperating(t *testing.T) { // TestProbesPostDisable runs the throttler for some time, and then investigates the internal throttler maps and values. func TestProbesPostDisable(t *testing.T) { throttler := newTestThrottler() - runThrottler(t, throttler, 2*time.Second, nil) + runThrottler(t, context.Background(), throttler, 2*time.Second, nil) probes := throttler.mysqlInventory.ClustersProbes assert.NotEmpty(t, probes) @@ -431,3 +487,102 @@ func TestProbesPostDisable(t *testing.T) { assert.Empty(t, aggr) }) } + +func TestDormant(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + throttler := newTestThrottler() + + heartbeatWriter, ok := throttler.heartbeatWriter.(*FakeHeartbeatWriter) + assert.True(t, ok) + assert.Zero(t, heartbeatWriter.Requests()) // once upon Enable() + + runThrottler(t, ctx, throttler, time.Minute, func(t *testing.T, ctx context.Context) { + assert.True(t, throttler.isDormant()) + assert.EqualValues(t, 1, heartbeatWriter.Requests()) // once upon Enable() + flags := &CheckFlags{} + throttler.CheckByType(ctx, throttlerapp.VitessName.String(), "", flags, ThrottleCheckSelf) + go func() { + select { + case <-ctx.Done(): + require.FailNow(t, "context expired before testing completed") + case <-time.After(time.Second): + assert.True(t, throttler.isDormant()) + assert.EqualValues(t, 1, heartbeatWriter.Requests()) // "vitess" name does not cause heartbeat requests + } + throttler.CheckByType(ctx, throttlerapp.ThrottlerStimulatorName.String(), "", flags, ThrottleCheckSelf) + select { + case <-ctx.Done(): + require.FailNow(t, "context expired before testing completed") + case <-time.After(time.Second): + assert.False(t, throttler.isDormant()) + assert.Greater(t, heartbeatWriter.Requests(), int64(1)) + } + throttler.CheckByType(ctx, throttlerapp.OnlineDDLName.String(), "", flags, ThrottleCheckSelf) + select { + case <-ctx.Done(): + require.FailNow(t, "context expired before testing completed") + case <-time.After(time.Second): + assert.False(t, throttler.isDormant()) + assert.Greater(t, heartbeatWriter.Requests(), int64(2)) + } + + // Dormant period + select { + case <-ctx.Done(): + require.FailNow(t, "context expired before testing completed") + case <-time.After(throttler.dormantPeriod): + assert.True(t, throttler.isDormant()) + } + cancel() // end test early + }() + }) +} + +func TestReplica(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + throttler := newTestThrottler() + throttler.dormantPeriod = time.Minute + throttler.tabletTypeFunc = func() topodatapb.TabletType { return topodatapb.TabletType_REPLICA } + + tmClient, ok := throttler.overrideTmClient.(*fakeTMClient) + require.True(t, ok) + assert.Empty(t, tmClient.AppNames()) + + runThrottler(t, ctx, throttler, time.Minute, func(t *testing.T, ctx context.Context) { + assert.Empty(t, tmClient.AppNames()) + flags := &CheckFlags{} + throttler.CheckByType(ctx, throttlerapp.VitessName.String(), "", flags, ThrottleCheckSelf) + go func() { + select { + case <-ctx.Done(): + require.FailNow(t, "context expired before testing completed") + case <-time.After(time.Second): + assert.Empty(t, tmClient.AppNames()) + } + throttler.CheckByType(ctx, throttlerapp.OnlineDDLName.String(), "", flags, ThrottleCheckSelf) + select { + case <-ctx.Done(): + require.FailNow(t, "context expired before testing completed") + case <-time.After(time.Second): + appNames := tmClient.AppNames() + assert.NotEmpty(t, appNames) + assert.Containsf(t, appNames, throttlerapp.ThrottlerStimulatorName.String(), "%+v", appNames) + assert.Equalf(t, 1, len(appNames), "%+v", appNames) + } + throttler.CheckByType(ctx, throttlerapp.OnlineDDLName.String(), "", flags, ThrottleCheckSelf) + select { + case <-ctx.Done(): + require.FailNow(t, "context expired before testing completed") + case <-time.After(time.Second): + // Due to stimulation rate limiting, we shouldn't see a 2nd CheckThrottler request. + appNames := tmClient.AppNames() + assert.Equalf(t, 1, len(appNames), "%+v", appNames) + } + cancel() // end test early + }() + }) +} diff --git a/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go b/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go index 4f1f5857837..7594df6c1b2 100644 --- a/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go +++ b/go/vt/vttablet/tabletserver/throttle/throttlerapp/app.go @@ -42,8 +42,9 @@ func (n Name) Concatenate(other Name) Name { const ( // DefaultName is the app name used by vitess when app doesn't indicate its name - DefaultName Name = "default" - VitessName Name = "vitess" + DefaultName Name = "default" + VitessName Name = "vitess" + ThrottlerStimulatorName Name = "throttler-stimulator" TableGCName Name = "tablegc" OnlineDDLName Name = "online-ddl"