From 977b9a301a4ce6f597f090b877cb4f22b9308164 Mon Sep 17 00:00:00 2001 From: Rafer Hazen Date: Thu, 29 Aug 2024 10:38:44 -0600 Subject: [PATCH] Add resultsObserver to ScatterConn (#16638) Signed-off-by: Rafer Hazen Signed-off-by: Andres Taylor Co-authored-by: Andres Taylor --- go/vt/vtgate/executor.go | 10 +-- go/vt/vtgate/legacy_scatter_conn_test.go | 54 +++++++++--- go/vt/vtgate/scatter_conn.go | 33 +++++-- go/vt/vtgate/scatter_conn_test.go | 4 +- go/vt/vtgate/tx_conn_test.go | 104 +++++++++++------------ go/vt/vtgate/vcursor_impl.go | 16 ++-- 6 files changed, 137 insertions(+), 84 deletions(-) diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 093246c71c5..08e1bf09ab7 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -672,7 +672,7 @@ func (e *Executor) executeSPInAllSessions(ctx context.Context, safeSession *Safe }) queries = append(queries, &querypb.BoundQuery{Sql: sql}) } - qr, errs = e.ExecuteMultiShard(ctx, nil, rss, queries, safeSession, false /*autocommit*/, ignoreMaxMemoryRows) + qr, errs = e.ExecuteMultiShard(ctx, nil, rss, queries, safeSession, false /*autocommit*/, ignoreMaxMemoryRows, nullResultsObserver{}) err := vterrors.Aggregate(errs) if err != nil { return nil, err @@ -1454,13 +1454,13 @@ func parseAndValidateQuery(query string, parser *sqlparser.Parser) (sqlparser.St } // ExecuteMultiShard implements the IExecutor interface -func (e *Executor) ExecuteMultiShard(ctx context.Context, primitive engine.Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *SafeSession, autocommit bool, ignoreMaxMemoryRows bool) (qr *sqltypes.Result, errs []error) { - return e.scatterConn.ExecuteMultiShard(ctx, primitive, rss, queries, session, autocommit, ignoreMaxMemoryRows) +func (e *Executor) ExecuteMultiShard(ctx context.Context, primitive engine.Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *SafeSession, autocommit bool, ignoreMaxMemoryRows bool, resultsObserver resultsObserver) (qr *sqltypes.Result, errs []error) { + return e.scatterConn.ExecuteMultiShard(ctx, primitive, rss, queries, session, autocommit, ignoreMaxMemoryRows, resultsObserver) } // StreamExecuteMulti implements the IExecutor interface -func (e *Executor) StreamExecuteMulti(ctx context.Context, primitive engine.Primitive, query string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, session *SafeSession, autocommit bool, callback func(reply *sqltypes.Result) error) []error { - return e.scatterConn.StreamExecuteMulti(ctx, primitive, query, rss, vars, session, autocommit, callback) +func (e *Executor) StreamExecuteMulti(ctx context.Context, primitive engine.Primitive, query string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, session *SafeSession, autocommit bool, callback func(reply *sqltypes.Result) error, resultsObserver resultsObserver) []error { + return e.scatterConn.StreamExecuteMulti(ctx, primitive, query, rss, vars, session, autocommit, callback, resultsObserver) } // ExecuteLock implements the IExecutor interface diff --git a/go/vt/vtgate/legacy_scatter_conn_test.go b/go/vt/vtgate/legacy_scatter_conn_test.go index 7ada1c3ac31..8fefce1dd66 100644 --- a/go/vt/vtgate/legacy_scatter_conn_test.go +++ b/go/vt/vtgate/legacy_scatter_conn_test.go @@ -99,7 +99,7 @@ func TestLegacyExecuteFailOnAutocommit(t *testing.T) { }, Autocommit: false, } - _, errs := sc.ExecuteMultiShard(ctx, nil, rss, queries, NewSafeSession(session), true /*autocommit*/, false) + _, errs := sc.ExecuteMultiShard(ctx, nil, rss, queries, NewSafeSession(session), true /*autocommit*/, false, nullResultsObserver{}) err := vterrors.Aggregate(errs) require.Error(t, err) require.Contains(t, err.Error(), "in autocommit mode, transactionID should be zero but was: 123") @@ -123,7 +123,7 @@ func TestScatterConnExecuteMulti(t *testing.T) { } } - qr, errs := sc.ExecuteMultiShard(ctx, nil, rss, queries, NewSafeSession(nil), false /*autocommit*/, false) + qr, errs := sc.ExecuteMultiShard(ctx, nil, rss, queries, NewSafeSession(nil), false /*autocommit*/, false, nullResultsObserver{}) return qr, vterrors.Aggregate(errs) }) } @@ -143,7 +143,7 @@ func TestScatterConnStreamExecuteMulti(t *testing.T) { defer mu.Unlock() qr.AppendResult(r) return nil - }) + }, nullResultsObserver{}) return qr, vterrors.Aggregate(errors) }) } @@ -310,7 +310,7 @@ func TestMaxMemoryRows(t *testing.T) { sbc0.SetResults([]*sqltypes.Result{tworows, tworows}) sbc1.SetResults([]*sqltypes.Result{tworows, tworows}) - _, errs := sc.ExecuteMultiShard(ctx, nil, rss, queries, session, false, test.ignoreMaxMemoryRows) + _, errs := sc.ExecuteMultiShard(ctx, nil, rss, queries, session, false, test.ignoreMaxMemoryRows, nullResultsObserver{}) if test.ignoreMaxMemoryRows { require.NoError(t, err) } else { @@ -342,7 +342,7 @@ func TestLegaceHealthCheckFailsOnReservedConnections(t *testing.T) { }) } - _, errs := sc.ExecuteMultiShard(ctx, nil, rss, queries, session, false, false) + _, errs := sc.ExecuteMultiShard(ctx, nil, rss, queries, session, false, false, nullResultsObserver{}) require.Error(t, vterrors.Aggregate(errs)) } @@ -365,10 +365,21 @@ func executeOnShardsReturnsErr(t *testing.T, ctx context.Context, res *srvtopo.R }) } - _, errs := sc.ExecuteMultiShard(ctx, nil, rss, queries, session, false, false) + _, errs := sc.ExecuteMultiShard(ctx, nil, rss, queries, session, false, false, nullResultsObserver{}) return vterrors.Aggregate(errs) } +type recordingResultsObserver struct { + mu sync.Mutex + recorded []*sqltypes.Result +} + +func (o *recordingResultsObserver) observe(result *sqltypes.Result) { + mu.Lock() + o.recorded = append(o.recorded, result) + mu.Unlock() +} + func TestMultiExecs(t *testing.T) { ctx := utils.LeakCheckContext(t) createSandbox("TestMultiExecs") @@ -409,9 +420,17 @@ func TestMultiExecs(t *testing.T) { }, }, } + results := []*sqltypes.Result{ + {Info: "r0"}, + {Info: "r1"}, + } + sbc0.SetResults(results[0:1]) + sbc1.SetResults(results[1:2]) + + observer := recordingResultsObserver{} session := NewSafeSession(&vtgatepb.Session{}) - _, err := sc.ExecuteMultiShard(ctx, nil, rss, queries, session, false, false) + _, err := sc.ExecuteMultiShard(ctx, nil, rss, queries, session, false, false, &observer) require.NoError(t, vterrors.Aggregate(err)) if len(sbc0.Queries) == 0 || len(sbc1.Queries) == 0 { t.Fatalf("didn't get expected query") @@ -428,8 +447,12 @@ func TestMultiExecs(t *testing.T) { if !reflect.DeepEqual(sbc1.Queries[0].BindVariables, wantVars1) { t.Errorf("got %+v, want %+v", sbc0.Queries[0].BindVariables, wantVars1) } + assert.ElementsMatch(t, results, observer.recorded) + sbc0.Queries = nil sbc1.Queries = nil + sbc0.SetResults(results[0:1]) + sbc1.SetResults(results[1:2]) rss = []*srvtopo.ResolvedShard{ { @@ -455,15 +478,18 @@ func TestMultiExecs(t *testing.T) { "bv1": sqltypes.Int64BindVariable(1), }, } + + observer = recordingResultsObserver{} _ = sc.StreamExecuteMulti(ctx, nil, "query", rss, bvs, session, false /* autocommit */, func(*sqltypes.Result) error { return nil - }) + }, &observer) if !reflect.DeepEqual(sbc0.Queries[0].BindVariables, wantVars0) { t.Errorf("got %+v, want %+v", sbc0.Queries[0].BindVariables, wantVars0) } if !reflect.DeepEqual(sbc1.Queries[0].BindVariables, wantVars1) { t.Errorf("got %+v, want %+v", sbc0.Queries[0].BindVariables, wantVars1) } + assert.ElementsMatch(t, results, observer.recorded) } func TestScatterConnSingleDB(t *testing.T) { @@ -487,27 +513,27 @@ func TestScatterConnSingleDB(t *testing.T) { // TransactionMode_SINGLE in session session := NewSafeSession(&vtgatepb.Session{InTransaction: true, TransactionMode: vtgatepb.TransactionMode_SINGLE}) queries := []*querypb.BoundQuery{{Sql: "query1"}} - _, errors := sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) + _, errors := sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) require.Empty(t, errors) - _, errors = sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false) + _, errors = sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false, nullResultsObserver{}) require.Error(t, errors[0]) assert.Contains(t, errors[0].Error(), want) // TransactionMode_SINGLE in txconn sc.txConn.mode = vtgatepb.TransactionMode_SINGLE session = NewSafeSession(&vtgatepb.Session{InTransaction: true}) - _, errors = sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) + _, errors = sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) require.Empty(t, errors) - _, errors = sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false) + _, errors = sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false, nullResultsObserver{}) require.Error(t, errors[0]) assert.Contains(t, errors[0].Error(), want) // TransactionMode_MULTI in txconn. Should not fail. sc.txConn.mode = vtgatepb.TransactionMode_MULTI session = NewSafeSession(&vtgatepb.Session{InTransaction: true}) - _, errors = sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) + _, errors = sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) require.Empty(t, errors) - _, errors = sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false) + _, errors = sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false, nullResultsObserver{}) require.Empty(t, errors) } diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index 8b571f7b67d..f7db598127e 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -72,6 +72,15 @@ type shardActionFunc func(rs *srvtopo.ResolvedShard, i int) error // the results and errors for the caller. type shardActionTransactionFunc func(rs *srvtopo.ResolvedShard, i int, shardActionInfo *shardActionInfo) (*shardActionInfo, error) +type ( + resultsObserver interface { + observe(*sqltypes.Result) + } + nullResultsObserver struct{} +) + +func (nullResultsObserver) observe(*sqltypes.Result) {} + // NewScatterConn creates a new ScatterConn. func NewScatterConn(statsName string, txConn *TxConn, gw *TabletGateway) *ScatterConn { // this only works with TabletGateway @@ -146,6 +155,7 @@ func (stc *ScatterConn) ExecuteMultiShard( session *SafeSession, autocommit bool, ignoreMaxMemoryRows bool, + resultsObserver resultsObserver, ) (qr *sqltypes.Result, errs []error) { if len(rss) != len(queries) { @@ -260,6 +270,10 @@ func (stc *ScatterConn) ExecuteMultiShard( mu.Lock() defer mu.Unlock() + if innerqr != nil { + resultsObserver.observe(innerqr) + } + // Don't append more rows if row count is exceeded. if ignoreMaxMemoryRows || len(qr.Rows) <= maxMemoryRows { qr.AppendResult(innerqr) @@ -354,11 +368,18 @@ func (stc *ScatterConn) StreamExecuteMulti( session *SafeSession, autocommit bool, callback func(reply *sqltypes.Result) error, + resultsObserver resultsObserver, ) []error { if session.InLockSession() && session.TriggerLockHeartBeat() { go stc.runLockQuery(ctx, session) } + observedCallback := func(reply *sqltypes.Result) error { + if reply != nil { + resultsObserver.observe(reply) + } + return callback(reply) + } allErrors := stc.multiGoTransaction( ctx, "StreamExecute", @@ -407,20 +428,20 @@ func (stc *ScatterConn) StreamExecuteMulti( switch info.actionNeeded { case nothing: - err = qs.StreamExecute(ctx, rs.Target, query, bindVars[i], transactionID, reservedID, opts, callback) + err = qs.StreamExecute(ctx, rs.Target, query, bindVars[i], transactionID, reservedID, opts, observedCallback) if err != nil { retryRequest(func() { // we seem to have lost our connection. it was a reserved connection, let's try to recreate it info.actionNeeded = reserve var state queryservice.ReservedState - state, err = qs.ReserveStreamExecute(ctx, rs.Target, session.SetPreQueries(), query, bindVars[i], 0 /*transactionId*/, opts, callback) + state, err = qs.ReserveStreamExecute(ctx, rs.Target, session.SetPreQueries(), query, bindVars[i], 0 /*transactionId*/, opts, observedCallback) reservedID = state.ReservedID alias = state.TabletAlias }) } case begin: var state queryservice.TransactionState - state, err = qs.BeginStreamExecute(ctx, rs.Target, session.SavePoints(), query, bindVars[i], reservedID, opts, callback) + state, err = qs.BeginStreamExecute(ctx, rs.Target, session.SavePoints(), query, bindVars[i], reservedID, opts, observedCallback) transactionID = state.TransactionID alias = state.TabletAlias if err != nil { @@ -428,7 +449,7 @@ func (stc *ScatterConn) StreamExecuteMulti( // we seem to have lost our connection. it was a reserved connection, let's try to recreate it info.actionNeeded = reserveBegin var state queryservice.ReservedTransactionState - state, err = qs.ReserveBeginStreamExecute(ctx, rs.Target, session.SetPreQueries(), session.SavePoints(), query, bindVars[i], opts, callback) + state, err = qs.ReserveBeginStreamExecute(ctx, rs.Target, session.SetPreQueries(), session.SavePoints(), query, bindVars[i], opts, observedCallback) transactionID = state.TransactionID reservedID = state.ReservedID alias = state.TabletAlias @@ -436,12 +457,12 @@ func (stc *ScatterConn) StreamExecuteMulti( } case reserve: var state queryservice.ReservedState - state, err = qs.ReserveStreamExecute(ctx, rs.Target, session.SetPreQueries(), query, bindVars[i], transactionID, opts, callback) + state, err = qs.ReserveStreamExecute(ctx, rs.Target, session.SetPreQueries(), query, bindVars[i], transactionID, opts, observedCallback) reservedID = state.ReservedID alias = state.TabletAlias case reserveBegin: var state queryservice.ReservedTransactionState - state, err = qs.ReserveBeginStreamExecute(ctx, rs.Target, session.SetPreQueries(), session.SavePoints(), query, bindVars[i], opts, callback) + state, err = qs.ReserveBeginStreamExecute(ctx, rs.Target, session.SetPreQueries(), session.SavePoints(), query, bindVars[i], opts, observedCallback) transactionID = state.TransactionID reservedID = state.ReservedID alias = state.TabletAlias diff --git a/go/vt/vtgate/scatter_conn_test.go b/go/vt/vtgate/scatter_conn_test.go index 0e863805d9c..c5d4f350433 100644 --- a/go/vt/vtgate/scatter_conn_test.go +++ b/go/vt/vtgate/scatter_conn_test.go @@ -100,7 +100,7 @@ func TestExecuteFailOnAutocommit(t *testing.T) { }, Autocommit: false, } - _, errs := sc.ExecuteMultiShard(ctx, nil, rss, queries, NewSafeSession(session), true /*autocommit*/, false) + _, errs := sc.ExecuteMultiShard(ctx, nil, rss, queries, NewSafeSession(session), true /*autocommit*/, false, nullResultsObserver{}) err := vterrors.Aggregate(errs) require.Error(t, err) require.Contains(t, err.Error(), "in autocommit mode, transactionID should be zero but was: 123") @@ -183,7 +183,7 @@ func TestExecutePanic(t *testing.T) { require.Contains(t, logMessage, "(*ScatterConn).multiGoTransaction") }() - _, _ = sc.ExecuteMultiShard(ctx, nil, rss, queries, NewSafeSession(session), true /*autocommit*/, false) + _, _ = sc.ExecuteMultiShard(ctx, nil, rss, queries, NewSafeSession(session), true /*autocommit*/, false, nullResultsObserver{}) } diff --git a/go/vt/vtgate/tx_conn_test.go b/go/vt/vtgate/tx_conn_test.go index ed977b75051..dd1415bce87 100644 --- a/go/vt/vtgate/tx_conn_test.go +++ b/go/vt/vtgate/tx_conn_test.go @@ -56,7 +56,7 @@ func TestTxConnBegin(t *testing.T) { require.NoError(t, err) wantSession := vtgatepb.Session{InTransaction: true} utils.MustMatch(t, &wantSession, session, "Session") - _, errors := sc.ExecuteMultiShard(ctx, nil, rss0, queries, safeSession, false, false) + _, errors := sc.ExecuteMultiShard(ctx, nil, rss0, queries, safeSession, false, false, nullResultsObserver{}) require.Empty(t, errors) // Begin again should cause a commit and a new begin. @@ -76,7 +76,7 @@ func TestTxConnCommitFailure(t *testing.T) { // Sequence the executes to ensure commit order session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.ExecuteMultiShard(ctx, nil, rssm[0], queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rssm[0], queries, session, false, false, nullResultsObserver{}) wantSession := vtgatepb.Session{ InTransaction: true, ShardSessions: []*vtgatepb.Session_ShardSession{{ @@ -91,7 +91,7 @@ func TestTxConnCommitFailure(t *testing.T) { } utils.MustMatch(t, &wantSession, session.Session, "Session") - sc.ExecuteMultiShard(ctx, nil, rssm[1], queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rssm[1], queries, session, false, false, nullResultsObserver{}) wantSession = vtgatepb.Session{ InTransaction: true, ShardSessions: []*vtgatepb.Session_ShardSession{{ @@ -114,7 +114,7 @@ func TestTxConnCommitFailure(t *testing.T) { } utils.MustMatch(t, &wantSession, session.Session, "Session") - sc.ExecuteMultiShard(ctx, nil, rssa, threeQueries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rssa, threeQueries, session, false, false, nullResultsObserver{}) wantSession = vtgatepb.Session{ InTransaction: true, ShardSessions: []*vtgatepb.Session_ShardSession{{ @@ -183,7 +183,7 @@ func TestTxConnCommitFailureAfterNonAtomicCommitMaxShards(t *testing.T) { } for i := 0; i < 18; i++ { - sc.ExecuteMultiShard(ctx, nil, rssm[i], queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rssm[i], queries, session, false, false, nullResultsObserver{}) wantSession.ShardSessions = append(wantSession.ShardSessions, &vtgatepb.Session_ShardSession{ Target: &querypb.Target{ Keyspace: "TestTxConn", @@ -237,7 +237,7 @@ func TestTxConnCommitFailureBeforeNonAtomicCommitMaxShards(t *testing.T) { } for i := 0; i < 17; i++ { - sc.ExecuteMultiShard(ctx, nil, rssm[i], queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rssm[i], queries, session, false, false, nullResultsObserver{}) wantSession.ShardSessions = append(wantSession.ShardSessions, &vtgatepb.Session_ShardSession{ Target: &querypb.Target{ Keyspace: "TestTxConn", @@ -283,7 +283,7 @@ func TestTxConnCommitSuccess(t *testing.T) { // Sequence the executes to ensure commit order session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) wantSession := vtgatepb.Session{ InTransaction: true, ShardSessions: []*vtgatepb.Session_ShardSession{{ @@ -297,7 +297,7 @@ func TestTxConnCommitSuccess(t *testing.T) { }}, } utils.MustMatch(t, &wantSession, session.Session, "Session") - sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false, nullResultsObserver{}) wantSession = vtgatepb.Session{ InTransaction: true, ShardSessions: []*vtgatepb.Session_ShardSession{{ @@ -336,7 +336,7 @@ func TestTxConnReservedCommitSuccess(t *testing.T) { // Sequence the executes to ensure commit order session := NewSafeSession(&vtgatepb.Session{InTransaction: true, InReservedConn: true}) - sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) wantSession := vtgatepb.Session{ InTransaction: true, InReservedConn: true, @@ -352,7 +352,7 @@ func TestTxConnReservedCommitSuccess(t *testing.T) { }}, } utils.MustMatch(t, &wantSession, session.Session, "Session") - sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false, nullResultsObserver{}) wantSession = vtgatepb.Session{ InTransaction: true, InReservedConn: true, @@ -423,9 +423,9 @@ func TestTxConnReservedOn2ShardTxOn1ShardAndCommit(t *testing.T) { session := NewSafeSession(&vtgatepb.Session{InReservedConn: true}) // this will create reserved connections against all tablets - _, errs := sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false) + _, errs := sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false, nullResultsObserver{}) require.Empty(t, errs) - _, errs = sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) + _, errs = sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) require.Empty(t, errs) wantSession := vtgatepb.Session{ @@ -453,7 +453,7 @@ func TestTxConnReservedOn2ShardTxOn1ShardAndCommit(t *testing.T) { session.Session.InTransaction = true // start a transaction against rss0 - sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) wantSession = vtgatepb.Session{ InTransaction: true, InReservedConn: true, @@ -518,9 +518,9 @@ func TestTxConnReservedOn2ShardTxOn1ShardAndRollback(t *testing.T) { session := NewSafeSession(&vtgatepb.Session{InReservedConn: true}) // this will create reserved connections against all tablets - _, errs := sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false) + _, errs := sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false, nullResultsObserver{}) require.Empty(t, errs) - _, errs = sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) + _, errs = sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) require.Empty(t, errs) wantSession := vtgatepb.Session{ @@ -548,7 +548,7 @@ func TestTxConnReservedOn2ShardTxOn1ShardAndRollback(t *testing.T) { session.Session.InTransaction = true // start a transaction against rss0 - sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) wantSession = vtgatepb.Session{ InTransaction: true, InReservedConn: true, @@ -612,13 +612,13 @@ func TestTxConnCommitOrderFailure1(t *testing.T) { // Sequence the executes to ensure commit order session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) session.SetCommitOrder(vtgatepb.CommitOrder_PRE) - sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) session.SetCommitOrder(vtgatepb.CommitOrder_POST) - sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false, nullResultsObserver{}) sbc0.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 err := sc.txConn.Commit(ctx, session) @@ -647,13 +647,13 @@ func TestTxConnCommitOrderFailure2(t *testing.T) { // Sequence the executes to ensure commit order session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.ExecuteMultiShard(context.Background(), nil, rss1, queries, session, false, false) + sc.ExecuteMultiShard(context.Background(), nil, rss1, queries, session, false, false, nullResultsObserver{}) session.SetCommitOrder(vtgatepb.CommitOrder_PRE) - sc.ExecuteMultiShard(context.Background(), nil, rss0, queries, session, false, false) + sc.ExecuteMultiShard(context.Background(), nil, rss0, queries, session, false, false, nullResultsObserver{}) session.SetCommitOrder(vtgatepb.CommitOrder_POST) - sc.ExecuteMultiShard(context.Background(), nil, rss1, queries, session, false, false) + sc.ExecuteMultiShard(context.Background(), nil, rss1, queries, session, false, false, nullResultsObserver{}) sbc1.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 err := sc.txConn.Commit(ctx, session) @@ -681,13 +681,13 @@ func TestTxConnCommitOrderFailure3(t *testing.T) { // Sequence the executes to ensure commit order session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) session.SetCommitOrder(vtgatepb.CommitOrder_PRE) - sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) session.SetCommitOrder(vtgatepb.CommitOrder_POST) - sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false, nullResultsObserver{}) sbc1.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 require.NoError(t, @@ -723,7 +723,7 @@ func TestTxConnCommitOrderSuccess(t *testing.T) { // Sequence the executes to ensure commit order session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) wantSession := vtgatepb.Session{ InTransaction: true, ShardSessions: []*vtgatepb.Session_ShardSession{{ @@ -739,7 +739,7 @@ func TestTxConnCommitOrderSuccess(t *testing.T) { utils.MustMatch(t, &wantSession, session.Session, "Session") session.SetCommitOrder(vtgatepb.CommitOrder_PRE) - sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) wantSession = vtgatepb.Session{ InTransaction: true, PreSessions: []*vtgatepb.Session_ShardSession{{ @@ -764,7 +764,7 @@ func TestTxConnCommitOrderSuccess(t *testing.T) { utils.MustMatch(t, &wantSession, session.Session, "Session") session.SetCommitOrder(vtgatepb.CommitOrder_POST) - sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false, nullResultsObserver{}) wantSession = vtgatepb.Session{ InTransaction: true, PreSessions: []*vtgatepb.Session_ShardSession{{ @@ -798,7 +798,7 @@ func TestTxConnCommitOrderSuccess(t *testing.T) { utils.MustMatch(t, &wantSession, session.Session, "Session") // Ensure nothing changes if we reuse a transaction. - sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false, nullResultsObserver{}) utils.MustMatch(t, &wantSession, session.Session, "Session") require.NoError(t, @@ -821,7 +821,7 @@ func TestTxConnReservedCommitOrderSuccess(t *testing.T) { // Sequence the executes to ensure commit order session := NewSafeSession(&vtgatepb.Session{InTransaction: true, InReservedConn: true}) - sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) wantSession := vtgatepb.Session{ InTransaction: true, InReservedConn: true, @@ -839,7 +839,7 @@ func TestTxConnReservedCommitOrderSuccess(t *testing.T) { utils.MustMatch(t, &wantSession, session.Session, "Session") session.SetCommitOrder(vtgatepb.CommitOrder_PRE) - sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) wantSession = vtgatepb.Session{ InTransaction: true, InReservedConn: true, @@ -867,7 +867,7 @@ func TestTxConnReservedCommitOrderSuccess(t *testing.T) { utils.MustMatch(t, &wantSession, session.Session, "Session") session.SetCommitOrder(vtgatepb.CommitOrder_POST) - sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false, nullResultsObserver{}) wantSession = vtgatepb.Session{ InTransaction: true, InReservedConn: true, @@ -905,7 +905,7 @@ func TestTxConnReservedCommitOrderSuccess(t *testing.T) { utils.MustMatch(t, &wantSession, session.Session, "Session") // Ensure nothing changes if we reuse a transaction. - sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false, nullResultsObserver{}) utils.MustMatch(t, &wantSession, session.Session, "Session") require.NoError(t, @@ -958,8 +958,8 @@ func TestTxConnCommit2PC(t *testing.T) { sc, sbc0, sbc1, rss0, _, rss01 := newTestTxConnEnv(t, ctx, "TestTxConnCommit2PC") session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) - sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) + sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false, nullResultsObserver{}) session.TransactionMode = vtgatepb.TransactionMode_TWOPC require.NoError(t, sc.txConn.Commit(ctx, session)) @@ -975,7 +975,7 @@ func TestTxConnCommit2PCOneParticipant(t *testing.T) { sc, sbc0, _, rss0, _, _ := newTestTxConnEnv(t, ctx, "TestTxConnCommit2PCOneParticipant") session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) session.TransactionMode = vtgatepb.TransactionMode_TWOPC require.NoError(t, sc.txConn.Commit(ctx, session)) @@ -988,8 +988,8 @@ func TestTxConnCommit2PCCreateTransactionFail(t *testing.T) { sc, sbc0, sbc1, rss0, rss1, _ := newTestTxConnEnv(t, ctx, "TestTxConnCommit2PCCreateTransactionFail") session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) - sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) + sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false, nullResultsObserver{}) sbc0.MustFailCreateTransaction = 1 session.TransactionMode = vtgatepb.TransactionMode_TWOPC @@ -1012,8 +1012,8 @@ func TestTxConnCommit2PCPrepareFail(t *testing.T) { sc, sbc0, sbc1, rss0, _, rss01 := newTestTxConnEnv(t, ctx, "TestTxConnCommit2PCPrepareFail") session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) - sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) + sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false, nullResultsObserver{}) sbc1.MustFailPrepare = 1 session.TransactionMode = vtgatepb.TransactionMode_TWOPC @@ -1040,8 +1040,8 @@ func TestTxConnCommit2PCStartCommitFail(t *testing.T) { sc, sbc0, sbc1, rss0, _, rss01 := newTestTxConnEnv(t, ctx, "TestTxConnCommit2PCStartCommitFail") session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) - sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) + sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false, nullResultsObserver{}) sbc0.MustFailStartCommit = 1 session.TransactionMode = vtgatepb.TransactionMode_TWOPC @@ -1062,8 +1062,8 @@ func TestTxConnCommit2PCCommitPreparedFail(t *testing.T) { sc, sbc0, sbc1, rss0, _, rss01 := newTestTxConnEnv(t, ctx, "TestTxConnCommit2PCCommitPreparedFail") session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) - sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) + sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false, nullResultsObserver{}) sbc1.MustFailCommitPrepared = 1 session.TransactionMode = vtgatepb.TransactionMode_TWOPC @@ -1084,8 +1084,8 @@ func TestTxConnCommit2PCConcludeTransactionFail(t *testing.T) { sc, sbc0, sbc1, rss0, _, rss01 := newTestTxConnEnv(t, ctx, "TestTxConnCommit2PCConcludeTransactionFail") session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) - sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) + sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false, nullResultsObserver{}) sbc0.MustFailConcludeTransaction = 1 session.TransactionMode = vtgatepb.TransactionMode_TWOPC @@ -1104,8 +1104,8 @@ func TestTxConnRollback(t *testing.T) { sc, sbc0, sbc1, rss0, _, rss01 := newTestTxConnEnv(t, ctx, "TxConnRollback") session := NewSafeSession(&vtgatepb.Session{InTransaction: true}) - sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) - sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) + sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false, nullResultsObserver{}) require.NoError(t, sc.txConn.Rollback(ctx, session)) wantSession := vtgatepb.Session{} @@ -1120,8 +1120,8 @@ func TestTxConnReservedRollback(t *testing.T) { sc, sbc0, sbc1, rss0, _, rss01 := newTestTxConnEnv(t, ctx, "TxConnReservedRollback") session := NewSafeSession(&vtgatepb.Session{InTransaction: true, InReservedConn: true}) - sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) - sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) + sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false, nullResultsObserver{}) require.NoError(t, sc.txConn.Rollback(ctx, session)) wantSession := vtgatepb.Session{ @@ -1157,8 +1157,8 @@ func TestTxConnReservedRollbackFailure(t *testing.T) { sc, sbc0, sbc1, rss0, rss1, rss01 := newTestTxConnEnv(t, ctx, "TxConnReservedRollback") session := NewSafeSession(&vtgatepb.Session{InTransaction: true, InReservedConn: true}) - sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false) - sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false) + sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}) + sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false, nullResultsObserver{}) sbc1.MustFailCodes[vtrpcpb.Code_INVALID_ARGUMENT] = 1 assert.Error(t, diff --git a/go/vt/vtgate/vcursor_impl.go b/go/vt/vtgate/vcursor_impl.go index d7a426e41d4..a71ad29184a 100644 --- a/go/vt/vtgate/vcursor_impl.go +++ b/go/vt/vtgate/vcursor_impl.go @@ -68,8 +68,8 @@ var ( // vcursor_impl needs these facilities to be able to be able to execute queries for vindexes type iExecute interface { Execute(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, method string, session *SafeSession, s string, vars map[string]*querypb.BindVariable) (*sqltypes.Result, error) - ExecuteMultiShard(ctx context.Context, primitive engine.Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *SafeSession, autocommit bool, ignoreMaxMemoryRows bool) (qr *sqltypes.Result, errs []error) - StreamExecuteMulti(ctx context.Context, primitive engine.Primitive, query string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, session *SafeSession, autocommit bool, callback func(reply *sqltypes.Result) error) []error + ExecuteMultiShard(ctx context.Context, primitive engine.Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *SafeSession, autocommit bool, ignoreMaxMemoryRows bool, resultsObserver resultsObserver) (qr *sqltypes.Result, errs []error) + StreamExecuteMulti(ctx context.Context, primitive engine.Primitive, query string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, session *SafeSession, autocommit bool, callback func(reply *sqltypes.Result) error, observer resultsObserver) []error ExecuteLock(ctx context.Context, rs *srvtopo.ResolvedShard, query *querypb.BoundQuery, session *SafeSession, lockFuncType sqlparser.LockingFuncType) (*sqltypes.Result, error) Commit(ctx context.Context, safeSession *SafeSession) error ExecuteMessageStream(ctx context.Context, rss []*srvtopo.ResolvedShard, name string, callback func(*sqltypes.Result) error) error @@ -128,6 +128,8 @@ type vcursorImpl struct { warmingReadsPercent int warmingReadsChannel chan bool + + resultsObserver resultsObserver } // newVcursorImpl creates a vcursorImpl. Before creating this object, you have to separate out any marginComments that came with @@ -195,6 +197,7 @@ func newVCursorImpl( pv: pv, warmingReadsPercent: warmingReadsPct, warmingReadsChannel: warmingReadsChan, + resultsObserver: nullResultsObserver{}, }, nil } @@ -602,7 +605,7 @@ func (vc *vcursorImpl) ExecuteMultiShard(ctx context.Context, primitive engine.P return nil, []error{err} } - qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, commentedShardQueries(queries, vc.marginComments), vc.safeSession, canAutocommit, vc.ignoreMaxMemoryRows) + qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, commentedShardQueries(queries, vc.marginComments), vc.safeSession, canAutocommit, vc.ignoreMaxMemoryRows, vc.resultsObserver) vc.setRollbackOnPartialExecIfRequired(len(errs) != len(rss), rollbackOnError) return qr, errs @@ -617,7 +620,7 @@ func (vc *vcursorImpl) StreamExecuteMulti(ctx context.Context, primitive engine. return []error{err} } - errs := vc.executor.StreamExecuteMulti(ctx, primitive, vc.marginComments.Leading+query+vc.marginComments.Trailing, rss, bindVars, vc.safeSession, autocommit, callback) + errs := vc.executor.StreamExecuteMulti(ctx, primitive, vc.marginComments.Leading+query+vc.marginComments.Trailing, rss, bindVars, vc.safeSession, autocommit, callback, vc.resultsObserver) vc.setRollbackOnPartialExecIfRequired(len(errs) != len(rss), rollbackOnError) return errs @@ -640,7 +643,7 @@ func (vc *vcursorImpl) ExecuteStandalone(ctx context.Context, primitive engine.P } // The autocommit flag is always set to false because we currently don't // execute DMLs through ExecuteStandalone. - qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, bqs, NewAutocommitSession(vc.safeSession.Session), false /* autocommit */, vc.ignoreMaxMemoryRows) + qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, bqs, NewAutocommitSession(vc.safeSession.Session), false /* autocommit */, vc.ignoreMaxMemoryRows, vc.resultsObserver) return qr, vterrors.Aggregate(errs) } @@ -1325,6 +1328,7 @@ func (vc *vcursorImpl) cloneWithAutocommitSession() *vcursorImpl { topoServer: vc.topoServer, warnShardedOnly: vc.warnShardedOnly, pv: vc.pv, + resultsObserver: vc.resultsObserver, } } @@ -1397,6 +1401,7 @@ func (vc *vcursorImpl) CloneForReplicaWarming(ctx context.Context) engine.VCurso warnShardedOnly: vc.warnShardedOnly, warnings: vc.warnings, pv: vc.pv, + resultsObserver: nullResultsObserver{}, } v.marginComments.Trailing += "/* warming read */" @@ -1428,6 +1433,7 @@ func (vc *vcursorImpl) CloneForMirroring(ctx context.Context) engine.VCursor { warnShardedOnly: vc.warnShardedOnly, warnings: vc.warnings, pv: vc.pv, + resultsObserver: nullResultsObserver{}, } v.marginComments.Trailing += "/* mirror query */"