From e408eddcbbb236981bc67cfdb7af3e6ac5f9a212 Mon Sep 17 00:00:00 2001 From: Noble Mittal Date: Thu, 28 Nov 2024 21:39:21 +0530 Subject: [PATCH 1/3] vreplication: Support for filter using IN operator Signed-off-by: Noble Mittal --- go/vt/vtgate/evalengine/eval_result.go | 1 + .../tabletserver/vstreamer/planbuilder.go | 53 +++++++++++++++++++ .../vstreamer/planbuilder_test.go | 9 +++- 3 files changed, 62 insertions(+), 1 deletion(-) diff --git a/go/vt/vtgate/evalengine/eval_result.go b/go/vt/vtgate/evalengine/eval_result.go index d9916af03be..5c1973d8eb1 100644 --- a/go/vt/vtgate/evalengine/eval_result.go +++ b/go/vt/vtgate/evalengine/eval_result.go @@ -62,6 +62,7 @@ func (er EvalResult) String() string { // TupleValues allows for retrieval of the value we expose for public consumption func (er EvalResult) TupleValues() []sqltypes.Value { + // TODO: Make this collation-aware switch v := er.v.(type) { case *evalTuple: result := make([]sqltypes.Value, 0, len(v.t)) diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index 9bbc98ca2bd..96babb35666 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -89,6 +89,8 @@ const ( NotEqual // IsNotNull is used to filter a column if it is NULL IsNotNull + // In is used to filter a comparable column if equals any of the values from a specific tuple + In ) // Filter contains opcodes for filtering. @@ -97,6 +99,9 @@ type Filter struct { ColNum int Value sqltypes.Value + // Values will be used to store tuple/list values. + Values []sqltypes.Value + // Parameters for VindexMatch. // Vindex, VindexColumns and KeyRange, if set, will be used // to filter the row. @@ -166,6 +171,8 @@ func getOpcode(comparison *sqlparser.ComparisonExpr) (Opcode, error) { opcode = GreaterThanEqual case sqlparser.NotEqualOp: opcode = NotEqual + case sqlparser.InOp: + opcode = In default: return -1, fmt.Errorf("comparison operator %s not supported", comparison.Operator.ToString()) } @@ -238,6 +245,24 @@ func (plan *Plan) filter(values, result []sqltypes.Value, charsets []collations. if values[filter.ColNum].IsNull() { return false, nil } + case In: + if filter.Values == nil { + return false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected empty filter values when performing IN operator") + } + found := false + for _, filterValue := range filter.Values { + match, err := compare(Equal, values[filter.ColNum], filterValue, plan.env.CollationEnv(), charsets[filter.ColNum]) + if err != nil { + return false, err + } + if match { + found = true + break + } + } + if !found { + return false, nil + } default: match, err := compare(filter.Opcode, values[filter.ColNum], filter.Value, plan.env.CollationEnv(), charsets[filter.ColNum]) if err != nil { @@ -514,6 +539,27 @@ func (plan *Plan) getColumnFuncExpr(columnName string) *sqlparser.FuncExpr { return nil } +func (plan *Plan) appendTupleFilter(values sqlparser.ValTuple, opcode Opcode, colnum int) error { + pv, err := evalengine.Translate(values, &evalengine.Config{ + Collation: plan.env.CollationEnv().DefaultConnectionCharset(), + Environment: plan.env, + }) + if err != nil { + return err + } + env := evalengine.EmptyExpressionEnv(plan.env) + resolved, err := env.Evaluate(pv) + if err != nil { + return err + } + plan.Filters = append(plan.Filters, Filter{ + Opcode: opcode, + ColNum: colnum, + Values: resolved.TupleValues(), + }) + return nil +} + func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) error { if where == nil { return nil @@ -537,6 +583,13 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er if err != nil { return err } + if opcode == In { + values, ok := expr.Right.(sqlparser.ValTuple) + if !ok { + return fmt.Errorf("unexpected: %v", sqlparser.String(expr)) + } + return plan.appendTupleFilter(values, opcode, colnum) + } val, ok := expr.Right.(*sqlparser.Literal) if !ok { return fmt.Errorf("unexpected: %v", sqlparser.String(expr)) diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go index ba345b2a00b..aba74368802 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder_test.go @@ -710,9 +710,15 @@ func TestPlanBuilderFilterComparison(t *testing.T) { outFilters: []Filter{{Opcode: LessThan, ColNum: 0, Value: sqltypes.NewInt64(2)}, {Opcode: LessThanEqual, ColNum: 1, Value: sqltypes.NewVarChar("xyz")}, }, + }, { + name: "in-operator", + inFilter: "select * from t1 where id in (1, 2)", + outFilters: []Filter{ + {Opcode: In, ColNum: 0, Values: []sqltypes.Value{sqltypes.NewInt64(1), sqltypes.NewInt64(2)}}, + }, }, { name: "vindex-and-operators", - inFilter: "select * from t1 where in_keyrange(id, 'hash', '-80') and id = 2 and val <> 'xyz'", + inFilter: "select * from t1 where in_keyrange(id, 'hash', '-80') and id = 2 and val <> 'xyz' and id in (100, 30)", outFilters: []Filter{ { Opcode: VindexMatch, @@ -727,6 +733,7 @@ func TestPlanBuilderFilterComparison(t *testing.T) { }, {Opcode: Equal, ColNum: 0, Value: sqltypes.NewInt64(2)}, {Opcode: NotEqual, ColNum: 1, Value: sqltypes.NewVarChar("xyz")}, + {Opcode: In, ColNum: 0, Values: []sqltypes.Value{sqltypes.NewInt64(100), sqltypes.NewInt64(30)}}, }, }} From b30a8ba54aa4c3aadcf792865fe5358586d2060d Mon Sep 17 00:00:00 2001 From: Noble Mittal Date: Fri, 29 Nov 2024 16:29:35 +0530 Subject: [PATCH 2/3] test: Add more tests in vstreamer_test.go Signed-off-by: Noble Mittal --- .../tabletserver/vstreamer/vstreamer_test.go | 36 ++++++++++++++++--- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index 846d62202e7..36ca93e4994 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -1966,7 +1966,7 @@ func TestFilteredMultipleWhere(t *testing.T) { filter: &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ Match: "t1", - Filter: "select id1, val from t1 where in_keyrange('-80') and id2 = 200 and id3 = 1000 and val = 'newton'", + Filter: "select id1, val from t1 where in_keyrange('-80') and id2 = 200 and id3 = 1000 and val = 'newton' and id1 in (1, 2, 129)", }}, }, customFieldEvents: true, @@ -1988,9 +1988,7 @@ func TestFilteredMultipleWhere(t *testing.T) { {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{after: []string{"2", "newton"}}}}}, }}, {"insert into t1 values (3, 100, 2000, 'kepler')", noEvents}, - {"insert into t1 values (128, 200, 1000, 'newton')", []TestRowEvent{ - {spec: &TestRowEventSpec{table: "t1", changes: []TestRowChange{{after: []string{"128", "newton"}}}}}, - }}, + {"insert into t1 values (128, 200, 1000, 'newton')", noEvents}, {"insert into t1 values (5, 200, 2000, 'kepler')", noEvents}, {"insert into t1 values (129, 200, 1000, 'kepler')", noEvents}, {"commit", nil}, @@ -2080,3 +2078,33 @@ func TestGeneratedInvisiblePrimaryKey(t *testing.T) { }} ts.Run() } + +func TestFilteredInOperator(t *testing.T) { + ts := &TestSpec{ + t: t, + ddls: []string{ + "create table t1(id1 int, id2 int, val varbinary(128), primary key(id1))", + }, + options: &TestSpecOptions{ + filter: &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select id1, val from t1 where val in ('eee', 'bbb', 'ddd')", + }}, + }, + }, + } + defer ts.Close() + ts.Init() + ts.fieldEvents["t1"].cols[1].skip = true + ts.tests = [][]*TestQuery{{ + {"begin", nil}, + {"insert into t1 values (1, 100, 'aaa')", noEvents}, + {"insert into t1 values (2, 200, 'bbb')", nil}, + {"insert into t1 values (3, 100, 'ccc')", noEvents}, + {"insert into t1 values (4, 200, 'ddd')", nil}, + {"insert into t1 values (5, 200, 'eee')", nil}, + {"commit", nil}, + }} + ts.Run() +} From 3af22a6483e55c14797760e86a5e73d7e1c19a8a Mon Sep 17 00:00:00 2001 From: Noble Mittal Date: Sat, 30 Nov 2024 02:32:53 +0530 Subject: [PATCH 3/3] fix: Allow addition of other filters after IN operator Signed-off-by: Noble Mittal --- go/vt/vttablet/tabletserver/vstreamer/planbuilder.go | 9 ++++++++- go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go | 4 ++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index 96babb35666..e5115afe6d3 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -583,12 +583,19 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er if err != nil { return err } + // The Right Expr is typically expected to be a Literal value, + // except for the IN operator, where a Tuple value is expected. + // Handle the IN operator case first. if opcode == In { values, ok := expr.Right.(sqlparser.ValTuple) if !ok { return fmt.Errorf("unexpected: %v", sqlparser.String(expr)) } - return plan.appendTupleFilter(values, opcode, colnum) + err := plan.appendTupleFilter(values, opcode, colnum) + if err != nil { + return err + } + continue } val, ok := expr.Right.(*sqlparser.Literal) if !ok { diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index 36ca93e4994..5282b5f372d 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -2089,7 +2089,7 @@ func TestFilteredInOperator(t *testing.T) { filter: &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ Match: "t1", - Filter: "select id1, val from t1 where val in ('eee', 'bbb', 'ddd')", + Filter: "select id1, val from t1 where val in ('eee', 'bbb', 'ddd') and id1 in (4, 5)", }}, }, }, @@ -2100,7 +2100,7 @@ func TestFilteredInOperator(t *testing.T) { ts.tests = [][]*TestQuery{{ {"begin", nil}, {"insert into t1 values (1, 100, 'aaa')", noEvents}, - {"insert into t1 values (2, 200, 'bbb')", nil}, + {"insert into t1 values (2, 200, 'bbb')", noEvents}, {"insert into t1 values (3, 100, 'ccc')", noEvents}, {"insert into t1 values (4, 200, 'ddd')", nil}, {"insert into t1 values (5, 200, 'eee')", nil},