Skip to content

Commit

Permalink
Add leak checking unit test
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed May 26, 2024
1 parent eeba51b commit 3839e90
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 4 deletions.
3 changes: 3 additions & 0 deletions go/test/utils/noleak.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ func ensureNoGoroutines() error {
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/logutil.(*ThrottledLogger).log.func1"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vttablet/tabletserver/throttle.initThrottleTicker.func1.1"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vttablet/tabletserver/throttle.NewBackgroundClient.initThrottleTicker.func1.1"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/pools/smartconnpool.(*ConnPool[...]).runWorker.func1"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/stats.(*Rates).track"),
goleak.IgnoreTopFunction("vitess.io/vitess/go/timer.(*Timer).run"),
goleak.IgnoreTopFunction("testing.tRunner.func1"),
}

Expand Down
17 changes: 13 additions & 4 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -840,16 +840,22 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m
}

// stallHandler is used to monitor for vplayer stalls and trigger an error
// when detected. This is used today to detect when a vplayer is not able
// to commit/complete a replicated user transaction within a configured
// period of time.
// when detected. This is used today to detect when a vplayer is not able to
// commit/complete a replicated user transaction within a configured period of
// time. It offers a lock-free implementation that is idempotent; you can call
// startTimer()/stopTimer() as many times as you like and in any order -- it
// will ensure that there's only ever 0 or 1 timers/goroutines running at a
// time. When it is already running, calls to startTimer() will only reset
// the timer's deadline.
type stallHandler struct {
timer atomic.Pointer[time.Timer]
deadline time.Duration
fire chan error
stop chan struct{}
}

// newStallHandler initializes a stall handler. You should call stopTimer()
// in a defer from the same function where you initalize a new stallHandler.
func newStallHandler(dl time.Duration, ch chan error) *stallHandler {
return &stallHandler{
deadline: dl,
Expand All @@ -858,11 +864,13 @@ func newStallHandler(dl time.Duration, ch chan error) *stallHandler {
}
}

// startTimer starts the timer if it's not already running and it otherwise
// resets the timer's deadline when it it is already running.
func (sh *stallHandler) startTimer() error {
if sh == nil || sh.deadline == 0 { // Stall handling is disabled
return nil
}
// If the timer has not been initializeded yet, then do so.
// If the timer has not been initialized yet, then do so.
if swapped := sh.timer.CompareAndSwap(nil, time.NewTimer(sh.deadline)); !swapped {
// Otherwise, reset the timer's deadline.
if sh.timer.Load().Reset(sh.deadline) {
Expand All @@ -887,6 +895,7 @@ func (sh *stallHandler) startTimer() error {
return nil
}

// stopTimer stops the timer if it's currently running.
func (sh *stallHandler) stopTimer() error {
if sh == nil || sh.deadline == 0 || sh.timer.Load() == nil { // Stall handling is currently disabled
return nil
Expand Down
134 changes: 134 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
Copyright 2024 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vreplication

import (
"context"
"math/rand"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/test/utils"
)

func TestStallHandler(t *testing.T) {
lctx := utils.LeakCheckContext(t)
tme := time.Duration(90 * time.Second)
ctx, cancel := context.WithTimeout(lctx, tme)
defer cancel()

ovpd := vplayerProgressDeadline
vplayerProgressDeadline = tme / 10
defer func() {
vplayerProgressDeadline = ovpd
}()
concurrency := 10000
dl := time.Duration(10 * time.Second)

tests := []struct {
name string
f func()
}{
{
name: "Random concurrency",
f: func() {
ch := make(chan error)
sh := newStallHandler(dl, ch)
defer sh.stopTimer() // This should always be called in a defer from where it's created
wg := sync.WaitGroup{}
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
time.Sleep(time.Duration(rand.Intn(1000)) * time.Microsecond)
var err error
action := rand.Intn(2)
if action == 0 {
err = sh.startTimer()
} else {
err = sh.stopTimer()
}
require.NoError(t, err)
select {
case e := <-ch:
require.FailNow(t, "unexpected error", "error: %v", e)
case <-ctx.Done():
default:
}
}()
}
wg.Wait()
},
},
{
name: "All stops",
f: func() {
ch := make(chan error)
sh := newStallHandler(dl, ch)
defer sh.stopTimer() // This should always be called in a defer from where it's created
wg := sync.WaitGroup{}
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := sh.stopTimer()
require.NoError(t, err)
select {
case e := <-ch:
require.FailNow(t, "unexpected error", "error: %v", e)
case <-ctx.Done():
default:
}
}()
}
wg.Wait()
},
},
{
name: "All starts",
f: func() {
ch := make(chan error)
sh := newStallHandler(dl, ch)
defer sh.stopTimer() // This should always be called in a defer from where it's created
wg := sync.WaitGroup{}
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := sh.startTimer()
require.NoError(t, err)
select {
case e := <-ch:
require.FailNow(t, "unexpected error", "error: %v", e)
case <-ctx.Done():
default:
}
}()
}
wg.Wait()
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tt.f()
})
}
}

0 comments on commit 3839e90

Please sign in to comment.