Skip to content

Commit

Permalink
Fix: Offset planning in hash joins (#16540)
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <[email protected]>
  • Loading branch information
GuptaManan100 authored Aug 7, 2024
1 parent 27f2d73 commit 0b7c0e4
Show file tree
Hide file tree
Showing 9 changed files with 222 additions and 66 deletions.
32 changes: 31 additions & 1 deletion go/test/endtoend/vtgate/vitess_tester/join/join.test
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ CREATE TABLE `t3`
CHARSET utf8mb4,
COLLATE utf8mb4_unicode_ci;

CREATE TABLE `t4`
(
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`col` int unsigned NOT NULL,
PRIMARY KEY (`id`)
) ENGINE InnoDB,
CHARSET utf8mb4,
COLLATE utf8mb4_unicode_ci;

insert into t1 (id, name)
values (1, 'A'),
(2, 'B'),
Expand All @@ -43,7 +52,28 @@ values (1, 'A'),
(4, 'B'),
(5, 'B');

insert into t4 (id, col)
values (1, 1),
(2, 2),
(3, 3);

-- wait_authoritative t1
-- wait_authoritative t2
-- wait_authoritative t3
select 42 from t1 join t2 on t1.id = t2.t1_id join t3 on t1.id = t3.id where t1.name or t2.id or t3.name;
select 42
from t1
join t2 on t1.id = t2.t1_id
join t3 on t1.id = t3.id
where t1.name
or t2.id
or t3.name;

# Complex query that requires hash join underneath a memory sort and ordered aggregate
select 1
from t1
join t2 on t1.id = t2.t1_id
join t4 on t4.col = t2.id
left join (select t4.col, count(*) as count from t4 group by t4.col) t3 on t3.col = t2.id
where t1.id IN (1, 2)
group by t2.id, t4.col;

8 changes: 8 additions & 0 deletions go/test/endtoend/vtgate/vitess_tester/join/vschema.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@
"name": "hash"
}
]
},
"t4": {
"column_vindexes": [
{
"column": "id",
"name": "hash"
}
]
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/apply_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (aj *ApplyJoin) AddWSColumn(ctx *plancontext.PlanningContext, offset int, u
func (aj *ApplyJoin) planOffsets(ctx *plancontext.PlanningContext) Operator {
if len(aj.Columns) > 0 {
// we've already done offset planning
return aj
return nil
}
for _, col := range aj.JoinColumns.columns {
// Read the type description for applyJoinColumn to understand the following code
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/operators/dml_with_input.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (d *DMLWithInput) planOffsets(ctx *plancontext.PlanningContext) Operator {
}
}
d.BvList = bvList
return d
return nil
}

var _ Operator = (*DMLWithInput)(nil)
23 changes: 1 addition & 22 deletions go/vt/vtgate/planbuilder/operators/hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,20 +326,9 @@ func (hj *HashJoin) addColumn(ctx *plancontext.PlanningContext, in sqlparser.Exp
inOffset = op.AddColumn(ctx, false, false, aeWrap(expr))
}

// we turn the
// we have to turn the incoming offset to an outgoing offset of the columns this operator is exposing
internalOffset := offsetter(inOffset)

// ok, we have an offset from the input operator. Let's check if we already have it
// in our list of incoming columns

for idx, offset := range hj.ColumnOffsets {
if internalOffset == offset {
return idx
}
}

hj.ColumnOffsets = append(hj.ColumnOffsets, internalOffset)

return len(hj.ColumnOffsets) - 1
}

Expand Down Expand Up @@ -434,17 +423,7 @@ func (hj *HashJoin) addSingleSidedColumn(

// we have to turn the incoming offset to an outgoing offset of the columns this operator is exposing
internalOffset := offsetter(inOffset)

// ok, we have an offset from the input operator. Let's check if we already have it
// in our list of incoming columns
for idx, offset := range hj.ColumnOffsets {
if internalOffset == offset {
return idx
}
}

hj.ColumnOffsets = append(hj.ColumnOffsets, internalOffset)

return len(hj.ColumnOffsets) - 1
}

Expand Down
9 changes: 7 additions & 2 deletions go/vt/vtgate/planbuilder/operators/offset_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ func planOffsets(ctx *plancontext.PlanningContext, root Operator) Operator {
panic(vterrors.VT13001(fmt.Sprintf("should not see %T here", in)))
case offsettable:
newOp := op.planOffsets(ctx)

if newOp == nil {
newOp = op
}
Expand All @@ -47,7 +46,13 @@ func planOffsets(ctx *plancontext.PlanningContext, root Operator) Operator {
fmt.Println("Planned offsets for:")
fmt.Println(ToTree(newOp))
}
return newOp, nil

if newOp == op {
return newOp, nil
} else {
// We got a new operator from plan offsets. We should return that something has changed.
return newOp, Rewrote("planning offsets introduced a new operator")
}
}
return in, NoRewrite
}
Expand Down
93 changes: 54 additions & 39 deletions go/vt/vtgate/planbuilder/testdata/aggr_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -6663,55 +6663,70 @@
"OrderBy": "(4|6) ASC, (5|7) ASC",
"Inputs": [
{
"OperatorType": "Join",
"Variant": "HashLeftJoin",
"Collation": "binary",
"ComparisonType": "INT16",
"JoinColumnIndexes": "-1,1,-2,2,-3,3",
"Predicate": "`user`.col = ue.col",
"TableName": "`user`_user_extra",
"OperatorType": "Projection",
"Expressions": [
"count(*) as count(*)",
"count(*) as count(*)",
"`user`.col as col",
"ue.col as col",
"`user`.foo as foo",
"ue.bar as bar",
"weight_string(`user`.foo) as weight_string(`user`.foo)",
"weight_string(ue.bar) as weight_string(ue.bar)"
],
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select count(*), `user`.col, `user`.foo from `user` where 1 != 1 group by `user`.col, `user`.foo",
"Query": "select count(*), `user`.col, `user`.foo from `user` group by `user`.col, `user`.foo",
"Table": "`user`"
},
{
"OperatorType": "Aggregate",
"Variant": "Ordered",
"Aggregates": "count_star(0)",
"GroupBy": "1, (2|3)",
"ResultColumns": 3,
"OperatorType": "Join",
"Variant": "HashLeftJoin",
"Collation": "binary",
"ComparisonType": "INT16",
"JoinColumnIndexes": "-1,1,-2,2,-3,3,-3,3",
"Predicate": "`user`.col = ue.col",
"TableName": "`user`_user_extra",
"Inputs": [
{
"OperatorType": "SimpleProjection",
"Columns": "2,0,1,3",
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select count(*), `user`.col, `user`.foo from `user` where 1 != 1 group by `user`.col, `user`.foo",
"Query": "select count(*), `user`.col, `user`.foo from `user` group by `user`.col, `user`.foo",
"Table": "`user`"
},
{
"OperatorType": "Aggregate",
"Variant": "Ordered",
"Aggregates": "count_star(0)",
"GroupBy": "1, (2|3)",
"ResultColumns": 3,
"Inputs": [
{
"OperatorType": "Sort",
"Variant": "Memory",
"OrderBy": "0 ASC, (1|3) ASC",
"OperatorType": "SimpleProjection",
"Columns": "2,0,1,3",
"Inputs": [
{
"OperatorType": "Limit",
"Count": "10",
"OperatorType": "Sort",
"Variant": "Memory",
"OrderBy": "0 ASC, (1|3) ASC",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select ue.col, ue.bar, 1, weight_string(ue.bar) from (select col, bar from user_extra where 1 != 1) as ue where 1 != 1",
"Query": "select ue.col, ue.bar, 1, weight_string(ue.bar) from (select col, bar from user_extra) as ue limit 10",
"Table": "user_extra"
"OperatorType": "Limit",
"Count": "10",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select ue.col, ue.bar, 1, weight_string(ue.bar) from (select col, bar from user_extra where 1 != 1) as ue where 1 != 1",
"Query": "select ue.col, ue.bar, 1, weight_string(ue.bar) from (select col, bar from user_extra) as ue limit 10",
"Table": "user_extra"
}
]
}
]
}
Expand Down
113 changes: 113 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/from_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -720,6 +720,119 @@
]
}
},
{
"comment": "Complex query that has hash left join underneath a memory sort and ordered aggregation",
"query": "select 1 from user join user_extra on user.id = user_extra.user_id join music on music.intcol = user_extra.col left join (select user_metadata.col, count(*) as count from user_metadata group by user_metadata.col) um on um.col = user_extra.col where user.id IN (103) group by user_extra.col, music.intcol",
"plan": {
"QueryType": "SELECT",
"Original": "select 1 from user join user_extra on user.id = user_extra.user_id join music on music.intcol = user_extra.col left join (select user_metadata.col, count(*) as count from user_metadata group by user_metadata.col) um on um.col = user_extra.col where user.id IN (103) group by user_extra.col, music.intcol",
"Instructions": {
"OperatorType": "Aggregate",
"Variant": "Ordered",
"Aggregates": "any_value(0) AS 1",
"GroupBy": "1, 4",
"ResultColumns": 1,
"Inputs": [
{
"OperatorType": "Sort",
"Variant": "Memory",
"OrderBy": "1 ASC, 4 ASC",
"Inputs": [
{
"OperatorType": "Join",
"Variant": "HashLeftJoin",
"Collation": "binary",
"ComparisonType": "FLOAT64",
"JoinColumnIndexes": "-1,-2,1,-2,-4,-1",
"Predicate": "user_extra.col = um.col",
"TableName": "music_`user`, user_extra_user_metadata",
"Inputs": [
{
"OperatorType": "Join",
"Variant": "Join",
"JoinColumnIndexes": "L:0,R:0,R:0,L:1",
"JoinVars": {
"music_intcol": 1
},
"TableName": "music_`user`, user_extra",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select 1, music.intcol from music where 1 != 1 group by music.intcol",
"Query": "select 1, music.intcol from music group by music.intcol",
"Table": "music"
},
{
"OperatorType": "Route",
"Variant": "EqualUnique",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select user_extra.col, user_extra.col from `user`, user_extra where 1 != 1 group by user_extra.col",
"Query": "select user_extra.col, user_extra.col from `user`, user_extra where `user`.id in (103) and user_extra.col = :music_intcol /* INT16 */ and `user`.id = user_extra.user_id group by user_extra.col",
"Table": "`user`, user_extra",
"Values": [
"103"
],
"Vindex": "user_index"
}
]
},
{
"OperatorType": "Aggregate",
"Variant": "Ordered",
"GroupBy": "(0|1)",
"ResultColumns": 1,
"Inputs": [
{
"OperatorType": "SimpleProjection",
"Columns": "0,2",
"Inputs": [
{
"OperatorType": "Aggregate",
"Variant": "Ordered",
"Aggregates": "sum_count_star(1) AS count",
"GroupBy": "(0|2)",
"ResultColumns": 3,
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select user_metadata.col, count(*) as `count`, weight_string(user_metadata.col) from user_metadata where 1 != 1 group by user_metadata.col, weight_string(user_metadata.col)",
"OrderBy": "(0|2) ASC",
"Query": "select user_metadata.col, count(*) as `count`, weight_string(user_metadata.col) from user_metadata group by user_metadata.col, weight_string(user_metadata.col) order by user_metadata.col asc",
"Table": "user_metadata"
}
]
}
]
}
]
}
]
}
]
}
]
},
"TablesUsed": [
"user.music",
"user.user",
"user.user_extra",
"user.user_metadata"
]
}
},
{
"comment": "Straight-join (ignores the straight_join hint)",
"query": "select m1.col from unsharded as m1 straight_join unsharded as m2",
Expand Down
6 changes: 6 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/vschemas/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,12 @@
"column": "id",
"name": "music_user_map"
}
],
"columns": [
{
"name": "intcol",
"type": "INT16"
}
]
},
"authoritative": {
Expand Down

0 comments on commit 0b7c0e4

Please sign in to comment.