Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Support for filter using IN operator #17296

Merged
merged 3 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/vt/vtgate/evalengine/eval_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (er EvalResult) String() string {

// TupleValues allows for retrieval of the value we expose for public consumption
func (er EvalResult) TupleValues() []sqltypes.Value {
// TODO: Make this collation-aware
switch v := er.v.(type) {
case *evalTuple:
result := make([]sqltypes.Value, 0, len(v.t))
Expand Down
60 changes: 60 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ const (
NotEqual
// IsNotNull is used to filter a column if it is NULL
IsNotNull
// In is used to filter a comparable column if equals any of the values from a specific tuple
In
)

// Filter contains opcodes for filtering.
Expand All @@ -97,6 +99,9 @@ type Filter struct {
ColNum int
Value sqltypes.Value

// Values will be used to store tuple/list values.
Values []sqltypes.Value

// Parameters for VindexMatch.
// Vindex, VindexColumns and KeyRange, if set, will be used
// to filter the row.
Expand Down Expand Up @@ -166,6 +171,8 @@ func getOpcode(comparison *sqlparser.ComparisonExpr) (Opcode, error) {
opcode = GreaterThanEqual
case sqlparser.NotEqualOp:
opcode = NotEqual
case sqlparser.InOp:
opcode = In
default:
return -1, fmt.Errorf("comparison operator %s not supported", comparison.Operator.ToString())
}
Expand Down Expand Up @@ -238,6 +245,24 @@ func (plan *Plan) filter(values, result []sqltypes.Value, charsets []collations.
if values[filter.ColNum].IsNull() {
return false, nil
}
case In:
if filter.Values == nil {
return false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected empty filter values when performing IN operator")
}
found := false
for _, filterValue := range filter.Values {
match, err := compare(Equal, values[filter.ColNum], filterValue, plan.env.CollationEnv(), charsets[filter.ColNum])
if err != nil {
return false, err
}
if match {
found = true
break
}
}
if !found {
return false, nil
}
default:
match, err := compare(filter.Opcode, values[filter.ColNum], filter.Value, plan.env.CollationEnv(), charsets[filter.ColNum])
if err != nil {
Expand Down Expand Up @@ -514,6 +539,27 @@ func (plan *Plan) getColumnFuncExpr(columnName string) *sqlparser.FuncExpr {
return nil
}

func (plan *Plan) appendTupleFilter(values sqlparser.ValTuple, opcode Opcode, colnum int) error {
pv, err := evalengine.Translate(values, &evalengine.Config{
Collation: plan.env.CollationEnv().DefaultConnectionCharset(),
Environment: plan.env,
})
if err != nil {
return err
}
env := evalengine.EmptyExpressionEnv(plan.env)
resolved, err := env.Evaluate(pv)
if err != nil {
return err
}
plan.Filters = append(plan.Filters, Filter{
Opcode: opcode,
ColNum: colnum,
Values: resolved.TupleValues(),
})
return nil
}

func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) error {
if where == nil {
return nil
Expand All @@ -537,6 +583,20 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
if err != nil {
return err
}
// The Right Expr is typically expected to be a Literal value,
// except for the IN operator, where a Tuple value is expected.
// Handle the IN operator case first.
if opcode == In {
values, ok := expr.Right.(sqlparser.ValTuple)
if !ok {
return fmt.Errorf("unexpected: %v", sqlparser.String(expr))
}
err := plan.appendTupleFilter(values, opcode, colnum)
if err != nil {
return err
}
continue
}
val, ok := expr.Right.(*sqlparser.Literal)
if !ok {
return fmt.Errorf("unexpected: %v", sqlparser.String(expr))
Expand Down
9 changes: 8 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,9 +710,15 @@ func TestPlanBuilderFilterComparison(t *testing.T) {
outFilters: []Filter{{Opcode: LessThan, ColNum: 0, Value: sqltypes.NewInt64(2)},
{Opcode: LessThanEqual, ColNum: 1, Value: sqltypes.NewVarChar("xyz")},
},
}, {
name: "in-operator",
inFilter: "select * from t1 where id in (1, 2)",
outFilters: []Filter{
{Opcode: In, ColNum: 0, Values: []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}},
},
}, {
name: "vindex-and-operators",
inFilter: "select * from t1 where in_keyrange(id, 'hash', '-80') and id = 2 and val <> 'xyz'",
inFilter: "select * from t1 where in_keyrange(id, 'hash', '-80') and id = 2 and val <> 'xyz' and id in (100, 30)",
outFilters: []Filter{
{
Opcode: VindexMatch,
Expand All @@ -727,6 +733,7 @@ func TestPlanBuilderFilterComparison(t *testing.T) {
},
{Opcode: Equal, ColNum: 0, Value: sqltypes.NewInt64(2)},
{Opcode: NotEqual, ColNum: 1, Value: sqltypes.NewVarChar("xyz")},
{Opcode: In, ColNum: 0, Values: []sqltypes.Value{sqltypes.NewInt64(100), sqltypes.NewInt64(30)}},
},
}}

Expand Down
36 changes: 32 additions & 4 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1966,7 +1966,7 @@ func TestFilteredMultipleWhere(t *testing.T) {
filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select id1, val from t1 where in_keyrange('-80') and id2 = 200 and id3 = 1000 and val = 'newton'",
Filter: "select id1, val from t1 where in_keyrange('-80') and id2 = 200 and id3 = 1000 and val = 'newton' and id1 in (1, 2, 129)",
}},
},
customFieldEvents: true,
Expand All @@ -1988,9 +1988,7 @@ func TestFilteredMultipleWhere(t *testing.T) {
{spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{after: []string{"2", "newton"}}}}},
}},
{"insert into t1 values (3, 100, 2000, 'kepler')", noEvents},
{"insert into t1 values (128, 200, 1000, 'newton')", []TestRowEvent{
{spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{after: []string{"128", "newton"}}}}},
}},
{"insert into t1 values (128, 200, 1000, 'newton')", noEvents},
{"insert into t1 values (5, 200, 2000, 'kepler')", noEvents},
{"insert into t1 values (129, 200, 1000, 'kepler')", noEvents},
{"commit", nil},
Expand Down Expand Up @@ -2080,3 +2078,33 @@ func TestGeneratedInvisiblePrimaryKey(t *testing.T) {
}}
ts.Run()
}

func TestFilteredInOperator(t *testing.T) {
ts := &TestSpec{
t: t,
ddls: []string{
"create table t1(id1 int, id2 int, val varbinary(128), primary key(id1))",
},
options: &TestSpecOptions{
filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select id1, val from t1 where val in ('eee', 'bbb', 'ddd') and id1 in (4, 5)",
}},
},
},
}
defer ts.Close()
ts.Init()
ts.fieldEvents["t1"].cols[1].skip = true
ts.tests = [][]*TestQuery{{
{"begin", nil},
{"insert into t1 values (1, 100, 'aaa')", noEvents},
{"insert into t1 values (2, 200, 'bbb')", noEvents},
{"insert into t1 values (3, 100, 'ccc')", noEvents},
{"insert into t1 values (4, 200, 'ddd')", nil},
{"insert into t1 values (5, 200, 'eee')", nil},
{"commit", nil},
}}
ts.Run()
}
Loading