Skip to content

Commit

Permalink
planner feat: push down aggregations through hash joins
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay committed Dec 7, 2023
1 parent d1163ec commit 9d37cc8
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 4 deletions.
48 changes: 48 additions & 0 deletions go/vt/vtgate/planbuilder/operators/aggregation_pushing.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func tryPushAggregator(ctx *plancontext.PlanningContext, aggregator *Aggregator)
output, applyResult = pushAggregationThroughRoute(ctx, aggregator, src)
case *ApplyJoin:
output, applyResult = pushAggregationThroughApplyJoin(ctx, aggregator, src)
case *HashJoin:
output, applyResult = pushAggregationThroughHashJoin(ctx, aggregator, src)
case *Filter:
output, applyResult = pushAggregationThroughFilter(ctx, aggregator, src)
case *SubQueryContainer:
Expand Down Expand Up @@ -394,6 +396,52 @@ func pushAggregationThroughApplyJoin(ctx *plancontext.PlanningContext, rootAggr
return rootAggr, Rewrote("push Aggregation under join")
}

func pushAggregationThroughHashJoin(ctx *plancontext.PlanningContext, rootAggr *Aggregator, join *HashJoin) (Operator, *ApplyResult) {
lhs := &joinPusher{
orig: rootAggr,
pushed: &Aggregator{
Source: join.LHS,
QP: rootAggr.QP,
},
columns: initColReUse(len(rootAggr.Columns)),
tableID: TableID(join.LHS),
}
rhs := &joinPusher{
orig: rootAggr,
pushed: &Aggregator{
Source: join.RHS,
QP: rootAggr.QP,
},
columns: initColReUse(len(rootAggr.Columns)),
tableID: TableID(join.RHS),
}

columns := &hashJoinColumns{}
output, err := splitAggrColumnsToLeftAndRight(ctx, rootAggr, join, join.LeftJoin, columns, lhs, rhs)
if err != nil {
// if we get this error, we just abort the splitting and fall back on simpler ways of solving the same query
if errors.Is(err, errAbortAggrPushing) {
return nil, nil
}
panic(err)
}

if len(rootAggr.Grouping) > 0 {
return nil, nil
}
join.LHS, join.RHS = lhs.pushed, rhs.pushed

if !rootAggr.Original {
// we only keep the root aggregation, if this aggregator was created
// by splitting one and pushing under a join, we can get rid of this one
return output, Rewrote("push Aggregation under hash join - keep original")
}

rootAggr.aggregateTheAggregates()
rootAggr.Source = output
return rootAggr, Rewrote("push Aggregation under hash join")
}

var errAbortAggrPushing = fmt.Errorf("abort aggregation pushing")

func addColumnsFromLHSInJoinPredicates(ctx *plancontext.PlanningContext, rootAggr *Aggregator, join *ApplyJoin, lhs *joinPusher) {
Expand Down
11 changes: 7 additions & 4 deletions go/vt/vtgate/planbuilder/operators/query_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func tryPushProjection(
if !p.canPush(ctx) {
return p, NoRewrite
}
return pushProjectionThroughHashJoin(p, src)
return pushProjectionThroughHashJoin(ctx, p, src)
case *Vindex:
if !p.canPush(ctx) {
return p, NoRewrite
Expand All @@ -216,10 +216,13 @@ func tryPushProjection(
}
}

func pushProjectionThroughHashJoin(p *Projection, hj *HashJoin) (Operator, *ApplyResult) {
cols := p.Columns.GetColumns()
func pushProjectionThroughHashJoin(ctx *plancontext.PlanningContext, p *Projection, hj *HashJoin) (Operator, *ApplyResult) {
cols := p.Columns.(AliasedProjections)
for _, col := range cols {
hj.columns.add(col.Expr)
if !col.isSameInAndOut(ctx) {
return p, NoRewrite
}
hj.columns.add(col.ColExpr)
}
return hj, Rewrote("merged projection into hash join")
}
Expand Down
82 changes: 82 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/aggr_cases.json
Original file line number Diff line number Diff line change
Expand Up @@ -6442,5 +6442,87 @@
"user.user"
]
}
},
{
"comment": "count(*) push down through left hash join",
"query": "select count(*) from user left join (select col from user_extra limit 10) ue on user.col = ue.col",
"plan": {
"QueryType": "SELECT",
"Original": "select count(*) from user left join (select col from user_extra limit 10) ue on user.col = ue.col",
"Instructions": {
"OperatorType": "Aggregate",
"Variant": "Scalar",
"Aggregates": "sum_count_star(0) AS count(*)",
"Inputs": [
{
"OperatorType": "Projection",
"Expressions": [
"count(*) * coalesce(count(*), 1) as count(*)"
],
"Inputs": [
{
"OperatorType": "Join",
"Variant": "HashLeftJoin",
"Collation": "binary",
"ComparisonType": "INT16",
"JoinColumnIndexes": "-1,1",
"Predicate": "`user`.col = ue.col",
"TableName": "`user`_user_extra",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select count(*), `user`.col from `user` where 1 != 1",
"Query": "select count(*), `user`.col from `user`",
"Table": "`user`"
},
{
"OperatorType": "Aggregate",
"Variant": "Scalar",
"Aggregates": "count_star(0), any_value(1)",
"Inputs": [
{
"OperatorType": "SimpleProjection",
"Columns": [
1,
0
],
"Inputs": [
{
"OperatorType": "Limit",
"Count": "10",
"Inputs": [
{
"OperatorType": "Route",
"Variant": "Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select col, 1 from (select col from user_extra where 1 != 1) as ue where 1 != 1",
"Query": "select col, 1 from (select col from user_extra) as ue limit :__upper_limit",
"Table": "user_extra"
}
]
}
]
}
]
}
]
}
]
}
]
},
"TablesUsed": [
"user.user",
"user.user_extra"
]
}
}
]

0 comments on commit 9d37cc8

Please sign in to comment.