From f3c34f5072523e2e19286b9f14629ab1592d3bf5 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Thu, 9 May 2024 15:43:38 +0530 Subject: [PATCH] allow query timeout hints on shard targeting (#15898) Signed-off-by: Harshit Gangal --- .../vtgate/queries/timeout/timeout_test.go | 30 +++++++ go/vt/vtgate/engine/cached_size.go | 2 +- go/vt/vtgate/engine/send.go | 6 +- go/vt/vtgate/planbuilder/bypass.go | 10 ++- .../testdata/bypass_shard_cases.json | 79 ++++++++++++++++++- 5 files changed, 123 insertions(+), 4 deletions(-) diff --git a/go/test/endtoend/vtgate/queries/timeout/timeout_test.go b/go/test/endtoend/vtgate/queries/timeout/timeout_test.go index bab9699d027..a8202cd5593 100644 --- a/go/test/endtoend/vtgate/queries/timeout/timeout_test.go +++ b/go/test/endtoend/vtgate/queries/timeout/timeout_test.go @@ -97,3 +97,33 @@ func TestQueryTimeoutWithTables(t *testing.T) { assert.Contains(t, err.Error(), "context deadline exceeded") assert.Contains(t, err.Error(), "(errno 1317) (sqlstate 70100)") } + +// TestQueryTimeoutWithShardTargeting tests the query timeout with shard targeting. +func TestQueryTimeoutWithShardTargeting(t *testing.T) { + utils.SkipIfBinaryIsBelowVersion(t, 20, "vtgate") + + mcmp, closer := start(t) + defer closer() + + // shard targeting to -80 shard. + utils.Exec(t, mcmp.VtConn, "use `ks_misc/-80`") + + // insert some data + utils.Exec(t, mcmp.VtConn, "insert into t1(id1, id2) values (1,2),(3,4),(4,5),(5,6)") + + // insert + _, err := utils.ExecAllowError(t, mcmp.VtConn, "insert /*vt+ QUERY_TIMEOUT_MS=1 */ into t1(id1, id2) values (6,sleep(5))") + assert.ErrorContains(t, err, "context deadline exceeded (errno 1317) (sqlstate 70100)") + + // update + _, err = utils.ExecAllowError(t, mcmp.VtConn, "update /*vt+ QUERY_TIMEOUT_MS=1 */ t1 set id2 = sleep(5)") + assert.ErrorContains(t, err, "context deadline exceeded (errno 1317) (sqlstate 70100)") + + // delete + _, err = utils.ExecAllowError(t, mcmp.VtConn, "delete /*vt+ QUERY_TIMEOUT_MS=1 */ from t1 where id2 = sleep(5)") + assert.ErrorContains(t, err, "context deadline exceeded (errno 1317) (sqlstate 70100)") + + // select + _, err = utils.ExecAllowError(t, mcmp.VtConn, "select /*vt+ QUERY_TIMEOUT_MS=1 */ 1 from t1 where id2 = 5 and sleep(100)") + assert.ErrorContains(t, err, "context deadline exceeded (errno 1317) (sqlstate 70100)") +} diff --git a/go/vt/vtgate/engine/cached_size.go b/go/vt/vtgate/engine/cached_size.go index 18e22c00378..c1b72382461 100644 --- a/go/vt/vtgate/engine/cached_size.go +++ b/go/vt/vtgate/engine/cached_size.go @@ -1091,7 +1091,7 @@ func (cached *Send) CachedSize(alloc bool) int64 { } size := int64(0) if alloc { - size += int64(48) + size += int64(64) } // field Keyspace *vitess.io/vitess/go/vt/vtgate/vindexes.Keyspace size += cached.Keyspace.CachedSize(true) diff --git a/go/vt/vtgate/engine/send.go b/go/vt/vtgate/engine/send.go index 53f12cd3549..31c9e9e0eb0 100644 --- a/go/vt/vtgate/engine/send.go +++ b/go/vt/vtgate/engine/send.go @@ -57,6 +57,9 @@ type Send struct { MultishardAutocommit bool ReservedConnectionNeeded bool + + // QueryTimeout contains the optional timeout (in milliseconds) to apply to this query + QueryTimeout int } // ShardName as key for setting shard name in bind variables map @@ -88,7 +91,7 @@ func (s *Send) GetTableName() string { // TryExecute implements Primitive interface func (s *Send) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { - ctx, cancelFunc := addQueryTimeout(ctx, vcursor, 0) + ctx, cancelFunc := addQueryTimeout(ctx, vcursor, s.QueryTimeout) defer cancelFunc() rss, err := s.checkAndReturnShards(ctx, vcursor) @@ -192,6 +195,7 @@ func (s *Send) description() PrimitiveDescription { "ShardNameNeeded": s.ShardNameNeeded, "MultishardAutocommit": s.MultishardAutocommit, "ReservedConnectionNeeded": s.ReservedConnectionNeeded, + "QueryTimeout": s.QueryTimeout, } return PrimitiveDescription{ OperatorType: "Send", diff --git a/go/vt/vtgate/planbuilder/bypass.go b/go/vt/vtgate/planbuilder/bypass.go index 52286816a11..62cae9655b1 100644 --- a/go/vt/vtgate/planbuilder/bypass.go +++ b/go/vt/vtgate/planbuilder/bypass.go @@ -49,13 +49,21 @@ func buildPlanForBypass(stmt sqlparser.Statement, _ *sqlparser.ReservedVars, vsc } } + hints := &queryHints{} + if comments, ok := stmt.(sqlparser.Commented); ok { + if qh := getHints(comments.GetParsedComments()); qh != nil { + hints = qh + } + } + send := &engine.Send{ Keyspace: keyspace, TargetDestination: vschema.Destination(), Query: sqlparser.String(stmt), IsDML: sqlparser.IsDMLStatement(stmt), SingleShardOnly: false, - MultishardAutocommit: sqlparser.MultiShardAutocommitDirective(stmt), + MultishardAutocommit: hints.multiShardAutocommit, + QueryTimeout: hints.queryTimeout, } return newPlanResult(send), nil } diff --git a/go/vt/vtgate/planbuilder/testdata/bypass_shard_cases.json b/go/vt/vtgate/planbuilder/testdata/bypass_shard_cases.json index 6f2be325b6b..02a00444724 100644 --- a/go/vt/vtgate/planbuilder/testdata/bypass_shard_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/bypass_shard_cases.json @@ -123,6 +123,7 @@ } }, { + "comment": "load data from s3 'x.txt' into table x", "query": "load data from s3 'x.txt' into table x", "plan": { "QueryType": "OTHER", @@ -141,6 +142,7 @@ } }, { + "comment": "load data from s3 'x.txt'", "query": "load data from s3 'x.txt'", "plan": { "QueryType": "OTHER", @@ -174,5 +176,80 @@ "Query": "create /* test */ table t1(id bigint, primary key(id)) /* comments */" } } + }, + { + "comment": "select bypass with query timeout hint", + "query": "select /*vt+ QUERY_TIMEOUT_MS=100 */ count(*), col from user", + "plan": { + "QueryType": "SELECT", + "Original": "select /*vt+ QUERY_TIMEOUT_MS=100 */ count(*), col from user", + "Instructions": { + "OperatorType": "Send", + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "TargetDestination": "Shard(-80)", + "Query": "select /*vt+ QUERY_TIMEOUT_MS=100 */ count(*), col from `user`", + "QueryTimeout": 100 + } + } + }, + { + "comment": "update bypass with query timeout hint", + "query": "update /*vt+ QUERY_TIMEOUT_MS=100 */ user set val = 1 where id = 1", + "plan": { + "QueryType": "UPDATE", + "Original": "update /*vt+ QUERY_TIMEOUT_MS=100 */ user set val = 1 where id = 1", + "Instructions": { + "OperatorType": "Send", + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "TargetDestination": "Shard(-80)", + "IsDML": true, + "Query": "update /*vt+ QUERY_TIMEOUT_MS=100 */ `user` set val = 1 where id = 1", + "QueryTimeout": 100 + } + } + }, + { + "comment": "delete bypass with query timeout hint", + "query": "DELETE /*vt+ QUERY_TIMEOUT_MS=100 */ FROM USER WHERE ID = 42", + "plan": { + "QueryType": "DELETE", + "Original": "DELETE /*vt+ QUERY_TIMEOUT_MS=100 */ FROM USER WHERE ID = 42", + "Instructions": { + "OperatorType": "Send", + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "TargetDestination": "Shard(-80)", + "IsDML": true, + "Query": "delete /*vt+ QUERY_TIMEOUT_MS=100 */ from `USER` where ID = 42", + "QueryTimeout": 100 + } + } + }, + { + "comment": "insert bypass with query timeout hint", + "query": "INSERT /*vt+ QUERY_TIMEOUT_MS=100 */ INTO USER (ID, NAME) VALUES (42, 'ms X')", + "plan": { + "QueryType": "INSERT", + "Original": "INSERT /*vt+ QUERY_TIMEOUT_MS=100 */ INTO USER (ID, NAME) VALUES (42, 'ms X')", + "Instructions": { + "OperatorType": "Send", + "Keyspace": { + "Name": "main", + "Sharded": false + }, + "TargetDestination": "Shard(-80)", + "IsDML": true, + "Query": "insert /*vt+ QUERY_TIMEOUT_MS=100 */ into `USER`(ID, `NAME`) values (42, 'ms X')", + "QueryTimeout": 100 + } + } } -] \ No newline at end of file +]