Skip to content

Commit

Permalink
Throttler: fix race conditions in Operate() termination and in tests
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach committed Jan 17, 2024
1 parent 44299cf commit 7ff9389
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 87 deletions.
24 changes: 15 additions & 9 deletions go/vt/vttablet/tabletserver/throttle/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,25 +424,28 @@ func (throttler *Throttler) IsRunning() bool {

// Enable activates the throttler probes; when enabled, the throttler responds to check queries based on
// the collected metrics.
func (throttler *Throttler) Enable() bool {
// The function returns a WaitGroup that can be used to wait for the throttler to be fully disabled, ie when
// the Operate() goroutine function terminates and caches are invalidated.
func (throttler *Throttler) Enable() *sync.WaitGroup {
throttler.enableMutex.Lock()
defer throttler.enableMutex.Unlock()

if wasEnabled := throttler.isEnabled.Swap(true); wasEnabled {
log.Infof("Throttler: already enabled")
return false
return nil
}
log.Infof("Throttler: enabling")

wg := &sync.WaitGroup{}
var ctx context.Context
ctx, throttler.cancelEnableContext = context.WithCancel(context.Background())
throttler.check.SelfChecks(ctx)
throttler.Operate(ctx)
throttler.Operate(ctx, wg)

// Make a one-time request for a lease of heartbeats
go throttler.heartbeatWriter.RequestHeartbeats()

return true
return wg
}

// Disable deactivates the probes and associated operations. When disabled, the throttler responds to check
Expand All @@ -457,10 +460,6 @@ func (throttler *Throttler) Disable() bool {
}
log.Infof("Throttler: disabling")
// _ = throttler.updateConfig(ctx, false, throttler.MetricsThreshold.Get()) // TODO(shlomi)
throttler.aggregatedMetrics.Flush()
throttler.recentApps.Flush()
throttler.nonLowPriorityAppRequestsThrottled.Flush()
// we do not flush throttler.throttledApps because this is data submitted by the user; the user expects the data to survive a disable+enable

throttler.cancelEnableContext()
return true
Expand Down Expand Up @@ -641,7 +640,7 @@ func (throttler *Throttler) isDormant() bool {

// Operate is the main entry point for the throttler operation and logic. It will
// run the probes, collect metrics, refresh inventory, etc.
func (throttler *Throttler) Operate(ctx context.Context) {
func (throttler *Throttler) Operate(ctx context.Context, wg *sync.WaitGroup) {
tickers := [](*timer.SuspendableTicker){}
addTicker := func(d time.Duration) *timer.SuspendableTicker {
t := timer.NewSuspendableTicker(d, false)
Expand All @@ -656,7 +655,14 @@ func (throttler *Throttler) Operate(ctx context.Context) {
throttledAppsTicker := addTicker(throttler.throttledAppsSnapshotInterval)
recentCheckTicker := addTicker(time.Second)

wg.Add(1)
go func() {
defer wg.Done()
defer throttler.aggregatedMetrics.Flush()
defer throttler.recentApps.Flush()
defer throttler.nonLowPriorityAppRequestsThrottled.Flush()
// we do not flush throttler.throttledApps because this is data submitted by the user; the user expects the data to survive a disable+enable

defer log.Infof("Throttler: Operate terminated, tickers stopped")
for _, t := range tickers {
defer t.Stop()
Expand Down

This file was deleted.

54 changes: 52 additions & 2 deletions go/vt/vttablet/tabletserver/throttle/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,9 +331,10 @@ func runThrottler(t *testing.T, throttler *Throttler, timeout time.Duration, f f
assert.True(t, throttler.IsOpen())
assert.False(t, throttler.IsEnabled())

ok := throttler.Enable()
wg := throttler.Enable()
defer throttler.Disable()
assert.True(t, ok)
assert.NotNil(t, wg)
defer wg.Wait()
assert.True(t, throttler.IsEnabled())

if f != nil {
Expand Down Expand Up @@ -382,3 +383,52 @@ func TestProbesWhileOperating(t *testing.T) {
})
})
}

// TestProbesPostDisable runs the throttler for some time, and then investigates the internal throttler maps and values.
// While the throttler is disabled, it is technically safe to iterate those structures. However, `go test -race` disagrees,
// which is why this test is in this *exclude_race* file
func TestProbesPostDisable(t *testing.T) {
throttler := newTestThrottler()
runThrottler(t, throttler, 2*time.Second, nil)

time.Sleep(time.Second) // throttler's Operate() quits asynchronously. For sake of `go test -race` we allow a graceful wait.
probes := throttler.mysqlInventory.ClustersProbes
assert.NotEmpty(t, probes)

selfProbes := probes[selfStoreName]
t.Run("self", func(t *testing.T) {
assert.NotEmpty(t, selfProbes)
require.Equal(t, 1, len(selfProbes)) // should always be true once refreshMySQLInventory() runs
probe, ok := selfProbes[""]
assert.True(t, ok)
assert.NotNil(t, probe)

assert.Equal(t, "", probe.Alias)
assert.Nil(t, probe.Tablet)
assert.Equal(t, "select 1", probe.MetricQuery)
assert.Zero(t, atomic.LoadInt64(&probe.QueryInProgress))
})

shardProbes := probes[shardStoreName]
t.Run("shard", func(t *testing.T) {
assert.NotEmpty(t, shardProbes)
assert.Equal(t, 2, len(shardProbes)) // see fake FindAllTabletAliasesInShard above
for _, probe := range shardProbes {
require.NotNil(t, probe)
assert.NotEmpty(t, probe.Alias)
assert.NotNil(t, probe.Tablet)
assert.Equal(t, "select 1", probe.MetricQuery)
assert.Zero(t, atomic.LoadInt64(&probe.QueryInProgress))
}
})

t.Run("metrics", func(t *testing.T) {
assert.Equal(t, 3, len(throttler.mysqlInventory.TabletMetrics)) // 1 self tablet + 2 shard tablets
})

t.Run("aggregated", func(t *testing.T) {
assert.Zero(t, throttler.aggregatedMetrics.ItemCount()) // flushed upon Disable()
aggr := throttler.aggregatedMetricsSnapshot()
assert.Empty(t, aggr)
})
}

0 comments on commit 7ff9389

Please sign in to comment.