Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make connection killing resilient to MySQL hangs #14500

Merged
merged 9 commits into from
Mar 4, 2024
160 changes: 84 additions & 76 deletions go/vt/vttablet/tabletserver/connpool/dbconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
env tabletenv.Env
dbaPool *dbconnpool.ConnectionPool
stats *tabletenv.Stats
current atomic.Value
current atomic.Pointer[string]

// err will be set if a query is killed through a Kill.
errmu sync.Mutex
Expand All @@ -76,7 +76,6 @@
stats: pool.env.Stats(),
dbaPool: pool.dbaPool,
}
db.current.Store("")
return db, nil
}

Expand All @@ -91,7 +90,6 @@
dbaPool: dbaPool,
stats: tabletenv.NewStats(servenv.NewExporter("Temp", "Tablet")),
}
dbconn.current.Store("")
if setting == nil {
return dbconn, nil
}
Expand Down Expand Up @@ -152,28 +150,42 @@
}

func (dbc *Conn) execOnce(ctx context.Context, query string, maxrows int, wantfields bool) (*sqltypes.Result, error) {
dbc.current.Store(query)
defer dbc.current.Store("")
dbc.current.Store(&query)
defer dbc.current.Store(nil)

// Check if the context is already past its deadline before
// trying to execute the query.
if err := ctx.Err(); err != nil {
return nil, fmt.Errorf("%v before execution started", err)
}

defer dbc.stats.MySQLTimings.Record("Exec", time.Now())
now := time.Now()
defer dbc.stats.MySQLTimings.Record("Exec", now)

done, wg := dbc.setDeadline(ctx)
qr, err := dbc.conn.ExecuteFetch(query, maxrows, wantfields)

if done != nil {
close(done)
wg.Wait()
type execResult struct {
result *sqltypes.Result
err error
}
if dbcerr := dbc.Err(); dbcerr != nil {
return nil, dbcerr

ch := make(chan execResult)
go func() {
result, err := dbc.conn.ExecuteFetch(query, maxrows, wantfields)
ch <- execResult{result, err}
}()

select {
case <-ctx.Done():
killCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

_ = dbc.KillWithContext(killCtx, ctx.Err().Error(), time.Since(now))
return nil, dbc.Err()
case r := <-ch:
if dbcErr := dbc.Err(); dbcErr != nil {
return nil, dbcErr
}
return r.result, r.err
}
return qr, err
}

// ExecOnce executes the specified query, but does not retry on connection errors.
Expand Down Expand Up @@ -249,22 +261,30 @@
}

func (dbc *Conn) streamOnce(ctx context.Context, query string, callback func(*sqltypes.Result) error, alloc func() *sqltypes.Result, streamBufferSize int) error {
defer dbc.stats.MySQLTimings.Record("ExecStream", time.Now())
dbc.current.Store(&query)
defer dbc.current.Store(nil)

dbc.current.Store(query)
defer dbc.current.Store("")
now := time.Now()
defer dbc.stats.MySQLTimings.Record("ExecStream", now)

done, wg := dbc.setDeadline(ctx)
err := dbc.conn.ExecuteStreamFetch(query, callback, alloc, streamBufferSize)
ch := make(chan error)
go func() {
ch <- dbc.conn.ExecuteStreamFetch(query, callback, alloc, streamBufferSize)
}()

if done != nil {
close(done)
wg.Wait()
}
if dbcerr := dbc.Err(); dbcerr != nil {
return dbcerr
select {
case <-ctx.Done():
killCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

Check warning on line 279 in go/vt/vttablet/tabletserver/connpool/dbconn.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletserver/connpool/dbconn.go#L277-L279

Added lines #L277 - L279 were not covered by tests
_ = dbc.KillWithContext(killCtx, ctx.Err().Error(), time.Since(now))
return dbc.Err()
case err := <-ch:

Check warning on line 282 in go/vt/vttablet/tabletserver/connpool/dbconn.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletserver/connpool/dbconn.go#L281-L282

Added lines #L281 - L282 were not covered by tests
if dbcErr := dbc.Err(); dbcErr != nil {
return dbcErr
}
return err
}
return err
}

// StreamOnce executes the query and streams the results. But, does not retry on connection errors.
Expand Down Expand Up @@ -362,10 +382,19 @@
return dbc.conn.IsClosed()
}

// Kill kills the currently executing query both on MySQL side
// Kill wraps KillWithContext using context.Background.
func (dbc *Conn) Kill(reason string, elapsed time.Duration) error {
return dbc.KillWithContext(context.Background(), reason, elapsed)
}

// KillWithContext kills the currently executing query both on MySQL side
// and on the connection side. If no query is executing, it's a no-op.
// Kill will also not kill a query more than once.
func (dbc *Conn) Kill(reason string, elapsed time.Duration) error {
func (dbc *Conn) KillWithContext(ctx context.Context, reason string, elapsed time.Duration) error {
if cause := context.Cause(ctx); cause != nil {
return cause
}

Check warning on line 396 in go/vt/vttablet/tabletserver/connpool/dbconn.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletserver/connpool/dbconn.go#L396

Added line #L396 was not covered by tests

dbc.stats.KillCounters.Add("Queries", 1)
log.Infof("Due to %s, elapsed time: %v, killing query ID %v %s", reason, elapsed, dbc.conn.ID(), dbc.CurrentForLogging())

Expand All @@ -376,25 +405,43 @@
dbc.conn.Close()

// Server side action. Kill the session.
killConn, err := dbc.dbaPool.Get(context.TODO())
killConn, err := dbc.dbaPool.Get(ctx)
if err != nil {
log.Warningf("Failed to get conn from dba pool: %v", err)
return err
}
defer killConn.Recycle()

ch := make(chan error)
sql := fmt.Sprintf("kill %d", dbc.conn.ID())
_, err = killConn.Conn.ExecuteFetch(sql, 10000, false)
if err != nil {
log.Errorf("Could not kill query ID %v %s: %v", dbc.conn.ID(),
dbc.CurrentForLogging(), err)
return err
go func() {
_, err := killConn.Conn.ExecuteFetch(sql, -1, false)
ch <- err
}()

select {
case <-ctx.Done():
killConn.Close()

Check warning on line 425 in go/vt/vttablet/tabletserver/connpool/dbconn.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletserver/connpool/dbconn.go#L424-L425

Added lines #L424 - L425 were not covered by tests
dbc.stats.InternalErrors.Add("HungQuery", 1)
log.Warningf("Query may be hung: %s", dbc.CurrentForLogging())

Check warning on line 428 in go/vt/vttablet/tabletserver/connpool/dbconn.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletserver/connpool/dbconn.go#L427-L428

Added lines #L427 - L428 were not covered by tests
return context.Cause(ctx)
case err := <-ch:

Check warning on line 430 in go/vt/vttablet/tabletserver/connpool/dbconn.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletserver/connpool/dbconn.go#L430

Added line #L430 was not covered by tests
if err != nil {
log.Errorf("Could not kill query ID %v %s: %v", dbc.conn.ID(), dbc.CurrentForLogging(), err)
return err
}
return nil
}
return nil
}

// Current returns the currently executing query.
func (dbc *Conn) Current() string {
return dbc.current.Load().(string)
if q := dbc.current.Load(); q != nil {
return *q
}
return ""
}

// ID returns the connection id.
Expand Down Expand Up @@ -436,45 +483,6 @@
return nil
}

// setDeadline starts a goroutine that will kill the currently executing query
// if the deadline is exceeded. It returns a channel and a waitgroup. After the
// query is done executing, the caller is required to close the done channel
// and wait for the waitgroup to make sure that the necessary cleanup is done.
func (dbc *Conn) setDeadline(ctx context.Context) (chan bool, *sync.WaitGroup) {
if ctx.Done() == nil {
return nil, nil
}
done := make(chan bool)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
startTime := time.Now()
select {
case <-ctx.Done():
dbc.Kill(ctx.Err().Error(), time.Since(startTime))
case <-done:
return
}
elapsed := time.Since(startTime)

// Give 2x the elapsed time and some buffer as grace period
// for the query to get killed.
tmr2 := time.NewTimer(2*elapsed + 5*time.Second)
defer tmr2.Stop()
select {
case <-tmr2.C:
dbc.stats.InternalErrors.Add("HungQuery", 1)
log.Warningf("Query may be hung: %s", dbc.CurrentForLogging())
case <-done:
return
}
<-done
log.Warningf("Hung query returned")
}()
return done, &wg
}

// CurrentForLogging applies transformations to the query making it suitable to log.
// It applies sanitization rules based on tablet settings and limits the max length of
// queries.
Expand Down
Loading