diff --git a/changelog/20.0/20.0.0/summary.md b/changelog/20.0/20.0.0/summary.md index 64588cd8be4..e8d47e0d0b1 100644 --- a/changelog/20.0/20.0.0/summary.md +++ b/changelog/20.0/20.0.0/summary.md @@ -22,6 +22,7 @@ - [Delete with Subquery Support](#delete-subquery) - [Delete with Multi Target Support](#delete-multi-target) - [User Defined Functions Support](#udf-support) + - **[Query Timeout](#query-timeout)** - **[Flag changes](#flag-changes)** - [`pprof-http` default change](#pprof-http-default) - [New `healthcheck-dial-concurrency` flag](#healthcheck-dial-concurrency-flag) @@ -193,6 +194,11 @@ It should be enabled in VTGate with the `--enable-udfs` flag. More details about how to load UDFs is available in [MySQL Docs](https://dev.mysql.com/doc/extending-mysql/8.0/en/adding-loadable-function.html) +### Query Timeout +On a query timeout, Vitess closed the connection using the `kill connection` statement. This leads to connection churn +which is not desirable in some cases. To avoid this, Vitess now uses the `kill query` statement to cancel the query. +This will only cancel the query and does not terminate the connection. + ### Flag Changes #### `pprof-http` Default Change diff --git a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go index d5106db6bf6..cc927b5d310 100644 --- a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go +++ b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go @@ -583,7 +583,7 @@ func testScheduler(t *testing.T) { onlineddl.CheckCompleteMigration(t, &vtParams, shards, t1uuid, true) }) t.Run("cut-over fail due to timeout", func(t *testing.T) { - waitForMessage(t, t1uuid, "due to context deadline exceeded") + waitForMessage(t, t1uuid, "(errno 3024) (sqlstate HY000): Query execution was interrupted, maximum statement execution time exceeded") status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, t1uuid, normalWaitTime, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed, schema.OnlineDDLStatusRunning) fmt.Printf("# Migration status (for debug purposes): <%s>\n", status) onlineddl.CheckMigrationStatus(t, &vtParams, shards, t1uuid, schema.OnlineDDLStatusRunning) @@ -606,7 +606,7 @@ func testScheduler(t *testing.T) { }) t.Run("expect transaction failure", func(t *testing.T) { select { - case commitTransactionChan <- true: //good + case commitTransactionChan <- true: // good case <-ctx.Done(): assert.Fail(t, ctx.Err().Error()) } @@ -1445,7 +1445,7 @@ DROP TABLE IF EXISTS stress_test } }) - //DROP + // DROP t.Run("online DROP TABLE", func(t *testing.T) { uuid := testOnlineDDLStatement(t, createParams(dropStatement, onlineSingletonDDLStrategy, "vtgate", "", "", "", false)) diff --git a/go/vt/vttablet/endtoend/config_test.go b/go/vt/vttablet/endtoend/config_test.go index 9eef54bd0bb..b1dc7f5dcb9 100644 --- a/go/vt/vttablet/endtoend/config_test.go +++ b/go/vt/vttablet/endtoend/config_test.go @@ -195,7 +195,7 @@ func TestConsolidatorReplicasOnly(t *testing.T) { func TestQueryPlanCache(t *testing.T) { var cachedPlanSize = int((&tabletserver.TabletPlan{}).CachedSize(true)) - //sleep to avoid race between SchemaChanged event clearing out the plans cache which breaks this test + // sleep to avoid race between SchemaChanged event clearing out the plans cache which breaks this test framework.Server.WaitForSchemaReset(2 * time.Second) bindVars := map[string]*querypb.BindVariable{ @@ -276,7 +276,7 @@ func TestQueryTimeout(t *testing.T) { assert.Equal(t, vtrpcpb.Code_ABORTED, vterrors.Code(err)) vend := framework.DebugVars() verifyIntValue(t, vend, "QueryTimeout", int(100*time.Millisecond)) - compareIntDiff(t, vend, "Kills/Queries", vstart, 1) + compareIntDiff(t, vend, "Kills/Connections", vstart, 1) } func changeVar(t *testing.T, name, value string) (revert func()) { diff --git a/go/vt/vttablet/endtoend/transaction_test.go b/go/vt/vttablet/endtoend/transaction_test.go index 8f6546df5f1..b15e73585ba 100644 --- a/go/vt/vttablet/endtoend/transaction_test.go +++ b/go/vt/vttablet/endtoend/transaction_test.go @@ -471,7 +471,7 @@ func TestShutdownGracePeriodWithReserveExecute(t *testing.T) { client.Rollback() } -func TestShortTxTimeout(t *testing.T) { +func TestShortTxTimeoutOltp(t *testing.T) { client := framework.NewClient() defer framework.Server.Config().SetTxTimeoutForWorkload( framework.Server.Config().TxTimeoutForWorkload(querypb.ExecuteOptions_OLTP), @@ -488,6 +488,23 @@ func TestShortTxTimeout(t *testing.T) { client.Rollback() } +func TestShortTxTimeoutOlap(t *testing.T) { + client := framework.NewClient() + defer framework.Server.Config().SetTxTimeoutForWorkload( + framework.Server.Config().TxTimeoutForWorkload(querypb.ExecuteOptions_OLAP), + querypb.ExecuteOptions_OLAP, + ) + framework.Server.Config().SetTxTimeoutForWorkload(10*time.Millisecond, querypb.ExecuteOptions_OLAP) + + err := client.Begin(false) + require.NoError(t, err) + start := time.Now() + _, err = client.StreamExecute("select sleep(10) from dual", nil) + assert.Error(t, err) + assert.True(t, time.Since(start) < 5*time.Second, time.Since(start)) + client.Rollback() +} + func TestMMCommitFlow(t *testing.T) { client := framework.NewClient() defer client.Execute("delete from vitess_test where intval=4", nil) diff --git a/go/vt/vttablet/onlineddl/executor.go b/go/vt/vttablet/onlineddl/executor.go index ae32c032ec6..acbfbd8fd69 100644 --- a/go/vt/vttablet/onlineddl/executor.go +++ b/go/vt/vttablet/onlineddl/executor.go @@ -974,7 +974,10 @@ func (e *Executor) cutOverVReplMigration(ctx context.Context, s *VReplStream, sh defer renameConn.Recycle() defer func() { if !renameWasSuccessful { - renameConn.Conn.Kill("premature exit while renaming tables", 0) + err := renameConn.Conn.Kill("premature exit while renaming tables", 0) + if err != nil { + log.Warningf("Failed to kill connection being used to rename tables in OnlineDDL migration %s: %v", onlineDDL.UUID, err) + } } }() // See if backend MySQL server supports 'rename_table_preserve_foreign_key' variable diff --git a/go/vt/vttablet/tabletserver/connpool/dbconn.go b/go/vt/vttablet/tabletserver/connpool/dbconn.go index a41fcc7b7ec..af8c5fbc78e 100644 --- a/go/vt/vttablet/tabletserver/connpool/dbconn.go +++ b/go/vt/vttablet/tabletserver/connpool/dbconn.go @@ -18,6 +18,7 @@ package connpool import ( "context" + "errors" "fmt" "strings" "sync" @@ -108,11 +109,13 @@ func NewConn(ctx context.Context, params dbconfigs.Connector, dbaPool *dbconnpoo } // Err returns an error if there was a client initiated error -// like a query kill. +// like a query kill and resets the error message on the connection. func (dbc *Conn) Err() error { dbc.errmu.Lock() defer dbc.errmu.Unlock() - return dbc.err + err := dbc.err + dbc.err = nil + return err } // Exec executes the specified query. If there is a connection error, it will reconnect @@ -122,7 +125,7 @@ func (dbc *Conn) Exec(ctx context.Context, query string, maxrows int, wantfields defer span.Finish() for attempt := 1; attempt <= 2; attempt++ { - r, err := dbc.execOnce(ctx, query, maxrows, wantfields) + r, err := dbc.execOnce(ctx, query, maxrows, wantfields, false) switch { case err == nil: // Success. @@ -156,7 +159,7 @@ func (dbc *Conn) Exec(ctx context.Context, query string, maxrows int, wantfields panic("unreachable") } -func (dbc *Conn) execOnce(ctx context.Context, query string, maxrows int, wantfields bool) (*sqltypes.Result, error) { +func (dbc *Conn) execOnce(ctx context.Context, query string, maxrows int, wantfields bool, insideTxn bool) (*sqltypes.Result, error) { dbc.current.Store(&query) defer dbc.current.Store(nil) @@ -178,14 +181,16 @@ func (dbc *Conn) execOnce(ctx context.Context, query string, maxrows int, wantfi go func() { result, err := dbc.conn.ExecuteFetch(query, maxrows, wantfields) ch <- execResult{result, err} + close(ch) }() select { case <-ctx.Done(): - killCtx, cancel := context.WithTimeout(context.Background(), dbc.killTimeout) - defer cancel() - - _ = dbc.KillWithContext(killCtx, ctx.Err().Error(), time.Since(now)) + dbc.terminate(ctx, insideTxn, now) + if !insideTxn { + // wait for the execute method to finish to make connection reusable. + <-ch + } return nil, dbc.Err() case r := <-ch: if dbcErr := dbc.Err(); dbcErr != nil { @@ -195,9 +200,28 @@ func (dbc *Conn) execOnce(ctx context.Context, query string, maxrows int, wantfi } } +// terminate kills the query or connection based on the transaction status +func (dbc *Conn) terminate(ctx context.Context, insideTxn bool, now time.Time) { + var errMsg string + switch { + case errors.Is(ctx.Err(), context.DeadlineExceeded): + errMsg = "(errno 3024) (sqlstate HY000): Query execution was interrupted, maximum statement execution time exceeded" + case errors.Is(ctx.Err(), context.Canceled): + errMsg = "(errno 1317) (sqlstate 70100): Query execution was interrupted" + default: + errMsg = ctx.Err().Error() + } + if insideTxn { + // we can't safely kill a query in a transaction, we need to kill the connection + _ = dbc.Kill(errMsg, time.Since(now)) + } else { + _ = dbc.KillQuery(errMsg, time.Since(now)) + } +} + // ExecOnce executes the specified query, but does not retry on connection errors. func (dbc *Conn) ExecOnce(ctx context.Context, query string, maxrows int, wantfields bool) (*sqltypes.Result, error) { - return dbc.execOnce(ctx, query, maxrows, wantfields) + return dbc.execOnce(ctx, query, maxrows, wantfields, true /* Once means we are in a txn*/) } // FetchNext returns the next result set. @@ -235,6 +259,7 @@ func (dbc *Conn) Stream(ctx context.Context, query string, callback func(*sqltyp }, alloc, streamBufferSize, + false, ) switch { case err == nil: @@ -267,7 +292,14 @@ func (dbc *Conn) Stream(ctx context.Context, query string, callback func(*sqltyp panic("unreachable") } -func (dbc *Conn) streamOnce(ctx context.Context, query string, callback func(*sqltypes.Result) error, alloc func() *sqltypes.Result, streamBufferSize int) error { +func (dbc *Conn) streamOnce( + ctx context.Context, + query string, + callback func(*sqltypes.Result) error, + alloc func() *sqltypes.Result, + streamBufferSize int, + insideTxn bool, +) error { dbc.current.Store(&query) defer dbc.current.Store(nil) @@ -277,14 +309,16 @@ func (dbc *Conn) streamOnce(ctx context.Context, query string, callback func(*sq ch := make(chan error) go func() { ch <- dbc.conn.ExecuteStreamFetch(query, callback, alloc, streamBufferSize) + close(ch) }() select { case <-ctx.Done(): - killCtx, cancel := context.WithTimeout(context.Background(), dbc.killTimeout) - defer cancel() - - _ = dbc.KillWithContext(killCtx, ctx.Err().Error(), time.Since(now)) + dbc.terminate(ctx, insideTxn, now) + if !insideTxn { + // wait for the execute method to finish to make connection reusable. + <-ch + } return dbc.Err() case err := <-ch: if dbcErr := dbc.Err(); dbcErr != nil { @@ -295,7 +329,14 @@ func (dbc *Conn) streamOnce(ctx context.Context, query string, callback func(*sq } // StreamOnce executes the query and streams the results. But, does not retry on connection errors. -func (dbc *Conn) StreamOnce(ctx context.Context, query string, callback func(*sqltypes.Result) error, alloc func() *sqltypes.Result, streamBufferSize int, includedFields querypb.ExecuteOptions_IncludedFields) error { +func (dbc *Conn) StreamOnce( + ctx context.Context, + query string, + callback func(*sqltypes.Result) error, + alloc func() *sqltypes.Result, + streamBufferSize int, + includedFields querypb.ExecuteOptions_IncludedFields, +) error { resultSent := false return dbc.streamOnce( ctx, @@ -309,6 +350,7 @@ func (dbc *Conn) StreamOnce(ctx context.Context, query string, callback func(*sq }, alloc, streamBufferSize, + true, // Once means we are in a txn ) } @@ -364,7 +406,7 @@ func (dbc *Conn) Close() { // ApplySetting implements the pools.Resource interface. func (dbc *Conn) ApplySetting(ctx context.Context, setting *smartconnpool.Setting) error { - if _, err := dbc.execOnce(ctx, setting.ApplyQuery(), 1, false); err != nil { + if _, err := dbc.execOnce(ctx, setting.ApplyQuery(), 1, false, false); err != nil { return err } dbc.setting = setting @@ -373,7 +415,7 @@ func (dbc *Conn) ApplySetting(ctx context.Context, setting *smartconnpool.Settin // ResetSetting implements the pools.Resource interface. func (dbc *Conn) ResetSetting(ctx context.Context) error { - if _, err := dbc.execOnce(ctx, dbc.setting.ResetQuery(), 1, false); err != nil { + if _, err := dbc.execOnce(ctx, dbc.setting.ResetQuery(), 1, false, false); err != nil { return err } dbc.setting = nil @@ -389,25 +431,31 @@ func (dbc *Conn) IsClosed() bool { return dbc.conn.IsClosed() } -// Kill wraps KillWithContext using context.Background. +// Kill executes a kill statement to terminate the connection. func (dbc *Conn) Kill(reason string, elapsed time.Duration) error { - return dbc.KillWithContext(context.Background(), reason, elapsed) + ctx, cancel := context.WithTimeout(context.Background(), dbc.killTimeout) + defer cancel() + + return dbc.kill(ctx, reason, elapsed) } -// KillWithContext kills the currently executing query both on MySQL side -// and on the connection side. If no query is executing, it's a no-op. -// Kill will also not kill a query more than once. -func (dbc *Conn) KillWithContext(ctx context.Context, reason string, elapsed time.Duration) error { - if cause := context.Cause(ctx); cause != nil { - return cause - } +// KillQuery executes a kill query statement to terminate the running query on the connection. +func (dbc *Conn) KillQuery(reason string, elapsed time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), dbc.killTimeout) + defer cancel() - dbc.stats.KillCounters.Add("Queries", 1) - log.Infof("Due to %s, elapsed time: %v, killing query ID %v %s", reason, elapsed, dbc.conn.ID(), dbc.CurrentForLogging()) + return dbc.killQuery(ctx, reason, elapsed) +} + +// kill closes the connection and stops any executing query both on MySQL and +// vttablet. +func (dbc *Conn) kill(ctx context.Context, reason string, elapsed time.Duration) error { + dbc.stats.KillCounters.Add("Connections", 1) + log.Infof("Due to %s, elapsed time: %v, killing connection ID %v %s", reason, elapsed, dbc.conn.ID(), dbc.CurrentForLogging()) // Client side action. Set error and close connection. dbc.errmu.Lock() - dbc.err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "(errno 2013) due to %s, elapsed time: %v, killing query ID %v", reason, elapsed, dbc.conn.ID()) + dbc.err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%s, elapsed time: %v, killing connection ID %v", reason, elapsed, dbc.conn.ID()) dbc.errmu.Unlock() dbc.conn.Close() @@ -424,15 +472,58 @@ func (dbc *Conn) KillWithContext(ctx context.Context, reason string, elapsed tim go func() { _, err := killConn.Conn.ExecuteFetch(sql, -1, false) ch <- err + close(ch) }() select { case <-ctx.Done(): killConn.Close() - dbc.stats.InternalErrors.Add("HungQuery", 1) - log.Warningf("Query may be hung: %s", dbc.CurrentForLogging()) + dbc.stats.InternalErrors.Add("HungConnection", 1) + log.Warningf("Failed to kill MySQL connection ID %d which was executing the following query, it may be hung: %s", dbc.conn.ID(), dbc.CurrentForLogging()) + return context.Cause(ctx) + case err := <-ch: + if err != nil { + log.Errorf("Could not kill connection ID %v %s: %v", dbc.conn.ID(), dbc.CurrentForLogging(), err) + return err + } + return nil + } +} +// killQuery kills the currently executing query both on MySQL side +// and on the connection side. +func (dbc *Conn) killQuery(ctx context.Context, reason string, elapsed time.Duration) error { + dbc.stats.KillCounters.Add("Queries", 1) + log.Infof("Due to %s, elapsed time: %v, killing query ID %v %s", reason, elapsed, dbc.conn.ID(), dbc.CurrentForLogging()) + + // Client side action. Set error for killing the query on timeout. + dbc.errmu.Lock() + dbc.err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%s, elapsed time: %v, killing query ID %v", reason, elapsed, dbc.conn.ID()) + dbc.errmu.Unlock() + + // Server side action. Kill the executing query. + killConn, err := dbc.dbaPool.Get(ctx) + if err != nil { + log.Warningf("Failed to get conn from dba pool: %v", err) + return err + } + defer killConn.Recycle() + + ch := make(chan error) + sql := fmt.Sprintf("kill query %d", dbc.conn.ID()) + go func() { + _, err := killConn.Conn.ExecuteFetch(sql, -1, false) + ch <- err + close(ch) + }() + + select { + case <-ctx.Done(): + killConn.Close() + + dbc.stats.InternalErrors.Add("HungQuery", 1) + log.Warningf("Failed to kill MySQL query ID %d which was executing the following query, it may be hung: %s", dbc.conn.ID(), dbc.CurrentForLogging()) return context.Cause(ctx) case err := <-ch: if err != nil { @@ -503,7 +594,7 @@ func (dbc *Conn) CurrentForLogging() string { return dbc.env.Environment().Parser().TruncateForLog(queryToLog) } -func (dbc *Conn) applySameSetting(ctx context.Context) (err error) { - _, err = dbc.execOnce(ctx, dbc.setting.ApplyQuery(), 1, false) - return +func (dbc *Conn) applySameSetting(ctx context.Context) error { + _, err := dbc.execOnce(ctx, dbc.setting.ApplyQuery(), 1, false, false) + return err } diff --git a/go/vt/vttablet/tabletserver/connpool/dbconn_test.go b/go/vt/vttablet/tabletserver/connpool/dbconn_test.go index 09a85a3e11a..6f3c77de528 100644 --- a/go/vt/vttablet/tabletserver/connpool/dbconn_test.go +++ b/go/vt/vttablet/tabletserver/connpool/dbconn_test.go @@ -320,66 +320,153 @@ func TestDBKillWithContext(t *testing.T) { time.Sleep(200 * time.Millisecond) }) - ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) - defer cancel() + // set a lower timeout value + dbConn.killTimeout = 100 * time.Millisecond - // KillWithContext should return context.DeadlineExceeded - err = dbConn.KillWithContext(ctx, "test kill", 0) + // Kill should return context.DeadlineExceeded + err = dbConn.Kill("test kill", 0) require.ErrorIs(t, err, context.DeadlineExceeded) } -func TestDBKillWithContextDoneContext(t *testing.T) { - db := fakesqldb.New(t) - defer db.Close() - connPool := newPool() - params := dbconfigs.New(db.ConnParams()) - connPool.Open(params, params, params) - defer connPool.Close() - dbConn, err := newPooledConn(context.Background(), connPool, params) - if dbConn != nil { - defer dbConn.Close() +// TestDBConnCtxError tests that an Exec returns with appropriate error code. +// Also, verifies that does it wait for the query to finish before returning. +func TestDBConnCtxError(t *testing.T) { + exec := func(ctx context.Context, query string, dbconn *Conn) error { + _, err := dbconn.Exec(ctx, query, 1, false) + return err } - require.NoError(t, err) - query := fmt.Sprintf("kill %d", dbConn.ID()) - db.AddRejectedQuery(query, errors.New("rejected")) + execOnce := func(ctx context.Context, query string, dbconn *Conn) error { + _, err := dbconn.ExecOnce(ctx, query, 1, false) + return err + } + + t.Run("context cancel - non-tx exec", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(10 * time.Millisecond) + cancel() + }() + testContextError(t, ctx, exec, + "(errno 1317) (sqlstate 70100): Query execution was interrupted", + 150*time.Millisecond) + }) - contextErr := errors.New("context error") - ctx, cancel := context.WithCancelCause(context.Background()) - cancel(contextErr) // cancel the context immediately + t.Run("context deadline - non-tx exec", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + defer cancel() + testContextError(t, ctx, exec, + "(errno 3024) (sqlstate HY000): Query execution was interrupted, maximum statement execution time exceeded", + 150*time.Millisecond) + }) - // KillWithContext should return the cancellation cause - err = dbConn.KillWithContext(ctx, "test kill", 0) - require.ErrorIs(t, err, contextErr) + t.Run("context cancel - tx exec", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(10 * time.Millisecond) + cancel() + }() + testContextError(t, ctx, execOnce, + "(errno 1317) (sqlstate 70100): Query execution was interrupted", + 50*time.Millisecond) + }) + + t.Run("context deadline - tx exec", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + defer cancel() + testContextError(t, ctx, execOnce, + "(errno 3024) (sqlstate HY000): Query execution was interrupted, maximum statement execution time exceeded", + 50*time.Millisecond) + }) +} + +var alloc = func() *sqltypes.Result { + return &sqltypes.Result{} +} + +// TestDBConnStreamCtxError tests that an StreamExec returns with appropriate error code. +// Also, verifies that does it wait for the query to finish before returning. +func TestDBConnStreamCtxError(t *testing.T) { + exec := func(ctx context.Context, query string, dbconn *Conn) error { + return dbconn.Stream(ctx, query, func(result *sqltypes.Result) error { + return nil + }, alloc, 1, querypb.ExecuteOptions_ALL) + } + + execOnce := func(ctx context.Context, query string, dbconn *Conn) error { + return dbconn.StreamOnce(ctx, query, func(result *sqltypes.Result) error { + return nil + }, alloc, 1, querypb.ExecuteOptions_ALL) + } + + t.Run("context cancel - non-tx exec", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(10 * time.Millisecond) + cancel() + }() + testContextError(t, ctx, exec, + "(errno 1317) (sqlstate 70100): Query execution was interrupted", + 150*time.Millisecond) + }) + + t.Run("context deadline - non-tx exec", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + defer cancel() + testContextError(t, ctx, exec, + "(errno 3024) (sqlstate HY000): Query execution was interrupted, maximum statement execution time exceeded", + 150*time.Millisecond) + }) + + t.Run("context cancel - tx exec", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(10 * time.Millisecond) + cancel() + }() + testContextError(t, ctx, execOnce, + "(errno 1317) (sqlstate 70100): Query execution was interrupted", + 50*time.Millisecond) + }) + + t.Run("context deadline - tx exec", func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + defer cancel() + testContextError(t, ctx, execOnce, + "(errno 3024) (sqlstate HY000): Query execution was interrupted, maximum statement execution time exceeded", + 50*time.Millisecond) + }) } -// TestDBConnClose tests that an Exec returns immediately if a connection -// is asynchronously killed (and closed) in the middle of an execution. -func TestDBConnClose(t *testing.T) { +func testContextError(t *testing.T, + ctx context.Context, + exec func(context.Context, string, *Conn) error, + expErrMsg string, + expDuration time.Duration) { db := fakesqldb.New(t) defer db.Close() connPool := newPool() params := dbconfigs.New(db.ConnParams()) connPool.Open(params, params, params) defer connPool.Close() - dbConn, err := newPooledConn(context.Background(), connPool, params) - require.NoError(t, err) - defer dbConn.Close() query := "sleep" db.AddQuery(query, &sqltypes.Result{}) db.SetBeforeFunc(query, func() { time.Sleep(100 * time.Millisecond) }) + db.AddQueryPattern(`kill query \d+`, &sqltypes.Result{}) + db.AddQueryPattern(`kill \d+`, &sqltypes.Result{}) + + dbConn, err := newPooledConn(context.Background(), connPool, params) + require.NoError(t, err) + defer dbConn.Close() start := time.Now() - go func() { - time.Sleep(10 * time.Millisecond) - dbConn.Kill("test kill", 0) - }() - _, err = dbConn.Exec(context.Background(), query, 1, false) - assert.Contains(t, err.Error(), "(errno 2013) due to") - assert.True(t, time.Since(start) < 100*time.Millisecond, "%v %v", time.Since(start), 100*time.Millisecond) + err = exec(ctx, query, dbConn) + end := time.Now() + assert.ErrorContains(t, err, expErrMsg) + assert.WithinDuration(t, end, start, expDuration) } func TestDBNoPoolConnKill(t *testing.T) { @@ -463,9 +550,7 @@ func TestDBConnStream(t *testing.T) { result.Rows = append(result.Rows, r.Rows...) } return nil - }, func() *sqltypes.Result { - return &sqltypes.Result{} - }, + }, alloc, 10, querypb.ExecuteOptions_ALL) if err != nil { t.Fatalf("should not get an error, err: %v", err) @@ -490,7 +575,25 @@ func TestDBConnStream(t *testing.T) { } } -func TestDBConnStreamKill(t *testing.T) { +// TestDBConnKillCall tests that direct Kill method calls work as expected. +func TestDBConnKillCall(t *testing.T) { + t.Run("stream exec", func(t *testing.T) { + testKill(t, func(ctx context.Context, query string, dbconn *Conn) error { + return dbconn.Stream(context.Background(), query, + func(r *sqltypes.Result) error { return nil }, + alloc, 10, querypb.ExecuteOptions_ALL) + }) + }) + + t.Run("exec", func(t *testing.T) { + testKill(t, func(ctx context.Context, query string, dbconn *Conn) error { + _, err := dbconn.Exec(context.Background(), query, 1, false) + return err + }) + }) +} + +func testKill(t *testing.T, exec func(context.Context, string, *Conn) error) { db := fakesqldb.New(t) defer db.Close() sql := "select * from test_table limit 1000" @@ -500,6 +603,13 @@ func TestDBConnStreamKill(t *testing.T) { }, } db.AddQuery(sql, expectedResult) + db.SetBeforeFunc(sql, func() { + time.Sleep(100 * time.Millisecond) + }) + + db.AddQueryPattern(`kill query \d+`, &sqltypes.Result{}) + db.AddQueryPattern(`kill \d+`, &sqltypes.Result{}) + connPool := newPool() params := dbconfigs.New(db.ConnParams()) connPool.Open(params, params, params) @@ -510,20 +620,11 @@ func TestDBConnStreamKill(t *testing.T) { go func() { time.Sleep(10 * time.Millisecond) - dbConn.Kill("test kill", 0) + dbConn.Kill("kill connection called", 0) }() - err = dbConn.Stream(context.Background(), sql, - func(r *sqltypes.Result) error { - time.Sleep(100 * time.Millisecond) - return nil - }, - func() *sqltypes.Result { - return &sqltypes.Result{} - }, - 10, querypb.ExecuteOptions_ALL) - - assert.Contains(t, err.Error(), "(errno 2013) due to") + err = exec(context.Background(), sql, dbConn) + assert.ErrorContains(t, err, "kill connection called") } func TestDBConnReconnect(t *testing.T) { @@ -589,6 +690,23 @@ func TestDBConnReApplySetting(t *testing.T) { } func TestDBExecOnceKillTimeout(t *testing.T) { + executeWithTimeout(t, `kill \d+`, 150*time.Millisecond, func(ctx context.Context, dbConn *Conn) (*sqltypes.Result, error) { + return dbConn.ExecOnce(ctx, "select 1", 1, false) + }) +} + +func TestDBExecKillTimeout(t *testing.T) { + executeWithTimeout(t, `kill query \d+`, 1000*time.Millisecond, func(ctx context.Context, dbConn *Conn) (*sqltypes.Result, error) { + return dbConn.Exec(ctx, "select 1", 1, false) + }) +} + +func executeWithTimeout( + t *testing.T, + expectedKillQuery string, + responseTime time.Duration, + execute func(context.Context, *Conn) (*sqltypes.Result, error), +) { db := fakesqldb.New(t) defer db.Close() connPool := newPool() @@ -615,7 +733,8 @@ func TestDBExecOnceKillTimeout(t *testing.T) { // It should also run into a timeout. var timestampKill atomic.Int64 dbConn.killTimeout = 100 * time.Millisecond - db.AddQueryPatternWithCallback(`kill \d+`, &sqltypes.Result{}, func(string) { + + db.AddQueryPatternWithCallback(expectedKillQuery, &sqltypes.Result{}, func(string) { timestampKill.Store(time.Now().UnixMicro()) // should take longer than the configured kill timeout above. time.Sleep(200 * time.Millisecond) @@ -624,7 +743,7 @@ func TestDBExecOnceKillTimeout(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() - result, err := dbConn.ExecOnce(ctx, "select 1", 1, false) + result, err := execute(ctx, dbConn) timeDone := time.Now() require.Error(t, err) @@ -632,6 +751,10 @@ func TestDBExecOnceKillTimeout(t *testing.T) { require.Nil(t, result) timeQuery := time.UnixMicro(timestampQuery.Load()) timeKill := time.UnixMicro(timestampKill.Load()) + // In this unit test, the execution of `select 1` is blocked for 1000ms. + // The kill query gets executed after 100ms but waits for the query to return which will happen after 1000ms due to the test framework. + // In real scenario mysql will kill the query immediately and return the error. + // Here, kill call happens after 100ms but took 1000ms to complete. require.WithinDuration(t, timeQuery, timeKill, 150*time.Millisecond) - require.WithinDuration(t, timeKill, timeDone, 150*time.Millisecond) + require.WithinDuration(t, timeKill, timeDone, responseTime) } diff --git a/go/vt/vttablet/tabletserver/query_list.go b/go/vt/vttablet/tabletserver/query_list.go index a21acd6f92a..60fac1ea3af 100644 --- a/go/vt/vttablet/tabletserver/query_list.go +++ b/go/vt/vttablet/tabletserver/query_list.go @@ -26,6 +26,7 @@ import ( "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/vt/callinfo" + "vitess.io/vitess/go/vt/log" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" @@ -139,7 +140,10 @@ func (ql *QueryList) Terminate(connID int64) bool { return false } for _, qd := range qds { - _ = qd.conn.Kill("QueryList.Terminate()", time.Since(qd.start)) + err := qd.conn.Kill("QueryList.Terminate()", time.Since(qd.start)) + if err != nil { + log.Warningf("Error terminating query on connection id: %d, error: %v", qd.conn.ID(), err) + } } return true } @@ -150,7 +154,10 @@ func (ql *QueryList) TerminateAll() { defer ql.mu.Unlock() for _, qds := range ql.queryDetails { for _, qd := range qds { - _ = qd.conn.Kill("QueryList.TerminateAll()", time.Since(qd.start)) + err := qd.conn.Kill("QueryList.TerminateAll()", time.Since(qd.start)) + if err != nil { + log.Warningf("Error terminating query on connection id: %d, error: %v", qd.conn.ID(), err) + } } } }