diff --git a/go/vt/vttablet/grpctabletconn/conn.go b/go/vt/vttablet/grpctabletconn/conn.go index f7a0801f1e7..d290862841d 100644 --- a/go/vt/vttablet/grpctabletconn/conn.go +++ b/go/vt/vttablet/grpctabletconn/conn.go @@ -185,10 +185,19 @@ func (conn *gRPCQueryClient) StreamExecute(ctx context.Context, target *querypb. if err != nil { return tabletconn.ErrorFromGRPC(err) } - if fields == nil { - fields = ser.Result.Fields + var result *sqltypes.Result + if options.RawMysqlPackets { + result, err = mysql.ParseResult(ser.Result, true) + if err != nil { + return err + } + } else { + if fields == nil { + fields = ser.Result.Fields + } + result = sqltypes.CustomProto3ToResult(fields, ser.Result) } - if err := callback(sqltypes.CustomProto3ToResult(fields, ser.Result)); err != nil { + if err := callback(result); err != nil { if err == io.EOF { return nil } @@ -560,10 +569,19 @@ func (conn *gRPCQueryClient) BeginStreamExecute(ctx context.Context, target *que return state, nil } - if fields == nil { - fields = ser.Result.Fields + var result *sqltypes.Result + if options.RawMysqlPackets { + result, err = mysql.ParseResult(ser.Result, true) + if err != nil { + return state, err + } + } else { + if fields == nil { + fields = ser.Result.Fields + } + result = sqltypes.CustomProto3ToResult(fields, ser.Result) } - if err := callback(sqltypes.CustomProto3ToResult(fields, ser.Result)); err != nil { + if err := callback(result); err != nil { if err == io.EOF { return state, nil } @@ -963,10 +981,19 @@ func (conn *gRPCQueryClient) ReserveBeginStreamExecute(ctx context.Context, targ return state, nil } - if fields == nil { - fields = ser.Result.Fields + var result *sqltypes.Result + if options.RawMysqlPackets { + result, err = mysql.ParseResult(ser.Result, true) + if err != nil { + return state, err + } + } else { + if fields == nil { + fields = ser.Result.Fields + } + result = sqltypes.CustomProto3ToResult(fields, ser.Result) } - if err := callback(sqltypes.CustomProto3ToResult(fields, ser.Result)); err != nil { + if err := callback(result); err != nil { if err == io.EOF { return state, nil } @@ -1071,10 +1098,19 @@ func (conn *gRPCQueryClient) ReserveStreamExecute(ctx context.Context, target *q return state, nil } - if fields == nil { - fields = ser.Result.Fields + var result *sqltypes.Result + if options.RawMysqlPackets { + result, err = mysql.ParseResult(ser.Result, true) + if err != nil { + return state, err + } + } else { + if fields == nil { + fields = ser.Result.Fields + } + result = sqltypes.CustomProto3ToResult(fields, ser.Result) } - if err := callback(sqltypes.CustomProto3ToResult(fields, ser.Result)); err != nil { + if err := callback(result); err != nil { if err == io.EOF { return state, nil } diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index c8ac28f804b..89f0779a467 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -201,6 +201,8 @@ func (qre *QueryExecutor) Execute() (reply *sqltypes.Result, err error) { } if qr.CachedProto == nil { // If we're not passing on the raw MySQL packets // Do we still need to figure out a way to do this? + // I don't think so, since when using raw packets we enforce this when + // parsing it in ReadQueryResultAsProto. if err := qre.verifyRowCount(int64(len(qr.Rows)), maxrows); err != nil { return nil, err }