Skip to content

Commit

Permalink
make the results observer easier to extend, and give it a life cycle
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay committed Sep 10, 2024
1 parent a513601 commit 577d7e2
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 25 deletions.
8 changes: 5 additions & 3 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,9 @@ func (e *Executor) executeSPInAllSessions(ctx context.Context, safeSession *Safe
})
queries = append(queries, &querypb.BoundQuery{Sql: sql})
}
qr, errs = e.ExecuteMultiShard(ctx, nil, rss, queries, safeSession, false /*autocommit*/, ignoreMaxMemoryRows, nullResultsObserver{})
observer := resultObserverFactory(sql)
qr, errs = e.ExecuteMultiShard(ctx, nil, rss, queries, safeSession, false /*autocommit*/, ignoreMaxMemoryRows, observer)
observer.close()
err := vterrors.Aggregate(errs)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1405,8 +1407,8 @@ func (e *Executor) prepare(ctx context.Context, safeSession *SafeSession, sql st

func (e *Executor) handlePrepare(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *logstats.LogStats) ([]*querypb.Field, error) {
query, comments := sqlparser.SplitMarginComments(sql)
vcursor, _ := newVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv)

vcursor, _ := newVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv, sql)
defer vcursor.Close()
stmt, reservedVars, err := parseAndValidateQuery(query, e.env.Parser())
if err != nil {
return nil, err
Expand Down
26 changes: 13 additions & 13 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1547,8 +1547,8 @@ var pv = querypb.ExecuteOptions_Gen4
func TestGetPlanUnnormalized(t *testing.T) {
r, _, _, _, ctx := createExecutorEnv(t)

emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "")
unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "")

query1 := "select * from music_user_map where id = 1"
plan1, logStats1 := getPlanCached(t, ctx, r, emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false)
Expand Down Expand Up @@ -1631,7 +1631,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) {
t.Run("Cache", func(t *testing.T) {
r, _, _, _, ctx := createExecutorEnv(t)

emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "")
query1 := "select * from music_user_map where id = 1"

_, logStats1 := getPlanCached(t, ctx, r, emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true)
Expand All @@ -1655,7 +1655,7 @@ func TestGetPlanCacheUnnormalized(t *testing.T) {
// Skip cache using directive
r, _, _, _, ctx := createExecutorEnv(t)

unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "")

query1 := "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)"
getPlanCached(t, ctx, r, unshardedvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false)
Expand All @@ -1666,12 +1666,12 @@ func TestGetPlanCacheUnnormalized(t *testing.T) {
assertCacheSize(t, r.plans, 1)

// the target string will be resolved and become part of the plan cache key, which adds a new entry
ksIDVc1, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
ksIDVc1, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "")
getPlanCached(t, ctx, r, ksIDVc1, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false)
assertCacheSize(t, r.plans, 2)

// the target string will be resolved and become part of the plan cache key, as it's an unsharded ks, it will be the same entry as above
ksIDVc2, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
ksIDVc2, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "")
getPlanCached(t, ctx, r, ksIDVc2, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false)
assertCacheSize(t, r.plans, 2)
})
Expand All @@ -1681,7 +1681,7 @@ func TestGetPlanCacheNormalized(t *testing.T) {
t.Run("Cache", func(t *testing.T) {
r, _, _, _, ctx := createExecutorEnv(t)
r.normalize = true
emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "")

query1 := "select * from music_user_map where id = 1"
_, logStats1 := getPlanCached(t, ctx, r, emptyvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, true /* skipQueryPlanCache */)
Expand All @@ -1698,7 +1698,7 @@ func TestGetPlanCacheNormalized(t *testing.T) {
// Skip cache using directive
r, _, _, _, ctx := createExecutorEnv(t)
r.normalize = true
unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "")

query1 := "insert /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ into user(id) values (1), (2)"
getPlanCached(t, ctx, r, unshardedvc, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false)
Expand All @@ -1709,12 +1709,12 @@ func TestGetPlanCacheNormalized(t *testing.T) {
assertCacheSize(t, r.plans, 1)

// the target string will be resolved and become part of the plan cache key, which adds a new entry
ksIDVc1, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
ksIDVc1, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[deadbeef]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "")
getPlanCached(t, ctx, r, ksIDVc1, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false)
assertCacheSize(t, r.plans, 2)

// the target string will be resolved and become part of the plan cache key, as it's an unsharded ks, it will be the same entry as above
ksIDVc2, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
ksIDVc2, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "[beefdead]"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "")
getPlanCached(t, ctx, r, ksIDVc2, query1, makeComments(" /* comment */"), map[string]*querypb.BindVariable{}, false)
assertCacheSize(t, r.plans, 2)
})
Expand All @@ -1724,8 +1724,8 @@ func TestGetPlanNormalized(t *testing.T) {
r, _, _, _, ctx := createExecutorEnv(t)

r.normalize = true
emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
emptyvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "")
unshardedvc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: KsTestUnsharded + "@unknown"}), makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "")

