From 577d7e2b0f02f10b5fdfbf86792e99d8c2f66a79 Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Tue, 10 Sep 2024 07:21:39 +0200 Subject: [PATCH 1/2] make the results observer easier to extend, and give it a life cycle Signed-off-by: Andres Taylor --- go/vt/vtgate/executor.go | 8 +++++--- go/vt/vtgate/executor_test.go | 26 ++++++++++++------------ go/vt/vtgate/legacy_scatter_conn_test.go | 2 ++ go/vt/vtgate/plan_execute.go | 3 ++- go/vt/vtgate/scatter_conn.go | 12 ++++++++++- go/vt/vtgate/vcursor_impl.go | 11 +++++++--- go/vt/vtgate/vcursor_impl_test.go | 8 ++++---- 7 files changed, 45 insertions(+), 25 deletions(-) diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 08e1bf09ab7..6b1582ffa78 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -672,7 +672,9 @@ func (e *Executor) executeSPInAllSessions(ctx context.Context, safeSession *Safe }) queries = append(queries, &querypb.BoundQuery{Sql: sql}) } - qr, errs = e.ExecuteMultiShard(ctx, nil, rss, queries, safeSession, false /*autocommit*/, ignoreMaxMemoryRows, nullResultsObserver{}) + observer := resultObserverFactory(sql) + qr, errs = e.ExecuteMultiShard(ctx, nil, rss, queries, safeSession, false /*autocommit*/, ignoreMaxMemoryRows, observer) + observer.close() err := vterrors.Aggregate(errs) if err != nil { return nil, err @@ -1405,8 +1407,8 @@ func (e *Executor) prepare(ctx context.Context, safeSession *SafeSession, sql st func (e *Executor) handlePrepare(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *logstats.LogStats) ([]*querypb.Field, error) { query, comments := sqlparser.SplitMarginComments(sql) - vcursor, _ := newVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv) - + vcursor, _ := newVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv, sql) + defer vcursor.Close() stmt, reservedVars, err := parseAndValidateQuery(query, e.env.Parser()) if err != nil { return nil, err diff --git a/go/vt/vtgate/executor_test.go b/go/vt/vtgate/executor_test.go index b8e2b996780..c133d689d7e 100644 --- a/go/vt/vtgate/executor_test.go +++ b/go/vt/vtgate/executor_test.go @@ -1547,8 +1547,8 @@ var pv = querypb.ExecuteOptions_Gen4 func TestGetPlanUnnormalized(t *testing.T) { r, _, _, _, ctx := createExecutorEnv(t) - emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) - unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") + unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") query1 := "select * from music_user_map where id = 1" plan1, logStats1 := getPlanCached(t, ctx, r, emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) @@ -1631,7 +1631,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { t.Run("Cache", func(t *testing.T) { r, _, _, _, ctx := createExecutorEnv(t) - emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") query1 := "select * from music_user_map where id = 1" _, logStats1 := getPlanCached(t, ctx, r, emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true) @@ -1655,7 +1655,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { // Skip cache using directive r, _, _, _, ctx := createExecutorEnv(t) - unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") query1 := "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)" getPlanCached(t, ctx, r, unshardedvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) @@ -1666,12 +1666,12 @@ func TestGetPlanCacheUnnormalized(t *testing.T) { assertCacheSize(t, r.plans, 1) // the target string will be resolved and become part of the plan cache key, which adds a new entry - ksIDVc1, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + ksIDVc1, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") getPlanCached(t, ctx, r, ksIDVc1, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) assertCacheSize(t, r.plans, 2) // the target string will be resolved and become part of the plan cache key, as it's an unsharded ks, it will be the same entry as above - ksIDVc2, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + ksIDVc2, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") getPlanCached(t, ctx, r, ksIDVc2, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) assertCacheSize(t, r.plans, 2) }) @@ -1681,7 +1681,7 @@ func TestGetPlanCacheNormalized(t *testing.T) { t.Run("Cache", func(t *testing.T) { r, _, _, _, ctx := createExecutorEnv(t) r.normalize = true - emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") query1 := "select * from music_user_map where id = 1" _, logStats1 := getPlanCached(t, ctx, r, emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true /* skipQueryPlanCache */) @@ -1698,7 +1698,7 @@ func TestGetPlanCacheNormalized(t *testing.T) { // Skip cache using directive r, _, _, _, ctx := createExecutorEnv(t) r.normalize = true - unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") query1 := "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)" getPlanCached(t, ctx, r, unshardedvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) @@ -1709,12 +1709,12 @@ func TestGetPlanCacheNormalized(t *testing.T) { assertCacheSize(t, r.plans, 1) // the target string will be resolved and become part of the plan cache key, which adds a new entry - ksIDVc1, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + ksIDVc1, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") getPlanCached(t, ctx, r, ksIDVc1, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) assertCacheSize(t, r.plans, 2) // the target string will be resolved and become part of the plan cache key, as it's an unsharded ks, it will be the same entry as above - ksIDVc2, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + ksIDVc2, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") getPlanCached(t, ctx, r, ksIDVc2, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false) assertCacheSize(t, r.plans, 2) }) @@ -1724,8 +1724,8 @@ func TestGetPlanNormalized(t *testing.T) { r, _, _, _, ctx := createExecutorEnv(t) r.normalize = true - emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) - unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") + unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") query1 := "select * from music_user_map where id = 1" query2 := "select * from music_user_map where id = 2" @@ -1782,7 +1782,7 @@ func TestGetPlanPriority(t *testing.T) { r.normalize = true logStats := logstats.NewLogStats(ctx, "Test", "", "", nil) - vCursor, err := newVCursorImpl(session, makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv) + vCursor, err := newVCursorImpl(session, makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "") assert.NoError(t, err) stmt, err := sqlparser.NewTestParser().Parse(testCase.sql) diff --git a/go/vt/vtgate/legacy_scatter_conn_test.go b/go/vt/vtgate/legacy_scatter_conn_test.go index 8fefce1dd66..8155531005e 100644 --- a/go/vt/vtgate/legacy_scatter_conn_test.go +++ b/go/vt/vtgate/legacy_scatter_conn_test.go @@ -380,6 +380,8 @@ func (o *recordingResultsObserver) observe(result *sqltypes.Result) { mu.Unlock() } +func (o *recordingResultsObserver) close() {} + func TestMultiExecs(t *testing.T) { ctx := utils.LeakCheckContext(t) createSandbox("TestMultiExecs") diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go index 199892842ee..8a7b5bea76b 100644 --- a/go/vt/vtgate/plan_execute.go +++ b/go/vt/vtgate/plan_execute.go @@ -111,10 +111,11 @@ func (e *Executor) newExecute( } } - vcursor, err := newVCursorImpl(safeSession, comments, e, logStats, e.vm, vs, e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv) + vcursor, err := newVCursorImpl(safeSession, comments, e, logStats, e.vm, vs, e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv, sql) if err != nil { return err } + defer vcursor.Close() // we know this happens in a loop, but MaxBufferingRetries is small, so it's fine // 3: Create a plan for the query. // If we are retrying, it is likely that the routing rules have changed and hence we need to diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index f7db598127e..5e2e13a7a0e 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -73,13 +73,23 @@ type shardActionFunc func(rs *srvtopo.ResolvedShard, i int) error type shardActionTransactionFunc func(rs *srvtopo.ResolvedShard, i int, shardActionInfo *shardActionInfo) (*shardActionInfo, error) type ( + // resultsObserver will be given the chance to observe the results coming in for a specific query resultsObserver interface { observe(*sqltypes.Result) + close() } - nullResultsObserver struct{} + + nullResultsObserver struct{} + nullResultObserverFactory func(sql string) resultsObserver ) +// resultObserverFactory is a factory function that creates a resultsObserver, associating them with the query +var resultObserverFactory func(sql string) resultsObserver = func(sql string) resultsObserver { + return nullResultsObserver{} +} + func (nullResultsObserver) observe(*sqltypes.Result) {} +func (nullResultsObserver) close() {} // NewScatterConn creates a new ScatterConn. func NewScatterConn(statsName string, txConn *TxConn, gw *TabletGateway) *ScatterConn { diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index a71ad29184a..ee7ac38cc5b 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -147,6 +147,7 @@ func newVCursorImpl( serv srvtopo.Server, warnShardedOnly bool, pv plancontext.PlannerVersion, + sql string, ) (*vcursorImpl, error) { keyspace, tabletType, destination, err := parseDestinationTarget(safeSession.TargetString, vschema) if err != nil { @@ -197,7 +198,7 @@ func newVCursorImpl( pv: pv, warmingReadsPercent: warmingReadsPct, warmingReadsChannel: warmingReadsChan, - resultsObserver: nullResultsObserver{}, + resultsObserver: resultObserverFactory(sql), }, nil } @@ -1401,7 +1402,7 @@ func (vc *vcursorImpl) CloneForReplicaWarming(ctx context.Context) engine.VCurso warnShardedOnly: vc.warnShardedOnly, warnings: vc.warnings, pv: vc.pv, - resultsObserver: nullResultsObserver{}, + resultsObserver: vc.resultsObserver, } v.marginComments.Trailing += "/* warming read */" @@ -1433,7 +1434,7 @@ func (vc *vcursorImpl) CloneForMirroring(ctx context.Context) engine.VCursor { warnShardedOnly: vc.warnShardedOnly, warnings: vc.warnings, pv: vc.pv, - resultsObserver: nullResultsObserver{}, + resultsObserver: vc.resultsObserver, } v.marginComments.Trailing += "/* mirror query */" @@ -1459,3 +1460,7 @@ func (vc *vcursorImpl) UpdateForeignKeyChecksState(fkStateFromQuery *bool) { func (vc *vcursorImpl) GetForeignKeyChecksState() *bool { return vc.fkChecksState } + +func (vc *vcursorImpl) Close() { + vc.resultsObserver.close() +} diff --git a/go/vt/vtgate/vcursor_impl_test.go b/go/vt/vtgate/vcursor_impl_test.go index b8e4a0d3a0a..87a70491381 100644 --- a/go/vt/vtgate/vcursor_impl_test.go +++ b/go/vt/vtgate/vcursor_impl_test.go @@ -187,7 +187,7 @@ func TestDestinationKeyspace(t *testing.T) { r, _, _, _, _ := createExecutorEnv(t) for i, tc := range tests { t.Run(strconv.Itoa(i)+tc.targetString, func(t *testing.T) { - impl, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: tc.targetString}), sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, false, querypb.ExecuteOptions_Gen4) + impl, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: tc.targetString}), sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, false, querypb.ExecuteOptions_Gen4, "") impl.vschema = tc.vschema dest, keyspace, tabletType, err := impl.TargetDestination(tc.qualifier) if tc.expectedError == "" { @@ -246,7 +246,7 @@ func TestSetTarget(t *testing.T) { r, _, _, _, _ := createExecutorEnv(t) for i, tc := range tests { t.Run(fmt.Sprintf("%d#%s", i, tc.targetString), func(t *testing.T) { - vc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{InTransaction: true}), sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, false, querypb.ExecuteOptions_Gen4) + vc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{InTransaction: true}), sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, false, querypb.ExecuteOptions_Gen4, "") vc.vschema = tc.vschema err := vc.SetTarget(tc.targetString) if tc.expectedError == "" { @@ -297,7 +297,7 @@ func TestKeyForPlan(t *testing.T) { t.Run(fmt.Sprintf("%d#%s", i, tc.targetString), func(t *testing.T) { ss := NewSafeSession(&vtgatepb.Session{InTransaction: false}) ss.SetTargetString(tc.targetString) - vc, err := newVCursorImpl(ss, sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, srvtopo.NewResolver(&fakeTopoServer{}, nil, ""), nil, false, querypb.ExecuteOptions_Gen4) + vc, err := newVCursorImpl(ss, sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, srvtopo.NewResolver(&fakeTopoServer{}, nil, ""), nil, false, querypb.ExecuteOptions_Gen4, "") require.NoError(t, err) vc.vschema = tc.vschema @@ -320,7 +320,7 @@ func TestFirstSortedKeyspace(t *testing.T) { }} r, _, _, _, _ := createExecutorEnv(t) - vc, err := newVCursorImpl(NewSafeSession(nil), sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: vschemaWith2KS}, vschemaWith2KS, srvtopo.NewResolver(&fakeTopoServer{}, nil, ""), nil, false, querypb.ExecuteOptions_Gen4) + vc, err := newVCursorImpl(NewSafeSession(nil), sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: vschemaWith2KS}, vschemaWith2KS, srvtopo.NewResolver(&fakeTopoServer{}, nil, ""), nil, false, querypb.ExecuteOptions_Gen4, "") require.NoError(t, err) ks, err := vc.FirstSortedKeyspace() require.NoError(t, err) From be19f481c166f665fd08f62cd7d6bff4e8cb9a6d Mon Sep 17 00:00:00 2001 From: Andres Taylor Date: Tue, 10 Sep 2024 15:53:38 +0200 Subject: [PATCH 2/2] change interface of resultObserver Signed-off-by: Andres Taylor --- go/vt/vtgate/executor.go | 6 ++---- go/vt/vtgate/legacy_scatter_conn_test.go | 4 +++- go/vt/vtgate/plan_execute.go | 1 - go/vt/vtgate/scatter_conn.go | 14 +++++++++----- go/vt/vtgate/vcursor_impl.go | 12 ++++-------- 5 files changed, 18 insertions(+), 19 deletions(-) diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 6b1582ffa78..3a7d81fa4e1 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -672,9 +672,8 @@ func (e *Executor) executeSPInAllSessions(ctx context.Context, safeSession *Safe }) queries = append(queries, &querypb.BoundQuery{Sql: sql}) } - observer := resultObserverFactory(sql) - qr, errs = e.ExecuteMultiShard(ctx, nil, rss, queries, safeSession, false /*autocommit*/, ignoreMaxMemoryRows, observer) - observer.close() + qr, errs = e.ExecuteMultiShard(ctx, nil, rss, queries, safeSession, false /*autocommit*/, ignoreMaxMemoryRows, nullResultsObserver{}) + err := vterrors.Aggregate(errs) if err != nil { return nil, err @@ -1408,7 +1407,6 @@ func (e *Executor) prepare(ctx context.Context, safeSession *SafeSession, sql st func (e *Executor) handlePrepare(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *logstats.LogStats) ([]*querypb.Field, error) { query, comments := sqlparser.SplitMarginComments(sql) vcursor, _ := newVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv, sql) - defer vcursor.Close() stmt, reservedVars, err := parseAndValidateQuery(query, e.env.Parser()) if err != nil { return nil, err diff --git a/go/vt/vtgate/legacy_scatter_conn_test.go b/go/vt/vtgate/legacy_scatter_conn_test.go index 8155531005e..8c76da67b79 100644 --- a/go/vt/vtgate/legacy_scatter_conn_test.go +++ b/go/vt/vtgate/legacy_scatter_conn_test.go @@ -380,7 +380,9 @@ func (o *recordingResultsObserver) observe(result *sqltypes.Result) { mu.Unlock() } -func (o *recordingResultsObserver) close() {} +func (o *recordingResultsObserver) clone() resultsObserver { + return o +} func TestMultiExecs(t *testing.T) { ctx := utils.LeakCheckContext(t) diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go index 8a7b5bea76b..dd487a69df9 100644 --- a/go/vt/vtgate/plan_execute.go +++ b/go/vt/vtgate/plan_execute.go @@ -115,7 +115,6 @@ func (e *Executor) newExecute( if err != nil { return err } - defer vcursor.Close() // we know this happens in a loop, but MaxBufferingRetries is small, so it's fine // 3: Create a plan for the query. // If we are retrying, it is likely that the routing rules have changed and hence we need to diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index 5e2e13a7a0e..3a65bda5979 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -24,6 +24,8 @@ import ( "sync/atomic" "time" + "vitess.io/vitess/go/vt/vtgate/logstats" + "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/vt/sqlparser" @@ -76,20 +78,22 @@ type ( // resultsObserver will be given the chance to observe the results coming in for a specific query resultsObserver interface { observe(*sqltypes.Result) - close() + clone() resultsObserver } - nullResultsObserver struct{} - nullResultObserverFactory func(sql string) resultsObserver + nullResultsObserver struct{} ) // resultObserverFactory is a factory function that creates a resultsObserver, associating them with the query -var resultObserverFactory func(sql string) resultsObserver = func(sql string) resultsObserver { +var resultObserverFactory = func(*logstats.LogStats) resultsObserver { return nullResultsObserver{} } func (nullResultsObserver) observe(*sqltypes.Result) {} -func (nullResultsObserver) close() {} + +func (nullResultsObserver) clone() resultsObserver { + return nullResultsObserver{} +} // NewScatterConn creates a new ScatterConn. func NewScatterConn(statsName string, txConn *TxConn, gw *TabletGateway) *ScatterConn { diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index ee7ac38cc5b..711bbb43cbc 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -198,7 +198,7 @@ func newVCursorImpl( pv: pv, warmingReadsPercent: warmingReadsPct, warmingReadsChannel: warmingReadsChan, - resultsObserver: resultObserverFactory(sql), + resultsObserver: resultObserverFactory(logStats), }, nil } @@ -1329,7 +1329,7 @@ func (vc *vcursorImpl) cloneWithAutocommitSession() *vcursorImpl { topoServer: vc.topoServer, warnShardedOnly: vc.warnShardedOnly, pv: vc.pv, - resultsObserver: vc.resultsObserver, + resultsObserver: vc.resultsObserver.clone(), } } @@ -1402,7 +1402,7 @@ func (vc *vcursorImpl) CloneForReplicaWarming(ctx context.Context) engine.VCurso warnShardedOnly: vc.warnShardedOnly, warnings: vc.warnings, pv: vc.pv, - resultsObserver: vc.resultsObserver, + resultsObserver: vc.resultsObserver.clone(), } v.marginComments.Trailing += "/* warming read */" @@ -1434,7 +1434,7 @@ func (vc *vcursorImpl) CloneForMirroring(ctx context.Context) engine.VCursor { warnShardedOnly: vc.warnShardedOnly, warnings: vc.warnings, pv: vc.pv, - resultsObserver: vc.resultsObserver, + resultsObserver: vc.resultsObserver.clone(), } v.marginComments.Trailing += "/* mirror query */" @@ -1460,7 +1460,3 @@ func (vc *vcursorImpl) UpdateForeignKeyChecksState(fkStateFromQuery *bool) { func (vc *vcursorImpl) GetForeignKeyChecksState() *bool { return vc.fkChecksState } - -func (vc *vcursorImpl) Close() { - vc.resultsObserver.close() -}