Skip to content

Commit

Permalink
feat: pass down FetchLastInsertID bool
Browse files Browse the repository at this point in the history
Signed-off-by: Andres Taylor <[email protected]>
  • Loading branch information
systay committed Dec 17, 2024
1 parent 0db81eb commit bc26f2f
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 101 deletions.
1 change: 1 addition & 0 deletions go/mysql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ func (c *Conn) ReadQueryResult(maxrows int, wantfields bool) (*sqltypes.Result,
return &sqltypes.Result{
RowsAffected: packetOk.affectedRows,
InsertID: packetOk.lastInsertID,
InsertIDChanged: packetOk.lastInsertID > 0,
SessionStateChanges: packetOk.sessionStateData,
StatusFlags: packetOk.statusFlags,
Info: packetOk.info,
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ func (e *Executor) executeSPInAllSessions(ctx context.Context, safeSession *econ
})
queries = append(queries, &querypb.BoundQuery{Sql: sql})
}
qr, errs = e.ExecuteMultiShard(ctx, nil, rss, queries, safeSession, false /*autocommit*/, ignoreMaxMemoryRows, nullResultsObserver{})
qr, errs = e.ExecuteMultiShard(ctx, nil, rss, queries, safeSession, false /*autocommit*/, ignoreMaxMemoryRows, nullResultsObserver{}, false)
err := vterrors.Aggregate(errs)
if err != nil {
return nil, err
Expand Down Expand Up @@ -1486,8 +1486,8 @@ func parseAndValidateQuery(query string, parser *sqlparser.Parser) (sqlparser.St
}

// ExecuteMultiShard implements the IExecutor interface
func (e *Executor) ExecuteMultiShard(ctx context.Context, primitive engine.Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *econtext.SafeSession, autocommit bool, ignoreMaxMemoryRows bool, resultsObserver econtext.ResultsObserver) (qr *sqltypes.Result, errs []error) {
return e.scatterConn.ExecuteMultiShard(ctx, primitive, rss, queries, session, autocommit, ignoreMaxMemoryRows, resultsObserver)
func (e *Executor) ExecuteMultiShard(ctx context.Context, primitive engine.Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *econtext.SafeSession, autocommit bool, ignoreMaxMemoryRows bool, resultsObserver econtext.ResultsObserver, fetchLastInsertID bool) (qr *sqltypes.Result, errs []error) {
return e.scatterConn.ExecuteMultiShard(ctx, primitive, rss, queries, session, autocommit, ignoreMaxMemoryRows, resultsObserver, fetchLastInsertID)
}

// StreamExecuteMulti implements the IExecutor interface
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/executorcontext/vcursor_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ type (
// vcursor_impl needs these facilities to be able to be able to execute queries for vindexes
iExecute interface {
Execute(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, method string, session *SafeSession, s string, vars map[string]*querypb.BindVariable) (*sqltypes.Result, error)
ExecuteMultiShard(ctx context.Context, primitive engine.Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *SafeSession, autocommit bool, ignoreMaxMemoryRows bool, resultsObserver ResultsObserver) (qr *sqltypes.Result, errs []error)
ExecuteMultiShard(ctx context.Context, primitive engine.Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *SafeSession, autocommit bool, ignoreMaxMemoryRows bool, resultsObserver ResultsObserver, fetchLastInsertID bool) (qr *sqltypes.Result, errs []error)
StreamExecuteMulti(ctx context.Context, primitive engine.Primitive, query string, rss []*srvtopo.ResolvedShard, vars []map[string]*querypb.BindVariable, session *SafeSession, autocommit bool, callback func(reply *sqltypes.Result) error, observer ResultsObserver) []error
ExecuteLock(ctx context.Context, rs *srvtopo.ResolvedShard, query *querypb.BoundQuery, session *SafeSession, lockFuncType sqlparser.LockingFuncType) (*sqltypes.Result, error)
Commit(ctx context.Context, safeSession *SafeSession) error
Expand Down Expand Up @@ -761,7 +761,7 @@ func (vc *VCursorImpl) ExecuteMultiShard(ctx context.Context, primitive engine.P
return nil, []error{err}
}

qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, commentedShardQueries(queries, vc.marginComments), vc.SafeSession, canAutocommit, vc.ignoreMaxMemoryRows, vc.observer)
qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, commentedShardQueries(queries, vc.marginComments), vc.SafeSession, canAutocommit, vc.ignoreMaxMemoryRows, vc.observer, fetchLastInsertID)
vc.setRollbackOnPartialExecIfRequired(len(errs) != len(rss), rollbackOnError)
vc.logShardsQueried(primitive, len(rss))
return qr, errs
Expand Down Expand Up @@ -801,7 +801,7 @@ func (vc *VCursorImpl) ExecuteStandalone(ctx context.Context, primitive engine.P
}
// The autocommit flag is always set to false because we currently don't
// execute DMLs through ExecuteStandalone.
qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, bqs, NewAutocommitSession(vc.SafeSession.Session), false /* autocommit */, vc.ignoreMaxMemoryRows, vc.observer)
qr, errs := vc.executor.ExecuteMultiShard(ctx, primitive, rss, bqs, NewAutocommitSession(vc.SafeSession.Session), false /* autocommit */, vc.ignoreMaxMemoryRows, vc.observer, false /* fetchLastInsertID */)
vc.logShardsQueried(primitive, len(rss))
return qr, vterrors.Aggregate(errs)
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/executorcontext/vcursor_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func (f fakeExecutor) Execute(ctx context.Context, mysqlCtx vtgateservice.MySQLC
panic("implement me")
}

func (f fakeExecutor) ExecuteMultiShard(ctx context.Context, primitive engine.Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *SafeSession, autocommit bool, ignoreMaxMemoryRows bool, resultsObserver ResultsObserver) (qr *sqltypes.Result, errs []error) {
func (f fakeExecutor) ExecuteMultiShard(ctx context.Context, primitive engine.Primitive, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, session *SafeSession, autocommit bool, ignoreMaxMemoryRows bool, resultsObserver ResultsObserver, fetchLastInsertID bool) (qr *sqltypes.Result, errs []error) {
// TODO implement me
panic("implement me")
}
Expand Down
24 changes: 12 additions & 12 deletions go/vt/vtgate/legacy_scatter_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestLegacyExecuteFailOnAutocommit(t *testing.T) {
},
Autocommit: false,
}
_, errs := sc.ExecuteMultiShard(ctx, nil, rss, queries, econtext.NewSafeSession(session), true /*autocommit*/, false, nullResultsObserver{})
_, errs := sc.ExecuteMultiShard(ctx, nil, rss, queries, econtext.NewSafeSession(session), true /*autocommit*/, false, nullResultsObserver{}, false)
err := vterrors.Aggregate(errs)
require.Error(t, err)
require.Contains(t, err.Error(), "in autocommit mode, transactionID should be zero but was: 123")
Expand All @@ -125,7 +125,7 @@ func TestScatterConnExecuteMulti(t *testing.T) {
}
}

