Skip to content

Commit

Permalink
Tablet throttler: starvation fix and consolidation of logic. (#15398)
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Mar 20, 2024
1 parent b164a6e commit 265e0b9
Show file tree
Hide file tree
Showing 8 changed files with 364 additions and 77 deletions.
2 changes: 1 addition & 1 deletion go/stats/counter_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var (
countersMu sync.RWMutex
)

// GetOrNewCounter returns a Counter with given name; the functiona either creates the counter
// GetOrNewCounter returns a Counter with given name; the function either creates the counter
// if it does not exist, or returns a pre-existing one. The function is thread safe.
func GetOrNewCounter(name string, help string) *Counter {
// first, attempt read lock only
Expand Down
41 changes: 38 additions & 3 deletions go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ var (
throttledAppsAPIPath = "throttler/throttled-apps"
checkAPIPath = "throttler/check"
checkSelfAPIPath = "throttler/check-self"
statusAPIPath = "throttler/status"
getResponseBody = func(resp *http.Response) string {
body, _ := io.ReadAll(resp.Body)
return string(body)
Expand Down Expand Up @@ -180,6 +181,16 @@ func throttleCheckSelf(tablet *cluster.Vttablet) (*http.Response, error) {
return httpClient.Get(fmt.Sprintf("http://localhost:%d/%s?app=%s", tablet.HTTPPort, checkSelfAPIPath, testAppName))
}

func throttleStatus(t *testing.T, tablet *cluster.Vttablet) string {
resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, statusAPIPath))
require.NoError(t, err)
defer resp.Body.Close()

b, err := io.ReadAll(resp.Body)
require.NoError(t, err)
return string(b)
}

func warmUpHeartbeat(t *testing.T) (respStatus int) {
// because we run with -heartbeat_on_demand_duration=5s, the heartbeat is "cold" right now.
// Let's warm it up.
Expand Down Expand Up @@ -314,17 +325,32 @@ func TestInitialThrottler(t *testing.T) {
})
t.Run("validating OK response from throttler with low threshold, heartbeats running", func(t *testing.T) {
time.Sleep(1 * time.Second)
cluster.ValidateReplicationIsHealthy(t, replicaTablet)
resp, err := throttleCheck(primaryTablet, false)
require.NoError(t, err)
defer resp.Body.Close()
assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp))
if !assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) {
rs, err := replicaTablet.VttabletProcess.QueryTablet("show replica status", keyspaceName, false)
assert.NoError(t, err)
t.Logf("Seconds_Behind_Source: %s", rs.Named().Row()["Seconds_Behind_Source"].ToString())
t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet))
t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet))
}
})

