From 5b074c24ff3e1957c909d92490dafe4e33c3cbe4 Mon Sep 17 00:00:00 2001 From: wangwei1207 Date: Thu, 26 Oct 2023 16:38:46 +0800 Subject: [PATCH] =?UTF-8?q?FIX=20group=20by+=E8=81=9A=E5=90=88=E5=87=BD?= =?UTF-8?q?=E6=95=B0=E6=8A=A5=E9=94=99=E9=97=AE=E9=A2=98=20(#101)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1.修复聚合函数+groupby执行报错的问题 在ctx中保存原始select SQL的AST,方便后续TruncateColumnCount生成 2.优化Limit operator生成逻辑 当Limit之下为Aggregator,且分片Plan中已生成LimitPlan可下推 --- .../vtgate/split_table/table_issue_test.go | 27 ++++ go/vt/vtgate/engine/table_route.go | 5 + go/vt/vtgate/planbuilder/gen4_planner.go | 1 + .../operators/table_horizon_planning.go | 3 + .../plancontext/planning_context.go | 2 + .../planbuilder/split_table_plan_test.go | 1 + .../table_operator_transformers.go | 5 - go/vt/vtgate/planbuilder/table_planner.go | 30 ++++ go/vt/vtgate/planbuilder/table_route.go | 2 +- .../planbuilder/testdata/table_issue.json | 135 ++++++++++++++++++ 10 files changed, 205 insertions(+), 6 deletions(-) create mode 100644 go/test/endtoend/vtgate/split_table/table_issue_test.go create mode 100644 go/vt/vtgate/planbuilder/testdata/table_issue.json diff --git a/go/test/endtoend/vtgate/split_table/table_issue_test.go b/go/test/endtoend/vtgate/split_table/table_issue_test.go new file mode 100644 index 00000000000..4c6482430e1 --- /dev/null +++ b/go/test/endtoend/vtgate/split_table/table_issue_test.go @@ -0,0 +1,27 @@ +package split_table + +import ( + "testing" +) + +func TestIssue(t *testing.T) { + mcmp, closer := start(t) + defer closer() + mcmp.Exec("use user") + + mcmp.Exec("insert into t_user(id,col,f_key,f_tinyint,f_int,name) values (1, 'a', 'aaa', 1, false, 'li'),(2, 'b', 'bbb', 2, false, 'zh'),(3, 'c', 'ccc', 3, false, 'kk')") + mcmp.Exec("insert into t_user(id,col,f_key,f_tinyint,f_int,name) values (4, 'ab', 'aaa', 10, true, 'li'),(5, 'bx', 'bbb', 4, false, 'zhsd'),(6, 'cdx', 'ccc', 34, true, 'kk')") + mcmp.Exec("insert into t_user(id,col,f_key,f_tinyint,f_int,name) values (7, 'ac', 'aaa', 100, false, 'li'),(8, 'bx', 'bbb', 25, true, 'zhff'),(9, 'cd', 'ccc', 33, false, 'kggk')") + mcmp.Exec("insert into t_user(id,col,f_key,f_tinyint,f_int,name) values (11, 'ad', 'aaa', 2, true, 'li'),(12, 'ba', 'bbb', 26, false, 'zdfh'),(13, 'cc', 'ccc', 13, true, 'kzzk')") + mcmp.Exec("insert into t_user(id,col,f_key,f_tinyint,f_int,name) values (14, 'ae', 'aaa', 3, false, 'a'),(15, 'bd', 'bbb', 27, true, 'zhdf'),(16, 'fdfc', 'ccc', 64, true, 'xxxxx')") + mcmp.Exec("insert into t_user(id,col,f_key,f_tinyint,f_int,name) values (17, 'a', 'aaa', 1, false, 'li'),(18, 'b', 'bbb', 2, false, 'zh'),(19, 'c', 'ccc', 3, false, 'kk')") + mcmp.Exec("insert into t_user(id,col,f_key,f_tinyint,f_int,name) values (20, 'ab', 'aaa', 10, true, 'li'),(21, 'bx', 'bbb', 4, false, 'zhsd'),(22, 'cdx', 'ccc', 34, true, 'kk')") + mcmp.Exec("insert into t_user(id,col,f_key,f_tinyint,f_int,name) values (23, 'ac', 'aaa', 100, false, 'li'),(24, 'bx', 'bbb', 25, true, 'zhff'),(25, 'cd', 'ccc', 33, false, 'kggk')") + mcmp.Exec("insert into t_user(id,col,f_key,f_tinyint,f_int,name) values (26, 'ad', 'aaa', 2, true, 'li'),(27, 'ba', 'bbb', 26, false, 'zdfh'),(28, 'cc', 'ccc', 13, true, 'kzzk')") + + // table_issue.json + mcmp.ExecWithColumnCompare("select max(f_int),min(`name`) from t_user group by id,f_tinyint order by f_tinyint desc limit 20") + mcmp.ExecWithColumnCompare("select max(f_int),min(`name`) from t_user group by id,f_tinyint order by f_tinyint asc") + mcmp.ExecWithColumnCompare("select max(f_int),min(`name`) from t_user group by col order by f_tinyint asc") + mcmp.ExecWithColumnCompare("select max(f_int),min(`name`) from t_user group by col,id order by f_tinyint asc") +} diff --git a/go/vt/vtgate/engine/table_route.go b/go/vt/vtgate/engine/table_route.go index 932e23333c4..195862ac523 100644 --- a/go/vt/vtgate/engine/table_route.go +++ b/go/vt/vtgate/engine/table_route.go @@ -264,6 +264,11 @@ func (tableRoute *TableRoute) TryStreamExecute(ctx context.Context, vcursor VCur panic("implement me") } +// SetTruncateColumnCount sets the truncate column count. +func (tableRoute *TableRoute) SetTruncateColumnCount(count int) { + tableRoute.TruncateColumnCount = count +} + func (tableRoute *TableRoute) description() PrimitiveDescription { other := map[string]any{ "Query": sqlparser.String(tableRoute.Query), diff --git a/go/vt/vtgate/planbuilder/gen4_planner.go b/go/vt/vtgate/planbuilder/gen4_planner.go index ae12a2c4435..a6ee8ab2983 100644 --- a/go/vt/vtgate/planbuilder/gen4_planner.go +++ b/go/vt/vtgate/planbuilder/gen4_planner.go @@ -233,6 +233,7 @@ func newBuildSelectPlan( plan = pushCommentDirectivesOnPlan(plan, selStmt) // todo: build split table plan + ctx.OriginSelStmt = selStmt plan, _, _, err = buildTablePlan(ctx, plan, operators.TableNamesUsed(op)) if err != nil { return nil, nil, nil, err diff --git a/go/vt/vtgate/planbuilder/operators/table_horizon_planning.go b/go/vt/vtgate/planbuilder/operators/table_horizon_planning.go index 06f7de941a0..d9c3c58cbce 100644 --- a/go/vt/vtgate/planbuilder/operators/table_horizon_planning.go +++ b/go/vt/vtgate/planbuilder/operators/table_horizon_planning.go @@ -139,6 +139,9 @@ func tryPushingDownLimitForSplitTable(ctx *plancontext.PlanningContext, in *Limi case *Projection: return rewrite.Swap(in, src, "push limit under projection") case *Aggregator: + if isCrossShard(ctx.GetRoute()) { + return rewrite.Swap(in, src, "limit pushed into aggregator") + } return in, rewrite.SameTree, nil default: return setUpperLimitForSplitTable(in) diff --git a/go/vt/vtgate/planbuilder/plancontext/planning_context.go b/go/vt/vtgate/planbuilder/plancontext/planning_context.go index 01ae1b75b13..43d7f789f6b 100644 --- a/go/vt/vtgate/planbuilder/plancontext/planning_context.go +++ b/go/vt/vtgate/planbuilder/plancontext/planning_context.go @@ -48,6 +48,8 @@ type PlanningContext struct { DMLEngine engine.DML KsPrimitive engine.Primitive + + OriginSelStmt sqlparser.SelectStatement } func NewPlanningContext(reservedVars *sqlparser.ReservedVars, semTable *semantics.SemTable, vschema VSchema, version querypb.ExecuteOptions_PlannerVersion) *PlanningContext { diff --git a/go/vt/vtgate/planbuilder/split_table_plan_test.go b/go/vt/vtgate/planbuilder/split_table_plan_test.go index 4dcbb057e57..2c324f75b65 100644 --- a/go/vt/vtgate/planbuilder/split_table_plan_test.go +++ b/go/vt/vtgate/planbuilder/split_table_plan_test.go @@ -29,6 +29,7 @@ func TestSplitTablePlan(t *testing.T) { testTableFile(t, "table_postprocess_cases.json", output, vschema, false) testTableFile(t, "table_select_case.json", output, vschema, false) testTableFile(t, "table_memory_sort_cases.json", output, vschema, false) + testTableFile(t, "table_issue.json", output, vschema, false) } func TestSplitTableOne(t *testing.T) { diff --git a/go/vt/vtgate/planbuilder/table_operator_transformers.go b/go/vt/vtgate/planbuilder/table_operator_transformers.go index f711e45dd01..016ff917e52 100644 --- a/go/vt/vtgate/planbuilder/table_operator_transformers.go +++ b/go/vt/vtgate/planbuilder/table_operator_transformers.go @@ -137,11 +137,6 @@ func transformTableRoutePlan(ctx *plancontext.PlanningContext, op *operators.Tab }) } - if ksERoute.TruncateColumnCount > 0 && op.ResultColumns > 0 { - return nil, vterrors.VT13001("split table add columns in selectExprs, need to recount TruncateColumnCount") - } - eroute.TruncateColumnCount = ksERoute.TruncateColumnCount + op.ResultColumns - return &tableRoute{ Select: sel, eroute: eroute, diff --git a/go/vt/vtgate/planbuilder/table_planner.go b/go/vt/vtgate/planbuilder/table_planner.go index 0d47ad24bd1..69edf0df50b 100644 --- a/go/vt/vtgate/planbuilder/table_planner.go +++ b/go/vt/vtgate/planbuilder/table_planner.go @@ -61,6 +61,10 @@ func buildTablePlan(ctx *plancontext.PlanningContext, ksPlan logicalPlan, tableN if err != nil { return nil, nil, nil, err } + ksAndTablePlan, err = truncateColumns(ctx, ksAndTablePlan) + if err != nil { + return nil, nil, nil, err + } return ksAndTablePlan, semTable, nil, nil } @@ -100,3 +104,29 @@ func findTableSchema(ctx *plancontext.PlanningContext, tableNames []string) (fou } return found } + +func truncateColumns(ctx *plancontext.PlanningContext, plan logicalPlan) (logicalPlan, error) { + if ctx.OriginSelStmt == nil { + return plan, nil + } + sel := sqlparser.GetFirstSelect(ctx.OriginSelStmt) + if len(plan.OutputColumns()) == len(sel.SelectExprs) { + return plan, nil + } + switch p := plan.(type) { + case *tableRoute: + p.eroute.SetTruncateColumnCount(len(sel.SelectExprs)) + case *orderedAggregate: + p.truncateColumnCount = len(sel.SelectExprs) + case *memorySort: + p.eMemorySort.SetTruncateColumnCount(len(sel.SelectExprs)) + case *limit: + for _, p := range plan.Inputs() { + _, err := truncateColumns(ctx, p) + if err != nil { + return nil, err + } + } + } + return plan, nil +} diff --git a/go/vt/vtgate/planbuilder/table_route.go b/go/vt/vtgate/planbuilder/table_route.go index c5d23d65489..bf150ebba7f 100644 --- a/go/vt/vtgate/planbuilder/table_route.go +++ b/go/vt/vtgate/planbuilder/table_route.go @@ -55,5 +55,5 @@ func (t *tableRoute) ContainsTables() semantics.TableSet { } func (t *tableRoute) OutputColumns() []sqlparser.SelectExpr { - panic("implement me") + return sqlparser.GetFirstSelect(t.Select).SelectExprs } diff --git a/go/vt/vtgate/planbuilder/testdata/table_issue.json b/go/vt/vtgate/planbuilder/testdata/table_issue.json new file mode 100644 index 00000000000..d3d5a9e60aa --- /dev/null +++ b/go/vt/vtgate/planbuilder/testdata/table_issue.json @@ -0,0 +1,135 @@ +[ + { + "comment": "aggregate function and group by with limit", + "query": "select max(f_int),min(name) from t_user group by id,f_tinyint order by f_tinyint desc limit 20", + "plan": { + "QueryType": "SELECT", + "Original": "select max(f_int),min(name) from t_user group by id,f_tinyint order by f_tinyint desc limit 20", + "Instructions": { + "OperatorType": "Limit", + "Count": "INT64(20)", + "Inputs": [ + { + "OperatorType": "Aggregate", + "Variant": "Ordered", + "Aggregates": "max(0) AS max(f_int), min(1) AS min(`name`), random(3) AS weight_string(f_tinyint)", + "GroupBy": "(2|3), (4|5)", + "ResultColumns": 2, + "Inputs": [ + { + "OperatorType": "TableRoute", + "Variant": "Scatter-Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select max(f_int), min(`name`), f_tinyint, weight_string(f_tinyint), id, weight_string(id) from t_user_0 where 1 != 1 group by id, f_tinyint, weight_string(id)", + "OrderBy": "(2|3) DESC, (4|5) ASC", + "Query": "select max(f_int), min(`name`), f_tinyint, weight_string(f_tinyint), id, weight_string(id) from t_user group by id, f_tinyint, weight_string(id) order by f_tinyint desc, id asc limit :__upper_limit", + "Table": "t_user" + } + ] + } + ] + }, + "TablesUsed": [ + "user.t_user" + ] + } + }, + { + "comment": "aggregate function and group by", + "query": "select max(f_int),min(name) from t_user group by id,f_tinyint order by f_tinyint asc", + "plan": { + "QueryType": "SELECT", + "Original": "select max(f_int),min(name) from t_user group by id,f_tinyint order by f_tinyint asc", + "Instructions": { + "OperatorType": "Aggregate", + "Variant": "Ordered", + "Aggregates": "max(0) AS max(f_int), min(1) AS min(`name`), random(3) AS weight_string(f_tinyint)", + "GroupBy": "(2|3), (4|5)", + "ResultColumns": 2, + "Inputs": [ + { + "OperatorType": "TableRoute", + "Variant": "Scatter-Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select max(f_int), min(`name`), f_tinyint, weight_string(f_tinyint), id, weight_string(id) from t_user_0 where 1 != 1 group by id, f_tinyint, weight_string(id)", + "OrderBy": "(2|3) ASC, (4|5) ASC", + "Query": "select max(f_int), min(`name`), f_tinyint, weight_string(f_tinyint), id, weight_string(id) from t_user group by id, f_tinyint, weight_string(id) order by f_tinyint asc, id asc", + "Table": "t_user" + } + ] + }, + "TablesUsed": [ + "user.t_user" + ] + } + }, + { + "comment": "aggregate function and group by tableVindex", + "query": "select max(f_int),min(name) from t_user group by col order by f_tinyint asc", + "plan": { + "QueryType": "SELECT", + "Original": "select max(f_int),min(name) from t_user group by col order by f_tinyint asc", + "Instructions": { + "OperatorType": "Sort", + "Variant": "Memory", + "OrderBy": "(2|3) ASC", + "ResultColumns": 2, + "Inputs": [ + { + "OperatorType": "Aggregate", + "Variant": "Ordered", + "Aggregates": "max(0) AS max(f_int), min(1) AS min(`name`), random(2) AS f_tinyint, random(3)", + "GroupBy": "4", + "Inputs": [ + { + "OperatorType": "TableRoute", + "Variant": "Scatter-Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select max(f_int), min(`name`), f_tinyint, weight_string(f_tinyint), col from t_user_0 where 1 != 1 group by col", + "OrderBy": "4 ASC", + "Query": "select max(f_int), min(`name`), f_tinyint, weight_string(f_tinyint), col from t_user group by col order by col asc", + "Table": "t_user" + } + ] + } + ] + }, + "TablesUsed": [ + "user.t_user" + ] + } + }, + { + "comment": "aggregate function and group by tableVindex & Vindex", + "query": "select max(f_int),min(name) from t_user group by col,id order by f_tinyint asc", + "plan": { + "QueryType": "SELECT", + "Original": "select max(f_int),min(name) from t_user group by col,id order by f_tinyint asc", + "Instructions": { + "OperatorType": "TableRoute", + "Variant": "Scatter-Scatter", + "Keyspace": { + "Name": "user", + "Sharded": true + }, + "FieldQuery": "select max(f_int), min(`name`), f_tinyint, weight_string(f_tinyint) from t_user_0 where 1 != 1 group by col, id", + "OrderBy": "(2|3) ASC", + "Query": "select max(f_int), min(`name`), f_tinyint, weight_string(f_tinyint) from t_user group by col, id order by f_tinyint asc", + "ResultColumns": 2, + "Table": "t_user" + }, + "TablesUsed": [ + "user.t_user" + ] + } + } +]