qr, errs := sc.ExecuteMultiShard(ctx, nil, rss, queries, econtext.NewSafeSession(nil), false /*autocommit*/, false, nullResultsObserver{})
qr, errs := sc.ExecuteMultiShard(ctx, nil, rss, queries, econtext.NewSafeSession(nil), false /*autocommit*/, false, nullResultsObserver{}, false)
return qr, vterrors.Aggregate(errs)
})
}
Expand Down Expand Up @@ -312,7 +312,7 @@ func TestMaxMemoryRows(t *testing.T) {
sbc0.SetResults([]*sqltypes.Result{tworows, tworows})
sbc1.SetResults([]*sqltypes.Result{tworows, tworows})

_, errs := sc.ExecuteMultiShard(ctx, nil, rss, queries, session, false, test.ignoreMaxMemoryRows, nullResultsObserver{})
_, errs := sc.ExecuteMultiShard(ctx, nil, rss, queries, session, false, test.ignoreMaxMemoryRows, nullResultsObserver{}, false)
if test.ignoreMaxMemoryRows {
require.NoError(t, err)
} else {
Expand Down Expand Up @@ -344,7 +344,7 @@ func TestLegaceHealthCheckFailsOnReservedConnections(t *testing.T) {
})
}

_, errs := sc.ExecuteMultiShard(ctx, nil, rss, queries, session, false, false, nullResultsObserver{})
_, errs := sc.ExecuteMultiShard(ctx, nil, rss, queries, session, false, false, nullResultsObserver{}, false)
require.Error(t, vterrors.Aggregate(errs))
}

Expand All @@ -367,7 +367,7 @@ func executeOnShardsReturnsErr(t *testing.T, ctx context.Context, res *srvtopo.R
})
}