t.Run("validating OK response from throttler with low threshold, heartbeats running still", func(t *testing.T) {
time.Sleep(1 * time.Second)
cluster.ValidateReplicationIsHealthy(t, replicaTablet)
resp, err := throttleCheck(primaryTablet, false)
require.NoError(t, err)
defer resp.Body.Close()
assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp))
if !assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) {
rs, err := replicaTablet.VttabletProcess.QueryTablet("show replica status", keyspaceName, false)
assert.NoError(t, err)
t.Logf("Seconds_Behind_Source: %s", rs.Named().Row()["Seconds_Behind_Source"].ToString())
t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet))
t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet))
}
})
t.Run("validating pushback response from throttler on low threshold once heartbeats go stale", func(t *testing.T) {
time.Sleep(2 * onDemandHeartbeatDuration) // just... really wait long enough, make sure on-demand stops
Expand Down Expand Up @@ -375,7 +401,13 @@ func TestLag(t *testing.T) {
})
t.Run("accumulating lag, expecting throttler push back", func(t *testing.T) {
time.Sleep(2 * throttler.DefaultThreshold)
})
t.Run("requesting heartbeats while replication stopped", func(t *testing.T) {
// By now on-demand heartbeats have stopped.
_ = warmUpHeartbeat(t)
})

t.Run("expecting throttler push back", func(t *testing.T) {
resp, err := throttleCheck(primaryTablet, false)
require.NoError(t, err)
defer resp.Body.Close()
Expand All @@ -386,7 +418,10 @@ func TestLag(t *testing.T) {
require.NoError(t, err)
defer resp.Body.Close()
// self (on primary) is unaffected by replication lag
assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp))
if !assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) {
t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet))
t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet))
}
})
t.Run("replica self-check should show error", func(t *testing.T) {
resp, err := throttleCheckSelf(replicaTablet)
Expand Down
24 changes: 19 additions & 5 deletions go/timer/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
// For example, we can create a RateLimiter of 1second. Then, we can ask it, over time, to run many
// tasks. It will only ever run a single task in any 1 second time frame. The rest are ignored.
type RateLimiter struct {
tickerValue int64
tickerValue atomic.Int64
lastDoValue int64

mu sync.Mutex
Expand All @@ -37,7 +37,8 @@ type RateLimiter struct {

// NewRateLimiter creates a new limiter with given duration. It is immediately ready to run tasks.
func NewRateLimiter(d time.Duration) *RateLimiter {
r := &RateLimiter{tickerValue: 1}
r := &RateLimiter{}
r.lastDoValue = math.MinInt32 // Far enough to make a difference, but not too far to overflow.
ctx, cancel := context.WithCancel(context.Background())
r.cancel = cancel
go func() {
Expand All @@ -48,7 +49,7 @@ func NewRateLimiter(d time.Duration) *RateLimiter {
case <-ctx.Done():
return
case <-ticker.C:
atomic.StoreInt64(&r.tickerValue, r.tickerValue+1)
r.tickerValue.Add(1)
}
}
}()
Expand All @@ -61,16 +62,29 @@ func (r *RateLimiter) Do(f func() error) (err error) {
r.mu.Lock()
defer r.mu.Unlock()

if r.lastDoValue >= atomic.LoadInt64(&r.tickerValue) {
if r.lastDoValue >= r.tickerValue.Load() {
return nil // rate limited. Skipped.
}
if f != nil {
err = f()
}
r.lastDoValue = atomic.LoadInt64(&r.tickerValue)
r.lastDoValue = r.tickerValue.Load()
return err
}

// DoEmpty is a convenience method to invoke Do() with no function.
func (r *RateLimiter) DoEmpty() {
_ = r.Do(nil)
}

// Diff returns the logical clock diff between the ticker and the last Do() call.
func (r *RateLimiter) Diff() int64 {
r.mu.Lock()
defer r.mu.Unlock()

return r.tickerValue.Load() - r.lastDoValue
}

// Stop terminates rate limiter's operation and will not allow any more Do() executions.
func (r *RateLimiter) Stop() {
r.cancel()
Expand Down
16 changes: 16 additions & 0 deletions go/timer/rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package timer

import (
"math"
"testing"
"time"

Expand Down Expand Up @@ -75,3 +76,18 @@ func TestRateLimiterStop(t *testing.T) {
}
assert.Equal(t, valSnapshot, val)
}

func TestRateLimiterDiff(t *testing.T) {
d := 2 * time.Second
r := NewRateLimiter(d)
require.NotNil(t, r)
defer r.Stop()

// This assumes the last couple lines of code run faster than 2 seconds, which should be the case.
// But if you see flakiness due to slow runners, we can revisit the logic.
assert.Greater(t, r.Diff(), int64(math.MaxInt32))
time.Sleep(d + time.Second)
assert.Greater(t, r.Diff(), int64(math.MaxInt32))
r.DoEmpty()
assert.LessOrEqual(t, r.Diff(), int64(1))
}
27 changes: 13 additions & 14 deletions go/vt/vttablet/tabletserver/throttle/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,20 +148,18 @@ func (check *ThrottlerCheck) Check(ctx context.Context, appName string, storeTyp
}

checkResult = check.checkAppMetricResult(ctx, appName, storeType, storeName, metricResultFunc, flags)
check.throttler.lastCheckTimeNano.Store(time.Now().UnixNano())

go func(statusCode int) {
stats.GetOrNewCounter("ThrottlerCheckAnyTotal", "total number of checks").Add(1)
stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheckAny%s%sTotal", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), "").Add(1)

if statusCode != http.StatusOK {
stats.GetOrNewCounter("ThrottlerCheckAnyError", "total number of failed checks").Add(1)
stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheckAny%s%sError", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), "").Add(1)
}

check.throttler.markRecentApp(appName, remoteAddr)
}(checkResult.StatusCode)

check.throttler.markRecentApp(appName, remoteAddr)
if !throttlerapp.VitessName.Equals(appName) {
go func(statusCode int) {
stats.GetOrNewCounter("ThrottlerCheckAnyTotal", "total number of checks").Add(1)
stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheckAny%s%sTotal", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), "").Add(1)

if statusCode != http.StatusOK {
stats.GetOrNewCounter("ThrottlerCheckAnyError", "total number of failed checks").Add(1)
stats.GetOrNewCounter(fmt.Sprintf("ThrottlerCheckAny%s%sError", textutil.SingleWordCamel(storeType), textutil.SingleWordCamel(storeName)), "").Add(1)
}
}(checkResult.StatusCode)
}
return checkResult
}

Expand Down Expand Up @@ -227,6 +225,7 @@ func (check *ThrottlerCheck) SelfChecks(ctx context.Context) {
for metricName, metricResult := range check.AggregatedMetrics(ctx) {
metricName := metricName
metricResult := metricResult

go check.localCheck(ctx, metricName)
go check.reportAggregated(metricName, metricResult)
}
Expand Down
Loading

0 comments on commit 265e0b9

Please sign in to comment.