diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 9f2f369ffd9..94035b9ec0c 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -23,6 +23,8 @@ import ( "sync" "time" + "github.com/patrickmn/go-cache" + "google.golang.org/protobuf/encoding/prototext" "google.golang.org/protobuf/proto" @@ -184,6 +186,8 @@ type txThrottlerStateImpl struct { healthCheck discovery.LegacyHealthCheck topologyWatchers []TopologyWatcherInterface + // lagRecordsCache holds the most recently seen lag for each tablet + lagRecordsCache *cache.Cache } // NewTxThrottler tries to construct a txThrottler from the @@ -331,6 +335,8 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar result := &txThrottlerStateImpl{ config: config, throttler: t, + lagRecordsCache: cache.New(time.Duration(2*config.throttlerConfig.TargetReplicationLagSec)*time.Second, + time.Duration(4*config.throttlerConfig.TargetReplicationLagSec)*time.Second), } result.healthCheck = healthCheckFactory() result.healthCheck.SetListener(result, false /* sendDownEvents */) @@ -359,7 +365,25 @@ func (ts *txThrottlerStateImpl) throttle() bool { // Serialize calls to ts.throttle.Throttle() ts.throttleMu.Lock() defer ts.throttleMu.Unlock() - return ts.throttler.Throttle(0 /* threadId */) > 0 + + // Find out the max lag seen recently from the lag cache + var maxLag int64 + + for host, lagInfo := range ts.lagRecordsCache.Items() { + lag, ok := lagInfo.Object.(int64) + if !ok { + log.Warningf("Failed to get lag of tablet %s from cache: %+v", host, lagInfo.Object) + + continue + } + + if lag > maxLag { + maxLag = lag + } + } + + return ts.throttler.Throttle(0 /* threadId */) > 0 && // Throttle if underlying Throttler object says so... + maxLag > ts.config.throttlerConfig.TargetReplicationLagSec // ... but only if there is actual lag } func (ts *txThrottlerStateImpl) deallocateResources() { @@ -391,6 +415,10 @@ func (ts *txThrottlerStateImpl) StatsUpdate(tabletStats *discovery.LegacyTabletS for _, expectedTabletType := range ts.config.tabletTypes { if tabletStats.Target.TabletType == expectedTabletType { ts.throttler.RecordReplicationLag(time.Now(), tabletStats) + if tabletStats.Stats != nil { + ts.lagRecordsCache.Set(tabletStats.Name, int64(tabletStats.Stats.ReplicationLagSeconds), + cache.DefaultExpiration) + } return } } diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 7b9e4bc98bd..aa129bdb09c 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -90,26 +90,53 @@ func TestEnabledThrottler(t *testing.T) { } call0 := mockThrottler.EXPECT().UpdateConfiguration(gomock.Any(), true /* copyZeroValues */) + // Call 1 cannot throttle because the return value of call to underlying Throttle is zero. call1 := mockThrottler.EXPECT().Throttle(0) call1.Return(0 * time.Second) - tabletStats := &discovery.LegacyTabletStats{ + tabletStatsReplicaLagged := &discovery.LegacyTabletStats{ Target: &querypb.Target{ TabletType: topodatapb.TabletType_REPLICA, }, + Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 20}, // Lag high enough for throttling + Name: "replica-name", } - call2 := mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStats) + // call2 Adds a lag record above threshold for replica tablets + call2 := mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStatsReplicaLagged) + + // call3 can throttle because the return value of call to Throttle is > zero - provided other conditions are met call3 := mockThrottler.EXPECT().Throttle(0) call3.Return(1 * time.Second) - + // call4 can throttle because the return value of call to Throttle is > zero - provided other conditions are met call4 := mockThrottler.EXPECT().Throttle(0) call4.Return(1 * time.Second) + + tabletStatsReplicaNotLagged := &discovery.LegacyTabletStats{ + Target: &querypb.Target{ + TabletType: topodatapb.TabletType_REPLICA, + }, + Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 1}, // Lag high enough for throttling + Name: "replica-name", + } + + // call5 Adds a lag record below threshold for the right tablet type + call5 := mockThrottler.EXPECT().RecordReplicationLag(gomock.Any(), tabletStatsReplicaNotLagged) + // call6 can throttle because the return value of call to Throttle is > zero - provided other conditions are met + call6 := mockThrottler.EXPECT().Throttle(0) + call6.Return(1 * time.Second) + // call7 can throttle because the return value of call to Throttle is > zero - provided other conditions are met + call7 := mockThrottler.EXPECT().Throttle(0) + call7.Return(1 * time.Second) + calllast := mockThrottler.EXPECT().Close() call1.After(call0) call2.After(call1) call3.After(call2) call4.After(call3) - calllast.After(call4) + call5.After(call4) + call6.After(call5) + call7.After(call6) + calllast.After(call7) config := tabletenv.NewDefaultConfig() config.EnableTxThrottler = true @@ -126,27 +153,46 @@ func TestEnabledThrottler(t *testing.T) { assert.Nil(t, throttler.Open()) assert.Equal(t, int64(1), throttler.throttlerRunning.Get()) + // call1 can't throttle. assert.False(t, throttler.Throttle(100, "some-workload")) assert.Equal(t, int64(1), throttler.requestsTotal.Counts()["some-workload"]) assert.Zero(t, throttler.requestsThrottled.Counts()["some-workload"]) + // call2 records lag above threshold for REPLICA tablet + throttler.state.StatsUpdate(tabletStatsReplicaLagged) - throttler.state.StatsUpdate(tabletStats) + // This call should not be forwarded to the go/vt/throttler.Throttler object. Ignore RDONLY replicas due to config rdonlyTabletStats := &discovery.LegacyTabletStats{ Target: &querypb.Target{ TabletType: topodatapb.TabletType_RDONLY, }, + Stats: &querypb.RealtimeStats{ReplicationLagSeconds: 20}, // Lag high enough for throttling + Name: "rdonly-name", } - // This call should not be forwarded to the go/vt/throttler.Throttler object. hcListener.StatsUpdate(rdonlyTabletStats) - // The second throttle call should reject. + + // call3 throttles due to priority & enough replication lag besides return value for underlying Throttle call assert.True(t, throttler.Throttle(100, "some-workload")) assert.Equal(t, int64(2), throttler.requestsTotal.Counts()["some-workload"]) assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"]) - // This call should not throttle due to priority. Check that's the case and counters agree. + // call4 does not throttle due to priority, despite enough replication lag besides return value for underlying + // Throttle call assert.False(t, throttler.Throttle(0, "some-workload")) assert.Equal(t, int64(3), throttler.requestsTotal.Counts()["some-workload"]) assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"]) + + // call5 records lag below threshold for REPLICA tablet + throttler.state.StatsUpdate(tabletStatsReplicaNotLagged) + + // call6 does not throttle despite priority, because lag is below threshold + assert.False(t, throttler.Throttle(100, "some-workload")) + assert.Equal(t, int64(4), throttler.requestsTotal.Counts()["some-workload"]) + assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"]) + // call7 does not throttle due to priority and replication lag below threshold + assert.False(t, throttler.Throttle(0, "some-workload")) + assert.Equal(t, int64(5), throttler.requestsTotal.Counts()["some-workload"]) + assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"]) + throttler.Close() assert.Zero(t, throttler.throttlerRunning.Get()) }