Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slack vitess r14.0.5 dsdefense throttle only if lag #170

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"sync"
"time"

"github.com/patrickmn/go-cache"

"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -184,6 +186,8 @@ type txThrottlerStateImpl struct {

healthCheck discovery.LegacyHealthCheck
topologyWatchers []TopologyWatcherInterface
// lagRecordsCache holds the most recently seen lag for each tablet
lagRecordsCache *cache.Cache
}

// NewTxThrottler tries to construct a txThrottler from the
Expand Down Expand Up @@ -331,6 +335,8 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar
result := &txThrottlerStateImpl{
config: config,
throttler: t,
lagRecordsCache: cache.New(time.Duration(2*config.throttlerConfig.TargetReplicationLagSec)*time.Second,
time.Duration(4*config.throttlerConfig.TargetReplicationLagSec)*time.Second),
}
result.healthCheck = healthCheckFactory()
result.healthCheck.SetListener(result, false /* sendDownEvents */)
Expand Down Expand Up @@ -359,7 +365,25 @@ func (ts *txThrottlerStateImpl) throttle() bool {
// Serialize calls to ts.throttle.Throttle()
ts.throttleMu.Lock()
defer ts.throttleMu.Unlock()
return ts.throttler.Throttle(0 /* threadId */) > 0

// Find out the max lag seen recently from the lag cache
var maxLag int64

for host, lagInfo := range ts.lagRecordsCache.Items() {
lag, ok := lagInfo.Object.(int64)
if !ok {
log.Warningf("Failed to get lag of tablet %s from cache: %+v", host, lagInfo.Object)

continue
}

if lag > maxLag {
maxLag = lag
}
}

return ts.throttler.Throttle(0 /* threadId */) > 0 && // Throttle if underlying Throttler object says so...
maxLag > ts.config.throttlerConfig.TargetReplicationLagSec // ... but only if there is actual lag
}

func (ts *txThrottlerStateImpl) deallocateResources() {
Expand Down Expand Up @@ -391,6 +415,10 @@ func (ts *txThrottlerStateImpl) StatsUpdate(tabletStats *discovery.LegacyTabletS
for _, expectedTabletType := range ts.config.tabletTypes {
if tabletStats.Target.TabletType == expectedTabletType {
ts.throttler.RecordReplicationLag(time.Now(), tabletStats)
if tabletStats.Stats != nil {
ts.lagRecordsCache.Set(tabletStats.Name, int64(tabletStats.Stats.ReplicationLagSeconds),
cache.DefaultExpiration)
}
return
}
}
Expand Down
62 changes: 54 additions & 8 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,26 +90,53 @@ func TestEnabledThrottler(t *testing.T) {
}

call0 := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */)
// Call 1 cannot throttle because the return value of call to underlying Throttle is zero.
call1 := mockThrottler.EXPECT().Throttle(0)
call1.Return(0 * time.Second)
tabletStats := &discovery.LegacyTabletStats{
tabletStatsReplicaLagged := &discovery.LegacyTabletStats{
Target: &querypb.Target{
TabletType: topodatapb.TabletType_REPLICA,
},
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 20}, // Lag high enough for throttling
Name: "replica-name",
}
call2 := mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats)
// call2 Adds a lag record above threshold for replica tablets
call2 := mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStatsReplicaLagged)

// call3 can throttle because the return value of call to Throttle is > zero - provided other conditions are met
call3 := mockThrottler.EXPECT().Throttle(0)
call3.Return(1 * time.Second)

// call4 can throttle because the return value of call to Throttle is > zero - provided other conditions are met
call4 := mockThrottler.EXPECT().Throttle(0)
call4.Return(1 * time.Second)

tabletStatsReplicaNotLagged := &discovery.LegacyTabletStats{
Target: &querypb.Target{
TabletType: topodatapb.TabletType_REPLICA,
},
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1}, // Lag high enough for throttling
Name: "replica-name",
}

// call5 Adds a lag record below threshold for the right tablet type
call5 := mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStatsReplicaNotLagged)
// call6 can throttle because the return value of call to Throttle is > zero - provided other conditions are met
call6 := mockThrottler.EXPECT().Throttle(0)
call6.Return(1 * time.Second)
// call7 can throttle because the return value of call to Throttle is > zero - provided other conditions are met
call7 := mockThrottler.EXPECT().Throttle(0)
call7.Return(1 * time.Second)

calllast := mockThrottler.EXPECT().Close()

call1.After(call0)
call2.After(call1)
call3.After(call2)
call4.After(call3)
calllast.After(call4)
call5.After(call4)
call6.After(call5)
call7.After(call6)
calllast.After(call7)

config := tabletenv.NewDefaultConfig()
config.EnableTxThrottler = true
Expand All @@ -126,27 +153,46 @@ func TestEnabledThrottler(t *testing.T) {
assert.Nil(t, throttler.Open())
assert.Equal(t, int64(1), throttler.throttlerRunning.Get())

// call1 can't throttle.
assert.False(t, throttler.Throttle(100, "some-workload"))
assert.Equal(t, int64(1), throttler.requestsTotal.Counts()["some-workload"])
assert.Zero(t, throttler.requestsThrottled.Counts()["some-workload"])
// call2 records lag above threshold for REPLICA tablet
throttler.state.StatsUpdate(tabletStatsReplicaLagged)

throttler.state.StatsUpdate(tabletStats)
// This call should not be forwarded to the go/vt/throttler.Throttler object. Ignore RDONLY replicas due to config
rdonlyTabletStats := &discovery.LegacyTabletStats{
Target: &querypb.Target{
TabletType: topodatapb.TabletType_RDONLY,
},
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 20}, // Lag high enough for throttling
Name: "rdonly-name",
}
// This call should not be forwarded to the go/vt/throttler.Throttler object.
hcListener.StatsUpdate(rdonlyTabletStats)
// The second throttle call should reject.

// call3 throttles due to priority & enough replication lag besides return value for underlying Throttle call
assert.True(t, throttler.Throttle(100, "some-workload"))
assert.Equal(t, int64(2), throttler.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"])

// This call should not throttle due to priority. Check that's the case and counters agree.
// call4 does not throttle due to priority, despite enough replication lag besides return value for underlying
// Throttle call
assert.False(t, throttler.Throttle(0, "some-workload"))
assert.Equal(t, int64(3), throttler.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"])

// call5 records lag below threshold for REPLICA tablet
throttler.state.StatsUpdate(tabletStatsReplicaNotLagged)

// call6 does not throttle despite priority, because lag is below threshold
assert.False(t, throttler.Throttle(100, "some-workload"))
assert.Equal(t, int64(4), throttler.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"])
// call7 does not throttle due to priority and replication lag below threshold
assert.False(t, throttler.Throttle(0, "some-workload"))
assert.Equal(t, int64(5), throttler.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"])

throttler.Close()
assert.Zero(t, throttler.throttlerRunning.Get())
}
Expand Down
Loading