diff --git a/go/test/utils/noleak.go b/go/test/utils/noleak.go index 31d454ec789..a0d1809087a 100644 --- a/go/test/utils/noleak.go +++ b/go/test/utils/noleak.go @@ -81,6 +81,9 @@ func ensureNoGoroutines() error { goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/logutil.(*ThrottledLogger).log.func1"), goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vttablet/tabletserver/throttle.initThrottleTicker.func1.1"), goleak.IgnoreTopFunction("vitess.io/vitess/go/vt/vttablet/tabletserver/throttle.NewBackgroundClient.initThrottleTicker.func1.1"), + goleak.IgnoreTopFunction("vitess.io/vitess/go/pools/smartconnpool.(*ConnPool[...]).runWorker.func1"), + goleak.IgnoreTopFunction("vitess.io/vitess/go/stats.(*Rates).track"), + goleak.IgnoreTopFunction("vitess.io/vitess/go/timer.(*Timer).run"), goleak.IgnoreTopFunction("testing.tRunner.func1"), } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 23c6d98316a..48fe441318d 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -840,9 +840,13 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m } // stallHandler is used to monitor for vplayer stalls and trigger an error -// when detected. This is used today to detect when a vplayer is not able -// to commit/complete a replicated user transaction within a configured -// period of time. +// when detected. This is used today to detect when a vplayer is not able to +// commit/complete a replicated user transaction within a configured period of +// time. It offers a lock-free implementation that is idempotent; you can call +// startTimer()/stopTimer() as many times as you like and in any order -- it +// will ensure that there's only ever 0 or 1 timers/goroutines running at a +// time. When it is already running, calls to startTimer() will only reset +// the timer's deadline. type stallHandler struct { timer atomic.Pointer[time.Timer] deadline time.Duration @@ -850,6 +854,8 @@ type stallHandler struct { stop chan struct{} } +// newStallHandler initializes a stall handler. You should call stopTimer() +// in a defer from the same function where you initalize a new stallHandler. func newStallHandler(dl time.Duration, ch chan error) *stallHandler { return &stallHandler{ deadline: dl, @@ -858,11 +864,13 @@ func newStallHandler(dl time.Duration, ch chan error) *stallHandler { } } +// startTimer starts the timer if it's not already running and it otherwise +// resets the timer's deadline when it it is already running. func (sh *stallHandler) startTimer() error { if sh == nil || sh.deadline == 0 { // Stall handling is disabled return nil } - // If the timer has not been initializeded yet, then do so. + // If the timer has not been initialized yet, then do so. if swapped := sh.timer.CompareAndSwap(nil, time.NewTimer(sh.deadline)); !swapped { // Otherwise, reset the timer's deadline. if sh.timer.Load().Reset(sh.deadline) { @@ -887,6 +895,7 @@ func (sh *stallHandler) startTimer() error { return nil } +// stopTimer stops the timer if it's currently running. func (sh *stallHandler) stopTimer() error { if sh == nil || sh.deadline == 0 || sh.timer.Load() == nil { // Stall handling is currently disabled return nil diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go new file mode 100644 index 00000000000..f8a05e9e217 --- /dev/null +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_test.go @@ -0,0 +1,134 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vreplication + +import ( + "context" + "math/rand" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/test/utils" +) + +func TestStallHandler(t *testing.T) { + lctx := utils.LeakCheckContext(t) + tme := time.Duration(90 * time.Second) + ctx, cancel := context.WithTimeout(lctx, tme) + defer cancel() + + ovpd := vplayerProgressDeadline + vplayerProgressDeadline = tme / 10 + defer func() { + vplayerProgressDeadline = ovpd + }() + concurrency := 10000 + dl := time.Duration(10 * time.Second) + + tests := []struct { + name string + f func() + }{ + { + name: "Random concurrency", + f: func() { + ch := make(chan error) + sh := newStallHandler(dl, ch) + defer sh.stopTimer() // This should always be called in a defer from where it's created + wg := sync.WaitGroup{} + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + time.Sleep(time.Duration(rand.Intn(1000)) * time.Microsecond) + var err error + action := rand.Intn(2) + if action == 0 { + err = sh.startTimer() + } else { + err = sh.stopTimer() + } + require.NoError(t, err) + select { + case e := <-ch: + require.FailNow(t, "unexpected error", "error: %v", e) + case <-ctx.Done(): + default: + } + }() + } + wg.Wait() + }, + }, + { + name: "All stops", + f: func() { + ch := make(chan error) + sh := newStallHandler(dl, ch) + defer sh.stopTimer() // This should always be called in a defer from where it's created + wg := sync.WaitGroup{} + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + err := sh.stopTimer() + require.NoError(t, err) + select { + case e := <-ch: + require.FailNow(t, "unexpected error", "error: %v", e) + case <-ctx.Done(): + default: + } + }() + } + wg.Wait() + }, + }, + { + name: "All starts", + f: func() { + ch := make(chan error) + sh := newStallHandler(dl, ch) + defer sh.stopTimer() // This should always be called in a defer from where it's created + wg := sync.WaitGroup{} + for i := 0; i < concurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + err := sh.startTimer() + require.NoError(t, err) + select { + case e := <-ch: + require.FailNow(t, "unexpected error", "error: %v", e) + case <-ctx.Done(): + default: + } + }() + } + wg.Wait() + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.f() + }) + } +}