From 3877a94912187def670e60b52231102012ae5d89 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Wed, 20 Sep 2023 21:41:28 +0530 Subject: [PATCH] Add session flag for stream execute grpc api (#14046) Signed-off-by: Harshit Gangal --- changelog/17.0/17.0.3/summary.md | 19 ++++++ go/flags/endtoend/vtgate.txt | 1 + go/test/endtoend/vtgate/grpc_api/main_test.go | 1 + go/vt/vtgate/grpcvtgateservice/server.go | 22 ++++--- .../tabletserver/exclude_race_test.go | 62 +++++++++++++++++++ .../tabletserver/tabletserver_test.go | 43 ------------- 6 files changed, 97 insertions(+), 51 deletions(-) create mode 100644 changelog/17.0/17.0.3/summary.md create mode 100644 go/vt/vttablet/tabletserver/exclude_race_test.go diff --git a/changelog/17.0/17.0.3/summary.md b/changelog/17.0/17.0.3/summary.md new file mode 100644 index 00000000000..9f1c9cdc120 --- /dev/null +++ b/changelog/17.0/17.0.3/summary.md @@ -0,0 +1,19 @@ +## Summary + +### Table of Contents + +- **[Major Changes](#major-changes)** + - **[New command line flags and behavior](#new-flag)** + - [VTGate flag `--grpc-send-session-in-streaming`](#new-vtgate-streaming-sesion) + +## Major Changes + +### New command line flags and behavior + +#### VTGate GRPC stream execute session flag `--grpc-send-session-in-streaming` + +This flag enables transaction support on `StreamExecute` api. +One enabled, VTGate `StreamExecute` grpc api will send session as the last packet in the response. +The client should enable it only when they have made the required changes to expect such a packet. + +It is disabled by default. \ No newline at end of file diff --git a/go/flags/endtoend/vtgate.txt b/go/flags/endtoend/vtgate.txt index cc1aabc339e..af0fe2c3d54 100644 --- a/go/flags/endtoend/vtgate.txt +++ b/go/flags/endtoend/vtgate.txt @@ -39,6 +39,7 @@ Usage of vtgate: --gate_query_cache_memory int gate server query cache size in bytes, maximum amount of memory to be cached. vtgate analyzes every incoming query and generate a query plan, these plans are being cached in a lru cache. This config controls the capacity of the lru cache. (default 33554432) --gate_query_cache_size int gate server query cache size, maximum number of queries to be cached. vtgate analyzes every incoming query and generate a query plan, these plans are being cached in a cache. This config controls the expected amount of unique entries in the cache. (default 5000) --gateway_initial_tablet_timeout duration At startup, the tabletGateway will wait up to this duration to get at least one tablet per keyspace/shard/tablet type (default 30s) + --grpc-send-session-in-streaming If set, will send the session as last packet in streaming api to support transactions in streaming --grpc-use-effective-groups If set, and SSL is not used, will set the immediate caller's security groups from the effective caller id's groups. --grpc-use-static-authentication-callerid If set, will set the immediate caller id to the username authenticated by the static auth plugin. --grpc_auth_mode string Which auth plugin implementation to use (eg: static) diff --git a/go/test/endtoend/vtgate/grpc_api/main_test.go b/go/test/endtoend/vtgate/grpc_api/main_test.go index a51c6d9e6f2..3c8605f79a0 100644 --- a/go/test/endtoend/vtgate/grpc_api/main_test.go +++ b/go/test/endtoend/vtgate/grpc_api/main_test.go @@ -111,6 +111,7 @@ func TestMain(m *testing.M) { "--grpc_auth_static_password_file", grpcServerAuthStaticPath, "--grpc_use_effective_callerid", "--grpc-use-static-authentication-callerid", + "--grpc-send-session-in-streaming", } // Configure vttablet to use table ACL diff --git a/go/vt/vtgate/grpcvtgateservice/server.go b/go/vt/vtgate/grpcvtgateservice/server.go index 7b87b6ed708..00d24281949 100644 --- a/go/vt/vtgate/grpcvtgateservice/server.go +++ b/go/vt/vtgate/grpcvtgateservice/server.go @@ -48,12 +48,15 @@ var ( useEffective bool useEffectiveGroups bool useStaticAuthenticationIdentity bool + + sendSessionInStreaming bool ) func registerFlags(fs *pflag.FlagSet) { fs.BoolVar(&useEffective, "grpc_use_effective_callerid", false, "If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal.") fs.BoolVar(&useEffectiveGroups, "grpc-use-effective-groups", false, "If set, and SSL is not used, will set the immediate caller's security groups from the effective caller id's groups.") fs.BoolVar(&useStaticAuthenticationIdentity, "grpc-use-static-authentication-callerid", false, "If set, will set the immediate caller id to the username authenticated by the static auth plugin.") + fs.BoolVar(&sendSessionInStreaming, "grpc-send-session-in-streaming", false, "If set, will send the session as last packet in streaming api to support transactions in streaming") } func init() { @@ -192,19 +195,22 @@ func (vtg *VTGate) StreamExecute(request *vtgatepb.StreamExecuteRequest, stream }) }) - // even if there is an error, session could have been modified. - // So, this needs to be sent back to the client. Session is sent in the last stream response. - lastErr := stream.Send(&vtgatepb.StreamExecuteResponse{ - Session: session, - }) - var errs []error if vtgErr != nil { errs = append(errs, vtgErr) } - if lastErr != nil { - errs = append(errs, lastErr) + + if sendSessionInStreaming { + // even if there is an error, session could have been modified. + // So, this needs to be sent back to the client. Session is sent in the last stream response. + lastErr := stream.Send(&vtgatepb.StreamExecuteResponse{ + Session: session, + }) + if lastErr != nil { + errs = append(errs, lastErr) + } } + return vterrors.ToGRPC(vterrors.Aggregate(errs)) } diff --git a/go/vt/vttablet/tabletserver/exclude_race_test.go b/go/vt/vttablet/tabletserver/exclude_race_test.go new file mode 100644 index 00000000000..2610a98da26 --- /dev/null +++ b/go/vt/vttablet/tabletserver/exclude_race_test.go @@ -0,0 +1,62 @@ +//go:build !race + +package tabletserver + +import ( + "context" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/sqltypes" + querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" +) + +// TestHandlePanicAndSendLogStatsMessageTruncation tests that when an error truncation +// length is set and a panic occurs, the code in handlePanicAndSendLogStats will +// truncate the error text in logs, but will not truncate the error text in the +// error value. +func TestHandlePanicAndSendLogStatsMessageTruncation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tl := newTestLogger() + defer tl.Close() + logStats := tabletenv.NewLogStats(ctx, "TestHandlePanicAndSendLogStatsMessageTruncation") + db, tsv := setupTabletServerTest(t, "") + defer tsv.StopService() + defer db.Close() + + longSql := "select * from test_table_loooooooooooooooooooooooooooooooooooong" + longBv := map[string]*querypb.BindVariable{ + "bv1": sqltypes.Int64BindVariable(1111111111), + "bv2": sqltypes.Int64BindVariable(2222222222), + "bv3": sqltypes.Int64BindVariable(3333333333), + "bv4": sqltypes.Int64BindVariable(4444444444), + } + origTruncateErrLen := sqlparser.GetTruncateErrLen() + sqlparser.SetTruncateErrLen(32) + defer sqlparser.SetTruncateErrLen(origTruncateErrLen) + + defer func() { + err := logStats.Error + want := "Uncaught panic for Sql: \"select * from test_table_loooooooooooooooooooooooooooooooooooong\", BindVars: {bv1: \"type:INT64 value:\\\"1111111111\\\"\"bv2: \"type:INT64 value:\\\"2222222222\\\"\"bv3: \"type:INT64 value:\\\"3333333333\\\"\"bv4: \"type:INT64 value:\\\"4444444444\\\"\"}" + require.Error(t, err) + assert.Contains(t, err.Error(), want) + want = "Uncaught panic for Sql: \"select * from test_t [TRUNCATED]\", BindVars: {bv1: \"typ [TRUNCATED]" + gotWhatWeWant := false + for _, log := range tl.getLogs() { + if strings.HasPrefix(log, want) { + gotWhatWeWant = true + break + } + } + assert.True(t, gotWhatWeWant) + }() + + defer tsv.handlePanicAndSendLogStats(longSql, longBv, logStats) + panic("panic from TestHandlePanicAndSendLogStatsMessageTruncation") +} diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index b26baa35cac..51504ffb8ed 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -1421,49 +1421,6 @@ func TestHandleExecUnknownError(t *testing.T) { panic("unknown exec error") } -// TestHandlePanicAndSendLogStatsMessageTruncation tests that when an error truncation -// length is set and a panic occurs, the code in handlePanicAndSendLogStats will -// truncate the error text in logs, but will not truncate the error text in the -// error value. -func TestHandlePanicAndSendLogStatsMessageTruncation(t *testing.T) { - tl := newTestLogger() - defer tl.Close() - logStats := tabletenv.NewLogStats(ctx, "TestHandlePanicAndSendLogStatsMessageTruncation") - db, tsv := setupTabletServerTest(t, "") - defer tsv.StopService() - defer db.Close() - - longSql := "select * from test_table_loooooooooooooooooooooooooooooooooooong" - longBv := map[string]*querypb.BindVariable{ - "bv1": sqltypes.Int64BindVariable(1111111111), - "bv2": sqltypes.Int64BindVariable(2222222222), - "bv3": sqltypes.Int64BindVariable(3333333333), - "bv4": sqltypes.Int64BindVariable(4444444444), - } - origTruncateErrLen := sqlparser.GetTruncateErrLen() - sqlparser.SetTruncateErrLen(32) - defer sqlparser.SetTruncateErrLen(origTruncateErrLen) - - defer func() { - err := logStats.Error - want := "Uncaught panic for Sql: \"select * from test_table_loooooooooooooooooooooooooooooooooooong\", BindVars: {bv1: \"type:INT64 value:\\\"1111111111\\\"\"bv2: \"type:INT64 value:\\\"2222222222\\\"\"bv3: \"type:INT64 value:\\\"3333333333\\\"\"bv4: \"type:INT64 value:\\\"4444444444\\\"\"}" - require.Error(t, err) - assert.Contains(t, err.Error(), want) - want = "Uncaught panic for Sql: \"select * from test_t [TRUNCATED]\", BindVars: {bv1: \"typ [TRUNCATED]" - gotWhatWeWant := false - for _, log := range tl.getLogs() { - if strings.HasPrefix(log, want) { - gotWhatWeWant = true - break - } - } - assert.True(t, gotWhatWeWant) - }() - - defer tsv.handlePanicAndSendLogStats(longSql, longBv, logStats) - panic("panic from TestHandlePanicAndSendLogStatsMessageTruncation") -} - func TestQueryAsString(t *testing.T) { longSql := "select * from test_table_loooooooooooooooooooooooooooooooooooong" longBv := map[string]*querypb.BindVariable{