From e3fd9b1f38e966d6c17f00bef70dd8e700b9ab5c Mon Sep 17 00:00:00 2001 From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com> Date: Wed, 17 Apr 2024 19:38:08 +0530 Subject: [PATCH 1/3] Fix panic in aggregation (#15728) Signed-off-by: Manan Gupta --- go/test/endtoend/utils/utils.go | 8 ++--- .../endtoend/vtgate/queries/tpch/tpch_test.go | 30 +++++++++++++++++++ go/vt/vtgate/engine/scalar_aggregation.go | 4 +-- 3 files changed, 36 insertions(+), 6 deletions(-) diff --git a/go/test/endtoend/utils/utils.go b/go/test/endtoend/utils/utils.go index df015d03d7c..ae80336d4a1 100644 --- a/go/test/endtoend/utils/utils.go +++ b/go/test/endtoend/utils/utils.go @@ -229,12 +229,12 @@ func AssertMatchesWithTimeout(t *testing.T, conn *mysql.Conn, query, expected st } // WaitForAuthoritative waits for a table to become authoritative -func WaitForAuthoritative(t *testing.T, ks, tbl string, readVSchema func() (*interface{}, error)) error { +func WaitForAuthoritative(t TestingT, ks, tbl string, readVSchema func() (*interface{}, error)) error { timeout := time.After(60 * time.Second) for { select { case <-timeout: - return fmt.Errorf("schema tracking didn't mark table t2 as authoritative until timeout") + return fmt.Errorf("schema tracking didn't mark table %v.%v as authoritative until timeout", ks, tbl) default: res, err := readVSchema() require.NoError(t, err, res) @@ -305,7 +305,7 @@ func WaitForTableDeletions(t *testing.T, vtgateProcess cluster.VtgateProcess, ks } // WaitForColumn waits for a table's column to be present -func WaitForColumn(t testing.TB, vtgateProcess cluster.VtgateProcess, ks, tbl, col string) error { +func WaitForColumn(t TestingT, vtgateProcess cluster.VtgateProcess, ks, tbl, col string) error { timeout := time.After(60 * time.Second) for { select { @@ -340,7 +340,7 @@ func WaitForColumn(t testing.TB, vtgateProcess cluster.VtgateProcess, ks, tbl, c if !isMap { break } - if colName, exists := colDef["name"]; exists && colName == col { + if colName, exists := colDef["name"]; exists && strings.EqualFold(colName.(string), col) { return nil } } diff --git a/go/test/endtoend/vtgate/queries/tpch/tpch_test.go b/go/test/endtoend/vtgate/queries/tpch/tpch_test.go index b1dd4ef1e98..513aea94a86 100644 --- a/go/test/endtoend/vtgate/queries/tpch/tpch_test.go +++ b/go/test/endtoend/vtgate/queries/tpch/tpch_test.go @@ -131,6 +131,36 @@ order by l_returnflag, l_linestatus;`, }, + { + name: "Q11", + query: `select + ps_partkey, + sum(ps_supplycost * ps_availqty) as value +from + partsupp, + supplier, + nation +where + ps_suppkey = s_suppkey + and s_nationkey = n_nationkey + and n_name = 'MOZAMBIQUE' +group by + ps_partkey having + sum(ps_supplycost * ps_availqty) > ( + select + sum(ps_supplycost * ps_availqty) * 0.0001000000 + from + partsupp, + supplier, + nation + where + ps_suppkey = s_suppkey + and s_nationkey = n_nationkey + and n_name = 'MOZAMBIQUE' + ) +order by + value desc;`, + }, } for _, testcase := range testcases { diff --git a/go/vt/vtgate/engine/scalar_aggregation.go b/go/vt/vtgate/engine/scalar_aggregation.go index 85e90420ff9..929536b9cdf 100644 --- a/go/vt/vtgate/engine/scalar_aggregation.go +++ b/go/vt/vtgate/engine/scalar_aggregation.go @@ -80,7 +80,7 @@ func (sa *ScalarAggregate) NeedsTransaction() bool { // TryExecute implements the Primitive interface func (sa *ScalarAggregate) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { - result, err := vcursor.ExecutePrimitive(ctx, sa.Input, bindVars, wantfields) + result, err := vcursor.ExecutePrimitive(ctx, sa.Input, bindVars, true) if err != nil { return nil, err } @@ -114,7 +114,7 @@ func (sa *ScalarAggregate) TryStreamExecute(ctx context.Context, vcursor VCursor var fields []*querypb.Field fieldsSent := !wantfields - err := vcursor.StreamExecutePrimitive(ctx, sa.Input, bindVars, wantfields, func(result *sqltypes.Result) error { + err := vcursor.StreamExecutePrimitive(ctx, sa.Input, bindVars, true, func(result *sqltypes.Result) error { // as the underlying primitive call is not sync // and here scalar aggregate is using shared variables we have to sync the callback // for correct aggregation. From 384ec1277c43dbc1f9988e674b847d94cadecc83 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Wed, 17 Apr 2024 20:01:56 +0530 Subject: [PATCH 2/3] test: fix build issues Signed-off-by: Manan Gupta --- go/test/endtoend/utils/utils.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/test/endtoend/utils/utils.go b/go/test/endtoend/utils/utils.go index ae80336d4a1..d5e4d5bd68a 100644 --- a/go/test/endtoend/utils/utils.go +++ b/go/test/endtoend/utils/utils.go @@ -229,7 +229,7 @@ func AssertMatchesWithTimeout(t *testing.T, conn *mysql.Conn, query, expected st } // WaitForAuthoritative waits for a table to become authoritative -func WaitForAuthoritative(t TestingT, ks, tbl string, readVSchema func() (*interface{}, error)) error { +func WaitForAuthoritative(t *testing.T, ks, tbl string, readVSchema func() (*interface{}, error)) error { timeout := time.After(60 * time.Second) for { select { @@ -305,7 +305,7 @@ func WaitForTableDeletions(t *testing.T, vtgateProcess cluster.VtgateProcess, ks } // WaitForColumn waits for a table's column to be present -func WaitForColumn(t TestingT, vtgateProcess cluster.VtgateProcess, ks, tbl, col string) error { +func WaitForColumn(t *testing.T, vtgateProcess cluster.VtgateProcess, ks, tbl, col string) error { timeout := time.After(60 * time.Second) for { select { From 657d1abddf3297f4ead9d89199a13c841bff1eef Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 18 Apr 2024 10:13:52 +0530 Subject: [PATCH 3/3] feat: fix build issue Signed-off-by: Manan Gupta --- go/test/endtoend/utils/utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/test/endtoend/utils/utils.go b/go/test/endtoend/utils/utils.go index d5e4d5bd68a..51c04733964 100644 --- a/go/test/endtoend/utils/utils.go +++ b/go/test/endtoend/utils/utils.go @@ -305,7 +305,7 @@ func WaitForTableDeletions(t *testing.T, vtgateProcess cluster.VtgateProcess, ks } // WaitForColumn waits for a table's column to be present -func WaitForColumn(t *testing.T, vtgateProcess cluster.VtgateProcess, ks, tbl, col string) error { +func WaitForColumn(t testing.TB, vtgateProcess cluster.VtgateProcess, ks, tbl, col string) error { timeout := time.After(60 * time.Second) for { select {