diff --git a/go/vt/vttablet/endtoend/config_test.go b/go/vt/vttablet/endtoend/config_test.go index 9eef54bd0bb..b1dc7f5dcb9 100644 --- a/go/vt/vttablet/endtoend/config_test.go +++ b/go/vt/vttablet/endtoend/config_test.go @@ -195,7 +195,7 @@ func TestConsolidatorReplicasOnly(t *testing.T) { func TestQueryPlanCache(t *testing.T) { var cachedPlanSize = int((&tabletserver.TabletPlan{}).CachedSize(true)) - //sleep to avoid race between SchemaChanged event clearing out the plans cache which breaks this test + // sleep to avoid race between SchemaChanged event clearing out the plans cache which breaks this test framework.Server.WaitForSchemaReset(2 * time.Second) bindVars := map[string]*querypb.BindVariable{ @@ -276,7 +276,7 @@ func TestQueryTimeout(t *testing.T) { assert.Equal(t, vtrpcpb.Code_ABORTED, vterrors.Code(err)) vend := framework.DebugVars() verifyIntValue(t, vend, "QueryTimeout", int(100*time.Millisecond)) - compareIntDiff(t, vend, "Kills/Queries", vstart, 1) + compareIntDiff(t, vend, "Kills/Connections", vstart, 1) } func changeVar(t *testing.T, name, value string) (revert func()) { diff --git a/go/vt/vttablet/tabletserver/connpool/dbconn.go b/go/vt/vttablet/tabletserver/connpool/dbconn.go index 51ff7a5b593..61aa118d793 100644 --- a/go/vt/vttablet/tabletserver/connpool/dbconn.go +++ b/go/vt/vttablet/tabletserver/connpool/dbconn.go @@ -108,11 +108,13 @@ func NewConn(ctx context.Context, params dbconfigs.Connector, dbaPool *dbconnpoo } // Err returns an error if there was a client initiated error -// like a query kill. +// like a query kill and resets the error message on the connection. func (dbc *Conn) Err() error { dbc.errmu.Lock() defer dbc.errmu.Unlock() - return dbc.err + err := dbc.err + dbc.err = nil + return err } // Exec executes the specified query. If there is a connection error, it will reconnect @@ -383,28 +385,31 @@ func (dbc *Conn) IsClosed() bool { return dbc.conn.IsClosed() } -// Kill wraps KillWithContext using context.Background. +// Kill executes a kill statement to terminate the connection. func (dbc *Conn) Kill(reason string, elapsed time.Duration) error { - killCtx, cancel := context.WithTimeout(context.Background(), dbc.killTimeout) + ctx, cancel := context.WithTimeout(context.Background(), dbc.killTimeout) defer cancel() - return dbc.KillWithContext(killCtx, reason, elapsed) + return dbc.kill(ctx, reason, elapsed) } -// KillWithContext kills the currently executing query both on MySQL side -// and on the connection side. If no query is executing, it's a no-op. -// Kill will also not kill a query more than once. -func (dbc *Conn) KillWithContext(ctx context.Context, reason string, elapsed time.Duration) error { - if cause := context.Cause(ctx); cause != nil { - return cause - } +// KillQuery executes a kill query statement to terminate the running query on the connection. +func (dbc *Conn) KillQuery(reason string, elapsed time.Duration) error { + ctx, cancel := context.WithTimeout(context.Background(), dbc.killTimeout) + defer cancel() - dbc.stats.KillCounters.Add("Queries", 1) - log.Infof("Due to %s, elapsed time: %v, killing query ID %v %s", reason, elapsed, dbc.conn.ID(), dbc.CurrentForLogging()) + return dbc.killQuery(ctx, reason, elapsed) +} + +// kill closes the connection and stops any executing query both on MySQL and +// vttablet. +func (dbc *Conn) kill(ctx context.Context, reason string, elapsed time.Duration) error { + dbc.stats.KillCounters.Add("Connections", 1) + log.Infof("Due to %s, elapsed time: %v, killing connection ID %v %s", reason, elapsed, dbc.conn.ID(), dbc.CurrentForLogging()) // Client side action. Set error and close connection. dbc.errmu.Lock() - dbc.err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "(errno 2013) due to %s, elapsed time: %v, killing query ID %v", reason, elapsed, dbc.conn.ID()) + dbc.err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "(errno 2013) due to %s, elapsed time: %v, killing connection ID %v", reason, elapsed, dbc.conn.ID()) dbc.errmu.Unlock() dbc.conn.Close() @@ -423,6 +428,49 @@ func (dbc *Conn) KillWithContext(ctx context.Context, reason string, elapsed tim ch <- err }() + select { + case <-ctx.Done(): + killConn.Close() + + dbc.stats.InternalErrors.Add("HungConnection", 1) + log.Warningf("Connection may be hung: %s", dbc.CurrentForLogging()) + + return context.Cause(ctx) + case err := <-ch: + if err != nil { + log.Errorf("Could not kill connection ID %v %s: %v", dbc.conn.ID(), dbc.CurrentForLogging(), err) + return err + } + return nil + } +} + +// killQuery kills the currently executing query both on MySQL side +// and on the connection side. +func (dbc *Conn) killQuery(ctx context.Context, reason string, elapsed time.Duration) error { + dbc.stats.KillCounters.Add("Queries", 1) + log.Infof("Due to %s, elapsed time: %v, killing query ID %v %s", reason, elapsed, dbc.conn.ID(), dbc.CurrentForLogging()) + + // Client side action. Set error for killing the query on timeout. + dbc.errmu.Lock() + dbc.err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "(errno 3024) due to %s, elapsed time: %v, killing query ID %v", reason, elapsed, dbc.conn.ID()) + dbc.errmu.Unlock() + + // Server side action. Kill the executing query. + killConn, err := dbc.dbaPool.Get(ctx) + if err != nil { + log.Warningf("Failed to get conn from dba pool: %v", err) + return err + } + defer killConn.Recycle() + + ch := make(chan error) + sql := fmt.Sprintf("kill query %d", dbc.conn.ID()) + go func() { + _, err := killConn.Conn.ExecuteFetch(sql, -1, false) + ch <- err + }() + select { case <-ctx.Done(): killConn.Close() diff --git a/go/vt/vttablet/tabletserver/connpool/dbconn_test.go b/go/vt/vttablet/tabletserver/connpool/dbconn_test.go index 821a38eb168..e5e15cb0903 100644 --- a/go/vt/vttablet/tabletserver/connpool/dbconn_test.go +++ b/go/vt/vttablet/tabletserver/connpool/dbconn_test.go @@ -328,31 +328,6 @@ func TestDBKillWithContext(t *testing.T) { require.ErrorIs(t, err, context.DeadlineExceeded) } -func TestDBKillWithContextDoneContext(t *testing.T) { - db := fakesqldb.New(t) - defer db.Close() - connPool := newPool() - params := dbconfigs.New(db.ConnParams()) - connPool.Open(params, params, params) - defer connPool.Close() - dbConn, err := newPooledConn(context.Background(), connPool, params) - if dbConn != nil { - defer dbConn.Close() - } - require.NoError(t, err) - - query := fmt.Sprintf("kill %d", dbConn.ID()) - db.AddRejectedQuery(query, errors.New("rejected")) - - contextErr := errors.New("context error") - ctx, cancel := context.WithCancelCause(context.Background()) - cancel(contextErr) // cancel the context immediately - - // KillWithContext should return the cancellation cause - err = dbConn.KillWithContext(ctx, "test kill", 0) - require.ErrorIs(t, err, contextErr) -} - // TestDBConnClose tests that an Exec returns immediately if a connection // is asynchronously killed (and closed) in the middle of an execution. func TestDBConnClose(t *testing.T) {