diff --git a/go/vt/vttablet/tabletmanager/vdiff/utils.go b/go/vt/vttablet/tabletmanager/vdiff/utils.go index 12ea1e8a68c..d756e6f6984 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/utils.go +++ b/go/vt/vttablet/tabletmanager/vdiff/utils.go @@ -21,6 +21,8 @@ import ( "fmt" "strings" + "vitess.io/vitess/go/vt/vtgate/evalengine" + "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/log" @@ -40,11 +42,14 @@ func newMergeSorter(participants map[string]*shardStreamer, comparePKs []compare for i, cpk := range comparePKs { weightStringCol := -1 // if the collation is nil or unknown, use binary collation to compare as bytes - if cpk.collation == collations.Unknown { - ob[i] = engine.OrderByParams{Col: cpk.colIndex, WeightStringCol: weightStringCol, Type: sqltypes.Unknown, CollationID: collations.CollationBinaryID} - } else { - ob[i] = engine.OrderByParams{Col: cpk.colIndex, WeightStringCol: weightStringCol, Type: sqltypes.Unknown, CollationID: cpk.collation} + t := evalengine.Type{ + Type: sqltypes.Unknown, + Coll: collations.CollationBinaryID, + } + if cpk.collation != collations.Unknown { + t.Coll = cpk.collation } + ob[i] = engine.OrderByParams{Col: cpk.colIndex, WeightStringCol: weightStringCol, Type: t} } return &engine.MergeSort{ Primitives: prims, @@ -52,7 +57,7 @@ func newMergeSorter(participants map[string]*shardStreamer, comparePKs []compare } } -//----------------------------------------------------------------- +// ----------------------------------------------------------------- // Utility functions func encodeString(in string) string { @@ -64,7 +69,7 @@ func encodeString(in string) string { func pkColsToGroupByParams(pkCols []int) []*engine.GroupByParams { var res []*engine.GroupByParams for _, col := range pkCols { - res = append(res, &engine.GroupByParams{KeyCol: col, WeightStringCol: -1, Type: sqltypes.Unknown}) + res = append(res, &engine.GroupByParams{KeyCol: col, WeightStringCol: -1, Type: evalengine.UnknownType()}) } return res } diff --git a/go/vt/wrangler/vdiff.go b/go/vt/wrangler/vdiff.go index 85c82bb3574..3311d376431 100644 --- a/go/vt/wrangler/vdiff.go +++ b/go/vt/wrangler/vdiff.go @@ -499,8 +499,8 @@ func findPKs(table *tabletmanagerdatapb.TableDefinition, targetSelect *sqlparser switch ct := expr.(type) { case *sqlparser.ColName: colname = ct.Name.String() - case *sqlparser.FuncExpr: //eg. weight_string() - //no-op + case *sqlparser.FuncExpr: // eg. weight_string() + // no-op default: log.Warningf("Not considering column %v for PK, type %v not handled", selExpr, ct) } @@ -769,7 +769,7 @@ func (df *vdiff) buildTablePlan(table *tabletmanagerdatapb.TableDefinition, quer func pkColsToGroupByParams(pkCols []int) []*engine.GroupByParams { var res []*engine.GroupByParams for _, col := range pkCols { - res = append(res, &engine.GroupByParams{KeyCol: col, WeightStringCol: -1, Type: sqltypes.Unknown}) + res = append(res, &engine.GroupByParams{KeyCol: col, WeightStringCol: -1, Type: evalengine.UnknownType()}) } return res } @@ -784,11 +784,11 @@ func newMergeSorter(participants map[string]*shardStreamer, comparePKs []compare for _, cpk := range comparePKs { weightStringCol := -1 // if the collation is nil or unknown, use binary collation to compare as bytes - if cpk.collation == collations.Unknown { - ob = append(ob, engine.OrderByParams{Col: cpk.colIndex, WeightStringCol: weightStringCol, Type: sqltypes.Unknown, CollationID: collations.CollationBinaryID}) - } else { - ob = append(ob, engine.OrderByParams{Col: cpk.colIndex, WeightStringCol: weightStringCol, Type: sqltypes.Unknown, CollationID: cpk.collation}) + t := evalengine.Type{Type: sqltypes.Unknown, Coll: collations.CollationBinaryID} + if cpk.collation != collations.Unknown { + t.Coll = cpk.collation } + ob = append(ob, engine.OrderByParams{Col: cpk.colIndex, WeightStringCol: weightStringCol, Type: t}) } return &engine.MergeSort{ Primitives: prims, @@ -1058,7 +1058,7 @@ func (df *vdiff) forAll(participants map[string]*shardStreamer, f func(string, * return allErrors.AggrError(vterrors.Aggregate) } -//----------------------------------------------------------------- +// ----------------------------------------------------------------- // primitiveExecutor // primitiveExecutor starts execution on the top level primitive @@ -1118,7 +1118,7 @@ func (pe *primitiveExecutor) drain(ctx context.Context) (int, error) { } } -//----------------------------------------------------------------- +// ----------------------------------------------------------------- // shardStreamer func (sm *shardStreamer) StreamExecute(ctx context.Context, vcursor engine.VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error { @@ -1153,7 +1153,7 @@ func humanInt(n int64) string { // nolint return fmt.Sprintf("%s%s", s, unit) } -//----------------------------------------------------------------- +// ----------------------------------------------------------------- // tableDiffer func (td *tableDiffer) diff(ctx context.Context, rowsToCompare *int64, debug, onlyPks bool, maxExtraRowsToCompare int) (*DiffReport, error) { @@ -1375,7 +1375,7 @@ func (td *tableDiffer) genDebugQueryDiff(sel *sqlparser.Select, row []sqltypes.V return buf.String() } -//----------------------------------------------------------------- +// ----------------------------------------------------------------- // contextVCursor // contextVCursor satisfies VCursor interface @@ -1395,7 +1395,7 @@ func (vc *contextVCursor) StreamExecutePrimitive(ctx context.Context, primitive return primitive.TryStreamExecute(ctx, vc, bindVars, wantfields, callback) } -//----------------------------------------------------------------- +// ----------------------------------------------------------------- // Utility functions func removeKeyrange(where *sqlparser.Where) *sqlparser.Where { diff --git a/go/vt/wrangler/vdiff_test.go b/go/vt/wrangler/vdiff_test.go index ac57c9bcf68..28422b6cd4d 100644 --- a/go/vt/wrangler/vdiff_test.go +++ b/go/vt/wrangler/vdiff_test.go @@ -23,6 +23,8 @@ import ( "testing" "time" + "vitess.io/vitess/go/vt/vtgate/evalengine" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -404,7 +406,7 @@ func TestVDiffPlanSuccess(t *testing.T) { engine.NewAggregateParam(opcode.AggregateSum, 2, ""), engine.NewAggregateParam(opcode.AggregateSum, 3, ""), }, - GroupByKeys: []*engine.GroupByParams{{KeyCol: 0, WeightStringCol: -1, Type: sqltypes.Unknown}}, + GroupByKeys: []*engine.GroupByParams{{KeyCol: 0, WeightStringCol: -1, Type: evalengine.UnknownType()}}, Input: newMergeSorter(nil, []compareColInfo{{0, collations.Unknown, true}}), }, targetPrimitive: newMergeSorter(nil, []compareColInfo{{0, collations.Unknown, true}}),