Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Kill Query for Non-Transaction Query Execution and Update Query Timeout / Cancelled Error Message #15694

Merged
merged 9 commits into from
Apr 23, 2024
6 changes: 6 additions & 0 deletions changelog/20.0/20.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
- [Delete with Subquery Support](#delete-subquery)
- [Delete with Multi Target Support](#delete-multi-target)
- [User Defined Functions Support](#udf-support)
- **[Query Timeout](#query-timeout)**
- **[Flag changes](#flag-changes)**
- [`pprof-http` default change](#pprof-http-default)
- [New `healthcheck-dial-concurrency` flag](#healthcheck-dial-concurrency-flag)
Expand Down Expand Up @@ -193,6 +194,11 @@ It should be enabled in VTGate with the `--enable-udfs` flag.

More details about how to load UDFs is available in [MySQL Docs](https://dev.mysql.com/doc/extending-mysql/8.0/en/adding-loadable-function.html)

### <a id="query-timeout"/>Query Timeout
On a query timeout, Vitess closed the connection using the `kill connection` statement. This leads to connection churn
which is not desirable in some cases. To avoid this, Vitess now uses the `kill query` statement to cancel the query.
This will only cancel the query and does not terminate the connection.

### <a id="flag-changes"/>Flag Changes

#### <a id="pprof-http-default"/> `pprof-http` Default Change
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ func testScheduler(t *testing.T) {
onlineddl.CheckCompleteMigration(t, &vtParams, shards, t1uuid, true)
})
t.Run("cut-over fail due to timeout", func(t *testing.T) {
waitForMessage(t, t1uuid, "due to context deadline exceeded")
waitForMessage(t, t1uuid, "(errno 3024) (sqlstate HY000): Query execution was interrupted, maximum statement execution time exceeded")
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusRunning)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning)
Expand All @@ -606,7 +606,7 @@ func testScheduler(t *testing.T) {
})
t.Run("expect transaction failure", func(t *testing.T) {
select {
case commitTransactionChan <- true: //good
case commitTransactionChan <- true: // good
case <-ctx.Done():
assert.Fail(t, ctx.Err().Error())
}
Expand Down Expand Up @@ -1445,7 +1445,7 @@ DROP TABLE IF EXISTS stress_test
}
})

//DROP
// DROP

t.Run("online DROP TABLE", func(t *testing.T) {
uuid := testOnlineDDLStatement(t, createParams(dropStatement, onlineSingletonDDLStrategy, "vtgate", "", "", "", false))
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/endtoend/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func TestConsolidatorReplicasOnly(t *testing.T) {
func TestQueryPlanCache(t *testing.T) {
var cachedPlanSize = int((&tabletserver.TabletPlan{}).CachedSize(true))

//sleep to avoid race between SchemaChanged event clearing out the plans cache which breaks this test
// sleep to avoid race between SchemaChanged event clearing out the plans cache which breaks this test
framework.Server.WaitForSchemaReset(2 * time.Second)

bindVars := map[string]*querypb.BindVariable{
Expand Down Expand Up @@ -276,7 +276,7 @@ func TestQueryTimeout(t *testing.T) {
assert.Equal(t, vtrpcpb.Code_ABORTED, vterrors.Code(err))
vend := framework.DebugVars()
verifyIntValue(t, vend, "QueryTimeout", int(100*time.Millisecond))
compareIntDiff(t, vend, "Kills/Queries", vstart, 1)
compareIntDiff(t, vend, "Kills/Connections", vstart, 1)
}

func changeVar(t *testing.T, name, value string) (revert func()) {
Expand Down
19 changes: 18 additions & 1 deletion go/vt/vttablet/endtoend/transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func TestShutdownGracePeriodWithReserveExecute(t *testing.T) {
client.Rollback()
}

func TestShortTxTimeout(t *testing.T) {
func TestShortTxTimeoutOltp(t *testing.T) {
client := framework.NewClient()
defer framework.Server.Config().SetTxTimeoutForWorkload(
framework.Server.Config().TxTimeoutForWorkload(querypb.ExecuteOptions_OLTP),
Expand All @@ -488,6 +488,23 @@ func TestShortTxTimeout(t *testing.T) {
client.Rollback()
}

func TestShortTxTimeoutOlap(t *testing.T) {
client := framework.NewClient()
defer framework.Server.Config().SetTxTimeoutForWorkload(
framework.Server.Config().TxTimeoutForWorkload(querypb.ExecuteOptions_OLAP),
querypb.ExecuteOptions_OLAP,
)
framework.Server.Config().SetTxTimeoutForWorkload(10*time.Millisecond, querypb.ExecuteOptions_OLAP)

err := client.Begin(false)
require.NoError(t, err)
start := time.Now()
_, err = client.StreamExecute("select sleep(10) from dual", nil)
assert.Error(t, err)
assert.True(t, time.Since(start) < 5*time.Second, time.Since(start))
client.Rollback()
}

func TestMMCommitFlow(t *testing.T) {
client := framework.NewClient()
defer client.Execute("delete from vitess_test where intval=4", nil)
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vttablet/onlineddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,10 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh
defer renameConn.Recycle()
defer func() {
if !renameWasSuccessful {
renameConn.Conn.Kill("premature exit while renaming tables", 0)
err := renameConn.Conn.Kill("premature exit while renaming tables", 0)
if err != nil {
log.Warningf("failed to kill rename connection: %v", err)
harshit-gangal marked this conversation as resolved.
Show resolved Hide resolved
}
}
}()
// See if backend MySQL server supports 'rename_table_preserve_foreign_key' variable
Expand Down
155 changes: 123 additions & 32 deletions go/vt/vttablet/tabletserver/connpool/dbconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package connpool

import (
"context"
"errors"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -108,11 +109,13 @@ func NewConn(ctx context.Context, params dbconfigs.Connector, dbaPool *dbconnpoo
}

// Err returns an error if there was a client initiated error
// like a query kill.
// like a query kill and resets the error message on the connection.
func (dbc *Conn) Err() error {
dbc.errmu.Lock()
defer dbc.errmu.Unlock()
return dbc.err
err := dbc.err
dbc.err = nil
return err
}

// Exec executes the specified query. If there is a connection error, it will reconnect
Expand All @@ -122,7 +125,7 @@ func (dbc *Conn) Exec(ctx context.Context, query string, maxrows int, wantfields
defer span.Finish()

for attempt := 1; attempt <= 2; attempt++ {
r, err := dbc.execOnce(ctx, query, maxrows, wantfields)
r, err := dbc.execOnce(ctx, query, maxrows, wantfields, false)
switch {
case err == nil:
// Success.
Expand Down Expand Up @@ -156,7 +159,7 @@ func (dbc *Conn) Exec(ctx context.Context, query string, maxrows int, wantfields
panic("unreachable")
}

func (dbc *Conn) execOnce(ctx context.Context, query string, maxrows int, wantfields bool) (*sqltypes.Result, error) {
func (dbc *Conn) execOnce(ctx context.Context, query string, maxrows int, wantfields bool, insideTxn bool) (*sqltypes.Result, error) {
dbc.current.Store(&query)
defer dbc.current.Store(nil)

Expand All @@ -178,14 +181,16 @@ func (dbc *Conn) execOnce(ctx context.Context, query string, maxrows int, wantfi
go func() {
result, err := dbc.conn.ExecuteFetch(query, maxrows, wantfields)
ch <- execResult{result, err}
close(ch)
}()

select {
case <-ctx.Done():
killCtx, cancel := context.WithTimeout(context.Background(), dbc.killTimeout)
defer cancel()

_ = dbc.KillWithContext(killCtx, ctx.Err().Error(), time.Since(now))
dbc.terminate(ctx, insideTxn, now)
if !insideTxn {
// wait for the execute method to finish to make connection reusable.
<-ch
}
return nil, dbc.Err()
case r := <-ch:
if dbcErr := dbc.Err(); dbcErr != nil {
Expand All @@ -195,9 +200,28 @@ func (dbc *Conn) execOnce(ctx context.Context, query string, maxrows int, wantfi
}
}

// terminate kills the query or connection based on the transaction status
func (dbc *Conn) terminate(ctx context.Context, insideTxn bool, now time.Time) {
var errMsg string
switch {
case errors.Is(ctx.Err(), context.DeadlineExceeded):
errMsg = "(errno 3024) (sqlstate HY000): Query execution was interrupted, maximum statement execution time exceeded"
case errors.Is(ctx.Err(), context.Canceled):
errMsg = "(errno 1317) (sqlstate 70100): Query execution was interrupted"
default:
errMsg = ctx.Err().Error()
}
if insideTxn {
// we can't safely kill a query in a transaction, we need to kill the connection
_ = dbc.Kill(errMsg, time.Since(now))
} else {
_ = dbc.KillQuery(errMsg, time.Since(now))
Comment on lines +215 to +218
Copy link
Contributor

@mattlord mattlord Apr 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that we attempt to kill the connection/query. The comment says that we do, but w/o checking the error we don't know if it was successful do we? IMO we should return the error here and check it at the call sites:

                return dbc.Kill(errMsg, time.Since(now))
	} else {
		return dbc.KillQuery(errMsg, time.Since(now))

It looks like dbc.KillWithContext() can fail for a number of reasons and we shouldn't assume the kill succeeded, should we?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If for any reason kill fails, we log the error message. Kill itself is not directly in the query execution path. But, it kills any other running query, which will receive the error message if the kill succeeds.

}
}

// ExecOnce executes the specified query, but does not retry on connection errors.
func (dbc *Conn) ExecOnce(ctx context.Context, query string, maxrows int, wantfields bool) (*sqltypes.Result, error) {
return dbc.execOnce(ctx, query, maxrows, wantfields)
return dbc.execOnce(ctx, query, maxrows, wantfields, true /* Once means we are in a txn*/)
}

// FetchNext returns the next result set.
Expand Down Expand Up @@ -235,6 +259,7 @@ func (dbc *Conn) Stream(ctx context.Context, query string, callback func(*sqltyp
},
alloc,
streamBufferSize,
false,
)
switch {
case err == nil:
Expand Down Expand Up @@ -267,7 +292,14 @@ func (dbc *Conn) Stream(ctx context.Context, query string, callback func(*sqltyp
panic("unreachable")
}

func (dbc *Conn) streamOnce(ctx context.Context, query string, callback func(*sqltypes.Result) error, alloc func() *sqltypes.Result, streamBufferSize int) error {
func (dbc *Conn) streamOnce(
ctx context.Context,
query string,
callback func(*sqltypes.Result) error,
alloc func() *sqltypes.Result,
streamBufferSize int,
insideTxn bool,
) error {
dbc.current.Store(&query)
defer dbc.current.Store(nil)

Expand All @@ -277,14 +309,16 @@ func (dbc *Conn) streamOnce(ctx context.Context, query string, callback func(*sq
ch := make(chan error)
go func() {
ch <- dbc.conn.ExecuteStreamFetch(query, callback, alloc, streamBufferSize)
close(ch)
}()

select {
case <-ctx.Done():
killCtx, cancel := context.WithTimeout(context.Background(), dbc.killTimeout)
defer cancel()

_ = dbc.KillWithContext(killCtx, ctx.Err().Error(), time.Since(now))
dbc.terminate(ctx, insideTxn, now)
if !insideTxn {
// wait for the execute method to finish to make connection reusable.
<-ch
}
return dbc.Err()
case err := <-ch:
if dbcErr := dbc.Err(); dbcErr != nil {
Expand All @@ -295,7 +329,14 @@ func (dbc *Conn) streamOnce(ctx context.Context, query string, callback func(*sq
}

// StreamOnce executes the query and streams the results. But, does not retry on connection errors.
func (dbc *Conn) StreamOnce(ctx context.Context, query string, callback func(*sqltypes.Result) error, alloc func() *sqltypes.Result, streamBufferSize int, includedFields querypb.ExecuteOptions_IncludedFields) error {
func (dbc *Conn) StreamOnce(
ctx context.Context,
query string,
callback func(*sqltypes.Result) error,
alloc func() *sqltypes.Result,
streamBufferSize int,
includedFields querypb.ExecuteOptions_IncludedFields,
) error {
resultSent := false
return dbc.streamOnce(
ctx,
Expand All @@ -309,6 +350,7 @@ func (dbc *Conn) StreamOnce(ctx context.Context, query string, callback func(*sq
},
alloc,
streamBufferSize,
true, // Once means we are in a txn
)
}

Expand Down Expand Up @@ -364,7 +406,7 @@ func (dbc *Conn) Close() {

// ApplySetting implements the pools.Resource interface.
func (dbc *Conn) ApplySetting(ctx context.Context, setting *smartconnpool.Setting) error {
if _, err := dbc.execOnce(ctx, setting.ApplyQuery(), 1, false); err != nil {
if _, err := dbc.execOnce(ctx, setting.ApplyQuery(), 1, false, false); err != nil {
return err
}
dbc.setting = setting
Expand All @@ -373,7 +415,7 @@ func (dbc *Conn) ApplySetting(ctx context.Context, setting *smartconnpool.Settin

// ResetSetting implements the pools.Resource interface.
func (dbc *Conn) ResetSetting(ctx context.Context) error {
if _, err := dbc.execOnce(ctx, dbc.setting.ResetQuery(), 1, false); err != nil {
if _, err := dbc.execOnce(ctx, dbc.setting.ResetQuery(), 1, false, false); err != nil {
return err
}
dbc.setting = nil
Expand All @@ -389,25 +431,31 @@ func (dbc *Conn) IsClosed() bool {
return dbc.conn.IsClosed()
}

// Kill wraps KillWithContext using context.Background.
// Kill executes a kill statement to terminate the connection.
func (dbc *Conn) Kill(reason string, elapsed time.Duration) error {
return dbc.KillWithContext(context.Background(), reason, elapsed)
ctx, cancel := context.WithTimeout(context.Background(), dbc.killTimeout)
defer cancel()

return dbc.kill(ctx, reason, elapsed)
}

// KillWithContext kills the currently executing query both on MySQL side
// and on the connection side. If no query is executing, it's a no-op.
// Kill will also not kill a query more than once.
func (dbc *Conn) KillWithContext(ctx context.Context, reason string, elapsed time.Duration) error {
if cause := context.Cause(ctx); cause != nil {
return cause
}
// KillQuery executes a kill query statement to terminate the running query on the connection.
func (dbc *Conn) KillQuery(reason string, elapsed time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), dbc.killTimeout)
defer cancel()

dbc.stats.KillCounters.Add("Queries", 1)
log.Infof("Due to %s, elapsed time: %v, killing query ID %v %s", reason, elapsed, dbc.conn.ID(), dbc.CurrentForLogging())
return dbc.killQuery(ctx, reason, elapsed)
}

// kill closes the connection and stops any executing query both on MySQL and
// vttablet.
func (dbc *Conn) kill(ctx context.Context, reason string, elapsed time.Duration) error {
dbc.stats.KillCounters.Add("Connections", 1)
log.Infof("Due to %s, elapsed time: %v, killing connection ID %v %s", reason, elapsed, dbc.conn.ID(), dbc.CurrentForLogging())

// Client side action. Set error and close connection.
dbc.errmu.Lock()
dbc.err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "(errno 2013) due to %s, elapsed time: %v, killing query ID %v", reason, elapsed, dbc.conn.ID())
dbc.err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%s, elapsed time: %v, killing connection ID %v", reason, elapsed, dbc.conn.ID())
dbc.errmu.Unlock()
dbc.conn.Close()

Expand All @@ -426,6 +474,49 @@ func (dbc *Conn) KillWithContext(ctx context.Context, reason string, elapsed tim
ch <- err
}()

select {
case <-ctx.Done():
killConn.Close()

dbc.stats.InternalErrors.Add("HungConnection", 1)
log.Warningf("Connection may be hung: %s", dbc.CurrentForLogging())
harshit-gangal marked this conversation as resolved.
Show resolved Hide resolved

return context.Cause(ctx)
case err := <-ch:
if err != nil {
log.Errorf("Could not kill connection ID %v %s: %v", dbc.conn.ID(), dbc.CurrentForLogging(), err)
return err
}
return nil
}
}

// killQuery kills the currently executing query both on MySQL side
// and on the connection side.
func (dbc *Conn) killQuery(ctx context.Context, reason string, elapsed time.Duration) error {
dbc.stats.KillCounters.Add("Queries", 1)
log.Infof("Due to %s, elapsed time: %v, killing query ID %v %s", reason, elapsed, dbc.conn.ID(), dbc.CurrentForLogging())

// Client side action. Set error for killing the query on timeout.
dbc.errmu.Lock()
dbc.err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%s, elapsed time: %v, killing query ID %v", reason, elapsed, dbc.conn.ID())
dbc.errmu.Unlock()

// Server side action. Kill the executing query.
killConn, err := dbc.dbaPool.Get(ctx)
if err != nil {
log.Warningf("Failed to get conn from dba pool: %v", err)
return err
}
defer killConn.Recycle()

ch := make(chan error)
sql := fmt.Sprintf("kill query %d", dbc.conn.ID())
go func() {
_, err := killConn.Conn.ExecuteFetch(sql, -1, false)
ch <- err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not close the channel here as we did elsewhere?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it in exec and streamexec, there is no reason we cannot add it here.
Added now.

}()

select {
case <-ctx.Done():
killConn.Close()
Expand Down Expand Up @@ -503,7 +594,7 @@ func (dbc *Conn) CurrentForLogging() string {
return dbc.env.Environment().Parser().TruncateForLog(queryToLog)
}

func (dbc *Conn) applySameSetting(ctx context.Context) (err error) {
_, err = dbc.execOnce(ctx, dbc.setting.ApplyQuery(), 1, false)
return
func (dbc *Conn) applySameSetting(ctx context.Context) error {
_, err := dbc.execOnce(ctx, dbc.setting.ApplyQuery(), 1, false, false)
return err
}
Loading
Loading