Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix hearbeatWriter Close being stuck if waiting for a semi-sync ACK #14823

Merged
merged 3 commits into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go/mysql/fakesqldb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,11 @@ func (db *DB) HandleQuery(c *mysql.Conn, query string, callback func(*sqltypes.R
if pat.expr.MatchString(query) {
userCallback, ok := db.queryPatternUserCallback[pat.expr]
if ok {
// Since the user call back can be indefinitely stuck, we shouldn't hold the lock indefinitely.
// This is only test code, so no actual cause for concern.
db.mu.Unlock()
userCallback(query)
db.mu.Lock()
}
if pat.err != "" {
return fmt.Errorf(pat.err)
Expand Down
63 changes: 57 additions & 6 deletions go/vt/vttablet/tabletserver/repltracker/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type heartbeatWriter struct {
appPool *dbconnpool.ConnectionPool
allPrivsPool *dbconnpool.ConnectionPool
ticks *timer.Timer
writeConnID atomic.Int64

onDemandDuration time.Duration
onDemandMu sync.Mutex
Expand Down Expand Up @@ -90,6 +91,7 @@ func newHeartbeatWriter(env tabletenv.Env, alias *topodatapb.TabletAlias) *heart
appPool: dbconnpool.NewConnectionPool("HeartbeatWriteAppPool", env.Exporter(), 2, mysqlctl.DbaIdleTimeout, 0, mysqlctl.PoolDynamicHostnameResolution),
allPrivsPool: dbconnpool.NewConnectionPool("HeartbeatWriteAllPrivsPool", env.Exporter(), 2, mysqlctl.DbaIdleTimeout, 0, mysqlctl.PoolDynamicHostnameResolution),
}
w.writeConnID.Store(-1)
if w.onDemandDuration > 0 {
// see RequestHeartbeats() for use of onDemandRequestTicks
// it's basically a mechanism to rate limit operation RequestHeartbeats().
Expand Down Expand Up @@ -192,11 +194,6 @@ func (w *heartbeatWriter) write() error {
defer w.env.LogError()
ctx, cancel := context.WithDeadline(context.Background(), w.now().Add(w.interval))
defer cancel()
allPrivsConn, err := w.allPrivsPool.Get(ctx)
if err != nil {
return err
}
defer allPrivsConn.Recycle()

upsert, err := w.bindHeartbeatVars(sqlUpsertHeartbeat)
if err != nil {
Expand All @@ -207,6 +204,8 @@ func (w *heartbeatWriter) write() error {
return err
}
defer appConn.Recycle()
w.writeConnID.Store(appConn.Conn.ID())
defer w.writeConnID.Store(-1)
_, err = appConn.Conn.ExecuteFetch(upsert, 1, false)
if err != nil {
return err
Expand All @@ -215,6 +214,9 @@ func (w *heartbeatWriter) write() error {
}

func (w *heartbeatWriter) recordError(err error) {
if err == nil {
return
}
w.errorLog.Errorf("%v", err)
writeErrors.Add(1)
}
Expand All @@ -238,14 +240,63 @@ func (w *heartbeatWriter) enableWrites(enable bool) {
w.ticks.Start(w.writeHeartbeat)
}()
case false:
w.ticks.Stop()
// We stop the ticks in a separate go routine because it can block if the write is stuck on semi-sync ACKs.
// At the same time we try and kill the write that is in progress. We use the context and its cancellation
// for coordination between the two go-routines. In the end we will have guaranteed that the ticks have stopped
// and no write is in progress.
ctx, cancel := context.WithCancel(context.Background())
go func() {
w.ticks.Stop()
cancel()
}()
w.killWritesUntilStopped(ctx)

if w.onDemandDuration > 0 {
// Let the next RequestHeartbeats() go through
w.allowNextHeartbeatRequest()
}
}
}

// killWritesUntilStopped tries to kill the write in progress until the ticks have stopped.
func (w *heartbeatWriter) killWritesUntilStopped(ctx context.Context) {
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
// Actually try to kill the query.
err := w.killWrite()
w.recordError(err)
select {
case <-ctx.Done():
// If the context has been cancelled, then we know that the ticks have stopped.
// This guarantees that there are no writes in progress, so there is nothing to kill.
return
case <-ticker.C:
}
}
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
}

// killWrite kills the write in progress (if any).
func (w *heartbeatWriter) killWrite() error {
defer w.env.LogError()
writeId := w.writeConnID.Load()
if writeId == -1 {
return nil
}

ctx, cancel := context.WithDeadline(context.Background(), w.now().Add(w.interval))
defer cancel()
killConn, err := w.allPrivsPool.Get(ctx)
if err != nil {
log.Errorf("Kill conn didn't get connection :(")
return err
}
defer killConn.Recycle()

_, err = killConn.Conn.ExecuteFetch(fmt.Sprintf("kill %d", writeId), 1, false)
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
return err
}

// allowNextHeartbeatRequest ensures that the next call to RequestHeartbeats() passes through and
// is not dropped.
func (w *heartbeatWriter) allowNextHeartbeatRequest() {
Expand Down
44 changes: 44 additions & 0 deletions go/vt/vttablet/tabletserver/repltracker/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ limitations under the License.
package repltracker

import (
"context"
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -65,6 +67,48 @@ func TestWriteHeartbeatError(t *testing.T) {
assert.Equal(t, int64(1), writeErrors.Get())
}

// TestCloseWhileStuckWriting tests that Close shouldn't get stuck even if the heartbeat writer is stuck waiting for a semi-sync ACK.
func TestCloseWhileStuckWriting(t *testing.T) {
db := fakesqldb.New(t)
tw := newTestWriter(db, nil)
tw.isOpen = true

killWg := sync.WaitGroup{}
killWg.Add(1)
startedWaitWg := sync.WaitGroup{}
startedWaitWg.Add(1)

// Insert a query pattern that causes the upsert to block indefinitely until it has been killed.
// This simulates a stuck primary write due to a semi-sync ACK requirement.
db.AddQueryPatternWithCallback(`INSERT INTO .*heartbeat \(ts, tabletUid, keyspaceShard\).*`, &sqltypes.Result{}, func(s string) {
startedWaitWg.Done()
killWg.Wait()
})

// When we receive a kill query, we want to finish running the wait group to unblock the upsert query.
db.AddQueryPatternWithCallback("kill.*", &sqltypes.Result{}, func(s string) {
killWg.Done()
})

// Now we enable writes, but the first write will get blocked.
tw.enableWrites(true)
// We wait until the write has blocked to ensure we only call Close after we are stuck writing.
startedWaitWg.Wait()
// Even if the write is blocked, we should be able to disable writes without waiting indefinitely.
// This is what we call, when we try to Close the heartbeat writer.
ctx, cancel := context.WithCancel(context.Background())
go func() {
tw.enableWrites(false)
cancel()
}()
select {
case <-ctx.Done():
db.Close()
case <-time.After(1000 * time.Second):
t.Fatalf("Timed out waiting for heartbeat writer to close")
}
}

func newTestWriter(db *fakesqldb.DB, frozenTime *time.Time) *heartbeatWriter {
config := tabletenv.NewDefaultConfig()
config.ReplicationTracker.Mode = tabletenv.Heartbeat
Expand Down
Loading