From 94259cd079cfae26270736373a6474748ce0c208 Mon Sep 17 00:00:00 2001 From: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> Date: Mon, 6 May 2024 20:21:46 +0530 Subject: [PATCH] Fix CTE query by fixing bindvar pushing in unions (#15838) Signed-off-by: Manan Gupta --- .../vtgate/queries/derived/cte_test.go | 24 ++++ .../operators/horizon_expanding.go | 3 + go/vt/vtgate/planbuilder/operators/union.go | 17 +++ .../planbuilder/testdata/cte_cases.json | 133 ++++++++++++++++++ 4 files changed, 177 insertions(+) diff --git a/go/test/endtoend/vtgate/queries/derived/cte_test.go b/go/test/endtoend/vtgate/queries/derived/cte_test.go index 61ddf5d6661..54d97261ae6 100644 --- a/go/test/endtoend/vtgate/queries/derived/cte_test.go +++ b/go/test/endtoend/vtgate/queries/derived/cte_test.go @@ -66,3 +66,27 @@ func TestCTEColumns(t *testing.T) { mcmp.AssertMatches(`with t(id) as (SELECT id FROM user) SELECT t.id FROM t ORDER BY t.id DESC`, `[[INT64(5)] [INT64(4)] [INT64(3)] [INT64(2)] [INT64(1)]]`) } + +func TestCTEAggregationsInUnion(t *testing.T) { + utils.SkipIfBinaryIsBelowVersion(t, 20, "vtgate") + mcmp, closer := start(t) + defer closer() + + mcmp.AssertMatches(`WITH toto AS (SELECT COUNT(*) as num + FROM (SELECT user.id + FROM user + WHERE user.name = 'toto' + LIMIT 1000) t LIMIT 1 ), + tata AS (SELECT COUNT(*) as num + FROM (SELECT user.id + FROM user + WHERE user.name = 'tata' + LIMIT 1000) t LIMIT 1), + total AS (SELECT LEAST(1000, SUM(num)) AS num + FROM (SELECT num + FROM toto + UNION ALL SELECT num + FROM tata) t LIMIT 1) +SELECT 'total' AS tab, num +FROM total`, `[[VARCHAR("total") DECIMAL(2)]]`) +} diff --git a/go/vt/vtgate/planbuilder/operators/horizon_expanding.go b/go/vt/vtgate/planbuilder/operators/horizon_expanding.go index 68880bef90b..ae53bde8b0f 100644 --- a/go/vt/vtgate/planbuilder/operators/horizon_expanding.go +++ b/go/vt/vtgate/planbuilder/operators/horizon_expanding.go @@ -235,6 +235,9 @@ outer: func createProjectionForComplexAggregation(a *Aggregator, qp *QueryProjection) Operator { p := newAliasedProjection(a) p.DT = a.DT + // We don't want to keep the derived table in both Aggregator and Projection. + // If we do, then we end up re-writing the same column twice. + a.DT = nil for _, expr := range qp.SelectExprs { ae, err := expr.GetAliasedExpr() if err != nil { diff --git a/go/vt/vtgate/planbuilder/operators/union.go b/go/vt/vtgate/planbuilder/operators/union.go index d46ee609812..fedfc362017 100644 --- a/go/vt/vtgate/planbuilder/operators/union.go +++ b/go/vt/vtgate/planbuilder/operators/union.go @@ -193,11 +193,28 @@ func (u *Union) AddColumn(ctx *plancontext.PlanningContext, reuse bool, gb bool, } return u.addWeightStringToOffset(ctx, argIdx) + case *sqlparser.Literal, *sqlparser.Argument: + return u.addConstantToUnion(ctx, expr) default: panic(vterrors.VT13001(fmt.Sprintf("only weight_string function is expected - got %s", sqlparser.String(expr)))) } } +func (u *Union) addConstantToUnion(ctx *plancontext.PlanningContext, aexpr *sqlparser.AliasedExpr) (outputOffset int) { + for i, src := range u.Sources { + thisOffset := src.AddColumn(ctx, true, false, aexpr) + // all offsets for the newly added ws need to line up + if i == 0 { + outputOffset = thisOffset + } else { + if thisOffset != outputOffset { + panic(vterrors.VT12001("argument offsets did not line up for UNION")) + } + } + } + return +} + func (u *Union) addWeightStringToOffset(ctx *plancontext.PlanningContext, argIdx int) (outputOffset int) { for i, src := range u.Sources { thisOffset := src.AddWSColumn(ctx, argIdx, false) diff --git a/go/vt/vtgate/planbuilder/testdata/cte_cases.json b/go/vt/vtgate/planbuilder/testdata/cte_cases.json index ed2f9221c83..e9abf06098a 100644 --- a/go/vt/vtgate/planbuilder/testdata/cte_cases.json +++ b/go/vt/vtgate/planbuilder/testdata/cte_cases.json @@ -1957,5 +1957,138 @@ "user.user_metadata" ] } + }, + { + "comment": "CTE expression using unions and complex aggregation with literal", + "query": "WITH `open` AS (SELECT COUNT(*) as `num` FROM (SELECT `user`.`id` FROM `user` WHERE `user`.`textcol1` = 'open' AND `user`.`intcol` = 1 LIMIT 1000) `t` LIMIT 1 ), `closed` AS (SELECT COUNT(*) as `num` FROM ( SELECT `user`.`id` FROM `user` WHERE `user`.`textcol1` = 'closed' AND `user`.`intcol` = 1 LIMIT 1000) `t` LIMIT 1 ), `all` AS (SELECT LEAST(1000, SUM(`num`)) AS `num` FROM ( SELECT `num` FROM `open` UNION ALL SELECT `num` FROM `closed` ) `t` LIMIT 1 )SELECT 'all' AS `tab`, `num`FROM `all`", + "plan": { + "QueryType": "SELECT", + "Original": "WITH `open` AS (SELECT COUNT(*) as `num` FROM (SELECT `user`.`id` FROM `user` WHERE `user`.`textcol1` = 'open' AND `user`.`intcol` = 1 LIMIT 1000) `t` LIMIT 1 ), `closed` AS (SELECT COUNT(*) as `num` FROM ( SELECT `user`.`id` FROM `user` WHERE `user`.`textcol1` = 'closed' AND `user`.`intcol` = 1 LIMIT 1000) `t` LIMIT 1 ), `all` AS (SELECT LEAST(1000, SUM(`num`)) AS `num` FROM ( SELECT `num` FROM `open` UNION ALL SELECT `num` FROM `closed` ) `t` LIMIT 1 )SELECT 'all' AS `tab`, `num`FROM `all`", + "Instructions": { + "OperatorType": "Limit", + "Count": "1", + "Inputs": [ + { + "OperatorType": "Projection", + "Expressions": [ + "'all' as tab", + ":0 as num" + ], + "Inputs": [ + { + "OperatorType": "Projection", + "Expressions": [ + "least(1000, sum(num)) as num" + ], + "Inputs": [ + { + "OperatorType": "Aggregate", + "Variant": "Scalar", + "Aggregates": "any_value(0), sum(1) AS sum(num)", + "Inputs": [ + { + "OperatorType": "SimpleProjection", + "Columns": [ + 1, + 0 + ], + "Inputs": [ + { + "OperatorType": "Concatenate", + "Inputs": [ + { + "OperatorType": "Limit", + "Count": "1", + "Inputs": [ + { + "OperatorType": "Aggregate", + "Variant": "Scalar", + "Aggregates": "count_star(0) AS num, any_value(1)", + "Inputs": [ + { + "OperatorType": "SimpleProjection", + "Columns": [ + 1, + 2 + ], + "Inputs": [ + { + "OperatorType": "Limit", + "Count": "1000", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select t.id, 1, 1000 from (select `user`.id from `user` where 1 != 1) as t where 1 != 1", + "Query": "select t.id, 1, 1000 from (select `user`.id from `user` where `user`.textcol1 = 'open' and `user`.intcol = 1) as t limit :__upper_limit", + "Table": "`user`" + } + ] + } + ] + } + ] + } + ] + }, + { + "OperatorType": "Limit", + "Count": "1", + "Inputs": [ + { + "OperatorType": "Aggregate", + "Variant": "Scalar", + "Aggregates": "count_star(0) AS num, any_value(1)", + "Inputs": [ + { + "OperatorType": "SimpleProjection", + "Columns": [ + 1, + 2 + ], + "Inputs": [ + { + "OperatorType": "Limit", + "Count": "1000", + "Inputs": [ + { + "OperatorType": "Route", + "Variant": "Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select t.id, 1, 1000 from (select `user`.id from `user` where 1 != 1) as t where 1 != 1", + "Query": "select t.id, 1, 1000 from (select `user`.id from `user` where `user`.textcol1 = 'closed' and `user`.intcol = 1) as t limit :__upper_limit", + "Table": "`user`" + } + ] + } + ] + } + ] + } + ] + } + ] + } + ] + } + ] + } + ] + } + ] + } + ] + }, + "TablesUsed": [ + "user.user" + ] + } } ]