Skip to content

Commit

Permalink
feat: set query timeout on query execution at vtgate
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal committed Sep 9, 2024
1 parent 0ef24ab commit bb03bc9
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 4 deletions.
16 changes: 13 additions & 3 deletions go/vt/vtgate/plan_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,18 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine"
"vitess.io/vitess/go/vt/vtgate/logstats"
"vitess.io/vitess/go/vt/vtgate/vtgateservice"

querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

type planExec func(ctx context.Context, plan *engine.Plan, vc *vcursorImpl, bindVars map[string]*querypb.BindVariable, startTime time.Time) error
Expand Down Expand Up @@ -92,6 +92,7 @@ func (e *Executor) newExecute(
plan *engine.Plan
)

var timeoutOnce sync.Once
for try := 0; try < MaxBufferingRetries; try++ {
if try > 0 && !vs.GetCreated().After(lastVSchemaCreated) { // We need to wait for a vschema update
// Without a wait we fail non-deterministically since the previous vschema will not have
Expand Down Expand Up @@ -140,6 +141,15 @@ func (e *Executor) newExecute(
safeSession.RecordWarning(warning)
}

// set the overall query timeout if it is not already set
if vcursor.queryTimeout > 0 {
timeoutOnce.Do(func() {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, vcursor.queryTimeout)
defer cancel()
})
}

result, err = e.handleTransactions(ctx, mysqlCtx, safeSession, plan, logStats, vcursor, stmt)
if err != nil {
return err
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type vcursorImpl struct {
vm VSchemaOperator
semTable *semantics.SemTable
warnShardedOnly bool // when using sharded only features, a warning will be warnings field
queryTimeout time.Duration

warnings []*querypb.QueryWarning // any warnings that are accumulated during the planning phase are stored here
pv plancontext.PlannerVersion
Expand Down Expand Up @@ -934,7 +935,7 @@ func (vc *vcursorImpl) SetExecQueryTimeout(timeout *int) {
var execTimeout *int
if timeout != nil {
execTimeout = timeout
} else if sessionTimeout := int(vc.safeSession.GetQueryTimeout()); sessionTimeout > 0 {
} else if sessionTimeout := vc.GetQueryTimeout(0); sessionTimeout > 0 {
execTimeout = &sessionTimeout
}

Expand All @@ -947,6 +948,7 @@ func (vc *vcursorImpl) SetExecQueryTimeout(timeout *int) {
return
}

vc.queryTimeout = time.Duration(*execTimeout) * time.Millisecond
// Set the authoritative timeout using the determined execTimeout
vc.safeSession.GetOrCreateOptions().Timeout = &querypb.ExecuteOptions_AuthoritativeTimeout{
AuthoritativeTimeout: int64(*execTimeout),
Expand Down

0 comments on commit bb03bc9

Please sign in to comment.