Skip to content

Commit

Permalink
added kill query method and started using kill command with timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Apr 15, 2024
1 parent 84154a8 commit d91bb0e
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 42 deletions.
4 changes: 2 additions & 2 deletions go/vt/vttablet/endtoend/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func TestConsolidatorReplicasOnly(t *testing.T) {
func TestQueryPlanCache(t *testing.T) {
var cachedPlanSize = int((&tabletserver.TabletPlan{}).CachedSize(true))

//sleep to avoid race between SchemaChanged event clearing out the plans cache which breaks this test
// sleep to avoid race between SchemaChanged event clearing out the plans cache which breaks this test
framework.Server.WaitForSchemaReset(2 * time.Second)

bindVars := map[string]*querypb.BindVariable{
Expand Down Expand Up @@ -276,7 +276,7 @@ func TestQueryTimeout(t *testing.T) {
assert.Equal(t, vtrpcpb.Code_ABORTED, vterrors.Code(err))
vend := framework.DebugVars()
verifyIntValue(t, vend, "QueryTimeout", int(100*time.Millisecond))
compareIntDiff(t, vend, "Kills/Queries", vstart, 1)
compareIntDiff(t, vend, "Kills/Connections", vstart, 1)
}

func changeVar(t *testing.T, name, value string) (revert func()) {
Expand Down
78 changes: 63 additions & 15 deletions go/vt/vttablet/tabletserver/connpool/dbconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,13 @@ func NewConn(ctx context.Context, params dbconfigs.Connector, dbaPool *dbconnpoo
}

// Err returns an error if there was a client initiated error
// like a query kill.
// like a query kill and resets the error message on the connection.
func (dbc *Conn) Err() error {
dbc.errmu.Lock()
defer dbc.errmu.Unlock()
return dbc.err
err := dbc.err
dbc.err = nil
return err
}

// Exec executes the specified query. If there is a connection error, it will reconnect
Expand Down Expand Up @@ -383,28 +385,31 @@ func (dbc *Conn) IsClosed() bool {
return dbc.conn.IsClosed()
}

// Kill wraps KillWithContext using context.Background.
// Kill executes a kill statement to terminate the connection.
func (dbc *Conn) Kill(reason string, elapsed time.Duration) error {
killCtx, cancel := context.WithTimeout(context.Background(), dbc.killTimeout)
ctx, cancel := context.WithTimeout(context.Background(), dbc.killTimeout)
defer cancel()

return dbc.KillWithContext(killCtx, reason, elapsed)
return dbc.kill(ctx, reason, elapsed)
}

// KillWithContext kills the currently executing query both on MySQL side
// and on the connection side. If no query is executing, it's a no-op.
// Kill will also not kill a query more than once.
func (dbc *Conn) KillWithContext(ctx context.Context, reason string, elapsed time.Duration) error {
if cause := context.Cause(ctx); cause != nil {
return cause
}
// KillQuery executes a kill query statement to terminate the running query on the connection.
func (dbc *Conn) KillQuery(reason string, elapsed time.Duration) error {
ctx, cancel := context.WithTimeout(context.Background(), dbc.killTimeout)
defer cancel()

dbc.stats.KillCounters.Add("Queries", 1)
log.Infof("Due to %s, elapsed time: %v, killing query ID %v %s", reason, elapsed, dbc.conn.ID(), dbc.CurrentForLogging())
return dbc.killQuery(ctx, reason, elapsed)
}

// kill closes the connection and stops any executing query both on MySQL and
// vttablet.
func (dbc *Conn) kill(ctx context.Context, reason string, elapsed time.Duration) error {
dbc.stats.KillCounters.Add("Connections", 1)
log.Infof("Due to %s, elapsed time: %v, killing connection ID %v %s", reason, elapsed, dbc.conn.ID(), dbc.CurrentForLogging())

// Client side action. Set error and close connection.
dbc.errmu.Lock()
dbc.err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "(errno 2013) due to %s, elapsed time: %v, killing query ID %v", reason, elapsed, dbc.conn.ID())
dbc.err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "(errno 2013) due to %s, elapsed time: %v, killing connection ID %v", reason, elapsed, dbc.conn.ID())
dbc.errmu.Unlock()
dbc.conn.Close()

Expand All @@ -423,6 +428,49 @@ func (dbc *Conn) KillWithContext(ctx context.Context, reason string, elapsed tim
ch <- err
}()

select {
case <-ctx.Done():
killConn.Close()

dbc.stats.InternalErrors.Add("HungConnection", 1)
log.Warningf("Connection may be hung: %s", dbc.CurrentForLogging())

return context.Cause(ctx)
case err := <-ch:
if err != nil {
log.Errorf("Could not kill connection ID %v %s: %v", dbc.conn.ID(), dbc.CurrentForLogging(), err)
return err
}
return nil
}
}

// killQuery kills the currently executing query both on MySQL side
// and on the connection side.
func (dbc *Conn) killQuery(ctx context.Context, reason string, elapsed time.Duration) error {
dbc.stats.KillCounters.Add("Queries", 1)
log.Infof("Due to %s, elapsed time: %v, killing query ID %v %s", reason, elapsed, dbc.conn.ID(), dbc.CurrentForLogging())

// Client side action. Set error for killing the query on timeout.
dbc.errmu.Lock()
dbc.err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "(errno 3024) due to %s, elapsed time: %v, killing query ID %v", reason, elapsed, dbc.conn.ID())
dbc.errmu.Unlock()

// Server side action. Kill the executing query.
killConn, err := dbc.dbaPool.Get(ctx)
if err != nil {
log.Warningf("Failed to get conn from dba pool: %v", err)
return err
}
defer killConn.Recycle()

ch := make(chan error)
sql := fmt.Sprintf("kill query %d", dbc.conn.ID())
go func() {
_, err := killConn.Conn.ExecuteFetch(sql, -1, false)
ch <- err
}()

select {
case <-ctx.Done():
killConn.Close()
Expand Down
25 changes: 0 additions & 25 deletions go/vt/vttablet/tabletserver/connpool/dbconn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,31 +328,6 @@ func TestDBKillWithContext(t *testing.T) {
require.ErrorIs(t, err, context.DeadlineExceeded)
}

func TestDBKillWithContextDoneContext(t *testing.T) {
db := fakesqldb.New(t)
defer db.Close()
connPool := newPool()
params := dbconfigs.New(db.ConnParams())
connPool.Open(params, params, params)
defer connPool.Close()
dbConn, err := newPooledConn(context.Background(), connPool, params)
if dbConn != nil {
defer dbConn.Close()
}
require.NoError(t, err)

query := fmt.Sprintf("kill %d", dbConn.ID())
db.AddRejectedQuery(query, errors.New("rejected"))

contextErr := errors.New("context error")
ctx, cancel := context.WithCancelCause(context.Background())
cancel(contextErr) // cancel the context immediately

// KillWithContext should return the cancellation cause
err = dbConn.KillWithContext(ctx, "test kill", 0)
require.ErrorIs(t, err, contextErr)
}

// TestDBConnClose tests that an Exec returns immediately if a connection
// is asynchronously killed (and closed) in the middle of an execution.
func TestDBConnClose(t *testing.T) {
Expand Down

0 comments on commit d91bb0e

Please sign in to comment.