diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 08e1bf09ab7..3a7d81fa4e1 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -673,6 +673,7 @@ func (e *Executor) executeSPInAllSessions(ctx context.Context, safeSession *Safe queries = append(queries, &querypb.BoundQuery{Sql: sql}) } qr, errs = e.ExecuteMultiShard(ctx, nil, rss, queries, safeSession, false /*autocommit*/, ignoreMaxMemoryRows, nullResultsObserver{}) + err := vterrors.Aggregate(errs) if err != nil { return nil, err @@ -1405,8 +1406,7 @@ func (e *Executor) prepare(ctx context.Context, safeSession *SafeSession, sql st func (e *Executor) handlePrepare(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *logstats.LogStats) ([]*querypb.Field, error) { query, comments := sqlparser.SplitMarginComments(sql) - vcursor, _ := newVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv) - + vcursor, _ := newVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv, sql) stmt, reservedVars, err := parseAndValidateQuery(query, e.env.Parser()) if err != nil { return nil, err diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index b8e2b996780..c133d689d7e 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -1547,8 +1547,8 @@ var pv = querypb.ExecuteOptions_Gen4 func TestGetPlanUnnormalized(t *testing.T) { r, _, _, _, ctx := createExecutorEnv(t) - emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) - unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") + unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") query1 := "select * from music_user_map where id = 1" plan1, logStats1 := getPlanCached(t, ctx, r, emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) @@ -1631,7 +1631,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { t.Run("Cache", func(t *testing.T) { r, _, _, _, ctx := createExecutorEnv(t) - emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") query1 := "select * from music_user_map where id = 1" _, logStats1 := getPlanCached(t, ctx, r, emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true) @@ -1655,7 +1655,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { // Skip cache using directive r, _, _, _, ctx := createExecutorEnv(t) - unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") query1 := "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)" getPlanCached(t, ctx, r, unshardedvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) @@ -1666,12 +1666,12 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { assertCacheSize(t, r.plans, 1) // the target string will be resolved and become part of the plan cache key, which adds a new entry - ksIDVc1, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + ksIDVc1, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") getPlanCached(t, ctx, r, ksIDVc1, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) assertCacheSize(t, r.plans, 2) // the target string will be resolved and become part of the plan cache key, as it's an unsharded ks, it will be the same entry as above - ksIDVc2, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + ksIDVc2, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") getPlanCached(t, ctx, r, ksIDVc2, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) assertCacheSize(t, r.plans, 2) }) @@ -1681,7 +1681,7 @@ func TestGetPlanCacheNormalized(t *testing.T) { t.Run("Cache", func(t *testing.T) { r, _, _, _, ctx := createExecutorEnv(t) r.normalize = true - emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") query1 := "select * from music_user_map where id = 1" _, logStats1 := getPlanCached(t, ctx, r, emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true /* skipQueryPlanCache */) @@ -1698,7 +1698,7 @@ func TestGetPlanCacheNormalized(t *testing.T) { // Skip cache using directive r, _, _, _, ctx := createExecutorEnv(t) r.normalize = true - unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") query1 := "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)" getPlanCached(t, ctx, r, unshardedvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) @@ -1709,12 +1709,12 @@ func TestGetPlanCacheNormalized(t *testing.T) { assertCacheSize(t, r.plans, 1) // the target string will be resolved and become part of the plan cache key, which adds a new entry - ksIDVc1, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + ksIDVc1, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") getPlanCached(t, ctx, r, ksIDVc1, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) assertCacheSize(t, r.plans, 2) // the target string will be resolved and become part of the plan cache key, as it's an unsharded ks, it will be the same entry as above - ksIDVc2, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + ksIDVc2, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") getPlanCached(t, ctx, r, ksIDVc2, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) assertCacheSize(t, r.plans, 2) }) @@ -1724,8 +1724,8 @@ func TestGetPlanNormalized(t *testing.T) { r, _, _, _, ctx := createExecutorEnv(t) r.normalize = true - emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) - unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") + unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") query1 := "select * from music_user_map where id = 1" query2 := "select * from music_user_map where id = 2" @@ -1782,7 +1782,7 @@ func TestGetPlanPriority(t *testing.T) { r.normalize = true logStats := logstats.NewLogStats(ctx, "Test", "", "", nil) - vCursor, err := newVCursorImpl(session, makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + vCursor, err := newVCursorImpl(session, makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") assert.NoError(t, err) stmt, err := sqlparser.NewTestParser().Parse(testCase.sql) diff --git a/go/vt/vtgate/legacy_scatter_conn_test.go b/go/vt/vtgate/legacy_scatter_conn_test.go index 8fefce1dd66..8c76da67b79 100644 --- a/go/vt/vtgate/legacy_scatter_conn_test.go +++ b/go/vt/vtgate/legacy_scatter_conn_test.go @@ -380,6 +380,10 @@ func (o *recordingResultsObserver) observe(result *sqltypes.Result) { mu.Unlock() } +func (o *recordingResultsObserver) clone() resultsObserver { + return o +} + func TestMultiExecs(t *testing.T) { ctx := utils.LeakCheckContext(t) createSandbox("TestMultiExecs") diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go index 199892842ee..dd487a69df9 100644 --- a/go/vt/vtgate/plan_execute.go +++ b/go/vt/vtgate/plan_execute.go @@ -111,7 +111,7 @@ func (e *Executor) newExecute( } } - vcursor, err := newVCursorImpl(safeSession, comments, e, logStats, e.vm, vs, e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv) + vcursor, err := newVCursorImpl(safeSession, comments, e, logStats, e.vm, vs, e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv, sql) if err != nil { return err } diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index f7db598127e..3a65bda5979 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -24,6 +24,8 @@ import ( "sync/atomic" "time" + "vitess.io/vitess/go/vt/vtgate/logstats" + "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/vt/sqlparser" @@ -73,14 +75,26 @@ type shardActionFunc func(rs *srvtopo.ResolvedShard, i int) error type shardActionTransactionFunc func(rs *srvtopo.ResolvedShard, i int, shardActionInfo *shardActionInfo) (*shardActionInfo, error) type ( + // resultsObserver will be given the chance to observe the results coming in for a specific query resultsObserver interface { observe(*sqltypes.Result) + clone() resultsObserver } + nullResultsObserver struct{} ) +// resultObserverFactory is a factory function that creates a resultsObserver, associating them with the query +var resultObserverFactory = func(*logstats.LogStats) resultsObserver { + return nullResultsObserver{} +} + func (nullResultsObserver) observe(*sqltypes.Result) {} +func (nullResultsObserver) clone() resultsObserver { + return nullResultsObserver{} +} + // NewScatterConn creates a new ScatterConn. func NewScatterConn(statsName string, txConn *TxConn, gw *TabletGateway) *ScatterConn { // this only works with TabletGateway diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index a71ad29184a..711bbb43cbc 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -147,6 +147,7 @@ func newVCursorImpl( serv srvtopo.Server, warnShardedOnly bool, pv plancontext.PlannerVersion, + sql string, ) (*vcursorImpl, error) { keyspace, tabletType, destination, err := parseDestinationTarget(safeSession.TargetString, vschema) if err != nil { @@ -197,7 +198,7 @@ func newVCursorImpl( pv: pv, warmingReadsPercent: warmingReadsPct, warmingReadsChannel: warmingReadsChan, - resultsObserver: nullResultsObserver{}, + resultsObserver: resultObserverFactory(logStats), }, nil } @@ -1328,7 +1329,7 @@ func (vc *vcursorImpl) cloneWithAutocommitSession() *vcursorImpl { topoServer: vc.topoServer, warnShardedOnly: vc.warnShardedOnly, pv: vc.pv, - resultsObserver: vc.resultsObserver, + resultsObserver: vc.resultsObserver.clone(), } } @@ -1401,7 +1402,7 @@ func (vc *vcursorImpl) CloneForReplicaWarming(ctx context.Context) engine.VCurso warnShardedOnly: vc.warnShardedOnly, warnings: vc.warnings, pv: vc.pv, - resultsObserver: nullResultsObserver{}, + resultsObserver: vc.resultsObserver.clone(), } v.marginComments.Trailing += "/* warming read */" @@ -1433,7 +1434,7 @@ func (vc *vcursorImpl) CloneForMirroring(ctx context.Context) engine.VCursor { warnShardedOnly: vc.warnShardedOnly, warnings: vc.warnings, pv: vc.pv, - resultsObserver: nullResultsObserver{}, + resultsObserver: vc.resultsObserver.clone(), } v.marginComments.Trailing += "/* mirror query */" diff --git a/go/vt/vtgate/vcursor_impl_test.go b/go/vt/vtgate/vcursor_impl_test.go index b8e4a0d3a0a..87a70491381 100644 --- a/go/vt/vtgate/vcursor_impl_test.go +++ b/go/vt/vtgate/vcursor_impl_test.go @@ -187,7 +187,7 @@ func TestDestinationKeyspace(t *testing.T) { r, _, _, _, _ := createExecutorEnv(t) for i, tc := range tests { t.Run(strconv.Itoa(i)+tc.targetString, func(t *testing.T) { - impl, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: tc.targetString}), sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, false, querypb.ExecuteOptions_Gen4) + impl, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: tc.targetString}), sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, false, querypb.ExecuteOptions_Gen4, "") impl.vschema = tc.vschema dest, keyspace, tabletType, err := impl.TargetDestination(tc.qualifier) if tc.expectedError == "" { @@ -246,7 +246,7 @@ func TestSetTarget(t *testing.T) { r, _, _, _, _ := createExecutorEnv(t) for i, tc := range tests { t.Run(fmt.Sprintf("%d#%s", i, tc.targetString), func(t *testing.T) { - vc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{InTransaction: true}), sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, false, querypb.ExecuteOptions_Gen4) + vc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{InTransaction: true}), sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, false, querypb.ExecuteOptions_Gen4, "") vc.vschema = tc.vschema err := vc.SetTarget(tc.targetString) if tc.expectedError == "" { @@ -297,7 +297,7 @@ func TestKeyForPlan(t *testing.T) { t.Run(fmt.Sprintf("%d#%s", i, tc.targetString), func(t *testing.T) { ss := NewSafeSession(&vtgatepb.Session{InTransaction: false}) ss.SetTargetString(tc.targetString) - vc, err := newVCursorImpl(ss, sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, srvtopo.NewResolver(&fakeTopoServer{}, nil, ""), nil, false, querypb.ExecuteOptions_Gen4) + vc, err := newVCursorImpl(ss, sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, srvtopo.NewResolver(&fakeTopoServer{}, nil, ""), nil, false, querypb.ExecuteOptions_Gen4, "") require.NoError(t, err) vc.vschema = tc.vschema @@ -320,7 +320,7 @@ func TestFirstSortedKeyspace(t *testing.T) { }} r, _, _, _, _ := createExecutorEnv(t) - vc, err := newVCursorImpl(NewSafeSession(nil), sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: vschemaWith2KS}, vschemaWith2KS, srvtopo.NewResolver(&fakeTopoServer{}, nil, ""), nil, false, querypb.ExecuteOptions_Gen4) + vc, err := newVCursorImpl(NewSafeSession(nil), sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: vschemaWith2KS}, vschemaWith2KS, srvtopo.NewResolver(&fakeTopoServer{}, nil, ""), nil, false, querypb.ExecuteOptions_Gen4, "") require.NoError(t, err) ks, err := vc.FirstSortedKeyspace() require.NoError(t, err)