From ff92fb76ef95e3323e0aeb6ae78c6ea8e542d18e Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Fri, 22 Dec 2023 12:54:41 +0530 Subject: [PATCH 1/5] allow aggregation function with multiple agruments to be pushed down Signed-off-by: Harshit Gangal --- .../queries/aggregation/aggregation_test.go | 2 + .../operators/aggregation_pushing.go | 2 +- .../planbuilder/operators/aggregator.go | 14 ++-- .../planbuilder/testdata/aggr_cases.json | 77 +++++++++++++++++-- .../testdata/unsupported_cases.json | 11 +-- 5 files changed, 85 insertions(+), 21 deletions(-) diff --git a/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go b/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go index 8e234063e10..bda729e21ea 100644 --- a/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go +++ b/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go @@ -574,6 +574,8 @@ func TestGroupConcatAggregation(t *testing.T) { compareRow(t, mQr, vtQr, nil, []int{0}) mQr, vtQr = mcmp.ExecNoCompare(`SELECT group_concat(value), t1.name FROM t1, t2 group by t1.name`) compareRow(t, mQr, vtQr, []int{1}, []int{0}) + mQr, vtQr = mcmp.ExecNoCompare(`SELECT group_concat(name, value) FROM t1`) + compareRow(t, mQr, vtQr, nil, []int{0}) } func compareRow(t *testing.T, mRes *sqltypes.Result, vtRes *sqltypes.Result, grpCols []int, fCols []int) { diff --git a/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go b/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go index 567936b8a84..77fe5148e9e 100644 --- a/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go +++ b/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go @@ -204,7 +204,7 @@ func checkIfWeCanPush(ctx *plancontext.PlanningContext, aggregator *Aggregator) continue } - innerExpr := aggr.Func.GetArg() + innerExpr := aggr.getPushColumn() if !exprHasUniqueVindex(ctx, innerExpr) { canPush = false } diff --git a/go/vt/vtgate/planbuilder/operators/aggregator.go b/go/vt/vtgate/planbuilder/operators/aggregator.go index 02e19d57654..3e577cfbb0b 100644 --- a/go/vt/vtgate/planbuilder/operators/aggregator.go +++ b/go/vt/vtgate/planbuilder/operators/aggregator.go @@ -255,7 +255,6 @@ func (a *Aggregator) planOffsets(ctx *plancontext.PlanningContext) Operator { if a.offsetPlanned { return nil } - a.checkForInvalidAggregations() defer func() { a.offsetPlanned = true }() @@ -281,7 +280,8 @@ func (a *Aggregator) planOffsets(ctx *plancontext.PlanningContext) Operator { if !aggr.NeedsWeightString(ctx) { continue } - offset := a.internalAddColumn(ctx, aeWrap(weightStringFor(aggr.Func.GetArg())), true) + arg := aggr.getPushColumn() + offset := a.internalAddColumn(ctx, aeWrap(weightStringFor(arg)), true) a.Aggregations[idx].WSOffset = offset } return nil @@ -295,10 +295,13 @@ func (aggr Aggr) getPushColumn() sqlparser.Expr { return sqlparser.NewIntLiteral("1") case opcode.AggregateGroupConcat: if len(aggr.Func.GetArgs()) > 1 { - panic("more than 1 column") + panic(vterrors.VT12001("group_concat with more than 1 column")) } - fallthrough + return aggr.Func.GetArg() default: + if len(aggr.Func.GetArgs()) > 1 { + panic(vterrors.VT03001(sqlparser.String(aggr.Func))) + } return aggr.Func.GetArg() } } @@ -380,7 +383,8 @@ func (a *Aggregator) pushRemainingGroupingColumnsAndWeightStrings(ctx *planconte continue } - offset := a.internalAddColumn(ctx, aeWrap(weightStringFor(aggr.Func.GetArg())), false) + arg := aggr.getPushColumn() + offset := a.internalAddColumn(ctx, aeWrap(weightStringFor(arg)), false) a.Aggregations[idx].WSOffset = offset } } diff --git a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json index 0e068283d5c..3b64d223f5e 100644 --- a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json @@ -6564,7 +6564,7 @@ } }, { - "comment": "select max((select min(col) from user where id = 1))", + "comment": "sharded subquery inside aggregation function on a dual table", "query": "select max((select min(col) from user where id = 1))", "plan": { "QueryType": "SELECT", @@ -6591,7 +6591,7 @@ } }, { - "comment": "select max((select min(col) from unsharded)) from user where id = 1", + "comment": "unsharded subquery inside aggregation function on a sharded table", "query": "select max((select min(col) from unsharded)) from user where id = 1", "plan": { "QueryType": "SELECT", @@ -6648,7 +6648,7 @@ } }, { - "comment": "select max((select min(col) from user where id = 1)) from user where id = 2", + "comment": "sharded subquery inside aggregation function on a sharded table on different vindex value", "query": "select max((select min(col) from user where id = 1)) from user where id = 2", "plan": { "QueryType": "SELECT", @@ -6708,7 +6708,7 @@ } }, { - "comment": "select max((select group_concat(col1, col2) from user where id = 1))", + "comment": "sharded subquery inside group_concat multi-column aggregation function on a dual table", "query": "select max((select group_concat(col1, col2) from user where id = 1))", "plan": { "QueryType": "SELECT", @@ -6735,7 +6735,7 @@ } }, { - "comment": "select max((select group_concat(col1, col2) from user where id = 1)) from user where id = 1", + "comment": "sharded subquery inside group_concat multi-column aggregation function on a sharded table on same vindex value", "query": "select max((select group_concat(col1, col2) from user where id = 1)) from user where id = 1", "plan": { "QueryType": "SELECT", @@ -6761,7 +6761,7 @@ } }, { - "comment": "select max((select group_concat(col1, col2) from user where id = 1)) from user", + "comment": "sharded subquery inside group_concat multi-column aggregation function on a sharded table", "query": "select max((select group_concat(col1, col2) from user where id = 1)) from user", "plan": { "QueryType": "SELECT", @@ -6817,7 +6817,7 @@ } }, { - "comment": "select max((select group_concat(col1, col2) from user where id = 1)) from user", + "comment": "sharded correlated subquery inside aggregation function on a sharded table on same vindex", "query": "select max((select max(col2) from user u1 where u1.id = u2.id)) from user u2", "plan": { "QueryType": "SELECT", @@ -6845,5 +6845,68 @@ "user.user" ] } + }, + { + "comment": "count aggregation function having multiple column", + "query": "select count(distinct user_id, name) from user", + "plan": "VT03001: aggregate functions take a single argument 'count(distinct user_id, `name`)'" + }, + { + "comment": "Multi-value aggregates pushed as function without splitting", + "query": "select count(a,b) from user", + "plan": { + "QueryType": "SELECT", + "Original": "select count(a,b) from user", + "Instructions": { + "OperatorType": "Aggregate", + "Variant": "Scalar", + "Aggregates": "sum_count(0) AS count(a, b)", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select count(a, b) from `user` where 1 != 1", + "Query": "select count(a, b) from `user`", + "Table": "`user`" + } + ] + }, + "TablesUsed": [ + "user.user" + ] + } + }, + { + "comment": "group_concat with multi column - pushed without splitting", + "query": "select group_concat(col1, col2) from user", + "plan": { + "QueryType": "SELECT", + "Original": "select group_concat(col1, col2) from user", + "Instructions": { + "OperatorType": "Aggregate", + "Variant": "Scalar", + "Aggregates": "group_concat(0) AS group_concat(col1, col2)", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select group_concat(col1, col2) from `user` where 1 != 1", + "Query": "select group_concat(col1, col2) from `user`", + "Table": "`user`" + } + ] + }, + "TablesUsed": [ + "user.user" + ] + } } ] diff --git a/go/vt/vtgate/planbuilder/testdata/unsupported_cases.json b/go/vt/vtgate/planbuilder/testdata/unsupported_cases.json index e93523710dd..3aff03b2645 100644 --- a/go/vt/vtgate/planbuilder/testdata/unsupported_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/unsupported_cases.json @@ -14,11 +14,6 @@ "query": "select * from user natural right join user_extra", "plan": "VT12001: unsupported: natural right join" }, - { - "comment": "Multi-value aggregates not supported", - "query": "select count(a,b) from user", - "plan": "VT03001: aggregate functions take a single argument 'count(a, b)'" - }, { "comment": "subqueries not supported in group by", "query": "select id from user group by id, (select id from user_extra)", @@ -415,8 +410,8 @@ "plan": "VT12001: unsupported: DELETE on reference table with join" }, { - "comment": "select max((select group_concat(col1, col2) from user where id = 1)) from user", - "query": "select group_concat(col1, col2) from user", - "plan": "VT03001: aggregate functions take a single argument 'group_concat(col1, col2)'" + "comment": "group_concat unsupported when needs full evaluation at vtgate with more than 1 column", + "query": "select group_concat(user.col1, music.col2) x from user join music on user.col = music.col order by x", + "plan": "VT12001: unsupported: group_concat with more than 1 column" } ] From 0abbf03cb3292958e015804eefa6b55c082d4c42 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Fri, 22 Dec 2023 16:15:21 +0530 Subject: [PATCH 2/5] allow distinct aggregation function to allow multiple column when can push down Signed-off-by: Harshit Gangal --- .../queries/aggregation/aggregation_test.go | 2 +- .../operators/aggregation_pushing.go | 54 +++++++++++++------ .../planbuilder/testdata/aggr_cases.json | 39 ++++++++++---- .../testdata/unsupported_cases.json | 10 ++++ 4 files changed, 79 insertions(+), 26 deletions(-) diff --git a/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go b/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go index bda729e21ea..413460809f4 100644 --- a/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go +++ b/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go @@ -628,7 +628,7 @@ func TestDistinctAggregation(t *testing.T) { }, { query: `SELECT a.value, SUM(DISTINCT b.t1_id), min(DISTINCT a.t1_id) FROM t1 a, t1 b group by a.value`, }, { - query: `SELECT distinct count(*) from t1, (select distinct count(*) from t1) as t2`, + query: `SELECT count(distinct name, shardkey) from t1`, }} for _, tc := range tcases { diff --git a/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go b/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go index 77fe5148e9e..11ebebc75a1 100644 --- a/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go +++ b/go/vt/vtgate/planbuilder/operators/aggregation_pushing.go @@ -27,6 +27,13 @@ import ( "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" ) +func errDistinctAggrWithMultiExpr(f sqlparser.AggrFunc) { + if f == nil { + panic(vterrors.VT12001("distinct aggregation function with multiple expressions")) + } + panic(vterrors.VT12001(fmt.Sprintf("distinct aggregation function with multiple expressions '%s'", sqlparser.String(f)))) +} + func tryPushAggregator(ctx *plancontext.PlanningContext, aggregator *Aggregator) (output Operator, applyResult *ApplyResult) { if aggregator.Pushed { return aggregator, NoRewrite @@ -162,7 +169,7 @@ func pushAggregationThroughRoute( // pushAggregations splits aggregations between the original aggregator and the one we are pushing down func pushAggregations(ctx *plancontext.PlanningContext, aggregator *Aggregator, aggrBelowRoute *Aggregator) { - canPushDistinctAggr, distinctExpr := checkIfWeCanPush(ctx, aggregator) + canPushDistinctAggr, distinctExprs := checkIfWeCanPush(ctx, aggregator) distinctAggrGroupByAdded := false @@ -173,16 +180,20 @@ func pushAggregations(ctx *plancontext.PlanningContext, aggregator *Aggregator, continue } + if len(distinctExprs) != 1 { + errDistinctAggrWithMultiExpr(aggr.Func) + } + // We handle a distinct aggregation by turning it into a group by and // doing the aggregating on the vtgate level instead - aeDistinctExpr := aeWrap(distinctExpr) + aeDistinctExpr := aeWrap(distinctExprs[0]) aggrBelowRoute.Columns[aggr.ColOffset] = aeDistinctExpr // We handle a distinct aggregation by turning it into a group by and // doing the aggregating on the vtgate level instead // Adding to group by can be done only once even though there are multiple distinct aggregation with same expression. if !distinctAggrGroupByAdded { - groupBy := NewGroupBy(distinctExpr, distinctExpr) + groupBy := NewGroupBy(distinctExprs[0], distinctExprs[0]) groupBy.ColOffset = aggr.ColOffset aggrBelowRoute.Grouping = append(aggrBelowRoute.Grouping, groupBy) distinctAggrGroupByAdded = true @@ -190,13 +201,13 @@ func pushAggregations(ctx *plancontext.PlanningContext, aggregator *Aggregator, } if !canPushDistinctAggr { - aggregator.DistinctExpr = distinctExpr + aggregator.DistinctExpr = distinctExprs[0] } } -func checkIfWeCanPush(ctx *plancontext.PlanningContext, aggregator *Aggregator) (bool, sqlparser.Expr) { +func checkIfWeCanPush(ctx *plancontext.PlanningContext, aggregator *Aggregator) (bool, sqlparser.Exprs) { canPush := true - var distinctExpr sqlparser.Expr + var distinctExprs sqlparser.Exprs var differentExpr *sqlparser.AliasedExpr for _, aggr := range aggregator.Aggregations { @@ -204,15 +215,25 @@ func checkIfWeCanPush(ctx *plancontext.PlanningContext, aggregator *Aggregator) continue } - innerExpr := aggr.getPushColumn() - if !exprHasUniqueVindex(ctx, innerExpr) { + args := aggr.Func.GetArgs() + hasUniqVindex := false + for _, arg := range args { + if exprHasUniqueVindex(ctx, arg) { + hasUniqVindex = true + break + } + } + if !hasUniqVindex { canPush = false } - if distinctExpr == nil { - distinctExpr = innerExpr + if len(distinctExprs) == 0 { + distinctExprs = args } - if !ctx.SemTable.EqualsExpr(distinctExpr, innerExpr) { - differentExpr = aggr.Original + for idx, expr := range distinctExprs { + if !ctx.SemTable.EqualsExpr(expr, args[idx]) { + differentExpr = aggr.Original + break + } } } @@ -220,7 +241,7 @@ func checkIfWeCanPush(ctx *plancontext.PlanningContext, aggregator *Aggregator) panic(vterrors.VT12001(fmt.Sprintf("only one DISTINCT aggregation is allowed in a SELECT: %s", sqlparser.String(differentExpr)))) } - return canPush, distinctExpr + return canPush, distinctExprs } func pushAggregationThroughFilter( @@ -530,12 +551,15 @@ func splitAggrColumnsToLeftAndRight( outerJoin: leftJoin, } - canPushDistinctAggr, distinctExpr := checkIfWeCanPush(ctx, aggregator) + canPushDistinctAggr, distinctExprs := checkIfWeCanPush(ctx, aggregator) // Distinct aggregation cannot be pushed down in the join. // We keep node of the distinct aggregation expression to be used later for ordering. if !canPushDistinctAggr { - aggregator.DistinctExpr = distinctExpr + if len(distinctExprs) != 1 { + errDistinctAggrWithMultiExpr(nil) + } + aggregator.DistinctExpr = distinctExprs[0] return nil, errAbortAggrPushing } diff --git a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json index 3b64d223f5e..a3a31d6ad73 100644 --- a/go/vt/vtgate/planbuilder/testdata/aggr_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/aggr_cases.json @@ -2843,11 +2843,6 @@ ] } }, - { - "comment": "select count(distinct user_id, name) from user", - "query": "select count(distinct user_id, name) from user", - "plan": "VT03001: aggregate functions take a single argument 'count(distinct user_id, `name`)'" - }, { "comment": "select sum(col) from (select user.col as col, 32 from user join user_extra) t", "query": "select sum(col) from (select user.col as col, 32 from user join user_extra) t", @@ -6846,11 +6841,6 @@ ] } }, - { - "comment": "count aggregation function having multiple column", - "query": "select count(distinct user_id, name) from user", - "plan": "VT03001: aggregate functions take a single argument 'count(distinct user_id, `name`)'" - }, { "comment": "Multi-value aggregates pushed as function without splitting", "query": "select count(a,b) from user", @@ -6908,5 +6898,34 @@ "user.user" ] } + }, + { + "comment": "", + "query": "select count(distinct name, id) from user", + "plan": { + "QueryType": "SELECT", + "Original": "select count(distinct name, id) from user", + "Instructions": { + "OperatorType": "Aggregate", + "Variant": "Scalar", + "Aggregates": "sum_count_distinct(0) AS count(distinct `name`, id)", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select count(distinct `name`, id) from `user` where 1 != 1", + "Query": "select count(distinct `name`, id) from `user`", + "Table": "`user`" + } + ] + }, + "TablesUsed": [ + "user.user" + ] + } } ] diff --git a/go/vt/vtgate/planbuilder/testdata/unsupported_cases.json b/go/vt/vtgate/planbuilder/testdata/unsupported_cases.json index 3aff03b2645..3765ae9b278 100644 --- a/go/vt/vtgate/planbuilder/testdata/unsupported_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/unsupported_cases.json @@ -413,5 +413,15 @@ "comment": "group_concat unsupported when needs full evaluation at vtgate with more than 1 column", "query": "select group_concat(user.col1, music.col2) x from user join music on user.col = music.col order by x", "plan": "VT12001: unsupported: group_concat with more than 1 column" + }, + { + "comment": "count aggregation function having multiple column", + "query": "select count(distinct user_id, name) from user", + "plan": "VT12001: unsupported: distinct aggregation function with multiple expressions 'count(distinct user_id, `name`)'" + }, + { + "comment": "count and sum distinct on different columns", + "query": "SELECT COUNT(DISTINCT col), SUM(DISTINCT id) FROM user", + "plan": "VT12001: unsupported: only one DISTINCT aggregation is allowed in a SELECT: sum(distinct id)" } ] From 577ce7716eb15c3534bcff24bd9cdbbfa640ad54 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Fri, 22 Dec 2023 16:30:32 +0530 Subject: [PATCH 3/5] skip test for lower vtgate version Signed-off-by: Harshit Gangal --- .../vtgate/queries/aggregation/aggregation_test.go | 12 +++++++++++- .../vtgate/queries/subquery/subquery_test.go | 1 - 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go b/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go index 413460809f4..310abedd09a 100644 --- a/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go +++ b/go/test/endtoend/vtgate/queries/aggregation/aggregation_test.go @@ -574,6 +574,10 @@ func TestGroupConcatAggregation(t *testing.T) { compareRow(t, mQr, vtQr, nil, []int{0}) mQr, vtQr = mcmp.ExecNoCompare(`SELECT group_concat(value), t1.name FROM t1, t2 group by t1.name`) compareRow(t, mQr, vtQr, []int{1}, []int{0}) + if versionMet := utils.BinaryIsAtLeastAtVersion(19, "vtgate"); !versionMet { + // skipping + return + } mQr, vtQr = mcmp.ExecNoCompare(`SELECT group_concat(name, value) FROM t1`) compareRow(t, mQr, vtQr, nil, []int{0}) } @@ -615,6 +619,7 @@ func TestDistinctAggregation(t *testing.T) { tcases := []struct { query string expectedErr string + minVersion int }{{ query: `SELECT COUNT(DISTINCT value), SUM(DISTINCT shardkey) FROM t1`, expectedErr: "VT12001: unsupported: only one DISTINCT aggregation is allowed in a SELECT: sum(distinct shardkey) (errno 1235) (sqlstate 42000)", @@ -628,10 +633,15 @@ func TestDistinctAggregation(t *testing.T) { }, { query: `SELECT a.value, SUM(DISTINCT b.t1_id), min(DISTINCT a.t1_id) FROM t1 a, t1 b group by a.value`, }, { - query: `SELECT count(distinct name, shardkey) from t1`, + minVersion: 19, + query: `SELECT count(distinct name, shardkey) from t1`, }} for _, tc := range tcases { + if versionMet := utils.BinaryIsAtLeastAtVersion(tc.minVersion, "vtgate"); !versionMet { + // skipping + continue + } mcmp.Run(tc.query, func(mcmp *utils.MySQLCompare) { _, err := mcmp.ExecAllowError(tc.query) if tc.expectedErr == "" { diff --git a/go/test/endtoend/vtgate/queries/subquery/subquery_test.go b/go/test/endtoend/vtgate/queries/subquery/subquery_test.go index e849f926d73..f9884980e27 100644 --- a/go/test/endtoend/vtgate/queries/subquery/subquery_test.go +++ b/go/test/endtoend/vtgate/queries/subquery/subquery_test.go @@ -165,7 +165,6 @@ func TestSubqueryInReference(t *testing.T) { // TestSubqueryInAggregation validates that subquery work inside aggregation functions. func TestSubqueryInAggregation(t *testing.T) { - utils.SkipIfBinaryIsBelowVersion(t, 19, "vtgate") mcmp, closer := start(t) defer closer() From 66966589571760caca5da823f67592727f985960 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Fri, 22 Dec 2023 17:49:42 +0530 Subject: [PATCH 4/5] skip TestSubqueryInAggregation for v18 vtgate version Signed-off-by: Harshit Gangal --- go/test/endtoend/vtgate/queries/subquery/subquery_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/test/endtoend/vtgate/queries/subquery/subquery_test.go b/go/test/endtoend/vtgate/queries/subquery/subquery_test.go index f9884980e27..e849f926d73 100644 --- a/go/test/endtoend/vtgate/queries/subquery/subquery_test.go +++ b/go/test/endtoend/vtgate/queries/subquery/subquery_test.go @@ -165,6 +165,7 @@ func TestSubqueryInReference(t *testing.T) { // TestSubqueryInAggregation validates that subquery work inside aggregation functions. func TestSubqueryInAggregation(t *testing.T) { + utils.SkipIfBinaryIsBelowVersion(t, 19, "vtgate") mcmp, closer := start(t) defer closer() From e60f2f7023ee8a591695c29907deb47b2e18ea75 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Fri, 22 Dec 2023 17:55:13 +0530 Subject: [PATCH 5/5] refactor: aggregation creation Signed-off-by: Harshit Gangal --- .../operators/horizon_expanding.go | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/go/vt/vtgate/planbuilder/operators/horizon_expanding.go b/go/vt/vtgate/planbuilder/operators/horizon_expanding.go index 86b9ab3ceb6..300e4ef36b9 100644 --- a/go/vt/vtgate/planbuilder/operators/horizon_expanding.go +++ b/go/vt/vtgate/planbuilder/operators/horizon_expanding.go @@ -116,7 +116,7 @@ func expandSelectHorizon(ctx *plancontext.PlanningContext, horizon *Horizon, sel return op, Rewrote(fmt.Sprintf("expand SELECT horizon into (%s)", strings.Join(extracted, ", "))) } -func createProjectionFromSelect(ctx *plancontext.PlanningContext, horizon *Horizon) (out Operator) { +func createProjectionFromSelect(ctx *plancontext.PlanningContext, horizon *Horizon) Operator { qp := horizon.getQP(ctx) var dt *DerivedTable @@ -131,15 +131,15 @@ func createProjectionFromSelect(ctx *plancontext.PlanningContext, horizon *Horiz if !qp.NeedsAggregation() { projX := createProjectionWithoutAggr(ctx, qp, horizon.src()) projX.DT = dt - out = projX - - return out + return projX } - aggregations, complexAggr := qp.AggregationExpressions(ctx, true) + return createProjectionWithAggr(ctx, qp, dt, horizon.src()) +} - src := horizon.src() - a := &Aggregator{ +func createProjectionWithAggr(ctx *plancontext.PlanningContext, qp *QueryProjection, dt *DerivedTable, src Operator) Operator { + aggregations, complexAggr := qp.AggregationExpressions(ctx, true) + aggrOp := &Aggregator{ Source: src, Original: true, QP: qp, @@ -148,6 +148,7 @@ func createProjectionFromSelect(ctx *plancontext.PlanningContext, horizon *Horiz DT: dt, } + // Go through all aggregations and check for any subquery. sqc := &SubQueryBuilder{} outerID := TableID(src) for idx, aggr := range aggregations { @@ -157,12 +158,13 @@ func createProjectionFromSelect(ctx *plancontext.PlanningContext, horizon *Horiz aggregations[idx].SubQueryExpression = subqs } } - a.Source = sqc.getRootOperator(src, nil) + aggrOp.Source = sqc.getRootOperator(src, nil) + // create the projection columns from aggregator. if complexAggr { - return createProjectionForComplexAggregation(a, qp) + return createProjectionForComplexAggregation(aggrOp, qp) } - return createProjectionForSimpleAggregation(ctx, a, qp) + return createProjectionForSimpleAggregation(ctx, aggrOp, qp) } func createProjectionForSimpleAggregation(ctx *plancontext.PlanningContext, a *Aggregator, qp *QueryProjection) Operator {