diff --git a/go.mod b/go.mod index d489ed3dd6e..856cece7dfe 100644 --- a/go.mod +++ b/go.mod @@ -114,6 +114,7 @@ require ( github.com/nsf/jsondiff v0.0.0-20210926074059-1e845ec5d249 github.com/spyzhov/ajson v0.8.0 golang.org/x/exp v0.0.0-20230131160201-f062dba9d201 + golang.org/x/sync v0.3.0 golang.org/x/tools/cmd/cover v0.1.0-deprecated modernc.org/sqlite v1.20.3 ) @@ -195,7 +196,6 @@ require ( go4.org/intern v0.0.0-20220617035311-6925f38cc365 // indirect go4.org/unsafe/assume-no-moving-gc v0.0.0-20220617031537-928513b29760 // indirect golang.org/x/exp/typeparams v0.0.0-20230131160201-f062dba9d201 // indirect - golang.org/x/sync v0.3.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b // indirect diff --git a/go/sqltypes/parse_rows.go b/go/sqltypes/parse_rows.go new file mode 100644 index 00000000000..2654141ed3b --- /dev/null +++ b/go/sqltypes/parse_rows.go @@ -0,0 +1,174 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sqltypes + +import ( + "fmt" + "io" + "reflect" + "strconv" + "strings" + "text/scanner" + + querypb "vitess.io/vitess/go/vt/proto/query" +) + +// ParseRows parses the output generated by fmt.Sprintf("#v", rows), and reifies the original []sqltypes.Row +// NOTE: This is not meant for production use! +func ParseRows(input string) ([]Row, error) { + type state int + const ( + stInvalid state = iota + stInit + stBeginRow + stInRow + stInValue0 + stInValue1 + stInValue2 + ) + + var ( + scan scanner.Scanner + result []Row + row Row + vtype int32 + st = stInit + ) + + scan.Init(strings.NewReader(input)) + + for tok := scan.Scan(); tok != scanner.EOF; tok = scan.Scan() { + var next state + + switch st { + case stInit: + if tok == '[' { + next = stBeginRow + } + case stBeginRow: + switch tok { + case '[': + next = stInRow + case ']': + return result, nil + } + case stInRow: + switch tok { + case ']': + result = append(result, row) + row = nil + next = stBeginRow + case scanner.Ident: + ident := scan.TokenText() + + if ident == "NULL" { + row = append(row, NULL) + continue + } + + var ok bool + vtype, ok = querypb.Type_value[ident] + if !ok { + return nil, fmt.Errorf("unknown SQL type %q at %s", ident, scan.Position) + } + next = stInValue0 + } + case stInValue0: + if tok == '(' { + next = stInValue1 + } + case stInValue1: + literal := scan.TokenText() + switch tok { + case scanner.String: + var err error + literal, err = strconv.Unquote(literal) + if err != nil { + return nil, fmt.Errorf("failed to parse literal string at %s: %w", scan.Position, err) + } + fallthrough + case scanner.Int, scanner.Float: + row = append(row, MakeTrusted(Type(vtype), []byte(literal))) + next = stInValue2 + } + case stInValue2: + if tok == ')' { + next = stInRow + } + } + if next == stInvalid { + return nil, fmt.Errorf("unexpected token '%s' at %s", scan.TokenText(), scan.Position) + } + st = next + } + return nil, io.ErrUnexpectedEOF +} + +type RowMismatchError struct { + err error + want, got []Row +} + +func (e *RowMismatchError) Error() string { + return fmt.Sprintf("results differ: %v\n\twant: %v\n\tgot: %v", e.err, e.want, e.got) +} + +func RowsEquals(want, got []Row) error { + if len(want) != len(got) { + return &RowMismatchError{ + err: fmt.Errorf("expected %d rows in result, got %d", len(want), len(got)), + want: want, + got: got, + } + } + + var matched = make([]bool, len(want)) + for _, aa := range want { + var ok bool + for i, bb := range got { + if matched[i] { + continue + } + if reflect.DeepEqual(aa, bb) { + matched[i] = true + ok = true + break + } + } + if !ok { + return &RowMismatchError{ + err: fmt.Errorf("row %v is missing from result", aa), + want: want, + got: got, + } + } + } + for _, m := range matched { + if !m { + return fmt.Errorf("not all elements matched") + } + } + return nil +} + +func RowsEqualsStr(wantStr string, got []Row) error { + want, err := ParseRows(wantStr) + if err != nil { + return fmt.Errorf("malformed row assertion: %w", err) + } + return RowsEquals(want, got) +} diff --git a/go/vt/vtgate/engine/distinct.go b/go/vt/vtgate/engine/distinct.go index 5baa7ca9c1f..b00430369e1 100644 --- a/go/vt/vtgate/engine/distinct.go +++ b/go/vt/vtgate/engine/distinct.go @@ -19,6 +19,7 @@ package engine import ( "context" "fmt" + "sync" "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/sqltypes" @@ -197,13 +198,16 @@ func (d *Distinct) TryExecute(ctx context.Context, vcursor VCursor, bindVars map // TryStreamExecute implements the Primitive interface func (d *Distinct) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { - pt := newProbeTable(d.CheckCols) + var mu sync.Mutex + pt := newProbeTable(d.CheckCols) err := vcursor.StreamExecutePrimitive(ctx, d.Source, bindVars, wantfields, func(input *sqltypes.Result) error { result := &sqltypes.Result{ Fields: input.Fields, InsertID: input.InsertID, } + mu.Lock() + defer mu.Unlock() for _, row := range input.Rows { exists, err := pt.exists(row) if err != nil { diff --git a/go/vt/vtgate/engine/distinct_test.go b/go/vt/vtgate/engine/distinct_test.go index 5e39d2c4425..45bef7efe2a 100644 --- a/go/vt/vtgate/engine/distinct_test.go +++ b/go/vt/vtgate/engine/distinct_test.go @@ -129,6 +129,59 @@ func TestDistinct(t *testing.T) { } } +func TestDistinctStreamAsync(t *testing.T) { + distinct := &Distinct{ + Source: &fakePrimitive{ + results: sqltypes.MakeTestStreamingResults(sqltypes.MakeTestFields("myid|id|num|name", "varchar|int64|int64|varchar"), + "a|1|1|a", + "a|1|1|a", + "a|1|1|a", + "a|1|1|a", + "---", + "c|1|1|a", + "a|1|1|a", + "z|1|1|a", + "a|1|1|t", + "a|1|1|a", + "a|1|1|a", + "a|1|1|a", + "---", + "c|1|1|a", + "a|1|1|a", + "---", + "c|1|1|a", + "a|1|1|a", + "a|1|1|a", + "c|1|1|a", + "a|1|1|a", + "a|1|1|a", + "---", + "c|1|1|a", + "a|1|1|a", + ), + async: true, + }, + CheckCols: []CheckCol{ + {Col: 0, Collation: collations.CollationUtf8mb4ID}, + {Col: 1, Collation: collations.CollationBinaryID}, + {Col: 2, Collation: collations.CollationBinaryID}, + {Col: 3, Collation: collations.CollationUtf8mb4ID}, + }, + } + + qr := &sqltypes.Result{} + err := distinct.TryStreamExecute(context.Background(), &noopVCursor{}, nil, true, func(result *sqltypes.Result) error { + qr.Rows = append(qr.Rows, result.Rows...) + return nil + }) + require.NoError(t, err) + require.NoError(t, sqltypes.RowsEqualsStr(` +[[VARCHAR("c") INT64(1) INT64(1) VARCHAR("a")] +[VARCHAR("a") INT64(1) INT64(1) VARCHAR("a")] +[VARCHAR("z") INT64(1) INT64(1) VARCHAR("a")] +[VARCHAR("a") INT64(1) INT64(1) VARCHAR("t")]]`, qr.Rows)) +} + func TestWeightStringFallBack(t *testing.T) { offsetOne := 1 checkCols := []CheckCol{{ diff --git a/go/vt/vtgate/engine/fake_primitive_test.go b/go/vt/vtgate/engine/fake_primitive_test.go index 1a168dc3dc4..4183ac6e3cc 100644 --- a/go/vt/vtgate/engine/fake_primitive_test.go +++ b/go/vt/vtgate/engine/fake_primitive_test.go @@ -23,8 +23,9 @@ import ( "strings" "testing" - "vitess.io/vitess/go/sqltypes" + "golang.org/x/sync/errgroup" + "vitess.io/vitess/go/sqltypes" querypb "vitess.io/vitess/go/vt/proto/query" ) @@ -41,6 +42,8 @@ type fakePrimitive struct { log []string allResultsInOneCall bool + + async bool } func (f *fakePrimitive) Inputs() []Primitive { @@ -86,6 +89,13 @@ func (f *fakePrimitive) TryStreamExecute(ctx context.Context, vcursor VCursor, b return f.sendErr } + if f.async { + return f.asyncCall(callback) + } + return f.syncCall(wantfields, callback) +} + +func (f *fakePrimitive) syncCall(wantfields bool, callback func(*sqltypes.Result) error) error { readMoreResults := true for readMoreResults && f.curResult < len(f.results) { readMoreResults = f.allResultsInOneCall @@ -116,9 +126,46 @@ func (f *fakePrimitive) TryStreamExecute(ctx context.Context, vcursor VCursor, b } } } - return nil } + +func (f *fakePrimitive) asyncCall(callback func(*sqltypes.Result) error) error { + var g errgroup.Group + var fields []*querypb.Field + if len(f.results) > 0 { + fields = f.results[0].Fields + } + for _, res := range f.results { + qr := res + g.Go(func() error { + if qr == nil { + return f.sendErr + } + if err := callback(&sqltypes.Result{Fields: fields}); err != nil { + return err + } + result := &sqltypes.Result{} + for i := 0; i < len(qr.Rows); i++ { + result.Rows = append(result.Rows, qr.Rows[i]) + // Send only two rows at a time. + if i%2 == 1 { + if err := callback(result); err != nil { + return err + } + result = &sqltypes.Result{} + } + } + if len(result.Rows) != 0 { + if err := callback(result); err != nil { + return err + } + } + return nil + }) + } + return g.Wait() +} + func (f *fakePrimitive) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) { f.log = append(f.log, fmt.Sprintf("GetFields %v", printBindVars(bindVars))) return f.TryExecute(ctx, vcursor, bindVars, true /* wantfields */) diff --git a/go/vt/vtgate/engine/filter.go b/go/vt/vtgate/engine/filter.go index fb696a9d679..32291543076 100644 --- a/go/vt/vtgate/engine/filter.go +++ b/go/vt/vtgate/engine/filter.go @@ -18,6 +18,7 @@ package engine import ( "context" + "sync" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vtgate/evalengine" @@ -79,10 +80,14 @@ func (f *Filter) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[s // TryStreamExecute satisfies the Primitive interface. func (f *Filter) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { + var mu sync.Mutex + env := evalengine.EnvWithBindVars(bindVars, vcursor.ConnCollation()) filter := func(results *sqltypes.Result) error { var rows [][]sqltypes.Value - env.Fields = results.Fields + + mu.Lock() + defer mu.Unlock() for _, row := range results.Rows { env.Row = row evalResult, err := env.Evaluate(f.Predicate) diff --git a/go/vt/vtgate/engine/filter_test.go b/go/vt/vtgate/engine/filter_test.go index b6d8730a869..05602ed2f9a 100644 --- a/go/vt/vtgate/engine/filter_test.go +++ b/go/vt/vtgate/engine/filter_test.go @@ -101,3 +101,59 @@ func TestFilterPass(t *testing.T) { }) } } + +func TestFilterStreaming(t *testing.T) { + predicate := &sqlparser.ComparisonExpr{ + Operator: sqlparser.GreaterThanOp, + Left: sqlparser.NewColName("left"), + Right: sqlparser.NewColName("right"), + } + + tcases := []struct { + name string + res []*sqltypes.Result + expRes string + }{{ + name: "int32", + res: sqltypes.MakeTestStreamingResults(sqltypes.MakeTestFields("left|right", "int32|int32"), "0|1", "---", "1|0", "2|3"), + expRes: `[[INT32(1) INT32(0)]]`, + }, { + name: "uint16", + res: sqltypes.MakeTestStreamingResults(sqltypes.MakeTestFields("left|right", "uint16|uint16"), "0|1", "1|0", "---", "2|3"), + expRes: `[[UINT16(1) UINT16(0)]]`, + }, { + name: "uint64_int64", + res: sqltypes.MakeTestStreamingResults(sqltypes.MakeTestFields("left|right", "uint64|int64"), "0|1", "---", "1|0", "2|3"), + expRes: `[[UINT64(1) INT64(0)]]`, + }, { + name: "int32_uint32", + res: sqltypes.MakeTestStreamingResults(sqltypes.MakeTestFields("left|right", "int32|uint32"), "0|1", "---", "1|0", "---", "2|3"), + expRes: `[[INT32(1) UINT32(0)]]`, + }, { + name: "uint16_int8", + res: sqltypes.MakeTestStreamingResults(sqltypes.MakeTestFields("left|right", "uint16|int8"), "0|1", "1|0", "2|3", "---"), + expRes: `[[UINT16(1) INT8(0)]]`, + }, { + name: "uint64_int32", + res: sqltypes.MakeTestStreamingResults(sqltypes.MakeTestFields("left|right", "uint64|int32"), "0|1", "1|0", "2|3", "---", "0|1", "1|3", "5|3"), + expRes: `[[UINT64(1) INT32(0)] [UINT64(5) INT32(3)]]`, + }} + for _, tc := range tcases { + t.Run(tc.name, func(t *testing.T) { + pred, err := evalengine.Translate(predicate, &dummyTranslator{}) + require.NoError(t, err) + + filter := &Filter{ + Predicate: pred, + Input: &fakePrimitive{results: tc.res, async: true}, + } + qr := &sqltypes.Result{} + err = filter.TryStreamExecute(context.Background(), &noopVCursor{}, nil, false, func(result *sqltypes.Result) error { + qr.Rows = append(qr.Rows, result.Rows...) + return nil + }) + require.NoError(t, err) + require.NoError(t, sqltypes.RowsEqualsStr(tc.expRes, qr.Rows)) + }) + } +} diff --git a/go/vt/vtgate/engine/limit.go b/go/vt/vtgate/engine/limit.go index 14fd0d7e15f..566f79fbe7e 100644 --- a/go/vt/vtgate/engine/limit.go +++ b/go/vt/vtgate/engine/limit.go @@ -20,12 +20,11 @@ import ( "context" "fmt" "io" - - "vitess.io/vitess/go/vt/vtgate/evalengine" + "sync" "vitess.io/vitess/go/sqltypes" - querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/vtgate/evalengine" ) var _ Primitive = (*Limit)(nil) @@ -95,8 +94,11 @@ func (l *Limit) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars // the offset in memory from the result of the scatter query with count + offset. bindVars["__upper_limit"] = sqltypes.Int64BindVariable(int64(count + offset)) + var mu sync.Mutex err = vcursor.StreamExecutePrimitive(ctx, l.Input, bindVars, wantfields, func(qr *sqltypes.Result) error { - if len(qr.Fields) != 0 { + mu.Lock() + defer mu.Unlock() + if wantfields && len(qr.Fields) != 0 { if err := callback(&sqltypes.Result{Fields: qr.Fields}); err != nil { return err } diff --git a/go/vt/vtgate/engine/limit_test.go b/go/vt/vtgate/engine/limit_test.go index dcdc43880a0..5f9d4e7da9f 100644 --- a/go/vt/vtgate/engine/limit_test.go +++ b/go/vt/vtgate/engine/limit_test.go @@ -450,6 +450,73 @@ func TestLimitStreamExecute(t *testing.T) { } } +func TestLimitStreamExecuteAsync(t *testing.T) { + bindVars := make(map[string]*querypb.BindVariable) + fields := sqltypes.MakeTestFields( + "col1|col2", + "int64|varchar", + ) + inputResults := sqltypes.MakeTestStreamingResults( + fields, + "a|1", + "b|2", + "d|3", + "e|4", + "a|1", + "b|2", + "d|3", + "e|4", + "---", + "c|7", + "x|8", + "y|9", + "c|7", + "x|8", + "y|9", + "c|7", + "x|8", + "y|9", + "---", + "l|4", + "m|5", + "n|6", + "l|4", + "m|5", + "n|6", + "l|4", + "m|5", + "n|6", + ) + fp := &fakePrimitive{ + results: inputResults, + async: true, + } + + const maxCount = 26 + for i := 0; i <= maxCount*20; i++ { + expRows := i + l := &Limit{ + Count: evalengine.NewLiteralInt(int64(expRows)), + Input: fp, + } + // Test with limit smaller than input. + results := &sqltypes.Result{} + + err := l.TryStreamExecute(context.Background(), &noopVCursor{}, bindVars, true, func(qr *sqltypes.Result) error { + if qr != nil { + results.Rows = append(results.Rows, qr.Rows...) + } + return nil + }) + require.NoError(t, err) + if expRows > maxCount { + expRows = maxCount + } + require.Len(t, results.Rows, expRows) + } + +} + func TestOffsetStreamExecute(t *testing.T) { bindVars := make(map[string]*querypb.BindVariable) fields := sqltypes.MakeTestFields( diff --git a/go/vt/vtgate/engine/memory_sort.go b/go/vt/vtgate/engine/memory_sort.go index 6085f36a7ed..42fbd9ac78f 100644 --- a/go/vt/vtgate/engine/memory_sort.go +++ b/go/vt/vtgate/engine/memory_sort.go @@ -24,6 +24,7 @@ import ( "reflect" "sort" "strings" + "sync" "vitess.io/vitess/go/vt/vtgate/evalengine" @@ -108,7 +109,11 @@ func (ms *MemorySort) TryStreamExecute(ctx context.Context, vcursor VCursor, bin comparers: extractSlices(ms.OrderBy), reverse: true, } + + var mu sync.Mutex err = vcursor.StreamExecutePrimitive(ctx, ms.Input, bindVars, wantfields, func(qr *sqltypes.Result) error { + mu.Lock() + defer mu.Unlock() if len(qr.Fields) != 0 { if err := cb(&sqltypes.Result{Fields: qr.Fields}); err != nil { return err diff --git a/go/vt/vtgate/engine/memory_sort_test.go b/go/vt/vtgate/engine/memory_sort_test.go index 93b76876783..71964de2922 100644 --- a/go/vt/vtgate/engine/memory_sort_test.go +++ b/go/vt/vtgate/engine/memory_sort_test.go @@ -659,3 +659,57 @@ func TestMemorySortExecuteNoVarChar(t *testing.T) { t.Errorf("StreamExecute err: %v, want %v", err, want) } } + +func TestMemorySortStreamAsync(t *testing.T) { + fields := sqltypes.MakeTestFields( + "c1|c2", + "varbinary|decimal", + ) + fp := &fakePrimitive{ + results: sqltypes.MakeTestStreamingResults( + fields, + "a|1", + "g|2", + "a|1", + "---", + "c|3", + "g|2", + "a|1", + "---", + "c|4", + "c|3", + "g|2", + "a|1", + "---", + "c|4", + "c|3", + "g|2", + "a|1", + "---", + "c|4", + "c|3", + ), + async: true, + } + + ms := &MemorySort{ + OrderBy: []OrderByParams{{ + WeightStringCol: -1, + Col: 1, + }}, + Input: fp, + } + + qr := &sqltypes.Result{} + err := ms.TryStreamExecute(context.Background(), &noopVCursor{}, nil, true, func(res *sqltypes.Result) error { + qr.Rows = append(qr.Rows, res.Rows...) + return nil + }) + require.NoError(t, err) + require.NoError(t, sqltypes.RowsEqualsStr( + `[[VARBINARY("a") DECIMAL(1)] [VARBINARY("a") DECIMAL(1)] [VARBINARY("a") DECIMAL(1)] [VARBINARY("a") DECIMAL(1)] [VARBINARY("a") DECIMAL(1)] +[VARBINARY("g") DECIMAL(2)] [VARBINARY("g") DECIMAL(2)] [VARBINARY("g") DECIMAL(2)] [VARBINARY("g") DECIMAL(2)] +[VARBINARY("c") DECIMAL(3)] [VARBINARY("c") DECIMAL(3)] [VARBINARY("c") DECIMAL(3)] [VARBINARY("c") DECIMAL(3)] +[VARBINARY("c") DECIMAL(4)] [VARBINARY("c") DECIMAL(4)] [VARBINARY("c") DECIMAL(4)]]`, + qr.Rows)) +} diff --git a/go/vt/vtgate/engine/projection.go b/go/vt/vtgate/engine/projection.go index 0c0875d19d9..5d759385954 100644 --- a/go/vt/vtgate/engine/projection.go +++ b/go/vt/vtgate/engine/projection.go @@ -87,8 +87,11 @@ func (p *Projection) TryStreamExecute(ctx context.Context, vcursor VCursor, bind env := evalengine.EnvWithBindVars(bindVars, vcursor.ConnCollation()) var once sync.Once var fields []*querypb.Field + var mu sync.Mutex return vcursor.StreamExecutePrimitive(ctx, p.Input, bindVars, wantfields, func(qr *sqltypes.Result) error { var err error + mu.Lock() + defer mu.Unlock() if wantfields { once.Do(func() { env.Fields = qr.Fields diff --git a/go/vt/vtgate/engine/projection_test.go b/go/vt/vtgate/engine/projection_test.go index 701ee8e2aaf..e487b82d349 100644 --- a/go/vt/vtgate/engine/projection_test.go +++ b/go/vt/vtgate/engine/projection_test.go @@ -70,6 +70,49 @@ func TestMultiply(t *testing.T) { assert.Equal(t, "[[UINT64(6)] [UINT64(0)] [UINT64(2)]]", fmt.Sprintf("%v", qr.Rows)) } +func TestProjectionStreaming(t *testing.T) { + expr := &sqlparser.BinaryExpr{ + Operator: sqlparser.MultOp, + Left: &sqlparser.Offset{V: 0}, + Right: &sqlparser.Offset{V: 1}, + } + evalExpr, err := evalengine.Translate(expr, nil) + require.NoError(t, err) + fp := &fakePrimitive{ + results: sqltypes.MakeTestStreamingResults( + sqltypes.MakeTestFields("a|b", "uint64|uint64"), + "3|2", + "1|0", + "6|2", + "---", + "3|2", + "---", + "1|0", + "---", + "1|2", + "4|2", + "---", + "5|5", + "4|10", + ), + async: true, + } + proj := &Projection{ + Cols: []string{"apa"}, + Exprs: []evalengine.Expr{evalExpr}, + Input: fp, + } + + qr := &sqltypes.Result{} + err = proj.TryStreamExecute(context.Background(), &noopVCursor{}, nil, true, func(result *sqltypes.Result) error { + qr.Rows = append(qr.Rows, result.Rows...) + return nil + }) + require.NoError(t, err) + require.NoError(t, sqltypes.RowsEqualsStr(`[[UINT64(25)] [UINT64(40)] [UINT64(6)] [UINT64(2)] [UINT64(8)] [UINT64(0)] [UINT64(6)] [UINT64(0)] [UINT64(12)]]`, + qr.Rows)) +} + func TestEmptyInput(t *testing.T) { expr := &sqlparser.BinaryExpr{ Operator: sqlparser.MultOp, @@ -91,18 +134,18 @@ func TestEmptyInput(t *testing.T) { require.NoError(t, err) assert.Equal(t, "[]", fmt.Sprintf("%v", qr.Rows)) - //fp = &fakePrimitive{ + // fp = &fakePrimitive{ // results: []*sqltypes.Result{sqltypes.MakeTestResult( // sqltypes.MakeTestFields("a|b", "uint64|uint64"), // "3|2", // "1|0", // "1|2", // )}, - //} - //proj.Input = fp - //qr, err = wrapStreamExecute(proj, newNoopVCursor(context.Background()), nil, true) - //require.NoError(t, err) - //assert.Equal(t, "[[UINT64(6)] [UINT64(0)] [UINT64(2)]]", fmt.Sprintf("%v", qr.Rows)) + // } + // proj.Input = fp + // qr, err = wrapStreamExecute(proj, newNoopVCursor(context.Background()), nil, true) + // require.NoError(t, err) + // assert.Equal(t, "[[UINT64(6)] [UINT64(0)] [UINT64(2)]]", fmt.Sprintf("%v", qr.Rows)) } func TestHexAndBinaryArgument(t *testing.T) {