Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: merge ApplyJoins more aggressively #16956

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions go/vt/vtgate/planbuilder/operators/join_merging.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// mergeJoinInputs checks whether two operators can be merged into a single one.
// If they can be merged, a new operator with the merged routing is returned
// If they cannot be merged, nil is returned.
func (jm *joinMerger) mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs Operator, joinPredicates []sqlparser.Expr) *Route {
func (jm *joinMerger) mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs Operator) *Route {
lhsRoute, rhsRoute, routingA, routingB, a, b, sameKeyspace := prepareInputRoutes(lhs, rhs)
if lhsRoute == nil {
return nil
Expand All @@ -38,7 +38,7 @@ func (jm *joinMerger) mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs
// If a dual is on the left side, and it is a left join (all right joins are changed to left joins), then we can only merge if the right side is a single sharded routing.
case a == dual:
rhsClone := Clone(rhs).(*Route)
for _, predicate := range joinPredicates {
for _, predicate := range jm.predicates {
if ctx.SemTable.DirectDeps(predicate).IsSolvedBy(TableID(rhsClone)) {
rhsClone.AddPredicate(ctx, predicate)
}
Expand All @@ -54,7 +54,7 @@ func (jm *joinMerger) mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs

// As both are reference route. We need to merge the alternates as well.
case a == anyShard && b == anyShard && sameKeyspace:
newrouting := mergeAnyShardRoutings(ctx, routingA.(*AnyShardRouting), routingB.(*AnyShardRouting), joinPredicates, jm.joinType)
newrouting := mergeAnyShardRoutings(ctx, routingA.(*AnyShardRouting), routingB.(*AnyShardRouting), jm.predicates, jm.joinType)
return jm.merge(ctx, lhsRoute, rhsRoute, newrouting)

// an unsharded/reference route can be merged with anything going to that keyspace
Expand All @@ -75,7 +75,7 @@ func (jm *joinMerger) mergeJoinInputs(ctx *plancontext.PlanningContext, lhs, rhs

// sharded routing is complex, so we handle it in a separate method
case a == sharded && b == sharded:
return tryMergeShardedRouting(ctx, lhsRoute, rhsRoute, jm, joinPredicates)
return tryMergeShardedRouting(ctx, lhsRoute, rhsRoute, jm, jm.predicates)

default:
return nil
Expand Down
13 changes: 12 additions & 1 deletion go/vt/vtgate/planbuilder/operators/query_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"io"
"strconv"

"vitess.io/vitess/go/slice"

"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/engine"
Expand Down Expand Up @@ -104,7 +106,16 @@ func runRewriters(ctx *plancontext.PlanningContext, root Operator) Operator {
return tryPushUpdate(in)
case *RecurseCTE:
return tryMergeRecurse(ctx, in)

case *ApplyJoin:
predicates := slice.Map(in.JoinPredicates.columns, func(j applyJoinColumn) sqlparser.Expr {
return j.Original
})
jm := newJoinMerge(predicates, in.JoinType)
newPlan := jm.mergeJoinInputs(ctx, in.LHS, in.RHS)
if newPlan == nil {
return in, NoRewrite
}
return newPlan, Rewrote("merge routes into single operator")
default:
return in, NoRewrite
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/route_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func requiresSwitchingSides(ctx *plancontext.PlanningContext, op Operator) (requ

func mergeOrJoin(ctx *plancontext.PlanningContext, lhs, rhs Operator, joinPredicates []sqlparser.Expr, joinType sqlparser.JoinType) (Operator, *ApplyResult) {
jm := newJoinMerge(joinPredicates, joinType)
newPlan := jm.mergeJoinInputs(ctx, lhs, rhs, joinPredicates)
newPlan := jm.mergeJoinInputs(ctx, lhs, rhs)
if newPlan != nil {
return newPlan, Rewrote("merge routes into single operator")
}
Expand Down
46 changes: 12 additions & 34 deletions go/vt/vtgate/planbuilder/testdata/cte_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -2329,41 +2329,19 @@
"QueryType": "SELECT",
"Original": "WITH RECURSIVE literal_cte AS (SELECT 1 AS id, 100 AS value, 1 AS manager_id UNION ALL SELECT id + 1, value * 2, id FROM literal_cte WHERE id < 5) SELECT l.id, l.value, l.manager_id, e.name AS employee_name FROM literal_cte l LEFT JOIN user e ON l.id = e.id",
"Instructions": {
"OperatorType": "Join",
"Variant": "LeftJoin",
"JoinColumnIndexes": "L:0,L:1,L:2,R:0",
"JoinVars": {
"l_id": 0
"OperatorType": "Route",
"Variant": "EqualUnique",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"TableName": "dual_`user`",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Reference",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"FieldQuery": "with recursive literal_cte as (select 1 as id, 100 as value, 1 as manager_id from dual where 1 != 1 union all select id + 1, value * 2, id from literal_cte where 1 != 1) select l.id, l.value, l.manager_id from literal_cte as l where 1 != 1",
"Query": "with recursive literal_cte as (select 1 as id, 100 as value, 1 as manager_id from dual union all select id + 1, value * 2, id from literal_cte where id < 5) select l.id, l.value, l.manager_id from literal_cte as l",
"Table": "dual"
},
{
"OperatorType": "Route",
"Variant": "EqualUnique",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select e.`name` as employee_name from `user` as e where 1 != 1",
"Query": "select e.`name` as employee_name from `user` as e where e.id = :l_id",
"Table": "`user`",
"Values": [
":l_id"
],
"Vindex": "user_index"
}
]
"FieldQuery": "with recursive literal_cte as (select 1 as id, 100 as value, 1 as manager_id from dual where 1 != 1 union all select id + 1, value * 2, id from literal_cte where 1 != 1) select l.id, l.value, l.manager_id, e.`name` as employee_name from literal_cte as l left join `user` as e on l.id = e.id where 1 != 1",
"Query": "with recursive literal_cte as (select 1 as id, 100 as value, 1 as manager_id from dual union all select id + 1, value * 2, id from literal_cte where id < 5) select l.id, l.value, l.manager_id, e.`name` as employee_name from literal_cte as l left join `user` as e on l.id = e.id",
"Table": "`user`, dual",
"Values": [
":l_id"
],
"Vindex": "user_index"
},
"TablesUsed": [
"main.dual",
Expand Down
40 changes: 9 additions & 31 deletions go/vt/vtgate/planbuilder/testdata/info_schema57_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -1153,38 +1153,16 @@
"QueryType": "SELECT",
"Original": "SELECT c.column_name FROM information_schema.columns c JOIN ( SELECT table_name FROM information_schema.tables WHERE table_schema != 'information_schema' LIMIT 1 ) AS tables ON tables.table_name = c.table_name",
"Instructions": {
"OperatorType": "Join",
"Variant": "Join",
"JoinColumnIndexes": "R:0",
"JoinVars": {
"tables_table_name": 0
"OperatorType": "Route",
"Variant": "DBA",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"TableName": "information_schema.`tables`_information_schema.`columns`",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "DBA",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"FieldQuery": "select `tables`.table_name from (select table_name from information_schema.`tables` where 1 != 1) as `tables` where 1 != 1",
"Query": "select `tables`.table_name from (select table_name from information_schema.`tables` where table_schema != 'information_schema' limit 1) as `tables`",
"Table": "information_schema.`tables`"
},
{
"OperatorType": "Route",
"Variant": "DBA",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"FieldQuery": "select c.column_name from information_schema.`columns` as c where 1 != 1",
"Query": "select c.column_name from information_schema.`columns` as c where c.table_name = :c_table_name /* VARCHAR */",
"SysTableTableName": "[c_table_name::tables_table_name]",
"Table": "information_schema.`columns`"
}
]
"FieldQuery": "select c.column_name from (select table_name from information_schema.`tables` where 1 != 1) as `tables`, information_schema.`columns` as c where 1 != 1",
"Query": "select c.column_name from (select table_name from information_schema.`tables` where table_schema != 'information_schema' limit 1) as `tables`, information_schema.`columns` as c where `tables`.table_name = c.table_name",
"SysTableTableName": "[c_table_name::tables_table_name]",
"Table": "information_schema.`columns`, information_schema.`tables`"
}
}
}
Expand Down
40 changes: 9 additions & 31 deletions go/vt/vtgate/planbuilder/testdata/info_schema80_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -1275,38 +1275,16 @@
"QueryType": "SELECT",
"Original": "SELECT c.column_name FROM information_schema.columns c JOIN ( SELECT table_name FROM information_schema.tables WHERE table_schema != 'information_schema' LIMIT 1 ) AS tables ON tables.table_name = c.table_name",
"Instructions": {
"OperatorType": "Join",
"Variant": "Join",
"JoinColumnIndexes": "R:0",
"JoinVars": {
"tables_table_name": 0
"OperatorType": "Route",
"Variant": "DBA",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"TableName": "information_schema.`tables`_information_schema.`columns`",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "DBA",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"FieldQuery": "select `tables`.table_name from (select table_name from information_schema.`tables` where 1 != 1) as `tables` where 1 != 1",
"Query": "select `tables`.table_name from (select table_name from information_schema.`tables` where table_schema != 'information_schema' limit 1) as `tables`",
"Table": "information_schema.`tables`"
},
{
"OperatorType": "Route",
"Variant": "DBA",
"Keyspace": {
"Name": "main",
"Sharded": false
},
"FieldQuery": "select c.column_name from information_schema.`columns` as c where 1 != 1",
"Query": "select c.column_name from information_schema.`columns` as c where c.table_name = :c_table_name /* VARCHAR */",
"SysTableTableName": "[c_table_name::tables_table_name]",
"Table": "information_schema.`columns`"
}
]
"FieldQuery": "select c.column_name from (select table_name from information_schema.`tables` where 1 != 1) as `tables`, information_schema.`columns` as c where 1 != 1",
"Query": "select c.column_name from (select table_name from information_schema.`tables` where table_schema != 'information_schema' limit 1) as `tables`, information_schema.`columns` as c where `tables`.table_name = c.table_name",
"SysTableTableName": "[c_table_name::tables_table_name]",
"Table": "information_schema.`columns`, information_schema.`tables`"
}
}
}
Expand Down
52 changes: 15 additions & 37 deletions go/vt/vtgate/planbuilder/testdata/reference_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -168,37 +168,15 @@
"QueryType": "SELECT",
"Original": "SELECT u.id FROM ( SELECT a.id, a.u_id FROM user.ref_with_source AS a WHERE a.id IN (3) ORDER BY a.d_at LIMIT 1) as u LEFT JOIN user.ref_with_source AS u0 ON u.u_id = u0.u_uid ORDER BY u.id",
"Instructions": {
"OperatorType": "Join",
"Variant": "LeftJoin",
"JoinColumnIndexes": "L:0",
"JoinVars": {
"u_u_id": 1
},
"TableName": "ref_with_source_ref_with_source",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Reference",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select u.id, u.u_id from (select a.id, a.u_id from ref_with_source as a where 1 != 1) as u where 1 != 1",
"Query": "select u.id, u.u_id from (select a.id, a.u_id from ref_with_source as a where a.id in (3) order by a.d_at asc limit 1) as u order by u.id asc",
"Table": "ref_with_source"
},
{
"OperatorType": "Route",
"Variant": "Reference",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select 1 from ref_with_source as u0 where 1 != 1",
"Query": "select 1 from ref_with_source as u0 where u0.u_uid = :u_u_id",
"Table": "ref_with_source"
}
]
"OperatorType": "Route",
"Variant": "Reference",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select u.id from (select a.id, a.u_id from ref_with_source as a where 1 != 1) as u left join ref_with_source as u0 on u.u_id = u0.u_uid where 1 != 1",
"Query": "select u.id from (select a.id, a.u_id from ref_with_source as a where a.id in (3) order by a.d_at asc limit 1) as u left join ref_with_source as u0 on u.u_id = u0.u_uid order by u.id asc",
"Table": "ref_with_source"
},
"TablesUsed": [
"user.ref_with_source"
Expand Down Expand Up @@ -700,19 +678,19 @@
"QueryType": "SELECT",
"Original": "select * from user.ref_with_source ref, `user`.`user` u where ref.id = u.ref_id and u.id = 2",
"Instructions": {
"FieldQuery": "select * from ref_with_source as ref, `user` as u where 1 != 1",
"OperatorType": "Route",
"Variant": "EqualUnique",
"Vindex": "user_index",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select * from ref_with_source as ref, `user` as u where 1 != 1",
"Query": "select * from ref_with_source as ref, `user` as u where u.id = 2 and ref.id = u.ref_id",
"Table": "`user`, ref_with_source",
"Values": [
"2"
]
],
"Vindex": "user_index"
},
"TablesUsed": [
"user.ref_with_source",
Expand All @@ -727,19 +705,19 @@
"QueryType": "SELECT",
"Original": "select * from source_of_ref ref, `user`.`user` u where ref.id = u.ref_id and u.id = 2",
"Instructions": {
"FieldQuery": "select * from ref_with_source as ref, `user` as u where 1 != 1",
"OperatorType": "Route",
"Variant": "EqualUnique",
"Vindex": "user_index",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select * from ref_with_source as ref, `user` as u where 1 != 1",
"Query": "select * from ref_with_source as ref, `user` as u where u.id = 2 and ref.id = u.ref_id",
"Table": "`user`, ref_with_source",
"Values": [
"2"
]
],
"Vindex": "user_index"
},
"TablesUsed": [
"user.ref_with_source",
Expand Down
Loading