Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-17.0] Add session flag for stream execute grpc api (#14046) #14053

Merged
merged 3 commits into from
Sep 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions changelog/17.0/17.0.3/summary.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
## Summary

### Table of Contents

- **[Major Changes](#major-changes)**
- **[New command line flags and behavior](#new-flag)**
- [VTGate flag `--grpc-send-session-in-streaming`](#new-vtgate-streaming-sesion)

## <a id="major-changes"/>Major Changes

### <a id="new-flag"/>New command line flags and behavior

#### <a id="new-vtgate-streaming-sesion"/>VTGate GRPC stream execute session flag `--grpc-send-session-in-streaming`

This flag enables transaction support on `StreamExecute` api.
One enabled, VTGate `StreamExecute` grpc api will send session as the last packet in the response.
The client should enable it only when they have made the required changes to expect such a packet.

It is disabled by default from v17.0.3.

This was a breaking change when v17.0.0 was released was causing upgrade issue for client
who relied on a certain behaviour of receiving streaming packets on `StreamExecute` call.
2 changes: 2 additions & 0 deletions changelog/17.0/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
## v17.0
* **[17.0.3](17.0.3)**

* **[17.0.2](17.0.2)**
* [Changelog](17.0.2/changelog.md)
* [Release Notes](17.0.2/release_notes.md)
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Usage of vtgate:
--gate_query_cache_memory int gate server query cache size in bytes, maximum amount of memory to be cached. vtgate analyzes every incoming query and generate a query plan, these plans are being cached in a lru cache. This config controls the capacity of the lru cache. (default 33554432)
--gate_query_cache_size int gate server query cache size, maximum number of queries to be cached. vtgate analyzes every incoming query and generate a query plan, these plans are being cached in a cache. This config controls the expected amount of unique entries in the cache. (default 5000)
--gateway_initial_tablet_timeout duration At startup, the tabletGateway will wait up to this duration to get at least one tablet per keyspace/shard/tablet type (default 30s)
--grpc-send-session-in-streaming If set, will send the session as last packet in streaming api to support transactions in streaming
--grpc-use-effective-groups If set, and SSL is not used, will set the immediate caller's security groups from the effective caller id's groups.
--grpc-use-static-authentication-callerid If set, will set the immediate caller id to the username authenticated by the static auth plugin.
--grpc_auth_mode string Which auth plugin implementation to use (eg: static)
Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/vtgate/grpc_api/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func TestMain(m *testing.M) {
"--grpc_auth_static_password_file", grpcServerAuthStaticPath,
"--grpc_use_effective_callerid",
"--grpc-use-static-authentication-callerid",
"--grpc-send-session-in-streaming",
}

// Configure vttablet to use table ACL
Expand Down
22 changes: 14 additions & 8 deletions go/vt/vtgate/grpcvtgateservice/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,15 @@ var (
useEffective bool
useEffectiveGroups bool
useStaticAuthenticationIdentity bool

sendSessionInStreaming bool
)

func registerFlags(fs *pflag.FlagSet) {
fs.BoolVar(&useEffective, "grpc_use_effective_callerid", false, "If set, and SSL is not used, will set the immediate caller id from the effective caller id's principal.")
fs.BoolVar(&useEffectiveGroups, "grpc-use-effective-groups", false, "If set, and SSL is not used, will set the immediate caller's security groups from the effective caller id's groups.")
fs.BoolVar(&useStaticAuthenticationIdentity, "grpc-use-static-authentication-callerid", false, "If set, will set the immediate caller id to the username authenticated by the static auth plugin.")
fs.BoolVar(&sendSessionInStreaming, "grpc-send-session-in-streaming", false, "If set, will send the session as last packet in streaming api to support transactions in streaming")
}

func init() {
Expand Down Expand Up @@ -192,19 +195,22 @@ func (vtg *VTGate) StreamExecute(request *vtgatepb.StreamExecuteRequest, stream
})
})

// even if there is an error, session could have been modified.
// So, this needs to be sent back to the client. Session is sent in the last stream response.
lastErr := stream.Send(&vtgatepb.StreamExecuteResponse{
Session: session,
})

var errs []error
if vtgErr != nil {
errs = append(errs, vtgErr)
}
if lastErr != nil {
errs = append(errs, lastErr)

if sendSessionInStreaming {
// even if there is an error, session could have been modified.
// So, this needs to be sent back to the client. Session is sent in the last stream response.
lastErr := stream.Send(&vtgatepb.StreamExecuteResponse{
Session: session,
})
if lastErr != nil {
errs = append(errs, lastErr)
}
}

return vterrors.ToGRPC(vterrors.Aggregate(errs))
}

Expand Down
62 changes: 62 additions & 0 deletions go/vt/vttablet/tabletserver/exclude_race_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
//go:build !race

package tabletserver

import (
"context"
"strings"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)

// TestHandlePanicAndSendLogStatsMessageTruncation tests that when an error truncation
// length is set and a panic occurs, the code in handlePanicAndSendLogStats will
// truncate the error text in logs, but will not truncate the error text in the
// error value.
func TestHandlePanicAndSendLogStatsMessageTruncation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tl := newTestLogger()
defer tl.Close()
logStats := tabletenv.NewLogStats(ctx, "TestHandlePanicAndSendLogStatsMessageTruncation")
db, tsv := setupTabletServerTest(t, "")
defer tsv.StopService()
defer db.Close()

longSql := "select * from test_table_loooooooooooooooooooooooooooooooooooong"
longBv := map[string]*querypb.BindVariable{
"bv1": sqltypes.Int64BindVariable(1111111111),
"bv2": sqltypes.Int64BindVariable(2222222222),
"bv3": sqltypes.Int64BindVariable(3333333333),
"bv4": sqltypes.Int64BindVariable(4444444444),
}
origTruncateErrLen := sqlparser.GetTruncateErrLen()
sqlparser.SetTruncateErrLen(32)
defer sqlparser.SetTruncateErrLen(origTruncateErrLen)

defer func() {
err := logStats.Error
want := "Uncaught panic for Sql: \"select * from test_table_loooooooooooooooooooooooooooooooooooong\", BindVars: {bv1: \"type:INT64 value:\\\"1111111111\\\"\"bv2: \"type:INT64 value:\\\"2222222222\\\"\"bv3: \"type:INT64 value:\\\"3333333333\\\"\"bv4: \"type:INT64 value:\\\"4444444444\\\"\"}"
require.Error(t, err)
assert.Contains(t, err.Error(), want)
want = "Uncaught panic for Sql: \"select * from test_t [TRUNCATED]\", BindVars: {bv1: \"typ [TRUNCATED]"
gotWhatWeWant := false
for _, log := range tl.getLogs() {
if strings.HasPrefix(log, want) {
gotWhatWeWant = true
break
}
}
assert.True(t, gotWhatWeWant)
}()

defer tsv.handlePanicAndSendLogStats(longSql, longBv, logStats)
panic("panic from TestHandlePanicAndSendLogStatsMessageTruncation")
}
43 changes: 0 additions & 43 deletions go/vt/vttablet/tabletserver/tabletserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1421,49 +1421,6 @@ func TestHandleExecUnknownError(t *testing.T) {
panic("unknown exec error")
}

// TestHandlePanicAndSendLogStatsMessageTruncation tests that when an error truncation
// length is set and a panic occurs, the code in handlePanicAndSendLogStats will
// truncate the error text in logs, but will not truncate the error text in the
// error value.
func TestHandlePanicAndSendLogStatsMessageTruncation(t *testing.T) {
tl := newTestLogger()
defer tl.Close()
logStats := tabletenv.NewLogStats(ctx, "TestHandlePanicAndSendLogStatsMessageTruncation")
db, tsv := setupTabletServerTest(t, "")
defer tsv.StopService()
defer db.Close()

longSql := "select * from test_table_loooooooooooooooooooooooooooooooooooong"
longBv := map[string]*querypb.BindVariable{
"bv1": sqltypes.Int64BindVariable(1111111111),
"bv2": sqltypes.Int64BindVariable(2222222222),
"bv3": sqltypes.Int64BindVariable(3333333333),
"bv4": sqltypes.Int64BindVariable(4444444444),
}
origTruncateErrLen := sqlparser.GetTruncateErrLen()
sqlparser.SetTruncateErrLen(32)
defer sqlparser.SetTruncateErrLen(origTruncateErrLen)

defer func() {
err := logStats.Error
want := "Uncaught panic for Sql: \"select * from test_table_loooooooooooooooooooooooooooooooooooong\", BindVars: {bv1: \"type:INT64 value:\\\"1111111111\\\"\"bv2: \"type:INT64 value:\\\"2222222222\\\"\"bv3: \"type:INT64 value:\\\"3333333333\\\"\"bv4: \"type:INT64 value:\\\"4444444444\\\"\"}"
require.Error(t, err)
assert.Contains(t, err.Error(), want)
want = "Uncaught panic for Sql: \"select * from test_t [TRUNCATED]\", BindVars: {bv1: \"typ [TRUNCATED]"
gotWhatWeWant := false
for _, log := range tl.getLogs() {
if strings.HasPrefix(log, want) {
gotWhatWeWant = true
break
}
}
assert.True(t, gotWhatWeWant)
}()

defer tsv.handlePanicAndSendLogStats(longSql, longBv, logStats)
panic("panic from TestHandlePanicAndSendLogStatsMessageTruncation")
}

func TestQueryAsString(t *testing.T) {
longSql := "select * from test_table_loooooooooooooooooooooooooooooooooooong"
longBv := map[string]*querypb.BindVariable{
Expand Down