diff --git a/go/vt/sqlparser/comments.go b/go/vt/sqlparser/comments.go index 780f1e67594..6075791baaf 100644 --- a/go/vt/sqlparser/comments.go +++ b/go/vt/sqlparser/comments.go @@ -20,6 +20,7 @@ import ( "fmt" "strconv" "strings" + "time" "unicode" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" @@ -63,7 +64,12 @@ const ( // between zero and MaxPriorityValue. MaxPriorityValue = 100 + // OptimizerHintMaxExecutionTime is the optimizer hint used in MySQL to set the max execution time for a query. + // https://dev.mysql.com/doc/refman/8.0/en/optimizer-hints.html#optimizer-hints-execution-time + OptimizerHintMaxExecutionTime = "MAX_EXECUTION_TIME" + // OptimizerHintSetVar is the optimizer hint used in MySQL to set the value of a specific session variable for a query. + // https://dev.mysql.com/doc/refman/8.0/en/optimizer-hints.html#optimizer-hints-set-var OptimizerHintSetVar = "SET_VAR" ) @@ -318,15 +324,29 @@ func (c *ParsedComments) GetMySQLSetVarValue(key string) string { return "" } +// SetMySQLMaxExecutionTimeValue sets the maximum execution time for a query using a /*+ MAX_EXECUTION_TIME() */ MySQL optimizer hint. +func (c *ParsedComments) SetMySQLMaxExecutionTimeValue(maxExecutionTime time.Duration) (newComments Comments) { + return setMySQLOptimizerHint(c.comments, OptimizerHintMaxExecutionTime, "" /* no key */, maxExecutionTime.Milliseconds()) +} + // SetMySQLSetVarValue updates or sets the value of the given variable as part of a /*+ SET_VAR() */ MySQL optimizer hint. func (c *ParsedComments) SetMySQLSetVarValue(key string, value string) (newComments Comments) { - if c == nil { + return setMySQLOptimizerHint(c.comments, OptimizerHintSetVar, key, value) +} + +// setMySQLOptimizerHint updates or sets the value of a MySQL optimizer hint. +func setMySQLOptimizerHint(comments Comments, hint, key string, value interface{}) (newComments Comments) { + keyAndValue := value + if key != "" { + keyAndValue = fmt.Sprintf("%v=%v", key, value) + } + if len(comments) == 0 { // If we have no parsed comments, then we create a new one with the required optimizer hint and return it. - newComments = append(newComments, fmt.Sprintf("/*+ %v(%v=%v) */", OptimizerHintSetVar, key, value)) + newComments = append(newComments, fmt.Sprintf("/*+ %v(%v) */", hint, keyAndValue)) return } seenFirstOhComment := false - for _, commentStr := range c.comments { + for _, commentStr := range comments { // Skip all the comments that don't start with the query optimizer prefix. // Also, since MySQL only parses the first comment that has the optimizer hint prefix and ignores the following ones, // we skip over all the comments that come after we have seen the first comment with the optimizer hint. @@ -372,10 +392,10 @@ func (c *ParsedComments) SetMySQLSetVarValue(key string, value string) (newComme finalComment += fmt.Sprintf(" %v(%v)", ohName, ohContent) } } - // If we haven't found any SET_VAR optimizer hint with the matching variable, + // If we haven't found any optimizer hint with the matching variable, // then we add a new optimizer hint to introduce this variable. if !keyPresent { - finalComment += fmt.Sprintf(" %v(%v=%v)", OptimizerHintSetVar, key, value) + finalComment += fmt.Sprintf(" %v(%v)", hint, keyAndValue) } finalComment += " */" @@ -384,7 +404,7 @@ func (c *ParsedComments) SetMySQLSetVarValue(key string, value string) (newComme // If we have not seen even a single comment that has the optimizer hint prefix, // then we add a new optimizer hint to introduce this variable. if !seenFirstOhComment { - newComments = append(newComments, fmt.Sprintf("/*+ %v(%v=%v) */", OptimizerHintSetVar, key, value)) + newComments = append(newComments, fmt.Sprintf("/*+ %v(%v) */", hint, keyAndValue)) } return newComments } diff --git a/go/vt/sqlparser/comments_test.go b/go/vt/sqlparser/comments_test.go index 42d02e35652..518b85d914b 100644 --- a/go/vt/sqlparser/comments_test.go +++ b/go/vt/sqlparser/comments_test.go @@ -19,6 +19,7 @@ package sqlparser import ( "fmt" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -600,6 +601,37 @@ func TestGetMySQLSetVarValue(t *testing.T) { } } +func TestSetMySQLMaxExecutionTimeValue(t *testing.T) { + tests := []struct { + name string + comments []string + maxExecTime time.Duration + commentsWanted Comments + }{ + { + name: "No comments", + comments: nil, + maxExecTime: time.Second * 30, + commentsWanted: []string{"/*+ MAX_EXECUTION_TIME(30000) */"}, + }, + { + name: "Add to comments", + comments: []string{"/*+ SET_VAR(sort_buffer_size = 16M) */"}, + maxExecTime: time.Minute, + commentsWanted: []string{"/*+ SET_VAR(sort_buffer_size = 16M) MAX_EXECUTION_TIME(60000) */"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &ParsedComments{ + comments: tt.comments, + } + newComments := c.SetMySQLMaxExecutionTimeValue(tt.maxExecTime) + require.EqualValues(t, tt.commentsWanted, newComments) + }) + } +} + func TestSetMySQLSetVarValue(t *testing.T) { tests := []struct { name string diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index f371d62006c..b7697916db8 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -21,15 +21,14 @@ import ( "errors" "fmt" "io" + "strconv" "strings" "sync" "time" - "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/pools/smartconnpool" - "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/callerid" @@ -41,6 +40,7 @@ import ( "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/evalengine" "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" + "vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder" p "vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder" "vitess.io/vitess/go/vt/vttablet/tabletserver/rules" eschema "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" @@ -103,6 +103,15 @@ func allocStreamResult() *sqltypes.Result { return streamResultPool.Get().(*sqltypes.Result) } +func (qre *QueryExecutor) isSelect() bool { + switch qre.plan.PlanID { + case planbuilder.PlanSelect, planbuilder.PlanSelectImpossible: + return true + default: + return false + } +} + func (qre *QueryExecutor) shouldConsolidate() bool { co := qre.options.GetConsolidator() switch co { @@ -852,14 +861,48 @@ func (qre *QueryExecutor) generateFinalSQL(parsedQuery *sqlparser.ParsedQuery, b return query, query, nil } + var mysqlOptimizerHints string + if qre.isSelect() { + mysqlOptimizerHints = buildMysqlOptimizerHints(qre.tsv) + } + var buf strings.Builder - buf.Grow(len(qre.marginComments.Leading) + len(query) + len(qre.marginComments.Trailing)) - buf.WriteString(qre.marginComments.Leading) - buf.WriteString(query) - buf.WriteString(qre.marginComments.Trailing) + if mysqlOptimizerHints != "" { + fields := strings.SplitN(query, " ", 2) + queryPrefix := fields[0] + " " + queryNonPrefix := " " + fields[1] + + buf.Grow(len(qre.marginComments.Leading) + len(queryPrefix) + len(mysqlOptimizerHints) + len(queryNonPrefix) + len(qre.marginComments.Trailing)) + buf.WriteString(qre.marginComments.Leading) + buf.WriteString(queryPrefix) + buf.WriteString(mysqlOptimizerHints) + buf.WriteString(queryNonPrefix) + buf.WriteString(qre.marginComments.Trailing) + } else { + buf.Grow(len(qre.marginComments.Leading) + len(query) + len(qre.marginComments.Trailing)) + buf.WriteString(qre.marginComments.Leading) + buf.WriteString(query) + buf.WriteString(qre.marginComments.Trailing) + } return buf.String(), query, nil } +func buildMysqlOptimizerHints(tsv *TabletServer) string { + var buf strings.Builder + if tsv.config.Oltp.QueryTimeoutPushdown { + // The MAX_EXECUTION_TIME(N) hint sets a statement execution timeout of N milliseconds. + // https://dev.mysql.com/doc/refman/8.0/en/optimizer-hints.html#optimizer-hints-execution-time + queryTimeoutStr := strconv.FormatInt(tsv.loadQueryTimeout(), 64) + buf.Grow(len(queryTimeoutStr)) + buf.WriteString(queryTimeoutStr) + } + + if len(optimizerHints) == 0 { + return "" + } + return "/*+ " + strings.Join(optimizerHints, " ") + " */" +} + func rewriteOUTParamError(err error) error { sqlErr, ok := err.(*sqlerror.SQLError) if !ok { diff --git a/go/vt/vttablet/tabletserver/query_executor_test.go b/go/vt/vttablet/tabletserver/query_executor_test.go index 0c845d7c2ae..3ffca437b5e 100644 --- a/go/vt/vttablet/tabletserver/query_executor_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_test.go @@ -40,6 +40,7 @@ import ( "vitess.io/vitess/go/vt/callinfo" "vitess.io/vitess/go/vt/callinfo/fakecallinfo" "vitess.io/vitess/go/vt/sidecardb" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/tableacl" "vitess.io/vitess/go/vt/tableacl/simpleacl" "vitess.io/vitess/go/vt/topo/memorytopo" @@ -1837,6 +1838,23 @@ func TestQueryExecSchemaReloadCount(t *testing.T) { } } +func TestGenerateFinalSQL(t *testing.T) { + // generateFinalSQL(parsedQuery *sqlparser.ParsedQuery, bindVars map[string]*querypb.BindVariable) + db := setUpQueryExecutorTest(t) + defer db.Close() + tsv := newTestTabletServer(context.Background(), noFlags, db) + defer tsv.StopService() + + qre := newTestQueryExecutor(context.Background(), tsv, `select * from something`, 0) + query, noComments, err := qre.generateFinalSQL( + &sqlparser.ParsedQuery{Query: `select * from something`}, + map[string]*querypb.BindVariable{}, + ) + assert.Nil(t, err) + assert.Equal(t, `select * from something`, query) + t.Logf("noComments: %s", noComments) +} + type mockTxThrottler struct { throttle bool } diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go index 7df97400acd..3e29503ac5c 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config.go @@ -216,8 +216,9 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) { fs.BoolVar(¤tConfig.EnableViews, "queryserver-enable-views", false, "Enable views support in vttablet.") fs.BoolVar(¤tConfig.EnablePerWorkloadTableMetrics, "enable-per-workload-table-metrics", defaultConfig.EnablePerWorkloadTableMetrics, "If true, query counts and query error metrics include a label that identifies the workload") - fs.BoolVar(¤tConfig.Unmanaged, "unmanaged", false, "Indicates an unmanaged tablet, i.e. using an external mysql-compatible database") + fs.BoolVar(¤tConfig.Oltp.QueryTimeoutPushdown, "query-timeout-pushdown", false, "Attempt to push-down timing-out of queries to MySQL with a fallback to a MySQL KILL operation.") + fs.DurationVar(¤tConfig.Oltp.QueryTimeoutPushdownWait, "query-timeout-pushdown-wait", time.Second, "Max time to wait for MySQL to kill a query before sending a fallback KILL operation. Requires --query-timeout-pushdown") } var ( @@ -565,10 +566,12 @@ func (cfg *OlapConfig) UnmarshalJSON(data []byte) (err error) { // OltpConfig contains the config for oltp settings. type OltpConfig struct { - QueryTimeout time.Duration `json:"queryTimeoutSeconds,omitempty"` - TxTimeout time.Duration `json:"txTimeoutSeconds,omitempty"` - MaxRows int `json:"maxRows,omitempty"` - WarnRows int `json:"warnRows,omitempty"` + QueryTimeout time.Duration `json:"queryTimeoutSeconds,omitempty"` + TxTimeout time.Duration `json:"txTimeoutSeconds,omitempty"` + QueryTimeoutPushdown bool `json:"queryTimeoutPushdown,omitempty"` + QueryTimeoutPushdownWait time.Duration `json:"queryTimeoutPushdownWait,omitempty"` + MaxRows int `json:"maxRows,omitempty"` + WarnRows int `json:"warnRows,omitempty"` } func (cfg *OltpConfig) MarshalJSON() ([]byte, error) { diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 8a1a45ca4a2..515a89727e9 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -236,6 +236,13 @@ func (tsv *TabletServer) loadQueryTimeout() time.Duration { return time.Duration(tsv.QueryTimeout.Load()) } +func (tsv *TabletServer) loadQueryTimeoutWithPushdownWait() time.Duration { + if tsv.config.Oltp.QueryTimeoutPushdown { + return tsv.loadQueryTimeout() + tsv.config.Oltp.QueryTimeoutPushdownWait + } + return tsv.loadQueryTimeout() +} + // onlineDDLExecutorToggleTableBuffer is called by onlineDDLExecutor as a callback function. onlineDDLExecutor // uses it to start/stop query buffering for a given table. // It is onlineDDLExecutor's responsibility to make sure buffering is stopped after some definite amount of time. @@ -493,7 +500,7 @@ func (tsv *TabletServer) Begin(ctx context.Context, target *querypb.Target, opti func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, savepointQueries []string, reservedID int64, settings []string, options *querypb.ExecuteOptions) (state queryservice.TransactionState, err error) { state.TabletAlias = tsv.alias err = tsv.execRequest( - ctx, tsv.loadQueryTimeout(), + ctx, tsv.loadQueryTimeoutWithPushdownWait(), "Begin", "begin", nil, target, options, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { @@ -795,6 +802,9 @@ func (tsv *TabletServer) Execute(ctx context.Context, target *querypb.Target, sq func (tsv *TabletServer) execute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, transactionID int64, reservedID int64, settings []string, options *querypb.ExecuteOptions) (result *sqltypes.Result, err error) { allowOnShutdown := false timeout := tsv.loadQueryTimeout() + if tsv.config.Oltp.QueryTimeoutPushdown { + return timeout + tsv.config.Oltp.QueryTimeoutPushdownWait + } if transactionID != 0 { allowOnShutdown = true // Execute calls happen for OLTP only, so we can directly fetch the