Skip to content

Commit

Permalink
FIX group by+聚合函数报错问题 (vitessio#101)
Browse files Browse the repository at this point in the history
1.修复聚合函数+groupby执行报错的问题
在ctx中保存原始select SQL的AST,方便后续TruncateColumnCount生成
2.优化Limit operator生成逻辑
当Limit之下为Aggregator,且分片Plan中已生成LimitPlan可下推
  • Loading branch information
wangwei1207 committed Oct 26, 2023
1 parent df6a9f8 commit 5b074c2
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 6 deletions.
27 changes: 27 additions & 0 deletions go/test/endtoend/vtgate/split_table/table_issue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package split_table

import (
"testing"
)

func TestIssue(t *testing.T) {
mcmp, closer := start(t)
defer closer()
mcmp.Exec("use user")

mcmp.Exec("insert into t_user(id,col,f_key,f_tinyint,f_int,name) values (1, 'a', 'aaa', 1, false, 'li'),(2, 'b', 'bbb', 2, false, 'zh'),(3, 'c', 'ccc', 3, false, 'kk')")
mcmp.Exec("insert into t_user(id,col,f_key,f_tinyint,f_int,name) values (4, 'ab', 'aaa', 10, true, 'li'),(5, 'bx', 'bbb', 4, false, 'zhsd'),(6, 'cdx', 'ccc', 34, true, 'kk')")
mcmp.Exec("insert into t_user(id,col,f_key,f_tinyint,f_int,name) values (7, 'ac', 'aaa', 100, false, 'li'),(8, 'bx', 'bbb', 25, true, 'zhff'),(9, 'cd', 'ccc', 33, false, 'kggk')")
mcmp.Exec("insert into t_user(id,col,f_key,f_tinyint,f_int,name) values (11, 'ad', 'aaa', 2, true, 'li'),(12, 'ba', 'bbb', 26, false, 'zdfh'),(13, 'cc', 'ccc', 13, true, 'kzzk')")
mcmp.Exec("insert into t_user(id,col,f_key,f_tinyint,f_int,name) values (14, 'ae', 'aaa', 3, false, 'a'),(15, 'bd', 'bbb', 27, true, 'zhdf'),(16, 'fdfc', 'ccc', 64, true, 'xxxxx')")
mcmp.Exec("insert into t_user(id,col,f_key,f_tinyint,f_int,name) values (17, 'a', 'aaa', 1, false, 'li'),(18, 'b', 'bbb', 2, false, 'zh'),(19, 'c', 'ccc', 3, false, 'kk')")
mcmp.Exec("insert into t_user(id,col,f_key,f_tinyint,f_int,name) values (20, 'ab', 'aaa', 10, true, 'li'),(21, 'bx', 'bbb', 4, false, 'zhsd'),(22, 'cdx', 'ccc', 34, true, 'kk')")
mcmp.Exec("insert into t_user(id,col,f_key,f_tinyint,f_int,name) values (23, 'ac', 'aaa', 100, false, 'li'),(24, 'bx', 'bbb', 25, true, 'zhff'),(25, 'cd', 'ccc', 33, false, 'kggk')")
mcmp.Exec("insert into t_user(id,col,f_key,f_tinyint,f_int,name) values (26, 'ad', 'aaa', 2, true, 'li'),(27, 'ba', 'bbb', 26, false, 'zdfh'),(28, 'cc', 'ccc', 13, true, 'kzzk')")

// table_issue.json
mcmp.ExecWithColumnCompare("select max(f_int),min(`name`) from t_user group by id,f_tinyint order by f_tinyint desc limit 20")
mcmp.ExecWithColumnCompare("select max(f_int),min(`name`) from t_user group by id,f_tinyint order by f_tinyint asc")
mcmp.ExecWithColumnCompare("select max(f_int),min(`name`) from t_user group by col order by f_tinyint asc")
mcmp.ExecWithColumnCompare("select max(f_int),min(`name`) from t_user group by col,id order by f_tinyint asc")
}
5 changes: 5 additions & 0 deletions go/vt/vtgate/engine/table_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,11 @@ func (tableRoute *TableRoute) TryStreamExecute(ctx context.Context, vcursor VCur
panic("implement me")
}

// SetTruncateColumnCount sets the truncate column count.
func (tableRoute *TableRoute) SetTruncateColumnCount(count int) {
tableRoute.TruncateColumnCount = count
}

func (tableRoute *TableRoute) description() PrimitiveDescription {
other := map[string]any{
"Query": sqlparser.String(tableRoute.Query),
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/planbuilder/gen4_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ func newBuildSelectPlan(
plan = pushCommentDirectivesOnPlan(plan, selStmt)

// todo: build split table plan
ctx.OriginSelStmt = selStmt
plan, _, _, err = buildTablePlan(ctx, plan, operators.TableNamesUsed(op))
if err != nil {
return nil, nil, nil, err
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtgate/planbuilder/operators/table_horizon_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ func tryPushingDownLimitForSplitTable(ctx *plancontext.PlanningContext, in *Limi
case *Projection:
return rewrite.Swap(in, src, "push limit under projection")
case *Aggregator:
if isCrossShard(ctx.GetRoute()) {
return rewrite.Swap(in, src, "limit pushed into aggregator")
}
return in, rewrite.SameTree, nil
default:
return setUpperLimitForSplitTable(in)
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/planbuilder/plancontext/planning_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type PlanningContext struct {
DMLEngine engine.DML

KsPrimitive engine.Primitive

OriginSelStmt sqlparser.SelectStatement
}

func NewPlanningContext(reservedVars *sqlparser.ReservedVars, semTable *semantics.SemTable, vschema VSchema, version querypb.ExecuteOptions_PlannerVersion) *PlanningContext {
Expand Down
1 change: 1 addition & 0 deletions go/vt/vtgate/planbuilder/split_table_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestSplitTablePlan(t *testing.T) {
testTableFile(t, "table_postprocess_cases.json", output, vschema, false)
testTableFile(t, "table_select_case.json", output, vschema, false)
testTableFile(t, "table_memory_sort_cases.json", output, vschema, false)
testTableFile(t, "table_issue.json", output, vschema, false)
}

func TestSplitTableOne(t *testing.T) {
Expand Down
5 changes: 0 additions & 5 deletions go/vt/vtgate/planbuilder/table_operator_transformers.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,6 @@ func transformTableRoutePlan(ctx *plancontext.PlanningContext, op *operators.Tab
})
}

if ksERoute.TruncateColumnCount > 0 && op.ResultColumns > 0 {
return nil, vterrors.VT13001("split table add columns in selectExprs, need to recount TruncateColumnCount")
}
eroute.TruncateColumnCount = ksERoute.TruncateColumnCount + op.ResultColumns

return &tableRoute{
Select: sel,
eroute: eroute,
Expand Down
30 changes: 30 additions & 0 deletions go/vt/vtgate/planbuilder/table_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func buildTablePlan(ctx *plancontext.PlanningContext, ksPlan logicalPlan, tableN
if err != nil {
return nil, nil, nil, err
}
ksAndTablePlan, err = truncateColumns(ctx, ksAndTablePlan)
if err != nil {
return nil, nil, nil, err
}

return ksAndTablePlan, semTable, nil, nil
}
Expand Down Expand Up @@ -100,3 +104,29 @@ func findTableSchema(ctx *plancontext.PlanningContext, tableNames []string) (fou
}
return found
}

func truncateColumns(ctx *plancontext.PlanningContext, plan logicalPlan) (logicalPlan, error) {
if ctx.OriginSelStmt == nil {
return plan, nil
}
sel := sqlparser.GetFirstSelect(ctx.OriginSelStmt)
if len(plan.OutputColumns()) == len(sel.SelectExprs) {
return plan, nil
}
switch p := plan.(type) {
case *tableRoute:
p.eroute.SetTruncateColumnCount(len(sel.SelectExprs))
case *orderedAggregate:
p.truncateColumnCount = len(sel.SelectExprs)
case *memorySort:
p.eMemorySort.SetTruncateColumnCount(len(sel.SelectExprs))
case *limit:
for _, p := range plan.Inputs() {
_, err := truncateColumns(ctx, p)
if err != nil {
return nil, err
}
}
}
return plan, nil
}
2 changes: 1 addition & 1 deletion go/vt/vtgate/planbuilder/table_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,5 @@ func (t *tableRoute) ContainsTables() semantics.TableSet {
}

func (t *tableRoute) OutputColumns() []sqlparser.SelectExpr {
panic("implement me")
return sqlparser.GetFirstSelect(t.Select).SelectExprs
}
135 changes: 135 additions & 0 deletions go/vt/vtgate/planbuilder/testdata/table_issue.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
[
{
"comment": "aggregate function and group by with limit",
"query": "select max(f_int),min(name) from t_user group by id,f_tinyint order by f_tinyint desc limit 20",
"plan": {
"QueryType": "SELECT",
"Original": "select max(f_int),min(name) from t_user group by id,f_tinyint order by f_tinyint desc limit 20",
"Instructions": {
"OperatorType": "Limit",
"Count": "INT64(20)",
"Inputs": [
{
"OperatorType": "Aggregate",
"Variant": "Ordered",
"Aggregates": "max(0) AS max(f_int), min(1) AS min(`name`), random(3) AS weight_string(f_tinyint)",
"GroupBy": "(2|3), (4|5)",
"ResultColumns": 2,
"Inputs": [
{
"OperatorType": "TableRoute",
"Variant": "Scatter-Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select max(f_int), min(`name`), f_tinyint, weight_string(f_tinyint), id, weight_string(id) from t_user_0 where 1 != 1 group by id, f_tinyint, weight_string(id)",
"OrderBy": "(2|3) DESC, (4|5) ASC",
"Query": "select max(f_int), min(`name`), f_tinyint, weight_string(f_tinyint), id, weight_string(id) from t_user group by id, f_tinyint, weight_string(id) order by f_tinyint desc, id asc limit :__upper_limit",
"Table": "t_user"
}
]
}
]
},
"TablesUsed": [
"user.t_user"
]
}
},
{
"comment": "aggregate function and group by",
"query": "select max(f_int),min(name) from t_user group by id,f_tinyint order by f_tinyint asc",
"plan": {
"QueryType": "SELECT",
"Original": "select max(f_int),min(name) from t_user group by id,f_tinyint order by f_tinyint asc",
"Instructions": {
"OperatorType": "Aggregate",
"Variant": "Ordered",
"Aggregates": "max(0) AS max(f_int), min(1) AS min(`name`), random(3) AS weight_string(f_tinyint)",
"GroupBy": "(2|3), (4|5)",
"ResultColumns": 2,
"Inputs": [
{
"OperatorType": "TableRoute",
"Variant": "Scatter-Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select max(f_int), min(`name`), f_tinyint, weight_string(f_tinyint), id, weight_string(id) from t_user_0 where 1 != 1 group by id, f_tinyint, weight_string(id)",
"OrderBy": "(2|3) ASC, (4|5) ASC",
"Query": "select max(f_int), min(`name`), f_tinyint, weight_string(f_tinyint), id, weight_string(id) from t_user group by id, f_tinyint, weight_string(id) order by f_tinyint asc, id asc",
"Table": "t_user"
}
]
},
"TablesUsed": [
"user.t_user"
]
}
},
{
"comment": "aggregate function and group by tableVindex",
"query": "select max(f_int),min(name) from t_user group by col order by f_tinyint asc",
"plan": {
"QueryType": "SELECT",
"Original": "select max(f_int),min(name) from t_user group by col order by f_tinyint asc",
"Instructions": {
"OperatorType": "Sort",
"Variant": "Memory",
"OrderBy": "(2|3) ASC",
"ResultColumns": 2,
"Inputs": [
{
"OperatorType": "Aggregate",
"Variant": "Ordered",
"Aggregates": "max(0) AS max(f_int), min(1) AS min(`name`), random(2) AS f_tinyint, random(3)",
"GroupBy": "4",
"Inputs": [
{
"OperatorType": "TableRoute",
"Variant": "Scatter-Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select max(f_int), min(`name`), f_tinyint, weight_string(f_tinyint), col from t_user_0 where 1 != 1 group by col",
"OrderBy": "4 ASC",
"Query": "select max(f_int), min(`name`), f_tinyint, weight_string(f_tinyint), col from t_user group by col order by col asc",
"Table": "t_user"
}
]
}
]
},
"TablesUsed": [
"user.t_user"
]
}
},
{
"comment": "aggregate function and group by tableVindex & Vindex",
"query": "select max(f_int),min(name) from t_user group by col,id order by f_tinyint asc",
"plan": {
"QueryType": "SELECT",
"Original": "select max(f_int),min(name) from t_user group by col,id order by f_tinyint asc",
"Instructions": {
"OperatorType": "TableRoute",
"Variant": "Scatter-Scatter",
"Keyspace": {
"Name": "user",
"Sharded": true
},
"FieldQuery": "select max(f_int), min(`name`), f_tinyint, weight_string(f_tinyint) from t_user_0 where 1 != 1 group by col, id",
"OrderBy": "(2|3) ASC",
"Query": "select max(f_int), min(`name`), f_tinyint, weight_string(f_tinyint) from t_user group by col, id order by f_tinyint asc",
"ResultColumns": 2,
"Table": "t_user"
},
"TablesUsed": [
"user.t_user"
]
}
}
]

0 comments on commit 5b074c2

Please sign in to comment.