From bb03bc9980892051e92214b661554919e79fc416 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 9 Sep 2024 21:57:18 +0530 Subject: [PATCH] feat: set query timeout on query execution at vtgate Signed-off-by: Harshit Gangal --- go/vt/vtgate/plan_execute.go | 16 +++++++++++++--- go/vt/vtgate/vcursor_impl.go | 4 +++- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go index 199892842ee..22e71b3234c 100644 --- a/go/vt/vtgate/plan_execute.go +++ b/go/vt/vtgate/plan_execute.go @@ -20,18 +20,18 @@ import ( "context" "fmt" "strings" + "sync" "time" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/log" + querypb "vitess.io/vitess/go/vt/proto/query" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/engine" "vitess.io/vitess/go/vt/vtgate/logstats" "vitess.io/vitess/go/vt/vtgate/vtgateservice" - - querypb "vitess.io/vitess/go/vt/proto/query" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) type planExec func(ctx context.Context, plan *engine.Plan, vc *vcursorImpl, bindVars map[string]*querypb.BindVariable, startTime time.Time) error @@ -92,6 +92,7 @@ func (e *Executor) newExecute( plan *engine.Plan ) + var timeoutOnce sync.Once for try := 0; try < MaxBufferingRetries; try++ { if try > 0 && !vs.GetCreated().After(lastVSchemaCreated) { // We need to wait for a vschema update // Without a wait we fail non-deterministically since the previous vschema will not have @@ -140,6 +141,15 @@ func (e *Executor) newExecute( safeSession.RecordWarning(warning) } + // set the overall query timeout if it is not already set + if vcursor.queryTimeout > 0 { + timeoutOnce.Do(func() { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, vcursor.queryTimeout) + defer cancel() + }) + } + result, err = e.handleTransactions(ctx, mysqlCtx, safeSession, plan, logStats, vcursor, stmt) if err != nil { return err diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index 6f63e13a6b3..661408a13c8 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -122,6 +122,7 @@ type vcursorImpl struct { vm VSchemaOperator semTable *semantics.SemTable warnShardedOnly bool // when using sharded only features, a warning will be warnings field + queryTimeout time.Duration warnings []*querypb.QueryWarning // any warnings that are accumulated during the planning phase are stored here pv plancontext.PlannerVersion @@ -934,7 +935,7 @@ func (vc *vcursorImpl) SetExecQueryTimeout(timeout *int) { var execTimeout *int if timeout != nil { execTimeout = timeout - } else if sessionTimeout := int(vc.safeSession.GetQueryTimeout()); sessionTimeout > 0 { + } else if sessionTimeout := vc.GetQueryTimeout(0); sessionTimeout > 0 { execTimeout = &sessionTimeout } @@ -947,6 +948,7 @@ func (vc *vcursorImpl) SetExecQueryTimeout(timeout *int) { return } + vc.queryTimeout = time.Duration(*execTimeout) * time.Millisecond // Set the authoritative timeout using the determined execTimeout vc.safeSession.GetOrCreateOptions().Timeout = &querypb.ExecuteOptions_AuthoritativeTimeout{ AuthoritativeTimeout: int64(*execTimeout),