Skip to content

Commit

Permalink
use evalengine.Type instead
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay committed Oct 18, 2023
1 parent 5ace08d commit 2ba4c6e
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 19 deletions.
17 changes: 11 additions & 6 deletions go/vt/vttablet/tabletmanager/vdiff/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"strings"

"vitess.io/vitess/go/vt/vtgate/evalengine"

"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/log"

Expand All @@ -40,19 +42,22 @@ func newMergeSorter(participants map[string]*shardStreamer, comparePKs []compare
for i, cpk := range comparePKs {
weightStringCol := -1
// if the collation is nil or unknown, use binary collation to compare as bytes
if cpk.collation == collations.Unknown {
ob[i] = engine.OrderByParams{Col: cpk.colIndex, WeightStringCol: weightStringCol, Type: sqltypes.Unknown, CollationID: collations.CollationBinaryID}
} else {
ob[i] = engine.OrderByParams{Col: cpk.colIndex, WeightStringCol: weightStringCol, Type: sqltypes.Unknown, CollationID: cpk.collation}
t := evalengine.Type{
Type: sqltypes.Unknown,
Coll: collations.CollationBinaryID,
}
if cpk.collation != collations.Unknown {
t.Coll = cpk.collation
}
ob[i] = engine.OrderByParams{Col: cpk.colIndex, WeightStringCol: weightStringCol, Type: t}
}
return &engine.MergeSort{
Primitives: prims,
OrderBy: ob,
}
}

//-----------------------------------------------------------------
// -----------------------------------------------------------------
// Utility functions

func encodeString(in string) string {
Expand All @@ -64,7 +69,7 @@ func encodeString(in string) string {
func pkColsToGroupByParams(pkCols []int) []*engine.GroupByParams {
var res []*engine.GroupByParams
for _, col := range pkCols {
res = append(res, &engine.GroupByParams{KeyCol: col, WeightStringCol: -1, Type: sqltypes.Unknown})
res = append(res, &engine.GroupByParams{KeyCol: col, WeightStringCol: -1, Type: evalengine.UnknownType()})
}
return res
}
Expand Down
24 changes: 12 additions & 12 deletions go/vt/wrangler/vdiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,8 @@ func findPKs(table *tabletmanagerdatapb.TableDefinition, targetSelect *sqlparser
switch ct := expr.(type) {
case *sqlparser.ColName:
colname = ct.Name.String()
case *sqlparser.FuncExpr: //eg. weight_string()
//no-op
case *sqlparser.FuncExpr: // eg. weight_string()
// no-op
default:
log.Warningf("Not considering column %v for PK, type %v not handled", selExpr, ct)
}
Expand Down Expand Up @@ -769,7 +769,7 @@ func (df *vdiff) buildTablePlan(table *tabletmanagerdatapb.TableDefinition, quer
func pkColsToGroupByParams(pkCols []int) []*engine.GroupByParams {
var res []*engine.GroupByParams
for _, col := range pkCols {
res = append(res, &engine.GroupByParams{KeyCol: col, WeightStringCol: -1, Type: sqltypes.Unknown})
res = append(res, &engine.GroupByParams{KeyCol: col, WeightStringCol: -1, Type: evalengine.UnknownType()})
}
return res
}
Expand All @@ -784,11 +784,11 @@ func newMergeSorter(participants map[string]*shardStreamer, comparePKs []compare
for _, cpk := range comparePKs {
weightStringCol := -1
// if the collation is nil or unknown, use binary collation to compare as bytes
if cpk.collation == collations.Unknown {
ob = append(ob, engine.OrderByParams{Col: cpk.colIndex, WeightStringCol: weightStringCol, Type: sqltypes.Unknown, CollationID: collations.CollationBinaryID})
} else {
ob = append(ob, engine.OrderByParams{Col: cpk.colIndex, WeightStringCol: weightStringCol, Type: sqltypes.Unknown, CollationID: cpk.collation})
t := evalengine.Type{Type: sqltypes.Unknown, Coll: collations.CollationBinaryID}
if cpk.collation != collations.Unknown {
t.Coll = cpk.collation
}
ob = append(ob, engine.OrderByParams{Col: cpk.colIndex, WeightStringCol: weightStringCol, Type: t})
}
return &engine.MergeSort{
Primitives: prims,
Expand Down Expand Up @@ -1058,7 +1058,7 @@ func (df *vdiff) forAll(participants map[string]*shardStreamer, f func(string, *
return allErrors.AggrError(vterrors.Aggregate)
}

//-----------------------------------------------------------------
// -----------------------------------------------------------------
// primitiveExecutor

// primitiveExecutor starts execution on the top level primitive
Expand Down Expand Up @@ -1118,7 +1118,7 @@ func (pe *primitiveExecutor) drain(ctx context.Context) (int, error) {
}
}

//-----------------------------------------------------------------
// -----------------------------------------------------------------
// shardStreamer

func (sm *shardStreamer) StreamExecute(ctx context.Context, vcursor engine.VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
Expand Down Expand Up @@ -1153,7 +1153,7 @@ func humanInt(n int64) string { // nolint
return fmt.Sprintf("%s%s", s, unit)
}

//-----------------------------------------------------------------
// -----------------------------------------------------------------
// tableDiffer

func (td *tableDiffer) diff(ctx context.Context, rowsToCompare *int64, debug, onlyPks bool, maxExtraRowsToCompare int) (*DiffReport, error) {
Expand Down Expand Up @@ -1375,7 +1375,7 @@ func (td *tableDiffer) genDebugQueryDiff(sel *sqlparser.Select, row []sqltypes.V
return buf.String()
}

//-----------------------------------------------------------------
// -----------------------------------------------------------------
// contextVCursor

// contextVCursor satisfies VCursor interface
Expand All @@ -1395,7 +1395,7 @@ func (vc *contextVCursor) StreamExecutePrimitive(ctx context.Context, primitive
return primitive.TryStreamExecute(ctx, vc, bindVars, wantfields, callback)
}

//-----------------------------------------------------------------
// -----------------------------------------------------------------
// Utility functions

func removeKeyrange(where *sqlparser.Where) *sqlparser.Where {
Expand Down
4 changes: 3 additions & 1 deletion go/vt/wrangler/vdiff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"testing"
"time"

"vitess.io/vitess/go/vt/vtgate/evalengine"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -404,7 +406,7 @@ func TestVDiffPlanSuccess(t *testing.T) {
engine.NewAggregateParam(opcode.AggregateSum, 2, ""),
engine.NewAggregateParam(opcode.AggregateSum, 3, ""),
},
GroupByKeys: []*engine.GroupByParams{{KeyCol: 0, WeightStringCol: -1, Type: sqltypes.Unknown}},
GroupByKeys: []*engine.GroupByParams{{KeyCol: 0, WeightStringCol: -1, Type: evalengine.UnknownType()}},
Input: newMergeSorter(nil, []compareColInfo{{0, collations.Unknown, true}}),
},
targetPrimitive: newMergeSorter(nil, []compareColInfo{{0, collations.Unknown, true}}),
Expand Down

0 comments on commit 2ba4c6e

Please sign in to comment.