Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-20.0] Fix race in replicationLagModule of go/vt/throttle (#16078) #16900

Merged
merged 2 commits into from
Oct 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions go/vt/throttler/replication_lag_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package throttler

import (
"sort"
"sync"
"time"

"vitess.io/vitess/go/vt/discovery"
Expand All @@ -30,6 +31,8 @@ type replicationLagCache struct {
// The map key is replicationLagRecord.LegacyTabletStats.Key.
entries map[string]*replicationLagHistory

mu sync.Mutex

// slowReplicas is a set of slow replicas.
// The map key is replicationLagRecord.LegacyTabletStats.Key.
// This map will always be recomputed by sortByLag() and must not be modified
Expand Down Expand Up @@ -60,6 +63,9 @@ func newReplicationLagCache(historyCapacityPerReplica int) *replicationLagCache

// add inserts or updates "r" in the cache for the replica with the key "r.Key".
func (c *replicationLagCache) add(r replicationLagRecord) {
c.mu.Lock()
defer c.mu.Unlock()

if !r.Serving {
// Tablet is down. Do no longer track it.
delete(c.entries, discovery.TabletToMapKey(r.Tablet))
Expand All @@ -76,9 +82,35 @@ func (c *replicationLagCache) add(r replicationLagRecord) {
entry.add(r)
}

// maxLag returns the maximum replication lag for the entries in cache.
func (c *replicationLagCache) maxLag() (maxLag uint32) {
c.mu.Lock()
defer c.mu.Unlock()

for key := range c.entries {
if c.isIgnored(key) {
continue
}

entry := c.entries[key]
if entry == nil {
continue
}

latest := entry.latest()
if lag := latest.Stats.ReplicationLagSeconds; lag > maxLag {
maxLag = lag
}
}

return maxLag
}

// latest returns the current lag record for the given LegacyTabletStats.Key string.
// A zero record is returned if there is no latest entry.
func (c *replicationLagCache) latest(key string) replicationLagRecord {
c.mu.Lock()
defer c.mu.Unlock()
entry, ok := c.entries[key]
if !ok {
return replicationLagRecord{}
Expand All @@ -90,6 +122,8 @@ func (c *replicationLagCache) latest(key string) replicationLagRecord {
// or just after it.
// If there is no such record, a zero record is returned.
func (c *replicationLagCache) atOrAfter(key string, at time.Time) replicationLagRecord {
c.mu.Lock()
defer c.mu.Unlock()
entry, ok := c.entries[key]
if !ok {
return replicationLagRecord{}
Expand All @@ -100,6 +134,9 @@ func (c *replicationLagCache) atOrAfter(key string, at time.Time) replicationLag
// sortByLag sorts all replicas by their latest replication lag value and
// tablet uid and updates the c.slowReplicas set.
func (c *replicationLagCache) sortByLag(ignoreNSlowestReplicas int, minimumReplicationLag int64) {
c.mu.Lock()
defer c.mu.Unlock()

// Reset the current list of ignored replicas.
c.slowReplicas = make(map[string]bool)

Expand Down Expand Up @@ -142,6 +179,9 @@ func (a byLagAndTabletUID) Less(i, j int) bool {
// this slow replica.
// "key" refers to ReplicationLagRecord.LegacyTabletStats.Key.
func (c *replicationLagCache) ignoreSlowReplica(key string) bool {
c.mu.Lock()
defer c.mu.Unlock()

if len(c.slowReplicas) == 0 {
// No slow replicas at all.
return false
Expand Down
7 changes: 7 additions & 0 deletions go/vt/throttler/replication_lag_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,10 @@ func TestReplicationLagCache_SortByLag(t *testing.T) {

require.True(t, c.slowReplicas[r1Key], "r1 should be tracked as a slow replica")
}

func TestReplicationLagCache_MaxLag(t *testing.T) {
c := newReplicationLagCache(2)
c.add(lagRecord(sinceZero(1*time.Second), r1, 30))
c.add(lagRecord(sinceZero(1*time.Second), r2, 1))
require.Equal(t, uint32(30), c.maxLag())
}
18 changes: 3 additions & 15 deletions go/vt/throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,22 +229,10 @@ func (t *Throttler) Throttle(threadID int) time.Duration {
// the provided type, excluding ignored tablets.
func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 {
cache := t.maxReplicationLagModule.lagCacheByType(tabletType)

var maxLag uint32
cacheEntries := cache.entries

for key := range cacheEntries {
if cache.isIgnored(key) {
continue
}

lag := cache.latest(key).Stats.ReplicationLagSeconds
if lag > maxLag {
maxLag = lag
}
if cache == nil {
return 0
}

return maxLag
return cache.maxLag()
}

// ThreadFinished marks threadID as finished and redistributes the thread's
Expand Down
81 changes: 81 additions & 0 deletions go/vt/throttler/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,25 @@ limitations under the License.
package throttler

import (
"context"
"runtime"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
)

// testTabletTypes is the list of tablet types to test.
var testTabletTypes = []topodata.TabletType{
topodata.TabletType_REPLICA,
topodata.TabletType_RDONLY,
}

// The main purpose of the benchmarks below is to demonstrate the functionality
// of the throttler in the real-world (using a non-faked time.Now).
// The benchmark values should be as close as possible to the request interval
Expand Down Expand Up @@ -398,3 +410,72 @@ func TestThreadFinished_SecondCallPanics(t *testing.T) {
}()
throttler.ThreadFinished(0)
}

func TestThrottlerMaxLag(t *testing.T) {
fc := &fakeClock{}
throttler, err := newThrottlerWithClock(t.Name(), "queries", 1, 1, 10, fc.now)
require.NoError(t, err)
defer throttler.Close()

require.NotNil(t, throttler)
require.NotNil(t, throttler.maxReplicationLagModule)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var wg sync.WaitGroup

// run .add() and .MaxLag() concurrently to detect races
for _, tabletType := range testTabletTypes {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
throttler.MaxLag(tabletType)
}
}
}()

wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
cache := throttler.maxReplicationLagModule.lagCacheByType(tabletType)
require.NotNil(t, cache)
cache.add(replicationLagRecord{
time: time.Now(),
TabletHealth: discovery.TabletHealth{
Serving: true,
Stats: &query.RealtimeStats{
ReplicationLagSeconds: 5,
},
Tablet: &topodata.Tablet{
Hostname: t.Name(),
Type: tabletType,
PortMap: map[string]int32{
"test": 15999,
},
},
},
})
}
}
}()
}
time.Sleep(time.Second)
cancel()
wg.Wait()

// check .MaxLag()
for _, tabletType := range testTabletTypes {
require.Equal(t, uint32(5), throttler.MaxLag(tabletType))
}
}
Loading