Skip to content

Commit

Permalink
Subquery inside aggregration function (#14844)
Browse files Browse the repository at this point in the history
Signed-off-by: Harshit Gangal <[email protected]>
  • Loading branch information
harshit-gangal authored Dec 21, 2023
1 parent 155acbd commit 6841090
Show file tree
Hide file tree
Showing 8 changed files with 349 additions and 17 deletions.
17 changes: 17 additions & 0 deletions go/test/endtoend/vtgate/queries/subquery/subquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,3 +162,20 @@ func TestSubqueryInReference(t *testing.T) {
mcmp.AssertMatches(`select (select id1 from t1 where id2 = 30)`, `[[INT64(3)]]`)
mcmp.AssertMatches(`select (select id1 from t1 where id2 = 9)`, `[[NULL]]`)
}

// TestSubqueryInAggregation validates that subquery work inside aggregation functions.
func TestSubqueryInAggregation(t *testing.T) {
utils.SkipIfBinaryIsBelowVersion(t, 19, "vtgate")
mcmp, closer := start(t)
defer closer()

mcmp.Exec("insert into t1(id1, id2) values(0,0),(1,1)")
mcmp.Exec("insert into t2(id3, id4) values(1,2),(5,7)")
mcmp.Exec(`SELECT max((select min(id2) from t1)) FROM t2`)
mcmp.Exec(`SELECT max((select group_concat(id1, id2) from t1 where id1 = 1)) FROM t1 where id1 = 1`)
mcmp.Exec(`SELECT max((select min(id2) from t1 where id2 = 1)) FROM dual`)
mcmp.Exec(`SELECT max((select min(id2) from t1)) FROM t2 where id4 = 7`)

// This fails as the planner adds `weight_string` method which make the query fail on MySQL.
// mcmp.Exec(`SELECT max((select min(id2) from t1 where t1.id1 = t.id1)) FROM t1 t`)
}
4 changes: 4 additions & 0 deletions go/vt/vtgate/planbuilder/operators/aggregation_pushing.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func pushAggregationThroughSubquery(

src.Outer = pushedAggr

for _, aggregation := range pushedAggr.Aggregations {
aggregation.Original.Expr = rewriteColNameToArgument(ctx, aggregation.Original.Expr, aggregation.SubQueryExpression, src.Inner...)
}

if !rootAggr.Original {
return src, Rewrote("push Aggregation under subquery - keep original")
}
Expand Down
18 changes: 18 additions & 0 deletions go/vt/vtgate/planbuilder/operators/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ func (a *Aggregator) planOffsets(ctx *plancontext.PlanningContext) Operator {
if a.offsetPlanned {
return nil
}
a.checkForInvalidAggregations()
defer func() {
a.offsetPlanned = true
}()
Expand Down Expand Up @@ -413,4 +414,21 @@ func (a *Aggregator) introducesTableID() semantics.TableSet {
return a.DT.introducesTableID()
}

func (a *Aggregator) checkForInvalidAggregations() {
for _, aggr := range a.Aggregations {
_ = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
aggrFunc, isAggregate := node.(sqlparser.AggrFunc)
if !isAggregate {
return true, nil
}
args := aggrFunc.GetArgs()
if args != nil && len(args) != 1 {
panic(vterrors.VT03001(sqlparser.String(node)))
}
return true, nil

}, aggr.Original.Expr)
}
}

var _ Operator = (*Aggregator)(nil)
14 changes: 13 additions & 1 deletion go/vt/vtgate/planbuilder/operators/horizon_expanding.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,27 @@ func createProjectionFromSelect(ctx *plancontext.PlanningContext, horizon *Horiz

aggregations, complexAggr := qp.AggregationExpressions(ctx, true)

src := horizon.src()
a := &Aggregator{
Source: horizon.src(),
Source: src,
Original: true,
QP: qp,
Grouping: qp.GetGrouping(),
Aggregations: aggregations,
DT: dt,
}

sqc := &SubQueryBuilder{}
outerID := TableID(src)
for idx, aggr := range aggregations {
expr := aggr.Original.Expr
newExpr, subqs := sqc.pullOutValueSubqueries(ctx, expr, outerID, false)
if newExpr != nil {
aggregations[idx].SubQueryExpression = subqs
}
}
a.Source = sqc.getRootOperator(src, nil)

if complexAggr {
return createProjectionForComplexAggregation(a, qp)
}
Expand Down
18 changes: 2 additions & 16 deletions go/vt/vtgate/planbuilder/operators/queryprojection.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ type (
// the offsets point to columns on the same aggregator
ColOffset int
WSOffset int

SubQueryExpression []*SubQuery
}

AggrRewriter struct {
Expand Down Expand Up @@ -252,7 +254,6 @@ func (qp *QueryProjection) addSelectExpressions(sel *sqlparser.Select) {
for _, selExp := range sel.SelectExprs {
switch selExp := selExp.(type) {
case *sqlparser.AliasedExpr:
checkForInvalidAggregations(selExp)
col := SelectExpr{
Col: selExp,
}
Expand Down Expand Up @@ -403,21 +404,6 @@ func (qp *QueryProjection) GetGrouping() []GroupBy {
return slices.Clone(qp.groupByExprs)
}

func checkForInvalidAggregations(exp *sqlparser.AliasedExpr) {
_ = sqlparser.Walk(func(node sqlparser.SQLNode) (kontinue bool, err error) {
aggrFunc, isAggregate := node.(sqlparser.AggrFunc)
if !isAggregate {
return true, nil
}
args := aggrFunc.GetArgs()
if args != nil && len(args) != 1 {
panic(vterrors.VT03001(sqlparser.String(node)))
}
return true, nil

}, exp.Expr)
}

func (qp *QueryProjection) isExprInGroupByExprs(ctx *plancontext.PlanningContext, expr sqlparser.Expr) bool {
for _, groupByExpr := range qp.groupByExprs {
if ctx.SemTable.EqualsExprWithDeps(groupByExpr.SimplifiedExpr, expr) {
Expand Down
7 changes: 7 additions & 0 deletions go/vt/vtgate/planbuilder/operators/subquery_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,13 @@ func settleSubqueries(ctx *plancontext.PlanningContext, op Operator) Operator {
for _, setExpr := range op.Assignments {
mergeSubqueryExpr(ctx, setExpr.Expr)
}
case *Aggregator:
for _, aggr := range op.Aggregations {
newExpr, rewritten := rewriteMergedSubqueryExpr(ctx, aggr.SubQueryExpression, aggr.Original.Expr)
if rewritten {
aggr.Original.Expr = newExpr
}
}
}
return op, NoRewrite
}
Expand Down
Loading

0 comments on commit 6841090

Please sign in to comment.