From cd559e1f179938cb27671a2ceedf08c4a68ccbc7 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Thu, 17 Oct 2024 11:04:41 +0530 Subject: [PATCH] fix: route engine to handle column truncation for execute after lookup (#16981) Signed-off-by: Harshit Gangal --- go/vt/vtgate/engine/route.go | 35 ++---- go/vt/vtgate/engine/vindex_lookup_test.go | 135 ++++++++++++++++++++++ 2 files changed, 147 insertions(+), 23 deletions(-) create mode 100644 go/vt/vtgate/engine/vindex_lookup_test.go diff --git a/go/vt/vtgate/engine/route.go b/go/vt/vtgate/engine/route.go index 1a0c17d6be5..7185f8db36c 100644 --- a/go/vt/vtgate/engine/route.go +++ b/go/vt/vtgate/engine/route.go @@ -137,11 +137,12 @@ func (route *Route) SetTruncateColumnCount(count int) { func (route *Route) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) { ctx, cancelFunc := addQueryTimeout(ctx, vcursor, route.QueryTimeout) defer cancelFunc() - qr, err := route.executeInternal(ctx, vcursor, bindVars, wantfields) + rss, bvs, err := route.findRoute(ctx, vcursor, bindVars) if err != nil { return nil, err } - return qr.Truncate(route.TruncateColumnCount), nil + + return route.executeShards(ctx, vcursor, bindVars, wantfields, rss, bvs) } // addQueryTimeout adds a query timeout to the context it receives and returns the modified context along with the cancel function. @@ -159,20 +160,6 @@ const ( IgnoreReserveTxn cxtKey = iota ) -func (route *Route) executeInternal( - ctx context.Context, - vcursor VCursor, - bindVars map[string]*querypb.BindVariable, - wantfields bool, -) (*sqltypes.Result, error) { - rss, bvs, err := route.findRoute(ctx, vcursor, bindVars) - if err != nil { - return nil, err - } - - return route.executeShards(ctx, vcursor, bindVars, wantfields, rss, bvs) -} - func (route *Route) executeShards( ctx context.Context, vcursor VCursor, @@ -228,11 +215,15 @@ func (route *Route) executeShards( } } - if len(route.OrderBy) == 0 { - return result, nil + if len(route.OrderBy) != 0 { + var err error + result, err = route.sort(result) + if err != nil { + return nil, err + } } - return route.sort(result) + return result.Truncate(route.TruncateColumnCount), nil } func filterOutNilErrors(errs []error) []error { @@ -389,10 +380,8 @@ func (route *Route) sort(in *sqltypes.Result) (*sqltypes.Result, error) { // the contents of any row. out := in.ShallowCopy() - if err := route.OrderBy.SortResult(out); err != nil { - return nil, err - } - return out.Truncate(route.TruncateColumnCount), nil + err := route.OrderBy.SortResult(out) + return out, err } func (route *Route) description() PrimitiveDescription { diff --git a/go/vt/vtgate/engine/vindex_lookup_test.go b/go/vt/vtgate/engine/vindex_lookup_test.go new file mode 100644 index 00000000000..d734bf12080 --- /dev/null +++ b/go/vt/vtgate/engine/vindex_lookup_test.go @@ -0,0 +1,135 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package engine + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/sqltypes" + querypb "vitess.io/vitess/go/vt/proto/query" + "vitess.io/vitess/go/vt/vtgate/evalengine" + "vitess.io/vitess/go/vt/vtgate/vindexes" +) + +var ( + vindex, _ = vindexes.CreateVindex("lookup_unique", "", map[string]string{ + "table": "lkp", + "from": "from", + "to": "toc", + "write_only": "true", + }) + ks = &vindexes.Keyspace{Name: "ks", Sharded: true} +) + +func TestVindexLookup(t *testing.T) { + planableVindex, ok := vindex.(vindexes.LookupPlanable) + require.True(t, ok, "not a lookup vindex") + _, args := planableVindex.Query() + + fp := &fakePrimitive{ + results: []*sqltypes.Result{ + sqltypes.MakeTestResult( + sqltypes.MakeTestFields("id|keyspace_id", "int64|varbinary"), + "1|\x10"), + }, + } + route := NewRoute(ByDestination, ks, "dummy_select", "dummy_select_field") + vdxLookup := &VindexLookup{ + Opcode: EqualUnique, + Keyspace: ks, + Vindex: planableVindex, + Arguments: args, + Values: []evalengine.Expr{evalengine.NewLiteralInt(1)}, + Lookup: fp, + SendTo: route, + } + + vc := &loggingVCursor{results: []*sqltypes.Result{defaultSelectResult}} + + result, err := vdxLookup.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) + require.NoError(t, err) + fp.ExpectLog(t, []string{`Execute from: type:TUPLE values:{type:INT64 value:"1"} false`}) + vc.ExpectLog(t, []string{ + `ResolveDestinations ks [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(10)`, + `ExecuteMultiShard ks.-20: dummy_select {} false false`, + }) + expectResult(t, result, defaultSelectResult) + + fp.rewind() + vc.Rewind() + result, err = wrapStreamExecute(vdxLookup, vc, map[string]*querypb.BindVariable{}, false) + require.NoError(t, err) + vc.ExpectLog(t, []string{ + `ResolveDestinations ks [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(10)`, + `StreamExecuteMulti dummy_select ks.-20: {} `, + }) + expectResult(t, result, defaultSelectResult) +} + +func TestVindexLookupTruncate(t *testing.T) { + planableVindex, ok := vindex.(vindexes.LookupPlanable) + require.True(t, ok, "not a lookup vindex") + _, args := planableVindex.Query() + + fp := &fakePrimitive{ + results: []*sqltypes.Result{ + sqltypes.MakeTestResult( + sqltypes.MakeTestFields("id|keyspace_id", "int64|varbinary"), + "1|\x10"), + }, + } + route := NewRoute(ByDestination, ks, "dummy_select", "dummy_select_field") + route.TruncateColumnCount = 1 + vdxLookup := &VindexLookup{ + Opcode: EqualUnique, + Keyspace: ks, + Vindex: planableVindex, + Arguments: args, + Values: []evalengine.Expr{evalengine.NewLiteralInt(1)}, + Lookup: fp, + SendTo: route, + } + + vc := &loggingVCursor{results: []*sqltypes.Result{ + sqltypes.MakeTestResult(sqltypes.MakeTestFields("name|morecol", "varchar|int64"), + "foo|1", "bar|2", "baz|3"), + }} + + wantRes := sqltypes.MakeTestResult(sqltypes.MakeTestFields("name", "varchar"), + "foo", "bar", "baz") + result, err := vdxLookup.TryExecute(context.Background(), vc, map[string]*querypb.BindVariable{}, false) + require.NoError(t, err) + fp.ExpectLog(t, []string{`Execute from: type:TUPLE values:{type:INT64 value:"1"} false`}) + vc.ExpectLog(t, []string{ + `ResolveDestinations ks [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(10)`, + `ExecuteMultiShard ks.-20: dummy_select {} false false`, + }) + expectResult(t, result, wantRes) + + fp.rewind() + vc.Rewind() + result, err = wrapStreamExecute(vdxLookup, vc, map[string]*querypb.BindVariable{}, false) + require.NoError(t, err) + vc.ExpectLog(t, []string{ + `ResolveDestinations ks [type:INT64 value:"1"] Destinations:DestinationKeyspaceID(10)`, + `StreamExecuteMulti dummy_select ks.-20: {} `, + }) + expectResult(t, result, wantRes) +}