Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make connection killing resilient to MySQL hangs #14500

Merged
merged 9 commits into from
Mar 4, 2024
13 changes: 8 additions & 5 deletions go/mysql/fakesqldb/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,11 +376,11 @@
}
key := strings.ToLower(query)
db.mu.Lock()
defer db.mu.Unlock()
db.queryCalled[key]++
db.querylog = append(db.querylog, key)
// Check if we should close the connection and provoke errno 2013.
if db.shouldClose.Load() {
defer db.mu.Unlock()

Check warning on line 383 in go/mysql/fakesqldb/server.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/fakesqldb/server.go#L383

Added line #L383 was not covered by tests
c.Close()

// log error
Expand All @@ -394,6 +394,8 @@
// The driver may send this at connection time, and we don't want it to
// interfere.
if key == "set names utf8" || strings.HasPrefix(key, "set collation_connection = ") {
defer db.mu.Unlock()

Check warning on line 397 in go/mysql/fakesqldb/server.go

View check run for this annotation

Codecov / codecov/patch

go/mysql/fakesqldb/server.go#L397

Added line #L397 was not covered by tests

// log error
if err := callback(&sqltypes.Result{}); err != nil {
log.Errorf("callback failed : %v", err)
Expand All @@ -403,12 +405,14 @@

// check if we should reject it.
if err, ok := db.rejectedData[key]; ok {
db.mu.Unlock()
return err
}

// Check explicit queries from AddQuery().
result, ok := db.data[key]
if ok {
db.mu.Unlock()
if f := result.BeforeFunc; f != nil {
f()
}
Expand All @@ -419,12 +423,9 @@
for _, pat := range db.patternData {
if pat.expr.MatchString(query) {
userCallback, ok := db.queryPatternUserCallback[pat.expr]
db.mu.Unlock()
if ok {
// Since the user call back can be indefinitely stuck, we shouldn't hold the lock indefinitely.
// This is only test code, so no actual cause for concern.
db.mu.Unlock()
userCallback(query)
db.mu.Lock()
Comment on lines -423 to -427
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had to refactor this a bit - there's other places in this code that can end up blocking (e.g. the BeforeFunc call further above) and where we need to release the lock beforehand.

Instead of unlocking and relocking here, we changed the unlock calls to not be defered. We're not happy about that, but it seemed like the simplest way forward.

I think it would be nice to refactor the fake db server to clean some of this up in the future.

}
if pat.err != "" {
return fmt.Errorf(pat.err)
Expand All @@ -433,6 +434,8 @@
}
}

defer db.mu.Unlock()

if db.neverFail.Load() {
return callback(&sqltypes.Result{})
}
Expand Down
182 changes: 98 additions & 84 deletions go/vt/vttablet/tabletserver/connpool/dbconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

const defaultKillTimeout = 5 * time.Second

// Conn is a db connection for tabletserver.
// It performs automatic reconnects as needed.
// Its Execute function has a timeout that can kill
Expand All @@ -52,11 +54,13 @@
env tabletenv.Env
dbaPool *dbconnpool.ConnectionPool
stats *tabletenv.Stats
current atomic.Value
current atomic.Pointer[string]

// err will be set if a query is killed through a Kill.
errmu sync.Mutex
err error

killTimeout time.Duration
}

// NewConnection creates a new DBConn. It triggers a CheckMySQL if creation fails.
Expand All @@ -71,12 +75,12 @@
return nil, err
}
db := &Conn{
conn: c,
env: pool.env,
stats: pool.env.Stats(),
dbaPool: pool.dbaPool,
conn: c,
env: pool.env,
stats: pool.env.Stats(),
dbaPool: pool.dbaPool,
killTimeout: defaultKillTimeout,
}
db.current.Store("")
return db, nil
}

Expand All @@ -87,12 +91,12 @@
return nil, err
}
dbconn := &Conn{
conn: c,
dbaPool: dbaPool,
stats: tabletenv.NewStats(servenv.NewExporter("Temp", "Tablet")),
env: env,
conn: c,
dbaPool: dbaPool,
stats: tabletenv.NewStats(servenv.NewExporter("Temp", "Tablet")),
env: env,
killTimeout: defaultKillTimeout,
}
dbconn.current.Store("")
if setting == nil {
return dbconn, nil
}
Expand Down Expand Up @@ -153,28 +157,42 @@
}

