Skip to content

Commit

Permalink
TxThrottler only throttles if current lag is above threshold.
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo J. Ortega U <[email protected]>
  • Loading branch information
ejortegau committed Dec 6, 2023
1 parent f1517e5 commit 8509de6
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 9 deletions.
30 changes: 29 additions & 1 deletion go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"sync"
"time"

"github.com/patrickmn/go-cache"

"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"

Expand Down Expand Up @@ -184,6 +186,8 @@ type txThrottlerStateImpl struct {

healthCheck discovery.LegacyHealthCheck
topologyWatchers []TopologyWatcherInterface
// lagRecordsCache holds the most recently seen lag for each tablet
lagRecordsCache *cache.Cache
}

// NewTxThrottler tries to construct a txThrottler from the
Expand Down Expand Up @@ -331,6 +335,8 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar
result := &txThrottlerStateImpl{
config: config,
throttler: t,
lagRecordsCache: cache.New(time.Duration(2*config.throttlerConfig.TargetReplicationLagSec)*time.Second,
time.Duration(4*config.throttlerConfig.TargetReplicationLagSec)*time.Second),
}
result.healthCheck = healthCheckFactory()
result.healthCheck.SetListener(result, false /* sendDownEvents */)
Expand Down Expand Up @@ -359,7 +365,25 @@ func (ts *txThrottlerStateImpl) throttle() bool {
// Serialize calls to ts.throttle.Throttle()
ts.throttleMu.Lock()
defer ts.throttleMu.Unlock()
return ts.throttler.Throttle(0 /* threadId */) > 0

// Find out the max lag seen recently from the lag cache
var maxLag int64

for host, lagInfo := range ts.lagRecordsCache.Items() {
lag, ok := lagInfo.Object.(int64)
if !ok {
log.Warningf("Failed to get lag of tablet %s from cache: %+v", host, lagInfo.Object)

continue
}

if lag > maxLag {
maxLag = lag
}
}

return ts.throttler.Throttle(0 /* threadId */) > 0 && // Throttle if underlying Throttler object says so...
maxLag > ts.config.throttlerConfig.TargetReplicationLagSec // ... but only if there is actual lag
}

func (ts *txThrottlerStateImpl) deallocateResources() {
Expand Down Expand Up @@ -391,6 +415,10 @@ func (ts *txThrottlerStateImpl) StatsUpdate(tabletStats *discovery.LegacyTabletS
for _, expectedTabletType := range ts.config.tabletTypes {
if tabletStats.Target.TabletType == expectedTabletType {
ts.throttler.RecordReplicationLag(time.Now(), tabletStats)
if tabletStats.Stats != nil {
ts.lagRecordsCache.Set(tabletStats.Name, int64(tabletStats.Stats.ReplicationLagSeconds),
cache.DefaultExpiration)
}
return
}
}
Expand Down
62 changes: 54 additions & 8 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,26 +90,53 @@ func TestEnabledThrottler(t *testing.T) {
}

call0 := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */)
// Call 1 cannot throttle because the return value of call to underlying Throttle is zero.
call1 := mockThrottler.EXPECT().Throttle(0)
call1.Return(0 * time.Second)
tabletStats := &discovery.LegacyTabletStats{
tabletStatsReplicaLagged := &discovery.LegacyTabletStats{
Target: &querypb.Target{
TabletType: topodatapb.TabletType_REPLICA,
},
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 20}, // Lag high enough for throttling
Name: "replica-name",
}
call2 := mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats)
// call2 Adds a lag record above threshold for replica tablets
call2 := mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStatsReplicaLagged)

// call3 can throttle because the return value of call to Throttle is > zero - provided other conditions are met
call3 := mockThrottler.EXPECT().Throttle(0)
call3.Return(1 * time.Second)

// call4 can throttle because the return value of call to Throttle is > zero - provided other conditions are met
call4 := mockThrottler.EXPECT().Throttle(0)
call4.Return(1 * time.Second)

tabletStatsReplicaNotLagged := &discovery.LegacyTabletStats{
Target: &querypb.Target{
TabletType: topodatapb.TabletType_REPLICA,
},
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1}, // Lag high enough for throttling
Name: "replica-name",
}

// call5 Adds a lag record below threshold for the right tablet type
call5 := mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStatsReplicaNotLagged)
// call6 can throttle because the return value of call to Throttle is > zero - provided other conditions are met
call6 := mockThrottler.EXPECT().Throttle(0)
call6.Return(1 * time.Second)
// call7 can throttle because the return value of call to Throttle is > zero - provided other conditions are met
call7 := mockThrottler.EXPECT().Throttle(0)
call7.Return(1 * time.Second)

calllast := mockThrottler.EXPECT().Close()

call1.After(call0)
call2.After(call1)
call3.After(call2)
call4.After(call3)
calllast.After(call4)
call5.After(call4)
call6.After(call5)
call7.After(call6)
calllast.After(call7)

config := tabletenv.NewDefaultConfig()
config.EnableTxThrottler = true
Expand All @@ -126,27 +153,46 @@ func TestEnabledThrottler(t *testing.T) {
assert.Nil(t, throttler.Open())
assert.Equal(t, int64(1), throttler.throttlerRunning.Get())

// call1 can't throttle.
assert.False(t, throttler.Throttle(100, "some-workload"))
assert.Equal(t, int64(1), throttler.requestsTotal.Counts()["some-workload"])
assert.Zero(t, throttler.requestsThrottled.Counts()["some-workload"])
// call2 records lag above threshold for REPLICA tablet
throttler.state.StatsUpdate(tabletStatsReplicaLagged)

throttler.state.StatsUpdate(tabletStats)
// This call should not be forwarded to the go/vt/throttler.Throttler object. Ignore RDONLY replicas due to config
rdonlyTabletStats := &discovery.LegacyTabletStats{
Target: &querypb.Target{
TabletType: topodatapb.TabletType_RDONLY,
},
Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 20}, // Lag high enough for throttling
Name: "rdonly-name",
}
// This call should not be forwarded to the go/vt/throttler.Throttler object.
hcListener.StatsUpdate(rdonlyTabletStats)
// The second throttle call should reject.

// call3 throttles due to priority & enough replication lag besides return value for underlying Throttle call
assert.True(t, throttler.Throttle(100, "some-workload"))
assert.Equal(t, int64(2), throttler.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"])

// This call should not throttle due to priority. Check that's the case and counters agree.
// call4 does not throttle due to priority, despite enough replication lag besides return value for underlying
// Throttle call
assert.False(t, throttler.Throttle(0, "some-workload"))
assert.Equal(t, int64(3), throttler.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"])

// call5 records lag below threshold for REPLICA tablet
throttler.state.StatsUpdate(tabletStatsReplicaNotLagged)

// call6 does not throttle despite priority, because lag is below threshold
assert.False(t, throttler.Throttle(100, "some-workload"))
assert.Equal(t, int64(4), throttler.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"])
// call7 does not throttle due to priority and replication lag below threshold
assert.False(t, throttler.Throttle(0, "some-workload"))
assert.Equal(t, int64(5), throttler.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"])

throttler.Close()
assert.Zero(t, throttler.throttlerRunning.Get())
}
Expand Down

0 comments on commit 8509de6

Please sign in to comment.