Skip to content

Commit

Permalink
change interface of resultObserver
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay committed Sep 10, 2024
1 parent 577d7e2 commit be19f48
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 19 deletions.
6 changes: 2 additions & 4 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,9 +672,8 @@ func (e *Executor) executeSPInAllSessions(ctx context.Context, safeSession *Safe
})
queries = append(queries, &querypb.BoundQuery{Sql: sql})
}
observer := resultObserverFactory(sql)
qr, errs = e.ExecuteMultiShard(ctx, nil, rss, queries, safeSession, false /*autocommit*/, ignoreMaxMemoryRows, observer)
observer.close()
qr, errs = e.ExecuteMultiShard(ctx, nil, rss, queries, safeSession, false /*autocommit*/, ignoreMaxMemoryRows, nullResultsObserver{})

err := vterrors.Aggregate(errs)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1408,7 +1407,6 @@ func (e *Executor) prepare(ctx context.Context, safeSession *SafeSession, sql st
func (e *Executor) handlePrepare(ctx context.Context, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable, logStats *logstats.LogStats) ([]*querypb.Field, error) {
query, comments := sqlparser.SplitMarginComments(sql)
vcursor, _ := newVCursorImpl(safeSession, comments, e, logStats, e.vm, e.VSchema(), e.resolver.resolver, e.serv, e.warnShardedOnly, e.pv, sql)
defer vcursor.Close()
stmt, reservedVars, err := parseAndValidateQuery(query, e.env.Parser())
if err != nil {
return nil, err
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vtgate/legacy_scatter_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,9 @@ func (o *recordingResultsObserver) observe(result *sqltypes.Result) {
mu.Unlock()
}

func (o *recordingResultsObserver) close() {}
func (o *recordingResultsObserver) clone() resultsObserver {
return o
}

func TestMultiExecs(t *testing.T) {
ctx := utils.LeakCheckContext(t)
Expand Down
1 change: 0 additions & 1 deletion go/vt/vtgate/plan_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ func (e *Executor) newExecute(
if err != nil {
return err
}
defer vcursor.Close() // we know this happens in a loop, but MaxBufferingRetries is small, so it's fine

// 3: Create a plan for the query.
// If we are retrying, it is likely that the routing rules have changed and hence we need to
Expand Down
14 changes: 9 additions & 5 deletions go/vt/vtgate/scatter_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync/atomic"
"time"

"vitess.io/vitess/go/vt/vtgate/logstats"

"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/sqlparser"

Expand Down Expand Up @@ -76,20 +78,22 @@ type (
// resultsObserver will be given the chance to observe the results coming in for a specific query
resultsObserver interface {
observe(*sqltypes.Result)
close()
clone() resultsObserver
}

nullResultsObserver struct{}
nullResultObserverFactory func(sql string) resultsObserver
nullResultsObserver struct{}
)

// resultObserverFactory is a factory function that creates a resultsObserver, associating them with the query
var resultObserverFactory func(sql string) resultsObserver = func(sql string) resultsObserver {
var resultObserverFactory = func(*logstats.LogStats) resultsObserver {
return nullResultsObserver{}
}

func (nullResultsObserver) observe(*sqltypes.Result) {}
func (nullResultsObserver) close() {}

func (nullResultsObserver) clone() resultsObserver {
return nullResultsObserver{}
}

// NewScatterConn creates a new ScatterConn.
func NewScatterConn(statsName string, txConn *TxConn, gw *TabletGateway) *ScatterConn {
Expand Down
12 changes: 4 additions & 8 deletions go/vt/vtgate/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func newVCursorImpl(
pv: pv,
warmingReadsPercent: warmingReadsPct,
warmingReadsChannel: warmingReadsChan,
resultsObserver: resultObserverFactory(sql),
resultsObserver: resultObserverFactory(logStats),
}, nil
}

Expand Down Expand Up @@ -1329,7 +1329,7 @@ func (vc *vcursorImpl) cloneWithAutocommitSession() *vcursorImpl {
topoServer: vc.topoServer,
warnShardedOnly: vc.warnShardedOnly,
pv: vc.pv,
resultsObserver: vc.resultsObserver,
resultsObserver: vc.resultsObserver.clone(),
}
}

Expand Down Expand Up @@ -1402,7 +1402,7 @@ func (vc *vcursorImpl) CloneForReplicaWarming(ctx context.Context) engine.VCurso
warnShardedOnly: vc.warnShardedOnly,
warnings: vc.warnings,
pv: vc.pv,
resultsObserver: vc.resultsObserver,
resultsObserver: vc.resultsObserver.clone(),
}

v.marginComments.Trailing += "/* warming read */"
Expand Down Expand Up @@ -1434,7 +1434,7 @@ func (vc *vcursorImpl) CloneForMirroring(ctx context.Context) engine.VCursor {
warnShardedOnly: vc.warnShardedOnly,
warnings: vc.warnings,
pv: vc.pv,
resultsObserver: vc.resultsObserver,
resultsObserver: vc.resultsObserver.clone(),
}

v.marginComments.Trailing += "/* mirror query */"
Expand All @@ -1460,7 +1460,3 @@ func (vc *vcursorImpl) UpdateForeignKeyChecksState(fkStateFromQuery *bool) {
func (vc *vcursorImpl) GetForeignKeyChecksState() *bool {
return vc.fkChecksState
}

func (vc *vcursorImpl) Close() {
vc.resultsObserver.close()
}

0 comments on commit be19f48

Please sign in to comment.