_, errs := sc.ExecuteMultiShard(ctx, nil, rss, queries, session, false, false, nullResultsObserver{})
_, errs := sc.ExecuteMultiShard(ctx, nil, rss, queries, session, false, false, nullResultsObserver{}, false)
return vterrors.Aggregate(errs)
}

Expand Down Expand Up @@ -432,7 +432,7 @@ func TestMultiExecs(t *testing.T) {
observer := recordingResultsObserver{}

session := econtext.NewSafeSession(&vtgatepb.Session{})
_, err := sc.ExecuteMultiShard(ctx, nil, rss, queries, session, false, false, &observer)
_, err := sc.ExecuteMultiShard(ctx, nil, rss, queries, session, false, false, &observer, false)
require.NoError(t, vterrors.Aggregate(err))
if len(sbc0.Queries) == 0 || len(sbc1.Queries) == 0 {
t.Fatalf("didn't get expected query")
Expand Down Expand Up @@ -515,27 +515,27 @@ func TestScatterConnSingleDB(t *testing.T) {
// TransactionMode_SINGLE in session
session := econtext.NewSafeSession(&vtgatepb.Session{InTransaction: true, TransactionMode: vtgatepb.TransactionMode_SINGLE})
queries := []*querypb.BoundQuery{{Sql: "query1"}}
_, errors := sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{})
_, errors := sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}, false)
require.Empty(t, errors)
_, errors = sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false, nullResultsObserver{})
_, errors = sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false, nullResultsObserver{}, false)
require.Error(t, errors[0])
assert.Contains(t, errors[0].Error(), want)

// TransactionMode_SINGLE in txconn
sc.txConn.mode = vtgatepb.TransactionMode_SINGLE
session = econtext.NewSafeSession(&vtgatepb.Session{InTransaction: true})
_, errors = sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{})
_, errors = sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}, false)
require.Empty(t, errors)
_, errors = sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false, nullResultsObserver{})
_, errors = sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false, nullResultsObserver{}, false)
require.Error(t, errors[0])
assert.Contains(t, errors[0].Error(), want)

// TransactionMode_MULTI in txconn. Should not fail.
sc.txConn.mode = vtgatepb.TransactionMode_MULTI
session = econtext.NewSafeSession(&vtgatepb.Session{InTransaction: true})
_, errors = sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{})
_, errors = sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}, false)
require.Empty(t, errors)
_, errors = sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false, nullResultsObserver{})
_, errors = sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false, nullResultsObserver{}, false)
require.Empty(t, errors)
}

Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/scatter_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func (stc *ScatterConn) ExecuteMultiShard(
autocommit bool,
ignoreMaxMemoryRows bool,
resultsObserver econtext.ResultsObserver,
fetchLastInsertID bool,
) (qr *sqltypes.Result, errs []error) {

if len(rss) != len(queries) {
Expand Down Expand Up @@ -187,6 +188,9 @@ func (stc *ScatterConn) ExecuteMultiShard(
opts = session.Session.Options
}

// TODO reset this?
opts.FetchLastInsertId = fetchLastInsertID

if autocommit {
// As this is auto-commit, the transactionID is supposed to be zero.
if transactionID != int64(0) {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtgate/scatter_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func TestExecuteFailOnAutocommit(t *testing.T) {
},
Autocommit: false,
}
_, errs := sc.ExecuteMultiShard(ctx, nil, rss, queries, econtext.NewSafeSession(session), true /*autocommit*/, false, nullResultsObserver{})
_, errs := sc.ExecuteMultiShard(ctx, nil, rss, queries, econtext.NewSafeSession(session), true /*autocommit*/, false, nullResultsObserver{}, false)
err := vterrors.Aggregate(errs)
require.Error(t, err)
require.Contains(t, err.Error(), "in autocommit mode, transactionID should be zero but was: 123")
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestExecutePanic(t *testing.T) {
require.Contains(t, logMessage, "(*ScatterConn).multiGoTransaction")
}()

_, _ = sc.ExecuteMultiShard(ctx, nil, rss, queries, econtext.NewSafeSession(session), true /*autocommit*/, false, nullResultsObserver{})
_, _ = sc.ExecuteMultiShard(ctx, nil, rss, queries, econtext.NewSafeSession(session), true /*autocommit*/, false, nullResultsObserver{}, false)

}

Expand Down
Loading

0 comments on commit bc26f2f

Please sign in to comment.