Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail on prepare of reserved connection #16316

Merged
merged 1 commit into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,27 +31,34 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)

// TxExecutor is used for executing a transactional request.
// TODO: merge this with tx_engine
type TxExecutor struct {
// TODO(sougou): Parameterize this.
// DTExecutor is used for executing a distributed transactional request.
type DTExecutor struct {
ctx context.Context
logStats *tabletenv.LogStats
te *TxEngine
}

// NewDTExecutor creates a new distributed transaction executor.
func NewDTExecutor(ctx context.Context, te *TxEngine, logStats *tabletenv.LogStats) *DTExecutor {
return &DTExecutor{
ctx: ctx,
te: te,
logStats: logStats,
}
}

// Prepare performs a prepare on a connection including the redo log work.
// If there is any failure, an error is returned. No cleanup is performed.
// A subsequent call to RollbackPrepared, which is required by the 2PC
// protocol, will perform all the cleanup.
func (txe *TxExecutor) Prepare(transactionID int64, dtid string) error {
if !txe.te.twopcEnabled {
func (dte *DTExecutor) Prepare(transactionID int64, dtid string) error {
if !dte.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer txe.te.env.Stats().QueryTimings.Record("PREPARE", time.Now())
txe.logStats.TransactionID = transactionID
defer dte.te.env.Stats().QueryTimings.Record("PREPARE", time.Now())
dte.logStats.TransactionID = transactionID

conn, err := txe.te.txPool.GetAndLock(transactionID, "for prepare")
conn, err := dte.te.txPool.GetAndLock(transactionID, "for prepare")
if err != nil {
return err
}
Expand All @@ -62,27 +69,33 @@ func (txe *TxExecutor) Prepare(transactionID int64, dtid string) error {
return nil
}

err = txe.te.preparedPool.Put(conn, dtid)
// If the connection is tainted, we cannot prepare it. As there could be temporary tables involved.
if conn.IsTainted() {
conn.Release(tx.TxRollback)
return vterrors.VT12001("cannot prepare the transaction on a reserved connection")
}

err = dte.te.preparedPool.Put(conn, dtid)
if err != nil {
txe.te.txPool.RollbackAndRelease(txe.ctx, conn)
dte.te.txPool.RollbackAndRelease(dte.ctx, conn)
return vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "prepare failed for transaction %d: %v", transactionID, err)
}

return txe.inTransaction(func(localConn *StatefulConnection) error {
return txe.te.twoPC.SaveRedo(txe.ctx, localConn, dtid, conn.TxProperties().Queries)
return dte.inTransaction(func(localConn *StatefulConnection) error {
return dte.te.twoPC.SaveRedo(dte.ctx, localConn, dtid, conn.TxProperties().Queries)
})

}

// CommitPrepared commits a prepared transaction. If the operation
// fails, an error counter is incremented and the transaction is
// marked as failed in the redo log.
func (txe *TxExecutor) CommitPrepared(dtid string) error {
if !txe.te.twopcEnabled {
func (dte *DTExecutor) CommitPrepared(dtid string) error {
if !dte.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer txe.te.env.Stats().QueryTimings.Record("COMMIT_PREPARED", time.Now())
conn, err := txe.te.preparedPool.FetchForCommit(dtid)
defer dte.te.env.Stats().QueryTimings.Record("COMMIT_PREPARED", time.Now())
conn, err := dte.te.preparedPool.FetchForCommit(dtid)
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "cannot commit dtid %s, state: %v", dtid, err)
}
Expand All @@ -91,19 +104,19 @@ func (txe *TxExecutor) CommitPrepared(dtid string) error {
}
// We have to use a context that will never give up,
// even if the original context expires.
ctx := trace.CopySpan(context.Background(), txe.ctx)
defer txe.te.txPool.RollbackAndRelease(ctx, conn)
err = txe.te.twoPC.DeleteRedo(ctx, conn, dtid)
ctx := trace.CopySpan(context.Background(), dte.ctx)
defer dte.te.txPool.RollbackAndRelease(ctx, conn)
err = dte.te.twoPC.DeleteRedo(ctx, conn, dtid)
if err != nil {
txe.markFailed(ctx, dtid)
dte.markFailed(ctx, dtid)
return err
}
_, err = txe.te.txPool.Commit(ctx, conn)
_, err = dte.te.txPool.Commit(ctx, conn)
if err != nil {
txe.markFailed(ctx, dtid)
dte.markFailed(ctx, dtid)
return err
}
txe.te.preparedPool.Forget(dtid)
dte.te.preparedPool.Forget(dtid)
return nil
}

Expand All @@ -113,23 +126,23 @@ func (txe *TxExecutor) CommitPrepared(dtid string) error {
// state of the transaction in the redo log as failed. If the
// state change does not succeed, it just logs the event.
// The function uses the passed in context that has no timeout
// instead of TxExecutor's context.
func (txe *TxExecutor) markFailed(ctx context.Context, dtid string) {
txe.te.env.Stats().InternalErrors.Add("TwopcCommit", 1)
txe.te.preparedPool.SetFailed(dtid)
conn, _, _, err := txe.te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
// instead of DTExecutor's context.
func (dte *DTExecutor) markFailed(ctx context.Context, dtid string) {
dte.te.env.Stats().InternalErrors.Add("TwopcCommit", 1)
dte.te.preparedPool.SetFailed(dtid)
conn, _, _, err := dte.te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
if err != nil {
log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err)
return
}
defer txe.te.txPool.RollbackAndRelease(ctx, conn)
defer dte.te.txPool.RollbackAndRelease(ctx, conn)

if err = txe.te.twoPC.UpdateRedo(ctx, conn, dtid, RedoStateFailed); err != nil {
if err = dte.te.twoPC.UpdateRedo(ctx, conn, dtid, RedoStateFailed); err != nil {
log.Errorf("markFailed: UpdateRedo failed for dtid %s: %v", dtid, err)
return
}

if _, err = txe.te.txPool.Commit(ctx, conn); err != nil {
if _, err = dte.te.txPool.Commit(ctx, conn); err != nil {
log.Errorf("markFailed: Commit failed for dtid %s: %v", dtid, err)
}
}
Expand All @@ -152,126 +165,126 @@ func (txe *TxExecutor) markFailed(ctx context.Context, dtid string) {
// If so, it must be set to 0, and the function will not attempt that
// step. If the original transaction is still alive, the transaction
// killer will be the one to eventually roll it back.
func (txe *TxExecutor) RollbackPrepared(dtid string, originalID int64) error {
if !txe.te.twopcEnabled {
func (dte *DTExecutor) RollbackPrepared(dtid string, originalID int64) error {
if !dte.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer txe.te.env.Stats().QueryTimings.Record("ROLLBACK_PREPARED", time.Now())
defer dte.te.env.Stats().QueryTimings.Record("ROLLBACK_PREPARED", time.Now())
defer func() {
if preparedConn := txe.te.preparedPool.FetchForRollback(dtid); preparedConn != nil {
txe.te.txPool.RollbackAndRelease(txe.ctx, preparedConn)
if preparedConn := dte.te.preparedPool.FetchForRollback(dtid); preparedConn != nil {
dte.te.txPool.RollbackAndRelease(dte.ctx, preparedConn)
}
if originalID != 0 {
txe.te.Rollback(txe.ctx, originalID)
dte.te.Rollback(dte.ctx, originalID)
}
}()
return txe.inTransaction(func(conn *StatefulConnection) error {
return txe.te.twoPC.DeleteRedo(txe.ctx, conn, dtid)
return dte.inTransaction(func(conn *StatefulConnection) error {
return dte.te.twoPC.DeleteRedo(dte.ctx, conn, dtid)
})
}

// CreateTransaction creates the metadata for a 2PC transaction.
func (txe *TxExecutor) CreateTransaction(dtid string, participants []*querypb.Target) error {
if !txe.te.twopcEnabled {
func (dte *DTExecutor) CreateTransaction(dtid string, participants []*querypb.Target) error {
if !dte.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer txe.te.env.Stats().QueryTimings.Record("CREATE_TRANSACTION", time.Now())
return txe.inTransaction(func(conn *StatefulConnection) error {
return txe.te.twoPC.CreateTransaction(txe.ctx, conn, dtid, participants)
defer dte.te.env.Stats().QueryTimings.Record("CREATE_TRANSACTION", time.Now())
return dte.inTransaction(func(conn *StatefulConnection) error {
return dte.te.twoPC.CreateTransaction(dte.ctx, conn, dtid, participants)
})
}

// StartCommit atomically commits the transaction along with the
// decision to commit the associated 2pc transaction.
func (txe *TxExecutor) StartCommit(transactionID int64, dtid string) error {
if !txe.te.twopcEnabled {
func (dte *DTExecutor) StartCommit(transactionID int64, dtid string) error {
if !dte.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer txe.te.env.Stats().QueryTimings.Record("START_COMMIT", time.Now())
txe.logStats.TransactionID = transactionID
defer dte.te.env.Stats().QueryTimings.Record("START_COMMIT", time.Now())
dte.logStats.TransactionID = transactionID

conn, err := txe.te.txPool.GetAndLock(transactionID, "for 2pc commit")
conn, err := dte.te.txPool.GetAndLock(transactionID, "for 2pc commit")
if err != nil {
return err
}
defer txe.te.txPool.RollbackAndRelease(txe.ctx, conn)
defer dte.te.txPool.RollbackAndRelease(dte.ctx, conn)

err = txe.te.twoPC.Transition(txe.ctx, conn, dtid, querypb.TransactionState_COMMIT)
err = dte.te.twoPC.Transition(dte.ctx, conn, dtid, querypb.TransactionState_COMMIT)
if err != nil {
return err
}
_, err = txe.te.txPool.Commit(txe.ctx, conn)
_, err = dte.te.txPool.Commit(dte.ctx, conn)
return err
}

// SetRollback transitions the 2pc transaction to the Rollback state.
// If a transaction id is provided, that transaction is also rolled back.
func (txe *TxExecutor) SetRollback(dtid string, transactionID int64) error {
if !txe.te.twopcEnabled {
func (dte *DTExecutor) SetRollback(dtid string, transactionID int64) error {
if !dte.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer txe.te.env.Stats().QueryTimings.Record("SET_ROLLBACK", time.Now())
txe.logStats.TransactionID = transactionID
defer dte.te.env.Stats().QueryTimings.Record("SET_ROLLBACK", time.Now())
dte.logStats.TransactionID = transactionID

if transactionID != 0 {
txe.te.Rollback(txe.ctx, transactionID)
dte.te.Rollback(dte.ctx, transactionID)
}

return txe.inTransaction(func(conn *StatefulConnection) error {
return txe.te.twoPC.Transition(txe.ctx, conn, dtid, querypb.TransactionState_ROLLBACK)
return dte.inTransaction(func(conn *StatefulConnection) error {
return dte.te.twoPC.Transition(dte.ctx, conn, dtid, querypb.TransactionState_ROLLBACK)
})
}

// ConcludeTransaction deletes the 2pc transaction metadata
// essentially resolving it.
func (txe *TxExecutor) ConcludeTransaction(dtid string) error {
if !txe.te.twopcEnabled {
func (dte *DTExecutor) ConcludeTransaction(dtid string) error {
if !dte.te.twopcEnabled {
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
defer txe.te.env.Stats().QueryTimings.Record("RESOLVE", time.Now())
defer dte.te.env.Stats().QueryTimings.Record("RESOLVE", time.Now())

return txe.inTransaction(func(conn *StatefulConnection) error {
return txe.te.twoPC.DeleteTransaction(txe.ctx, conn, dtid)
return dte.inTransaction(func(conn *StatefulConnection) error {
return dte.te.twoPC.DeleteTransaction(dte.ctx, conn, dtid)
})
}

// ReadTransaction returns the metadata for the specified dtid.
func (txe *TxExecutor) ReadTransaction(dtid string) (*querypb.TransactionMetadata, error) {
if !txe.te.twopcEnabled {
func (dte *DTExecutor) ReadTransaction(dtid string) (*querypb.TransactionMetadata, error) {
if !dte.te.twopcEnabled {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
return txe.te.twoPC.ReadTransaction(txe.ctx, dtid)
return dte.te.twoPC.ReadTransaction(dte.ctx, dtid)
}

// ReadTwopcInflight returns info about all in-flight 2pc transactions.
func (txe *TxExecutor) ReadTwopcInflight() (distributed []*tx.DistributedTx, prepared, failed []*tx.PreparedTx, err error) {
if !txe.te.twopcEnabled {
func (dte *DTExecutor) ReadTwopcInflight() (distributed []*tx.DistributedTx, prepared, failed []*tx.PreparedTx, err error) {
if !dte.te.twopcEnabled {
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "2pc is not enabled")
}
prepared, failed, err = txe.te.twoPC.ReadAllRedo(txe.ctx)
prepared, failed, err = dte.te.twoPC.ReadAllRedo(dte.ctx)
if err != nil {
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "Could not read redo: %v", err)
}
distributed, err = txe.te.twoPC.ReadAllTransactions(txe.ctx)
distributed, err = dte.te.twoPC.ReadAllTransactions(dte.ctx)
if err != nil {
return nil, nil, nil, vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "Could not read redo: %v", err)
}
return distributed, prepared, failed, nil
}

func (txe *TxExecutor) inTransaction(f func(*StatefulConnection) error) error {
conn, _, _, err := txe.te.txPool.Begin(txe.ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
func (dte *DTExecutor) inTransaction(f func(*StatefulConnection) error) error {
conn, _, _, err := dte.te.txPool.Begin(dte.ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil)
if err != nil {
return err
}
defer txe.te.txPool.RollbackAndRelease(txe.ctx, conn)
defer dte.te.txPool.RollbackAndRelease(dte.ctx, conn)

err = f(conn)
if err != nil {
return err
}

_, err = txe.te.txPool.Commit(txe.ctx, conn)
_, err = dte.te.txPool.Commit(dte.ctx, conn)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,22 @@ func TestTxExecutorPrepare(t *testing.T) {
require.NoError(t, err)
}

// TestTxExecutorPrepareResevedConn tests the case where a reserved connection is used for prepare.
func TestDTExecutorPrepareResevedConn(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
txe, tsv, db := newTestTxExecutor(t, ctx)
defer db.Close()
defer tsv.StopService()
txid := newTxForPrep(ctx, tsv)

// Reserve a connection
txe.te.Reserve(ctx, nil, txid, nil)

err := txe.Prepare(txid, "aa")
require.ErrorContains(t, err, "VT12001: unsupported: cannot prepare the transaction on a reserved connection")
}

func TestTxExecutorPrepareNotInTx(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -533,7 +549,7 @@ func TestNoTwopc(t *testing.T) {
}
}

func newTestTxExecutor(t *testing.T, ctx context.Context) (txe *TxExecutor, tsv *TabletServer, db *fakesqldb.DB) {
func newTestTxExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv *TabletServer, db *fakesqldb.DB) {
db = setUpQueryExecutorTest(t)
logStats := tabletenv.NewLogStats(ctx, "TestTxExecutor")
tsv = newTestTabletServer(ctx, smallTxPool, db)
Expand All @@ -542,15 +558,15 @@ func newTestTxExecutor(t *testing.T, ctx context.Context) (txe *TxExecutor, tsv
db.AddQuery("delete from _vt.redo_state where dtid = 'aa'", &sqltypes.Result{})
db.AddQuery("delete from _vt.redo_statement where dtid = 'aa'", &sqltypes.Result{})
db.AddQuery("update test_table set `name` = 2 where pk = 1 limit 10001", &sqltypes.Result{})
return &TxExecutor{
return &DTExecutor{
ctx: ctx,
logStats: logStats,
te: tsv.te,
}, tsv, db
}

// newShortAgeExecutor is same as newTestTxExecutor, but shorter transaction abandon age.
func newShortAgeExecutor(t *testing.T, ctx context.Context) (txe *TxExecutor, tsv *TabletServer, db *fakesqldb.DB) {
func newShortAgeExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv *TabletServer, db *fakesqldb.DB) {
db = setUpQueryExecutorTest(t)
logStats := tabletenv.NewLogStats(ctx, "TestTxExecutor")
tsv = newTestTabletServer(ctx, smallTxPool|shortTwopcAge, db)
Expand All @@ -559,19 +575,19 @@ func newShortAgeExecutor(t *testing.T, ctx context.Context) (txe *TxExecutor, ts
db.AddQuery("delete from _vt.redo_state where dtid = 'aa'", &sqltypes.Result{})
db.AddQuery("delete from _vt.redo_statement where dtid = 'aa'", &sqltypes.Result{})
db.AddQuery("update test_table set `name` = 2 where pk = 1 limit 10001", &sqltypes.Result{})
return &TxExecutor{
return &DTExecutor{
ctx: ctx,
logStats: logStats,
te: tsv.te,
}, tsv, db
}

// newNoTwopcExecutor is same as newTestTxExecutor, but 2pc disabled.
func newNoTwopcExecutor(t *testing.T, ctx context.Context) (txe *TxExecutor, tsv *TabletServer, db *fakesqldb.DB) {
func newNoTwopcExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv *TabletServer, db *fakesqldb.DB) {
db = setUpQueryExecutorTest(t)
logStats := tabletenv.NewLogStats(ctx, "TestTxExecutor")
tsv = newTestTabletServer(ctx, noTwopc, db)
return &TxExecutor{
return &DTExecutor{
ctx: ctx,
logStats: logStats,
te: tsv.te,
Expand Down
Loading
Loading