query1 := "select * from music_user_map where id = 1"
query2 := "select * from music_user_map where id = 2"
Expand Down Expand Up @@ -1782,7 +1782,7 @@ func TestGetPlanPriority(t *testing.T) {

r.normalize = true
logStats := logstats.NewLogStats(ctx, "Test", "", "", nil)
vCursor, err := newVCursorImpl(session, makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv)
vCursor, err := newVCursorImpl(session, makeComments(""), r, nil, r.vm, r.VSchema(), r.resolver.resolver, nil, false, pv, "")
assert.NoError(t, err)

stmt, err := sqlparser.NewTestParser().Parse(testCase.sql)
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/legacy_scatter_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,8 @@ func (o *recordingResultsObserver) observe(result *sqltypes.Result) {
mu.Unlock()
}

func (o *recordingResultsObserver) close() {}

func TestMultiExecs(t *testing.T) {
ctx := utils.LeakCheckContext(t)
createSandbox("TestMultiExecs")
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtgate/plan_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,11 @@ func (e *Executor) newExecute(
}
}

vcursor, err := newVCursorImpl(safeSession, comments, e, logStats, e.vm, vs, e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv)
vcursor, err := newVCursorImpl(safeSession, comments, e, logStats, e.vm, vs, e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv, sql)
if err != nil {
return err
}
defer vcursor.Close() // we know this happens in a loop, but MaxBufferingRetries is small, so it's fine

// 3: Create a plan for the query.
// If we are retrying, it is likely that the routing rules have changed and hence we need to
Expand Down
12 changes: 11 additions & 1 deletion go/vt/vtgate/scatter_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,23 @@ type shardActionFunc func(rs *srvtopo.ResolvedShard, i int) error
type shardActionTransactionFunc func(rs *srvtopo.ResolvedShard, i int, shardActionInfo *shardActionInfo) (*shardActionInfo, error)

type (
// resultsObserver will be given the chance to observe the results coming in for a specific query
resultsObserver interface {
observe(*sqltypes.Result)
close()
}
nullResultsObserver struct{}

nullResultsObserver struct{}
nullResultObserverFactory func(sql string) resultsObserver
)

// resultObserverFactory is a factory function that creates a resultsObserver, associating them with the query
var resultObserverFactory func(sql string) resultsObserver = func(sql string) resultsObserver {
return nullResultsObserver{}
}

func (nullResultsObserver) observe(*sqltypes.Result) {}
func (nullResultsObserver) close() {}

// NewScatterConn creates a new ScatterConn.
func NewScatterConn(statsName string, txConn *TxConn, gw *TabletGateway) *ScatterConn {
Expand Down
11 changes: 8 additions & 3 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func newVCursorImpl(
serv srvtopo.Server,
warnShardedOnly bool,
pv plancontext.PlannerVersion,
sql string,
) (*vcursorImpl, error) {
keyspace, tabletType, destination, err := parseDestinationTarget(safeSession.TargetString, vschema)
if err != nil {
Expand Down Expand Up @@ -197,7 +198,7 @@ func newVCursorImpl(
pv: pv,
warmingReadsPercent: warmingReadsPct,
warmingReadsChannel: warmingReadsChan,
resultsObserver: nullResultsObserver{},
resultsObserver: resultObserverFactory(sql),
}, nil
}

Expand Down Expand Up @@ -1401,7 +1402,7 @@ func (vc *vcursorImpl) CloneForReplicaWarming(ctx context.Context) engine.VCurso
warnShardedOnly: vc.warnShardedOnly,
warnings: vc.warnings,
pv: vc.pv,
resultsObserver: nullResultsObserver{},
resultsObserver: vc.resultsObserver,
}

v.marginComments.Trailing += "/* warming read */"
Expand Down Expand Up @@ -1433,7 +1434,7 @@ func (vc *vcursorImpl) CloneForMirroring(ctx context.Context) engine.VCursor {
warnShardedOnly: vc.warnShardedOnly,
warnings: vc.warnings,
pv: vc.pv,
resultsObserver: nullResultsObserver{},
resultsObserver: vc.resultsObserver,
}

v.marginComments.Trailing += "/* mirror query */"
Expand All @@ -1459,3 +1460,7 @@ func (vc *vcursorImpl) UpdateForeignKeyChecksState(fkStateFromQuery *bool) {
func (vc *vcursorImpl) GetForeignKeyChecksState() *bool {
return vc.fkChecksState
}

func (vc *vcursorImpl) Close() {
vc.resultsObserver.close()
}
8 changes: 4 additions & 4 deletions go/vt/vtgate/vcursor_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func TestDestinationKeyspace(t *testing.T) {
r, _, _, _, _ := createExecutorEnv(t)
for i, tc := range tests {
t.Run(strconv.Itoa(i)+tc.targetString, func(t *testing.T) {
impl, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: tc.targetString}), sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, false, querypb.ExecuteOptions_Gen4)
impl, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{TargetString: tc.targetString}), sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, false, querypb.ExecuteOptions_Gen4, "")
impl.vschema = tc.vschema
dest, keyspace, tabletType, err := impl.TargetDestination(tc.qualifier)
if tc.expectedError == "" {
Expand Down Expand Up @@ -246,7 +246,7 @@ func TestSetTarget(t *testing.T) {
r, _, _, _, _ := createExecutorEnv(t)
for i, tc := range tests {
t.Run(fmt.Sprintf("%d#%s", i, tc.targetString), func(t *testing.T) {
vc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{InTransaction: true}), sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, false, querypb.ExecuteOptions_Gen4)
vc, _ := newVCursorImpl(NewSafeSession(&vtgatepb.Session{InTransaction: true}), sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, nil, nil, false, querypb.ExecuteOptions_Gen4, "")
vc.vschema = tc.vschema
err := vc.SetTarget(tc.targetString)
if tc.expectedError == "" {
Expand Down Expand Up @@ -297,7 +297,7 @@ func TestKeyForPlan(t *testing.T) {
t.Run(fmt.Sprintf("%d#%s", i, tc.targetString), func(t *testing.T) {
ss := NewSafeSession(&vtgatepb.Session{InTransaction: false})
ss.SetTargetString(tc.targetString)
vc, err := newVCursorImpl(ss, sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, srvtopo.NewResolver(&fakeTopoServer{}, nil, ""), nil, false, querypb.ExecuteOptions_Gen4)
vc, err := newVCursorImpl(ss, sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: tc.vschema}, tc.vschema, srvtopo.NewResolver(&fakeTopoServer{}, nil, ""), nil, false, querypb.ExecuteOptions_Gen4, "")
require.NoError(t, err)
vc.vschema = tc.vschema

Expand All @@ -320,7 +320,7 @@ func TestFirstSortedKeyspace(t *testing.T) {
}}

r, _, _, _, _ := createExecutorEnv(t)
vc, err := newVCursorImpl(NewSafeSession(nil), sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: vschemaWith2KS}, vschemaWith2KS, srvtopo.NewResolver(&fakeTopoServer{}, nil, ""), nil, false, querypb.ExecuteOptions_Gen4)
vc, err := newVCursorImpl(NewSafeSession(nil), sqlparser.MarginComments{}, r, nil, &fakeVSchemaOperator{vschema: vschemaWith2KS}, vschemaWith2KS, srvtopo.NewResolver(&fakeTopoServer{}, nil, ""), nil, false, querypb.ExecuteOptions_Gen4, "")
require.NoError(t, err)
ks, err := vc.FirstSortedKeyspace()
require.NoError(t, err)
Expand Down

0 comments on commit 577d7e2

Please sign in to comment.