func (dbc *Conn) execOnce(ctx context.Context, query string, maxrows int, wantfields bool) (*sqltypes.Result, error) {
dbc.current.Store(query)
defer dbc.current.Store("")
dbc.current.Store(&query)
defer dbc.current.Store(nil)

// Check if the context is already past its deadline before
// trying to execute the query.
if err := ctx.Err(); err != nil {
return nil, fmt.Errorf("%v before execution started", err)
}

defer dbc.stats.MySQLTimings.Record("Exec", time.Now())

done, wg := dbc.setDeadline(ctx)
qr, err := dbc.conn.ExecuteFetch(query, maxrows, wantfields)
now := time.Now()
defer dbc.stats.MySQLTimings.Record("Exec", now)

if done != nil {
close(done)
wg.Wait()
type execResult struct {
result *sqltypes.Result
err error
}
if dbcerr := dbc.Err(); dbcerr != nil {
return nil, dbcerr

ch := make(chan execResult)
go func() {
result, err := dbc.conn.ExecuteFetch(query, maxrows, wantfields)
ch <- execResult{result, err}
}()

select {
case <-ctx.Done():
killCtx, cancel := context.WithTimeout(context.Background(), dbc.killTimeout)
defer cancel()

_ = dbc.KillWithContext(killCtx, ctx.Err().Error(), time.Since(now))
return nil, dbc.Err()
case r := <-ch:
if dbcErr := dbc.Err(); dbcErr != nil {
return nil, dbcErr
}
return r.result, r.err
}
return qr, err
}

// ExecOnce executes the specified query, but does not retry on connection errors.
Expand Down Expand Up @@ -250,22 +268,30 @@
}

func (dbc *Conn) streamOnce(ctx context.Context, query string, callback func(*sqltypes.Result) error, alloc func() *sqltypes.Result, streamBufferSize int) error {
defer dbc.stats.MySQLTimings.Record("ExecStream", time.Now())
dbc.current.Store(&query)
defer dbc.current.Store(nil)

dbc.current.Store(query)
defer dbc.current.Store("")
now := time.Now()
defer dbc.stats.MySQLTimings.Record("ExecStream", now)

done, wg := dbc.setDeadline(ctx)
err := dbc.conn.ExecuteStreamFetch(query, callback, alloc, streamBufferSize)
ch := make(chan error)
go func() {
ch <- dbc.conn.ExecuteStreamFetch(query, callback, alloc, streamBufferSize)
}()

if done != nil {
close(done)
wg.Wait()
}
if dbcerr := dbc.Err(); dbcerr != nil {
return dbcerr
select {
case <-ctx.Done():
killCtx, cancel := context.WithTimeout(context.Background(), dbc.killTimeout)
defer cancel()

Check warning on line 285 in go/vt/vttablet/tabletserver/connpool/dbconn.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletserver/connpool/dbconn.go#L283-L285

Added lines #L283 - L285 were not covered by tests

_ = dbc.KillWithContext(killCtx, ctx.Err().Error(), time.Since(now))
return dbc.Err()

Check warning on line 288 in go/vt/vttablet/tabletserver/connpool/dbconn.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletserver/connpool/dbconn.go#L287-L288

Added lines #L287 - L288 were not covered by tests
case err := <-ch:
if dbcErr := dbc.Err(); dbcErr != nil {
return dbcErr
}
return err
}
return err
}

// StreamOnce executes the query and streams the results. But, does not retry on connection errors.
Expand Down Expand Up @@ -363,10 +389,19 @@
return dbc.conn.IsClosed()
}

// Kill kills the currently executing query both on MySQL side
// Kill wraps KillWithContext using context.Background.
func (dbc *Conn) Kill(reason string, elapsed time.Duration) error {
return dbc.KillWithContext(context.Background(), reason, elapsed)
}

// KillWithContext kills the currently executing query both on MySQL side
// and on the connection side. If no query is executing, it's a no-op.
// Kill will also not kill a query more than once.
func (dbc *Conn) Kill(reason string, elapsed time.Duration) error {
func (dbc *Conn) KillWithContext(ctx context.Context, reason string, elapsed time.Duration) error {
if cause := context.Cause(ctx); cause != nil {
return cause
}

dbc.stats.KillCounters.Add("Queries", 1)
log.Infof("Due to %s, elapsed time: %v, killing query ID %v %s", reason, elapsed, dbc.conn.ID(), dbc.CurrentForLogging())

Expand All @@ -377,25 +412,43 @@
dbc.conn.Close()

// Server side action. Kill the session.
killConn, err := dbc.dbaPool.Get(context.TODO())
killConn, err := dbc.dbaPool.Get(ctx)
if err != nil {
log.Warningf("Failed to get conn from dba pool: %v", err)
return err
}
defer killConn.Recycle()

ch := make(chan error)
sql := fmt.Sprintf("kill %d", dbc.conn.ID())
_, err = killConn.Conn.ExecuteFetch(sql, 10000, false)
if err != nil {
log.Errorf("Could not kill query ID %v %s: %v", dbc.conn.ID(),
dbc.CurrentForLogging(), err)
return err
go func() {
_, err := killConn.Conn.ExecuteFetch(sql, -1, false)
ch <- err
}()

select {
case <-ctx.Done():
killConn.Close()

dbc.stats.InternalErrors.Add("HungQuery", 1)
log.Warningf("Query may be hung: %s", dbc.CurrentForLogging())

return context.Cause(ctx)
case err := <-ch:
if err != nil {
log.Errorf("Could not kill query ID %v %s: %v", dbc.conn.ID(), dbc.CurrentForLogging(), err)
return err
}
return nil
}
return nil
}

// Current returns the currently executing query.
func (dbc *Conn) Current() string {
return dbc.current.Load().(string)
if q := dbc.current.Load(); q != nil {
return *q
}
return ""
}

// ID returns the connection id.
Expand Down Expand Up @@ -437,45 +490,6 @@
return nil
}

// setDeadline starts a goroutine that will kill the currently executing query
// if the deadline is exceeded. It returns a channel and a waitgroup. After the
// query is done executing, the caller is required to close the done channel
// and wait for the waitgroup to make sure that the necessary cleanup is done.
func (dbc *Conn) setDeadline(ctx context.Context) (chan bool, *sync.WaitGroup) {
if ctx.Done() == nil {
return nil, nil
}
done := make(chan bool)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
startTime := time.Now()
select {
case <-ctx.Done():
dbc.Kill(ctx.Err().Error(), time.Since(startTime))
case <-done:
return
}
elapsed := time.Since(startTime)

// Give 2x the elapsed time and some buffer as grace period
// for the query to get killed.
tmr2 := time.NewTimer(2*elapsed + 5*time.Second)
defer tmr2.Stop()
select {
case <-tmr2.C:
dbc.stats.InternalErrors.Add("HungQuery", 1)
log.Warningf("Query may be hung: %s", dbc.CurrentForLogging())
case <-done:
return
}
<-done
log.Warningf("Hung query returned")
}()
return done, &wg
}

// CurrentForLogging applies transformations to the query making it suitable to log.
// It applies sanitization rules based on tablet settings and limits the max length of
// queries.
Expand Down
Loading
Loading