Skip to content

Commit

Permalink
Fix hearbeatWriter Close being stuck if waiting for a semi-sync ACK (#…
Browse files Browse the repository at this point in the history
…14823)

Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Dec 28, 2023
1 parent ddcd4cd commit 47de203
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 6 deletions.
4 changes: 4 additions & 0 deletions go/mysql/fakesqldb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,11 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R
if pat.expr.MatchString(query) {
userCallback, ok := db.queryPatternUserCallback[pat.expr]
if ok {
// Since the user call back can be indefinitely stuck, we shouldn't hold the lock indefinitely.
// This is only test code, so no actual cause for concern.
db.mu.Unlock()
userCallback(query)
db.mu.Lock()
}
if pat.err != "" {
return fmt.Errorf(pat.err)
Expand Down
63 changes: 57 additions & 6 deletions go/vt/vttablet/tabletserver/repltracker/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type heartbeatWriter struct {
appPool *dbconnpool.ConnectionPool
allPrivsPool *dbconnpool.ConnectionPool
ticks *timer.Timer
writeConnID atomic.Int64

onDemandDuration time.Duration
onDemandMu sync.Mutex
Expand Down Expand Up @@ -90,6 +91,7 @@ func newHeartbeatWriter(env tabletenv.Env, alias *topodatapb.TabletAlias) *heart
appPool: dbconnpool.NewConnectionPool("HeartbeatWriteAppPool", env.Exporter(), 2, mysqlctl.DbaIdleTimeout, 0, mysqlctl.PoolDynamicHostnameResolution),
allPrivsPool: dbconnpool.NewConnectionPool("HeartbeatWriteAllPrivsPool", env.Exporter(), 2, mysqlctl.DbaIdleTimeout, 0, mysqlctl.PoolDynamicHostnameResolution),
}
w.writeConnID.Store(-1)
if w.onDemandDuration > 0 {
// see RequestHeartbeats() for use of onDemandRequestTicks
// it's basically a mechanism to rate limit operation RequestHeartbeats().
Expand Down Expand Up @@ -192,11 +194,6 @@ func (w *heartbeatWriter) write() error {
defer w.env.LogError()
ctx, cancel := context.WithDeadline(context.Background(), w.now().Add(w.interval))
defer cancel()
allPrivsConn, err := w.allPrivsPool.Get(ctx)
if err != nil {
return err
}
defer allPrivsConn.Recycle()

upsert, err := w.bindHeartbeatVars(sqlUpsertHeartbeat)
if err != nil {
Expand All @@ -207,6 +204,8 @@ func (w *heartbeatWriter) write() error {
return err
}
defer appConn.Recycle()
w.writeConnID.Store(appConn.Conn.ID())
defer w.writeConnID.Store(-1)
_, err = appConn.Conn.ExecuteFetch(upsert, 1, false)
if err != nil {
return err
Expand All @@ -215,6 +214,9 @@ func (w *heartbeatWriter) write() error {
}

func (w *heartbeatWriter) recordError(err error) {
if err == nil {
return
}
w.errorLog.Errorf("%v", err)
writeErrors.Add(1)
}
Expand All @@ -238,14 +240,63 @@ func (w *heartbeatWriter) enableWrites(enable bool) {
w.ticks.Start(w.writeHeartbeat)
}()
case false:
w.ticks.Stop()
// We stop the ticks in a separate go routine because it can block if the write is stuck on semi-sync ACKs.
// At the same time we try and kill the write that is in progress. We use the context and its cancellation
// for coordination between the two go-routines. In the end we will have guaranteed that the ticks have stopped
// and no write is in progress.
ctx, cancel := context.WithCancel(context.Background())
go func() {
w.ticks.Stop()
cancel()
}()
w.killWritesUntilStopped(ctx)

if w.onDemandDuration > 0 {
// Let the next RequestHeartbeats() go through
w.allowNextHeartbeatRequest()
}
}
}

// killWritesUntilStopped tries to kill the write in progress until the ticks have stopped.
func (w *heartbeatWriter) killWritesUntilStopped(ctx context.Context) {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
// Actually try to kill the query.
err := w.killWrite()
w.recordError(err)
select {
case <-ctx.Done():
// If the context has been cancelled, then we know that the ticks have stopped.
// This guarantees that there are no writes in progress, so there is nothing to kill.
return
case <-ticker.C:
}
}
}

// killWrite kills the write in progress (if any).
func (w *heartbeatWriter) killWrite() error {
defer w.env.LogError()
writeId := w.writeConnID.Load()
if writeId == -1 {
return nil
}

ctx, cancel := context.WithDeadline(context.Background(), w.now().Add(w.interval))
defer cancel()
killConn, err := w.allPrivsPool.Get(ctx)
if err != nil {
log.Errorf("Kill conn didn't get connection :(")
return err
}
defer killConn.Recycle()

_, err = killConn.Conn.ExecuteFetch(fmt.Sprintf("kill %d", writeId), 1, false)
return err
}

// allowNextHeartbeatRequest ensures that the next call to RequestHeartbeats() passes through and
// is not dropped.
func (w *heartbeatWriter) allowNextHeartbeatRequest() {
Expand Down
44 changes: 44 additions & 0 deletions go/vt/vttablet/tabletserver/repltracker/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ limitations under the License.
package repltracker

import (
"context"
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -65,6 +67,48 @@ func TestWriteHeartbeatError(t *testing.T) {
assert.Equal(t, int64(1), writeErrors.Get())
}

// TestCloseWhileStuckWriting tests that Close shouldn't get stuck even if the heartbeat writer is stuck waiting for a semi-sync ACK.
func TestCloseWhileStuckWriting(t *testing.T) {
db := fakesqldb.New(t)
tw := newTestWriter(db, nil)
tw.isOpen = true

killWg := sync.WaitGroup{}
killWg.Add(1)
startedWaitWg := sync.WaitGroup{}
startedWaitWg.Add(1)

// Insert a query pattern that causes the upsert to block indefinitely until it has been killed.
// This simulates a stuck primary write due to a semi-sync ACK requirement.
db.AddQueryPatternWithCallback(`INSERT INTO .*heartbeat \(ts, tabletUid, keyspaceShard\).*`, &sqltypes.Result{}, func(s string) {
startedWaitWg.Done()
killWg.Wait()
})

// When we receive a kill query, we want to finish running the wait group to unblock the upsert query.
db.AddQueryPatternWithCallback("kill.*", &sqltypes.Result{}, func(s string) {
killWg.Done()
})

// Now we enable writes, but the first write will get blocked.
tw.enableWrites(true)
// We wait until the write has blocked to ensure we only call Close after we are stuck writing.
startedWaitWg.Wait()
// Even if the write is blocked, we should be able to disable writes without waiting indefinitely.
// This is what we call, when we try to Close the heartbeat writer.
ctx, cancel := context.WithCancel(context.Background())
go func() {
tw.enableWrites(false)
cancel()
}()
select {
case <-ctx.Done():
db.Close()
case <-time.After(1000 * time.Second):
t.Fatalf("Timed out waiting for heartbeat writer to close")
}
}

func newTestWriter(db *fakesqldb.DB, frozenTime *time.Time) *heartbeatWriter {
config := tabletenv.NewDefaultConfig()
config.ReplicationTracker.Mode = tabletenv.Heartbeat
Expand Down

0 comments on commit 47de203

Please sign in to comment.