diff --git a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go index e5df3051612..f9f203c7cd9 100644 --- a/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go +++ b/go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go @@ -177,7 +177,6 @@ func TestMain(m *testing.M) { clusterInstance.VtTabletExtraArgs = []string{ "--heartbeat_interval", "250ms", - "--heartbeat_on_demand_duration", "5s", "--migration_check_interval", "5s", "--watch_replication_stream", } diff --git a/go/timer/rate_limiter.go b/go/timer/rate_limiter.go index d42a4d7e14c..7ff602fb658 100644 --- a/go/timer/rate_limiter.go +++ b/go/timer/rate_limiter.go @@ -77,6 +77,14 @@ func (r *RateLimiter) DoEmpty() { _ = r.Do(nil) } +// AllowOne allows the next Do() call to run, even if the rate limiter would otherwise skip it. +func (r *RateLimiter) AllowOne() { + r.mu.Lock() + defer r.mu.Unlock() + + r.lastDoValue = r.tickerValue.Load() - 1 +} + // Diff returns the logical clock diff between the ticker and the last Do() call. func (r *RateLimiter) Diff() int64 { r.mu.Lock() @@ -87,7 +95,9 @@ func (r *RateLimiter) Diff() int64 { // Stop terminates rate limiter's operation and will not allow any more Do() executions. func (r *RateLimiter) Stop() { - r.cancel() + if r.cancel != nil { + r.cancel() + } r.mu.Lock() defer r.mu.Unlock() diff --git a/go/timer/rate_limiter_test.go b/go/timer/rate_limiter_test.go index 83690b98a22..70a3240d5f5 100644 --- a/go/timer/rate_limiter_test.go +++ b/go/timer/rate_limiter_test.go @@ -54,6 +54,23 @@ func TestRateLimiterShort(t *testing.T) { assert.Less(t, val, 10) } +func TestRateLimiterAllowOne(t *testing.T) { + r := NewRateLimiter(time.Millisecond * 250) + require.NotNil(t, r) + defer r.Stop() + val := 0 + incr := func() error { val++; return nil } + times := 10 + for range times { + time.Sleep(time.Millisecond * 100) + r.AllowOne() + err := r.Do(incr) + assert.NoError(t, err) + } + // we expect exactly 10 successful entries. + assert.Equal(t, times, val) +} + func TestRateLimiterStop(t *testing.T) { r := NewRateLimiter(time.Millisecond * 10) require.NotNil(t, r) @@ -91,3 +108,8 @@ func TestRateLimiterDiff(t *testing.T) { r.DoEmpty() assert.LessOrEqual(t, r.Diff(), int64(1)) } + +func TestRateLimiterUninitialized(t *testing.T) { + r := &RateLimiter{} + r.Stop() +} diff --git a/go/vt/vttablet/tabletserver/repltracker/repltracker.go b/go/vt/vttablet/tabletserver/repltracker/repltracker.go index c98005851d1..59d2db128da 100644 --- a/go/vt/vttablet/tabletserver/repltracker/repltracker.go +++ b/go/vt/vttablet/tabletserver/repltracker/repltracker.go @@ -52,8 +52,7 @@ var ( // ReplTracker tracks replication lag. type ReplTracker struct { - mode string - forceHeartbeat bool + mode string mu sync.Mutex isPrimary bool @@ -66,11 +65,10 @@ type ReplTracker struct { // NewReplTracker creates a new ReplTracker. func NewReplTracker(env tabletenv.Env, alias *topodatapb.TabletAlias) *ReplTracker { return &ReplTracker{ - mode: env.Config().ReplicationTracker.Mode, - forceHeartbeat: env.Config().ReplicationTracker.HeartbeatOnDemand > 0, - hw: newHeartbeatWriter(env, alias), - hr: newHeartbeatReader(env), - poller: &poller{}, + mode: env.Config().ReplicationTracker.Mode, + hw: newHeartbeatWriter(env, alias), + hr: newHeartbeatReader(env), + poller: &poller{}, } } @@ -97,9 +95,7 @@ func (rt *ReplTracker) MakePrimary() { rt.hr.Close() rt.hw.Open() } - if rt.forceHeartbeat { - rt.hw.Open() - } + rt.hw.Open() } // MakeNonPrimary must be called if the tablet type becomes non-PRIMARY. @@ -117,9 +113,7 @@ func (rt *ReplTracker) MakeNonPrimary() { // Run the status once to pre-initialize values. rt.poller.Status() } - if rt.forceHeartbeat { - rt.hw.Close() - } + rt.hw.Close() } // Close closes ReplTracker. @@ -147,5 +141,9 @@ func (rt *ReplTracker) Status() (time.Duration, error) { // EnableHeartbeat enables or disables writes of heartbeat. This functionality // is only used by tests. func (rt *ReplTracker) EnableHeartbeat(enable bool) { - rt.hw.enableWrites(enable) + if enable { + rt.hw.enableWrites() + } else { + rt.hw.disableWrites() + } } diff --git a/go/vt/vttablet/tabletserver/repltracker/repltracker_test.go b/go/vt/vttablet/tabletserver/repltracker/repltracker_test.go index 5e6150ddeb3..40827fdcbda 100644 --- a/go/vt/vttablet/tabletserver/repltracker/repltracker_test.go +++ b/go/vt/vttablet/tabletserver/repltracker/repltracker_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql/fakesqldb" "vitess.io/vitess/go/vt/dbconfigs" @@ -50,49 +51,89 @@ func TestReplTracker(t *testing.T) { target := &querypb.Target{} mysqld := mysqlctl.NewFakeMysqlDaemon(nil) - rt := NewReplTracker(env, alias) - rt.InitDBConfig(target, mysqld) - assert.Equal(t, tabletenv.Heartbeat, rt.mode) - assert.True(t, rt.hw.enabled) - assert.True(t, rt.hr.enabled) - - rt.MakePrimary() - assert.True(t, rt.hw.isOpen) - assert.False(t, rt.hr.isOpen) - assert.True(t, rt.isPrimary) - - lag, err := rt.Status() - assert.NoError(t, err) - assert.Equal(t, time.Duration(0), lag) - - rt.MakeNonPrimary() - assert.False(t, rt.hw.isOpen) - assert.True(t, rt.hr.isOpen) - assert.False(t, rt.isPrimary) - - rt.hr.lastKnownLag = 1 * time.Second - lag, err = rt.Status() - assert.NoError(t, err) - assert.Equal(t, 1*time.Second, lag) - - rt.Close() - assert.False(t, rt.hw.isOpen) - assert.False(t, rt.hr.isOpen) - - cfg.ReplicationTracker.Mode = tabletenv.Polling - rt = NewReplTracker(env, alias) - rt.InitDBConfig(target, mysqld) - assert.Equal(t, tabletenv.Polling, rt.mode) - assert.Equal(t, mysqld, rt.poller.mysqld) - assert.False(t, rt.hw.enabled) - assert.False(t, rt.hr.enabled) - - rt.MakeNonPrimary() - assert.False(t, rt.hw.isOpen) - assert.False(t, rt.hr.isOpen) - assert.False(t, rt.isPrimary) - - mysqld.ReplicationStatusError = errors.New("err") - _, err = rt.Status() - assert.Equal(t, "err", err.Error()) + t.Run("always-on heartbeat", func(t *testing.T) { + rt := NewReplTracker(env, alias) + rt.InitDBConfig(target, mysqld) + assert.Equal(t, tabletenv.Heartbeat, rt.mode) + assert.Equal(t, HeartbeatConfigTypeAlways, rt.hw.configType) + assert.Zero(t, rt.hw.onDemandDuration) + assert.True(t, rt.hr.enabled) + + rt.MakePrimary() + assert.True(t, rt.hw.isOpen) + assert.False(t, rt.hr.isOpen) + assert.True(t, rt.isPrimary) + + lag, err := rt.Status() + assert.NoError(t, err) + assert.Equal(t, time.Duration(0), lag) + + rt.MakeNonPrimary() + assert.False(t, rt.hw.isOpen) + assert.True(t, rt.hr.isOpen) + assert.False(t, rt.isPrimary) + + rt.hr.lastKnownLag = 1 * time.Second + lag, err = rt.Status() + assert.NoError(t, err) + assert.Equal(t, 1*time.Second, lag) + + rt.Close() + assert.False(t, rt.hw.isOpen) + assert.False(t, rt.hr.isOpen) + }) + t.Run("disabled heartbeat", func(t *testing.T) { + cfg.ReplicationTracker.Mode = tabletenv.Polling + rt := NewReplTracker(env, alias) + rt.InitDBConfig(target, mysqld) + assert.Equal(t, tabletenv.Polling, rt.mode) + assert.Equal(t, mysqld, rt.poller.mysqld) + assert.Equal(t, HeartbeatConfigTypeNone, rt.hw.configType) + require.NotZero(t, defaultOnDemandDuration) + assert.Equal(t, defaultOnDemandDuration, rt.hw.onDemandDuration) + assert.False(t, rt.hr.enabled) + + rt.MakeNonPrimary() + assert.False(t, rt.hw.isOpen) + assert.False(t, rt.hr.isOpen) + assert.False(t, rt.isPrimary) + + mysqld.ReplicationStatusError = errors.New("err") + _, err := rt.Status() + assert.Equal(t, "err", err.Error()) + }) + t.Run("on-demand heartbeat", func(t *testing.T) { + cfg.ReplicationTracker.Mode = tabletenv.Heartbeat + cfg.ReplicationTracker.HeartbeatOnDemand = time.Second + + rt := NewReplTracker(env, alias) + rt.InitDBConfig(target, mysqld) + assert.Equal(t, tabletenv.Heartbeat, rt.mode) + assert.Equal(t, HeartbeatConfigTypeOnDemand, rt.hw.configType) + assert.Equal(t, minimalOnDemandDuration, rt.hw.onDemandDuration) + assert.True(t, rt.hr.enabled) + + rt.MakePrimary() + assert.True(t, rt.hw.isOpen) + assert.False(t, rt.hr.isOpen) + assert.True(t, rt.isPrimary) + + lag, err := rt.Status() + assert.NoError(t, err) + assert.Equal(t, time.Duration(0), lag) + + rt.MakeNonPrimary() + assert.False(t, rt.hw.isOpen) + assert.True(t, rt.hr.isOpen) + assert.False(t, rt.isPrimary) + + rt.hr.lastKnownLag = 1 * time.Second + lag, err = rt.Status() + assert.NoError(t, err) + assert.Equal(t, 1*time.Second, lag) + + rt.Close() + assert.False(t, rt.hw.isOpen) + assert.False(t, rt.hr.isOpen) + }) } diff --git a/go/vt/vttablet/tabletserver/repltracker/writer.go b/go/vt/vttablet/tabletserver/repltracker/writer.go index a72b44d1845..11492fa92e4 100644 --- a/go/vt/vttablet/tabletserver/repltracker/writer.go +++ b/go/vt/vttablet/tabletserver/repltracker/writer.go @@ -38,52 +38,87 @@ import ( topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) +// We identify these heartbeat configuration types: +// - No configuration: the heartbeat writer is generally disabled and does not produce heartbeats (but see below). +// - On-demand: on-demand heartbeat interval was specified. +// - Always: the heartbeat writer is always on and produces heartbeats at a regular interval. +type HeartbeatConfigType int + +const ( + HeartbeatConfigTypeNone HeartbeatConfigType = iota + HeartbeatConfigTypeOnDemand + HeartbeatConfigTypeAlways +) + const ( - sqlUpsertHeartbeat = "INSERT INTO %s.heartbeat (ts, tabletUid, keyspaceShard) VALUES (%a, %a, %a) ON DUPLICATE KEY UPDATE ts=VALUES(ts), tabletUid=VALUES(tabletUid)" + sqlUpsertHeartbeat = "INSERT INTO %s.heartbeat (ts, tabletUid, keyspaceShard) VALUES (%a, %a, %a) ON DUPLICATE KEY UPDATE ts=VALUES(ts), tabletUid=VALUES(tabletUid)" + minimalOnDemandDuration = 4 * time.Second +) + +var ( + // Can be overridden by unit tests + defaultOnDemandDuration = 10 * time.Second + defaultHeartbeatInterval = 1 * time.Second ) // heartbeatWriter runs on primary tablets and writes heartbeats to the heartbeat -// table at a regular interval, defined by heartbeat_interval. +// table, depending on the configuration: +// - HeartbeatConfigTypeAlways: while open, the writer produces heartbeats at a regular interval. +// RequetHeartbeats() is meaningless in this mode. +// - HeartbeatConfigTypeOnDemand: when opened, the writer produces heartbeats for the configured lease. +// The heartbeats then expire. Lease can be renewed (after expired) or extended (while running) via +// RequetHeartbeats(). +// - HeartbeatConfigTypeNone: the writer does not initiate any heartbeats. However, RequetHeartbeats() +// can be called to request a heartbeat lease. The lease duration is predetermined as `defaultOnDemandDuration`. type heartbeatWriter struct { env tabletenv.Env - enabled bool + configType HeartbeatConfigType interval time.Duration tabletAlias *topodatapb.TabletAlias keyspaceShard string now func() time.Time errorLog *logutil.ThrottledLogger - mu sync.Mutex - isOpen bool - appPool *dbconnpool.ConnectionPool - allPrivsPool *dbconnpool.ConnectionPool - ticks *timer.Timer - writeConnID atomic.Int64 - - onDemandDuration time.Duration - onDemandMu sync.Mutex - concurrentHeartbeatRequests int64 - onDemandRequestTicks int64 - onDemandLastRequestTick int64 + mu sync.Mutex + isOpen bool + appPool *dbconnpool.ConnectionPool + allPrivsPool *dbconnpool.ConnectionPool + ticks *timer.Timer + onDemandRequestsRateLimiter atomic.Pointer[timer.RateLimiter] + writeConnID atomic.Int64 + + onDemandDuration time.Duration } // newHeartbeatWriter creates a new heartbeatWriter. func newHeartbeatWriter(env tabletenv.Env, alias *topodatapb.TabletAlias) *heartbeatWriter { config := env.Config() - // config.EnableLagThrottler is a feature flag for the throttler; if throttler runs, then heartbeat must also run - if config.ReplicationTracker.Mode != tabletenv.Heartbeat && config.ReplicationTracker.HeartbeatOnDemand == 0 { - return &heartbeatWriter{} + configType := HeartbeatConfigTypeNone + onDemandDuration := defaultOnDemandDuration + switch { + case config.ReplicationTracker.HeartbeatOnDemand > 0: + configType = HeartbeatConfigTypeOnDemand + onDemandDuration = config.ReplicationTracker.HeartbeatOnDemand + if onDemandDuration < minimalOnDemandDuration { + onDemandDuration = minimalOnDemandDuration + } + case config.ReplicationTracker.Mode == tabletenv.Heartbeat: + configType = HeartbeatConfigTypeAlways + onDemandDuration = 0 } heartbeatInterval := config.ReplicationTracker.HeartbeatInterval + if heartbeatInterval == 0 { + heartbeatInterval = defaultHeartbeatInterval + } w := &heartbeatWriter{ env: env, - enabled: true, + configType: configType, tabletAlias: alias.CloneVT(), now: time.Now, interval: heartbeatInterval, - onDemandDuration: config.ReplicationTracker.HeartbeatOnDemand, + onDemandDuration: onDemandDuration, ticks: timer.NewTimer(heartbeatInterval), errorLog: logutil.NewThrottledLogger("HeartbeatWriter", 60*time.Second), // We make this pool size 2; to prevent pool exhausted @@ -91,21 +126,8 @@ func newHeartbeatWriter(env tabletenv.Env, alias *topodatapb.TabletAlias) *heart appPool: dbconnpool.NewConnectionPool("HeartbeatWriteAppPool", env.Exporter(), 2, mysqlctl.DbaIdleTimeout, 0, mysqlctl.PoolDynamicHostnameResolution), allPrivsPool: dbconnpool.NewConnectionPool("HeartbeatWriteAllPrivsPool", env.Exporter(), 2, mysqlctl.DbaIdleTimeout, 0, mysqlctl.PoolDynamicHostnameResolution), } + w.writeConnID.Store(-1) - if w.onDemandDuration > 0 { - // see RequestHeartbeats() for use of onDemandRequestTicks - // it's basically a mechanism to rate limit operation RequestHeartbeats(). - // and selectively drop excessive requests. - w.allowNextHeartbeatRequest() - go func() { - // this will allow up to 1 request per (w.onDemandDuration / 4) to pass through - tick := time.NewTicker(w.onDemandDuration / 4) - defer tick.Stop() - for range tick.C { - w.allowNextHeartbeatRequest() - } - }() - } return w } @@ -117,14 +139,14 @@ func (w *heartbeatWriter) InitDBConfig(target *querypb.Target) { // Open sets up the heartbeatWriter's db connection and launches the ticker // responsible for periodically writing to the heartbeat table. func (w *heartbeatWriter) Open() { - if !w.enabled { - return - } w.mu.Lock() defer w.mu.Unlock() if w.isOpen { return } + defer func() { + w.isOpen = true + }() log.Info("Heartbeat Writer: opening") // We cannot create the database and tables in this Open function @@ -133,34 +155,56 @@ func (w *heartbeatWriter) Open() { // block this thread, and we could end up in a deadlock. // Instead, we try creating the database and table in each tick which runs in a go routine // keeping us safe from hanging the main thread. - w.appPool.Open(w.env.Config().DB.AppWithDB()) - w.allPrivsPool.Open(w.env.Config().DB.AllPrivsWithDB()) - if w.onDemandDuration == 0 { - w.enableWrites(true) - // when onDemandDuration > 0 we only enable writes per request - } else { + + // This function could be running from within a unit test scope, in which case we use + // mock pools that are already open. This is why we test for the pool being open. + if !w.appPool.IsOpen() { + w.appPool.Open(w.env.Config().DB.AppWithDB()) + } + if !w.allPrivsPool.IsOpen() { + w.allPrivsPool.Open(w.env.Config().DB.AllPrivsWithDB()) + } + + if w.onDemandDuration > 0 { + // Clients are given access to RequestHeartbeats(). But such clients can also abuse + // the system by initiating heartbeats too frequently. We rate-limit these requests. + rateLimiter := timer.NewRateLimiter(time.Second) + w.onDemandRequestsRateLimiter.Store(rateLimiter) + } + + switch w.configType { + case HeartbeatConfigTypeNone: + // Do not kick any heartbeats + return + case HeartbeatConfigTypeAlways: + // Heartbeats are always on + w.enableWrites() + case HeartbeatConfigTypeOnDemand: // A one-time kick off of heartbeats upon Open() go w.RequestHeartbeats() } - - w.isOpen = true } // Close closes the heartbeatWriter's db connection and stops the periodic ticker. func (w *heartbeatWriter) Close() { - if !w.enabled { - return - } w.mu.Lock() defer w.mu.Unlock() if !w.isOpen { return } + defer func() { + w.isOpen = false + }() - w.enableWrites(false) + w.disableWrites() w.appPool.Close() w.allPrivsPool.Close() - w.isOpen = false + + if rateLimiter := w.onDemandRequestsRateLimiter.Load(); rateLimiter != nil { + rateLimiter.Stop() + } + w.onDemandRequestsRateLimiter.Store(nil) + log.Info("Heartbeat Writer: closed") } @@ -185,11 +229,23 @@ func (w *heartbeatWriter) bindHeartbeatVars(query string) (string, error) { func (w *heartbeatWriter) writeHeartbeat() { if err := w.write(); err != nil { w.recordError(err) - return + } else { + writes.Add(1) + } + + if w.onDemandDuration > 0 { + // See if we need to expire the heartbeats + go func() { + if rateLimiter := w.onDemandRequestsRateLimiter.Load(); rateLimiter != nil { + if rateLimiter.Diff() > int64(w.onDemandDuration.Seconds()) { + w.disableWrites() + } + } + }() } - writes.Add(1) } +// write writes a single heartbeat update. func (w *heartbeatWriter) write() error { defer w.env.LogError() ctx, cancel := context.WithDeadline(context.Background(), w.now().Add(w.interval)) @@ -221,41 +277,36 @@ func (w *heartbeatWriter) recordError(err error) { writeErrors.Add(1) } -// enableWrites activates or deactivates heartbeat writes -func (w *heartbeatWriter) enableWrites(enable bool) { - if w.ticks == nil { - return - } - switch enable { - case true: - // We must combat a potential race condition: the writer is Open, and a request comes - // to enableWrites(true), but simultaneously the writes gets Close()d. - // We must not send any more ticks while the writer is closed. - go func() { - w.mu.Lock() - defer w.mu.Unlock() - if !w.isOpen { - return - } - w.ticks.Start(w.writeHeartbeat) - }() - case false: - // We stop the ticks in a separate go routine because it can block if the write is stuck on semi-sync ACKs. - // At the same time we try and kill the write that is in progress. We use the context and its cancellation - // for coordination between the two go-routines. In the end we will have guaranteed that the ticks have stopped - // and no write is in progress. - ctx, cancel := context.WithCancel(context.Background()) - go func() { - w.ticks.Stop() - cancel() - }() - w.killWritesUntilStopped(ctx) - - if w.onDemandDuration > 0 { - // Let the next RequestHeartbeats() go through - w.allowNextHeartbeatRequest() +// enableWrites activates heartbeat writes +func (w *heartbeatWriter) enableWrites() { + // We must combat a potential race condition: the writer is Open, and a request comes + // to enableWrites(), but simultaneously the writes gets Close()d. + // We must not send any more ticks while the writer is closed. + go func() { + w.mu.Lock() + defer w.mu.Unlock() + if !w.isOpen { + return } - } + w.ticks.Start(w.writeHeartbeat) + }() +} + +// disableWrites deactivates heartbeat writes +func (w *heartbeatWriter) disableWrites() { + // We stop the ticks in a separate go routine because it can block if the write is stuck on semi-sync ACKs. + // At the same time we try and kill the write that is in progress. We use the context and its cancellation + // for coordination between the two go-routines. In the end we will have guaranteed that the ticks have stopped + // and no write is in progress. + ctx, cancel := context.WithCancel(context.Background()) + go func() { + w.ticks.Stop() + cancel() + }() + w.killWritesUntilStopped(ctx) + + // Let the next RequestHeartbeats() go through + w.allowNextHeartbeatRequest() } // killWritesUntilStopped tries to kill the write in progress until the ticks have stopped. @@ -298,9 +349,15 @@ func (w *heartbeatWriter) killWrite() error { } // allowNextHeartbeatRequest ensures that the next call to RequestHeartbeats() passes through and -// is not dropped. +// is not rate-limited. +// Use case: just as the on-demand lease expires, a new request for heartbeat comes, and it's in the same timeslot +// as the one before the expiration. We want to allow the new request to re-lease on demand heartbeats. func (w *heartbeatWriter) allowNextHeartbeatRequest() { - atomic.AddInt64(&w.onDemandRequestTicks, 1) + // The writer could be close()d while this function is running and thus the value of the rate limiter could be nil. + // We thus use golang atomic here to avoid locking mutexes. + if rateLimiter := w.onDemandRequestsRateLimiter.Load(); rateLimiter != nil { + rateLimiter.AllowOne() + } } // RequestHeartbeats implements heartbeat.HeartbeatWriter.RequestHeartbeats() @@ -309,41 +366,12 @@ func (w *heartbeatWriter) allowNextHeartbeatRequest() { // This function will selectively and silently drop some such requests, depending on arrival rate. // This function is safe to call concurrently from goroutines func (w *heartbeatWriter) RequestHeartbeats() { - if w.onDemandDuration == 0 { - // heartbeats are not by demand. Therefore they are just coming in on their own (if enabled) - return + // The writer could be close()d while this function is running and thus the value of the rate limiter could be nil. + // We thus use golang atomic here to avoid locking mutexes. + if rateLimiter := w.onDemandRequestsRateLimiter.Load(); rateLimiter != nil { + rateLimiter.Do(func() error { + w.enableWrites() + return nil + }) } - // In this function we're going to create a timer to activate heartbeats by-demand. Creating a timer has a cost. - // Now, this function can be spammed by clients (the lag throttler). We therefore only allow this function to - // actually operate once per X seconds (1/4 of onDemandDuration as a reasonable oversampling value): - if atomic.LoadInt64(&w.onDemandLastRequestTick) >= atomic.LoadInt64(&w.onDemandRequestTicks) { - // Too many requests. We're dropping this one. - return - } - atomic.StoreInt64(&w.onDemandLastRequestTick, atomic.LoadInt64(&w.onDemandRequestTicks)) - - // OK, the request passed through. - - w.onDemandMu.Lock() - defer w.onDemandMu.Unlock() - - // Now for the actual logic. A client requests heartbeats. If it were only this client, we would - // activate heartbeats for the duration of onDemandDuration, and then turn heartbeats off. - // However, there may be multiple clients interested in heartbeats, or maybe the same single client - // requesting heartbeats again and again. So we keep track of how many _concurrent_ requests we have. - // We enable heartbeats as soon as we have a request; we turn heartbeats off once - // we have zero concurrent requests - w.enableWrites(true) - w.concurrentHeartbeatRequests++ - - time.AfterFunc(w.onDemandDuration, func() { - w.onDemandMu.Lock() - defer w.onDemandMu.Unlock() - w.concurrentHeartbeatRequests-- - if w.concurrentHeartbeatRequests == 0 { - // means there are currently no more clients interested in heartbeats - w.enableWrites(false) - } - w.allowNextHeartbeatRequest() - }) } diff --git a/go/vt/vttablet/tabletserver/repltracker/writer_test.go b/go/vt/vttablet/tabletserver/repltracker/writer_test.go index 0add32a1de0..e9aead570a8 100644 --- a/go/vt/vttablet/tabletserver/repltracker/writer_test.go +++ b/go/vt/vttablet/tabletserver/repltracker/writer_test.go @@ -38,7 +38,8 @@ func TestWriteHeartbeat(t *testing.T) { defer db.Close() now := time.Now() - tw := newTestWriter(db, &now) + tw := newSimpleTestWriter(db, &now) + upsert := fmt.Sprintf("INSERT INTO %s.heartbeat (ts, tabletUid, keyspaceShard) VALUES (%d, %d, '%s') ON DUPLICATE KEY UPDATE ts=VALUES(ts), tabletUid=VALUES(tabletUid)", "_vt", now.UnixNano(), tw.tabletAlias.Uid, tw.keyspaceShard) db.AddQuery(upsert, &sqltypes.Result{}) @@ -47,8 +48,214 @@ func TestWriteHeartbeat(t *testing.T) { writeErrors.Reset() tw.writeHeartbeat() - assert.Equal(t, int64(1), writes.Get()) - assert.Equal(t, int64(0), writeErrors.Get()) + assert.EqualValues(t, 1, writes.Get()) + assert.EqualValues(t, 0, writeErrors.Get()) +} + +// TestWriteHeartbeatOpen tests that the heartbeat writer writes heartbeats when the writer is open. +func TestWriteHeartbeatOpen(t *testing.T) { + defaultOnDemandDuration = 3 * time.Second + + db := fakesqldb.New(t) + defer db.Close() + + tw := newSimpleTestWriter(db, nil) + + assert.Zero(t, tw.onDemandDuration) + + db.AddQueryPattern("^INSERT INTO.*", &sqltypes.Result{}) + + writes.Reset() + writeErrors.Reset() + + tw.writeHeartbeat() + lastWrites := writes.Get() + assert.EqualValues(t, 1, lastWrites) + assert.EqualValues(t, 0, writeErrors.Get()) + + t.Run("closed, no heartbeats", func(t *testing.T) { + <-time.After(3 * time.Second) + assert.EqualValues(t, 1, writes.Get()) + }) + + { + rateLimiter := tw.onDemandRequestsRateLimiter.Load() + assert.Nil(t, rateLimiter) + } + tw.Open() + defer tw.Close() + { + rateLimiter := tw.onDemandRequestsRateLimiter.Load() + assert.Nil(t, rateLimiter) + } + t.Run("open, heartbeats", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + assert.EqualValues(t, 0, writeErrors.Get()) + currentWrites := writes.Get() + assert.Greater(t, currentWrites, lastWrites) + lastWrites = currentWrites + } + } + }) +} + +// TestWriteHeartbeatDisabled tests that the heartbeat writer doesn't write heartbeats when the writer is disabled, +// but that it does respond to RequestHeartbeats(), and generates heartbeats on a lease. +func TestWriteHeartbeatDisabled(t *testing.T) { + defaultOnDemandDuration = 3 * time.Second + + db := fakesqldb.New(t) + defer db.Close() + + tw := newTestWriter(db, nil, tabletenv.Disable, 0) + + // Even though disabled, the writer will have an on-demand duration set. + assert.Equal(t, defaultOnDemandDuration, tw.onDemandDuration) + + db.AddQueryPattern("^INSERT INTO.*", &sqltypes.Result{}) + + writes.Reset() + writeErrors.Reset() + + tw.writeHeartbeat() + lastWrites := writes.Get() + assert.EqualValues(t, 1, lastWrites) + assert.EqualValues(t, 0, writeErrors.Get()) + + t.Run("closed, no heartbeats", func(t *testing.T) { + <-time.After(3 * time.Second) + assert.EqualValues(t, 1, writes.Get()) + }) + { + rateLimiter := tw.onDemandRequestsRateLimiter.Load() + assert.Nil(t, rateLimiter) + } + tw.Open() + defer tw.Close() + { + rateLimiter := tw.onDemandRequestsRateLimiter.Load() + assert.NotNil(t, rateLimiter) + } + t.Run("open, no heartbeats", func(t *testing.T) { + <-time.After(3 * time.Second) + assert.EqualValues(t, 1, writes.Get()) + }) + t.Run("request heartbeats, heartbeats", func(t *testing.T) { + tw.RequestHeartbeats() + ctx, cancel := context.WithTimeout(context.Background(), tw.onDemandDuration-time.Second) + defer cancel() + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + assert.EqualValues(t, 0, writeErrors.Get()) + currentWrites := writes.Get() + assert.Greater(t, currentWrites, lastWrites) + lastWrites = currentWrites + } + } + }) + t.Run("exhaust lease, no heartbeats", func(t *testing.T) { + <-time.After(tw.onDemandDuration) + currentWrites := writes.Get() + <-time.After(3 * time.Second) + assert.EqualValues(t, currentWrites, writes.Get()) + }) + tw.Close() + { + rateLimiter := tw.onDemandRequestsRateLimiter.Load() + assert.Nil(t, rateLimiter) + } +} + +// TestWriteHeartbeatOnDemand tests that the heartbeat writer initiates leased heartbeats once opened, +// and then upon RequestHeartbeats(). +func TestWriteHeartbeatOnDemand(t *testing.T) { + defaultOnDemandDuration = 7 * time.Second + onDemandDuration := minimalOnDemandDuration + + db := fakesqldb.New(t) + defer db.Close() + + tw := newTestWriter(db, nil, tabletenv.Heartbeat, onDemandDuration) + + assert.Equal(t, onDemandDuration, tw.onDemandDuration) + + db.AddQueryPattern("^INSERT INTO.*", &sqltypes.Result{}) + + writes.Reset() + writeErrors.Reset() + + tw.writeHeartbeat() + lastWrites := writes.Get() + assert.EqualValues(t, 1, lastWrites) + assert.EqualValues(t, 0, writeErrors.Get()) + + t.Run("closed, no heartbeats", func(t *testing.T) { + <-time.After(3 * time.Second) + assert.EqualValues(t, 1, writes.Get()) + }) + { + rateLimiter := tw.onDemandRequestsRateLimiter.Load() + assert.Nil(t, rateLimiter) + } + tw.Open() + defer tw.Close() + { + rateLimiter := tw.onDemandRequestsRateLimiter.Load() + assert.NotNil(t, rateLimiter) + } + t.Run("open, initial heartbeats", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), tw.onDemandDuration-time.Second) + defer cancel() + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + assert.EqualValues(t, 0, writeErrors.Get()) + currentWrites := writes.Get() + assert.Greater(t, currentWrites, lastWrites) + lastWrites = currentWrites + } + } + }) + t.Run("exhaust lease, no heartbeats", func(t *testing.T) { + <-time.After(tw.onDemandDuration) + currentWrites := writes.Get() + <-time.After(3 * time.Second) + assert.EqualValues(t, currentWrites, writes.Get()) + }) + t.Run("request heartbeats, heartbeats", func(t *testing.T) { + tw.RequestHeartbeats() + lastWrites := writes.Get() + <-time.After(tw.onDemandDuration) + assert.Greater(t, writes.Get(), lastWrites) + }) + t.Run("exhaust lease, no heartbeats", func(t *testing.T) { + <-time.After(tw.onDemandDuration) + currentWrites := writes.Get() + <-time.After(3 * time.Second) + assert.EqualValues(t, currentWrites, writes.Get()) + }) + tw.Close() + { + rateLimiter := tw.onDemandRequestsRateLimiter.Load() + assert.Nil(t, rateLimiter) + } } func TestWriteHeartbeatError(t *testing.T) { @@ -56,7 +263,7 @@ func TestWriteHeartbeatError(t *testing.T) { defer db.Close() now := time.Now() - tw := newTestWriter(db, &now) + tw := newSimpleTestWriter(db, &now) writes.Reset() writeErrors.Reset() @@ -69,7 +276,8 @@ func TestWriteHeartbeatError(t *testing.T) { // TestCloseWhileStuckWriting tests that Close shouldn't get stuck even if the heartbeat writer is stuck waiting for a semi-sync ACK. func TestCloseWhileStuckWriting(t *testing.T) { db := fakesqldb.New(t) - tw := newTestWriter(db, nil) + tw := newSimpleTestWriter(db, nil) + tw.isOpen = true killWg := sync.WaitGroup{} @@ -90,14 +298,14 @@ func TestCloseWhileStuckWriting(t *testing.T) { }) // Now we enable writes, but the first write will get blocked. - tw.enableWrites(true) + tw.enableWrites() // We wait until the write has blocked to ensure we only call Close after we are stuck writing. startedWaitWg.Wait() // Even if the write is blocked, we should be able to disable writes without waiting indefinitely. // This is what we call, when we try to Close the heartbeat writer. ctx, cancel := context.WithCancel(context.Background()) go func() { - tw.enableWrites(false) + tw.disableWrites() cancel() }() select { @@ -108,17 +316,22 @@ func TestCloseWhileStuckWriting(t *testing.T) { } } -func newTestWriter(db *fakesqldb.DB, frozenTime *time.Time) *heartbeatWriter { +func newSimpleTestWriter(db *fakesqldb.DB, frozenTime *time.Time) *heartbeatWriter { + return newTestWriter(db, frozenTime, tabletenv.Heartbeat, 0) +} + +func newTestWriter(db *fakesqldb.DB, frozenTime *time.Time, replTrackerMode string, onDemandInterval time.Duration) *heartbeatWriter { cfg := tabletenv.NewDefaultConfig() - cfg.ReplicationTracker.Mode = tabletenv.Heartbeat - cfg.ReplicationTracker.HeartbeatInterval = time.Second + cfg.ReplicationTracker.Mode = replTrackerMode + cfg.ReplicationTracker.HeartbeatOnDemand = onDemandInterval + cfg.ReplicationTracker.HeartbeatInterval = 250 * time.Millisecond // oversampling our 1*time.Second unit test interval in various functions params := db.ConnParams() cp := *params dbc := dbconfigs.NewTestDBConfigs(cp, cp, "") tw := newHeartbeatWriter(tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "WriterTest"), &topodatapb.TabletAlias{Cell: "test", Uid: 1111}) - tw.keyspaceShard = "test:0" + tw.keyspaceShard = "test/-" if frozenTime != nil { tw.now = func() time.Time {