From fc25636cf21c07b0e436117773afea0f190cc2b6 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Wed, 18 Sep 2024 22:25:50 +0530 Subject: [PATCH 01/10] check for retryable error, if not update redo log state to fails and store the error message Signed-off-by: Harshit Gangal --- go/vt/sidecardb/schema/twopc/redo_state.sql | 1 + go/vt/vttablet/tabletserver/dt_executor.go | 29 +---- go/vt/vttablet/tabletserver/twopc.go | 11 +- go/vt/vttablet/tabletserver/tx_engine.go | 125 +++++++++++++++----- 4 files changed, 104 insertions(+), 62 deletions(-) diff --git a/go/vt/sidecardb/schema/twopc/redo_state.sql b/go/vt/sidecardb/schema/twopc/redo_state.sql index 975124e0320..58e250e435e 100644 --- a/go/vt/sidecardb/schema/twopc/redo_state.sql +++ b/go/vt/sidecardb/schema/twopc/redo_state.sql @@ -18,5 +18,6 @@ CREATE TABLE IF NOT EXISTS redo_state( dtid varbinary(512) NOT NULL, state bigint NOT NULL, time_created bigint NOT NULL, + message text, primary key(dtid) ) ENGINE = InnoDB CHARSET = utf8mb4 diff --git a/go/vt/vttablet/tabletserver/dt_executor.go b/go/vt/vttablet/tabletserver/dt_executor.go index 0721f9ab613..9a1c50bcd65 100644 --- a/go/vt/vttablet/tabletserver/dt_executor.go +++ b/go/vt/vttablet/tabletserver/dt_executor.go @@ -157,8 +157,8 @@ func (dte *DTExecutor) CommitPrepared(dtid string) (err error) { ctx := trace.CopySpan(context.Background(), dte.ctx) defer func() { if err != nil { - dte.markFailed(ctx, dtid) log.Warningf("failed to commit the prepared transaction '%s' with error: %v", dtid, err) + dte.te.checkErrorAndMarkFailed(ctx, dtid, err) } dte.te.txPool.RollbackAndRelease(ctx, conn) }() @@ -172,33 +172,6 @@ func (dte *DTExecutor) CommitPrepared(dtid string) (err error) { return nil } -// markFailed does the necessary work to mark a CommitPrepared -// as failed. It marks the dtid as failed in the prepared pool, -// increments the InternalErros counter, and also changes the -// state of the transaction in the redo log as failed. If the -// state change does not succeed, it just logs the event. -// The function uses the passed in context that has no timeout -// instead of DTExecutor's context. -func (dte *DTExecutor) markFailed(ctx context.Context, dtid string) { - dte.te.env.Stats().InternalErrors.Add("TwopcCommit", 1) - dte.te.preparedPool.SetFailed(dtid) - conn, _, _, err := dte.te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) - if err != nil { - log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err) - return - } - defer dte.te.txPool.RollbackAndRelease(ctx, conn) - - if err = dte.te.twoPC.UpdateRedo(ctx, conn, dtid, RedoStateFailed); err != nil { - log.Errorf("markFailed: UpdateRedo failed for dtid %s: %v", dtid, err) - return - } - - if _, err = dte.te.txPool.Commit(ctx, conn); err != nil { - log.Errorf("markFailed: Commit failed for dtid %s: %v", dtid, err) - } -} - // RollbackPrepared rolls back a prepared transaction. This function handles // the case of an incomplete prepare. // diff --git a/go/vt/vttablet/tabletserver/twopc.go b/go/vt/vttablet/tabletserver/twopc.go index 868ffff2b3d..195b9bc09df 100644 --- a/go/vt/vttablet/tabletserver/twopc.go +++ b/go/vt/vttablet/tabletserver/twopc.go @@ -109,8 +109,8 @@ func (tpc *TwoPC) initializeQueries() { "insert into %s.redo_statement(dtid, id, statement) values %a", dbname, ":vals") tpc.updateRedoTx = sqlparser.BuildParsedQuery( - "update %s.redo_state set state = %a where dtid = %a", - dbname, ":state", ":dtid") + "update %s.redo_state set state = %a, message = %a where dtid = %a", + dbname, ":state", ":message", ":dtid") tpc.deleteRedoTx = sqlparser.BuildParsedQuery( "delete from %s.redo_state where dtid = %a", dbname, ":dtid") @@ -200,10 +200,11 @@ func (tpc *TwoPC) SaveRedo(ctx context.Context, conn *StatefulConnection, dtid s } // UpdateRedo changes the state of the redo log for the dtid. -func (tpc *TwoPC) UpdateRedo(ctx context.Context, conn *StatefulConnection, dtid string, state int) error { +func (tpc *TwoPC) UpdateRedo(ctx context.Context, conn *StatefulConnection, dtid string, state int, message string) error { bindVars := map[string]*querypb.BindVariable{ - "dtid": sqltypes.StringBindVariable(dtid), - "state": sqltypes.Int64BindVariable(int64(state)), + "dtid": sqltypes.StringBindVariable(dtid), + "state": sqltypes.Int64BindVariable(int64(state)), + "message": sqltypes.StringBindVariable(message), } _, err := tpc.exec(ctx, conn, tpc.updateRedoTx, bindVars) return err diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go index d7f2d55b18a..f228f09eb73 100644 --- a/go/vt/vttablet/tabletserver/tx_engine.go +++ b/go/vt/vttablet/tabletserver/tx_engine.go @@ -22,10 +22,10 @@ import ( "sync" "time" + "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/pools/smartconnpool" "vitess.io/vitess/go/timer" "vitess.io/vitess/go/trace" - "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/dtids" "vitess.io/vitess/go/vt/log" querypb "vitess.io/vitess/go/vt/proto/query" @@ -411,58 +411,125 @@ func (te *TxEngine) shutdownLocked() { // to ensure there are no future collisions. func (te *TxEngine) prepareFromRedo() error { ctx := tabletenv.LocalContext() - var allErr concurrency.AllErrorRecorder - prepared, failed, err := te.twoPC.ReadAllRedo(ctx) - if err != nil { - return err + + prepared, failed, readErr := te.twoPC.ReadAllRedo(ctx) + if readErr != nil { + return readErr } - maxid := int64(0) + var ( + maxID = int64(0) + preparedCounter = 0 + failedCounter = len(failed) + lastDtid string + lastErr error + allErrs []error + ) + outer: for _, preparedTx := range prepared { - txid, err := dtids.TransactionID(preparedTx.Dtid) - if err != nil { - log.Errorf("Error extracting transaction ID from dtid: %v", err) + var conn *StatefulConnection + + txID, _ := dtids.TransactionID(preparedTx.Dtid) + if txID > maxID { + maxID = txID } - if txid > maxid { - maxid = txid + + // check last error to record failure. + if lastErr != nil { + allErrs = append(allErrs, vterrors.Wrapf(lastErr, "dtid - %v", lastDtid)) + if te.checkErrorAndMarkFailed(ctx, lastDtid, lastErr) { + failedCounter++ + } } + + lastDtid = preparedTx.Dtid + // We need to redo the prepared transactions using a dba user because MySQL might still be in read only mode. - conn, err := te.beginNewDbaConnection(ctx) - if err != nil { - allErr.RecordError(vterrors.Wrapf(err, "dtid - %v", preparedTx.Dtid)) + conn, lastErr = te.beginNewDbaConnection(ctx) + if lastErr != nil { continue } for _, stmt := range preparedTx.Queries { conn.TxProperties().RecordQuery(stmt, te.env.Environment().Parser()) - _, err := conn.Exec(ctx, stmt, 1, false) - if err != nil { - allErr.RecordError(vterrors.Wrapf(err, "dtid - %v", preparedTx.Dtid)) + if _, lastErr = conn.Exec(ctx, stmt, 1, false); lastErr != nil { te.txPool.RollbackAndRelease(ctx, conn) continue outer } } // We should not use the external Prepare because // we don't want to write again to the redo log. - err = te.preparedPool.Put(conn, preparedTx.Dtid) - if err != nil { - allErr.RecordError(vterrors.Wrapf(err, "dtid - %v", preparedTx.Dtid)) + if lastErr = te.preparedPool.Put(conn, preparedTx.Dtid); lastErr != nil { continue } + preparedCounter++ } - for _, preparedTx := range failed { - txid, err := dtids.TransactionID(preparedTx.Dtid) - if err != nil { - log.Errorf("Error extracting transaction ID from dtid: %v", err) + + // check last error to record failure. + if lastErr != nil { + allErrs = append(allErrs, vterrors.Wrapf(lastErr, "dtid - %v", lastDtid)) + if te.checkErrorAndMarkFailed(ctx, lastDtid, lastErr) { + failedCounter++ } - if txid > maxid { - maxid = txid + } + + for _, preparedTx := range failed { + txID, _ := dtids.TransactionID(preparedTx.Dtid) + if txID > maxID { + maxID = txID } te.preparedPool.SetFailed(preparedTx.Dtid) } - te.txPool.AdjustLastID(maxid) - log.Infof("TwoPC: Prepared %d transactions, and registered %d failures.", len(prepared), len(failed)) - return allErr.Error() + te.txPool.AdjustLastID(maxID) + log.Infof("TwoPC: Prepared %d transactions, and registered %d failures.", preparedCounter, failedCounter) + return vterrors.Aggregate(allErrs) +} + +// checkErrorAndMarkFailed check that the error is retryable or non-retryable error. +// If it is a non-retryable error than it marks the dtid as failed in the prepared pool, +// increments the InternalErrors counter, and also changes the state of the transaction in the redo log as failed. +func (te *TxEngine) checkErrorAndMarkFailed(ctx context.Context, dtid string, receivedErr error) (fail bool) { + if isRetryableError(receivedErr) { + log.Infof("retryable error for dtid: %s", dtid) + return + } + + fail = true + te.env.Stats().InternalErrors.Add("TwopcCommit", 1) + te.preparedPool.SetFailed(dtid) + conn, _, _, err := te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) + if err != nil { + log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err) + return + } + defer te.txPool.RollbackAndRelease(ctx, conn) + + if err = te.twoPC.UpdateRedo(ctx, conn, dtid, RedoStateFailed, receivedErr.Error()); err != nil { + log.Errorf("markFailed: UpdateRedo failed for dtid %s: %v", dtid, err) + return + } + + if _, err = te.txPool.Commit(ctx, conn); err != nil { + log.Errorf("markFailed: Commit failed for dtid %s: %v", dtid, err) + } + return +} + +func isRetryableError(err error) bool { + switch vterrors.Code(err) { + case vtrpcpb.Code_OK, + vtrpcpb.Code_DEADLINE_EXCEEDED, + vtrpcpb.Code_CANCELED, + vtrpcpb.Code_UNAVAILABLE: + return true + case vtrpcpb.Code_UNKNOWN: + // If the error is unknown, convert to SQL Error. + sqlErr := sqlerror.NewSQLErrorFromError(err) + // Connection errors are retryable + return sqlerror.IsConnErr(sqlErr) + default: + return false + } } // shutdownTransactions rolls back all open transactions that are idol. From 346aaf0a190fbc1c1c30b31427c7b1830f0d8148 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Wed, 18 Sep 2024 22:51:28 +0530 Subject: [PATCH 02/10] test: updated test with new fail state expectation Signed-off-by: Harshit Gangal --- go/vt/vttablet/tabletserver/tabletserver_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index 66ad2248a03..443ab9cbe09 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -282,7 +282,10 @@ func TestTabletServerRedoLogIsKeptBetweenRestarts(t *testing.T) { Sql: "update test_table set `name` = 2 where pk = 1 limit 10001", Tables: []string{"test_table"}}} utils.MustMatch(t, want, got, "Prepared queries") - wantFailed := map[string]error{"a:b:20": errPrepFailed} + wantFailed := map[string]error{ + "bogus": errPrepFailed, // The query is rejected by database so added to failed list. + "a:b:20": errPrepFailed, // The DTID is already in failed state. + } utils.MustMatch(t, tsv.te.preparedPool.reserved, wantFailed, fmt.Sprintf("Failed dtids: %v, want %v", tsv.te.preparedPool.reserved, wantFailed)) // Verify last id got adjusted. assert.EqualValues(t, 20, tsv.te.txPool.scp.lastID.Load(), "tsv.te.txPool.lastID.Get()") From 4fecbcc8151db01a3f7db8720b74c4f50822192e Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Thu, 19 Sep 2024 15:52:12 +0530 Subject: [PATCH 03/10] test: added check received error test Signed-off-by: Harshit Gangal --- go/vt/vttablet/tabletserver/tx_engine_test.go | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/go/vt/vttablet/tabletserver/tx_engine_test.go b/go/vt/vttablet/tabletserver/tx_engine_test.go index 95057d754fb..be840c2a683 100644 --- a/go/vt/vttablet/tabletserver/tx_engine_test.go +++ b/go/vt/vttablet/tabletserver/tx_engine_test.go @@ -25,7 +25,10 @@ import ( "testing" "time" + "vitess.io/vitess/go/mysql/sqlerror" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vtenv" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletserver/tx" "github.com/stretchr/testify/assert" @@ -603,3 +606,58 @@ func TestTxEngineFailReserve(t *testing.T) { require.Error(t, err) assert.Zero(t, connID) } + +func TestCheckReceivedError(t *testing.T) { + db := setUpQueryExecutorTest(t) + defer db.Close() + db.AddQueryPattern(".*", &sqltypes.Result{}) + cfg := tabletenv.NewDefaultConfig() + cfg.DB = newDBConfigs(db) + env := tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "TabletServerTest") + env.Config().TwoPCEnable = true + env.Config().TwoPCAbandonAge = 5 + te := NewTxEngine(env, nil) + te.AcceptReadWrite() + + tcases := []struct { + receivedErr error + nonRetryable bool + }{{ + receivedErr: vterrors.New(vtrpcpb.Code_DEADLINE_EXCEEDED, "deadline exceeded"), + nonRetryable: false, + }, { + receivedErr: vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "invalid argument"), + nonRetryable: true, + }, { + receivedErr: sqlerror.NewSQLError(sqlerror.ERLockDeadlock, sqlerror.SSLockDeadlock, "Deadlock found when trying to get lock; try restarting transaction"), + nonRetryable: true, + }, { + receivedErr: context.DeadlineExceeded, + nonRetryable: false, + }, { + receivedErr: context.Canceled, + nonRetryable: false, + }, { + receivedErr: sqlerror.NewSQLError(sqlerror.CRServerLost, sqlerror.SSUnknownSQLState, "Lost connection to MySQL server during query"), + nonRetryable: false, + }, { + receivedErr: sqlerror.NewSQLError(sqlerror.CRMalformedPacket, sqlerror.SSUnknownSQLState, "Malformed packet"), + nonRetryable: true, + }, { + receivedErr: sqlerror.NewSQLError(sqlerror.CRServerGone, sqlerror.SSUnknownSQLState, "Server has gone away"), + nonRetryable: false, + }, { + receivedErr: vterrors.New(vtrpcpb.Code_ABORTED, "Row count exceeded"), + nonRetryable: true, + }, { + receivedErr: errors.New("(errno 2013) (sqlstate HY000) lost connection"), + nonRetryable: false, + }} + + for _, tc := range tcases { + t.Run(tc.receivedErr.Error(), func(t *testing.T) { + nonRetryable := te.checkErrorAndMarkFailed(context.Background(), "aa", tc.receivedErr) + require.Equal(t, tc.nonRetryable, nonRetryable) + }) + } +} From 21d6016d45fa6d7a6d1f149b92a2163c6bdaf9fa Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Fri, 20 Sep 2024 10:53:04 +0530 Subject: [PATCH 04/10] retrieve message from redo log state Signed-off-by: Harshit Gangal --- go/vt/vttablet/tabletserver/dt_executor_test.go | 2 ++ go/vt/vttablet/tabletserver/tabletserver_test.go | 6 ++++++ go/vt/vttablet/tabletserver/twopc.go | 7 ++++--- go/vt/vttablet/tabletserver/tx/twopc.go | 1 + go/vt/vttablet/tabletserver/tx_engine_test.go | 4 ++++ 5 files changed, 17 insertions(+), 3 deletions(-) diff --git a/go/vt/vttablet/tabletserver/dt_executor_test.go b/go/vt/vttablet/tabletserver/dt_executor_test.go index a3486c4370e..9aec50ff59f 100644 --- a/go/vt/vttablet/tabletserver/dt_executor_test.go +++ b/go/vt/vttablet/tabletserver/dt_executor_test.go @@ -26,6 +26,7 @@ import ( "time" "vitess.io/vitess/go/event/syslogger" + "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/vt/vtenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/rules" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" @@ -683,6 +684,7 @@ func newTestTxExecutor(t *testing.T, ctx context.Context) (txe *DTExecutor, tsv db.AddQuery("delete from _vt.redo_state where dtid = 'aa'", &sqltypes.Result{}) db.AddQuery("delete from _vt.redo_statement where dtid = 'aa'", &sqltypes.Result{}) db.AddQuery("update test_table set `name` = 2 where pk = 1 limit 10001", &sqltypes.Result{}) + db.AddRejectedQuery("bogus", sqlerror.NewSQLError(sqlerror.ERUnknownError, sqlerror.SSUnknownSQLState, "bogus query")) return &DTExecutor{ ctx: ctx, logStats: logStats, diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index 443ab9cbe09..f4119aabd20 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -231,12 +231,14 @@ func TestTabletServerRedoLogIsKeptBetweenRestarts(t *testing.T) { {Type: sqltypes.Uint64}, {Type: sqltypes.Uint64}, {Type: sqltypes.VarBinary}, + {Type: sqltypes.Text}, }, Rows: [][]sqltypes.Value{{ sqltypes.NewVarBinary("dtid0"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary(""), sqltypes.NewVarBinary("update test_table set `name` = 2 where pk = 1 limit 10001"), + sqltypes.NULL, }}, }) turnOnTxEngine() @@ -257,22 +259,26 @@ func TestTabletServerRedoLogIsKeptBetweenRestarts(t *testing.T) { {Type: sqltypes.Uint64}, {Type: sqltypes.Uint64}, {Type: sqltypes.VarBinary}, + {Type: sqltypes.Text}, }, Rows: [][]sqltypes.Value{{ sqltypes.NewVarBinary("bogus"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary(""), sqltypes.NewVarBinary("bogus"), + sqltypes.NULL, }, { sqltypes.NewVarBinary("a:b:10"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary(""), sqltypes.NewVarBinary("update test_table set `name` = 2 where pk = 1 limit 10001"), + sqltypes.NULL, }, { sqltypes.NewVarBinary("a:b:20"), sqltypes.NewInt64(RedoStateFailed), sqltypes.NewVarBinary(""), sqltypes.NewVarBinary("unused"), + sqltypes.TestValue(sqltypes.Text, "deadlock detected, transaction rolled back"), }}, }) turnOnTxEngine() diff --git a/go/vt/vttablet/tabletserver/twopc.go b/go/vt/vttablet/tabletserver/twopc.go index 195b9bc09df..5eff30ce07e 100644 --- a/go/vt/vttablet/tabletserver/twopc.go +++ b/go/vt/vttablet/tabletserver/twopc.go @@ -47,7 +47,7 @@ const ( // DTStateRollback represents the ROLLBACK state for dt_state. DTStateRollback = querypb.TransactionState_ROLLBACK - readAllRedo = `select t.dtid, t.state, t.time_created, s.statement + readAllRedo = `select t.dtid, t.state, t.time_created, s.statement, t.message from %s.redo_state t join %s.redo_statement s on t.dtid = s.dtid order by t.dtid, s.id` @@ -245,8 +245,9 @@ func (tpc *TwoPC) ReadAllRedo(ctx context.Context) (prepared, failed []*tx.Prepa // which is harmless. tm, _ := row[2].ToCastInt64() curTx = &tx.PreparedTx{ - Dtid: dtid, - Time: time.Unix(0, tm), + Dtid: dtid, + Time: time.Unix(0, tm), + Message: row[4].ToString(), } st, err := row[1].ToCastInt64() if err != nil { diff --git a/go/vt/vttablet/tabletserver/tx/twopc.go b/go/vt/vttablet/tabletserver/tx/twopc.go index 56cfbd1a51f..6412fc53b4d 100644 --- a/go/vt/vttablet/tabletserver/tx/twopc.go +++ b/go/vt/vttablet/tabletserver/tx/twopc.go @@ -36,4 +36,5 @@ type PreparedTx struct { Dtid string Queries []string Time time.Time + Message string } diff --git a/go/vt/vttablet/tabletserver/tx_engine_test.go b/go/vt/vttablet/tabletserver/tx_engine_test.go index be840c2a683..a46488b92cf 100644 --- a/go/vt/vttablet/tabletserver/tx_engine_test.go +++ b/go/vt/vttablet/tabletserver/tx_engine_test.go @@ -658,6 +658,10 @@ func TestCheckReceivedError(t *testing.T) { t.Run(tc.receivedErr.Error(), func(t *testing.T) { nonRetryable := te.checkErrorAndMarkFailed(context.Background(), "aa", tc.receivedErr) require.Equal(t, tc.nonRetryable, nonRetryable) + if tc.nonRetryable { + require.Equal(t, errPrepFailed, te.preparedPool.reserved["aa"]) + } + delete(te.preparedPool.reserved, "aa") }) } } From c9d102ed0078022f0e115731e67397fa5a659aeb Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Fri, 20 Sep 2024 16:01:49 +0530 Subject: [PATCH 05/10] fix test setup and expectation Signed-off-by: Harshit Gangal --- go/vt/vttablet/tabletserver/twopc_test.go | 32 ++++++++++++++++------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/go/vt/vttablet/tabletserver/twopc_test.go b/go/vt/vttablet/tabletserver/twopc_test.go index 11321bd75fd..3fec16e91a0 100644 --- a/go/vt/vttablet/tabletserver/twopc_test.go +++ b/go/vt/vttablet/tabletserver/twopc_test.go @@ -66,12 +66,14 @@ func TestReadAllRedo(t *testing.T) { {Type: sqltypes.Int64}, {Type: sqltypes.Int64}, {Type: sqltypes.VarChar}, + {Type: sqltypes.Text}, }, Rows: [][]sqltypes.Value{{ sqltypes.NewVarBinary("dtid0"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("stmt01"), + sqltypes.NULL, }}, }) prepared, failed, err = tpc.ReadAllRedo(ctx) @@ -96,17 +98,20 @@ func TestReadAllRedo(t *testing.T) { {Type: sqltypes.Int64}, {Type: sqltypes.Int64}, {Type: sqltypes.VarChar}, + {Type: sqltypes.Text}, }, Rows: [][]sqltypes.Value{{ sqltypes.NewVarBinary("dtid0"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("stmt01"), + sqltypes.NULL, }, { sqltypes.NewVarBinary("dtid0"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("stmt02"), + sqltypes.NULL, }}, }) prepared, failed, err = tpc.ReadAllRedo(ctx) @@ -131,22 +136,26 @@ func TestReadAllRedo(t *testing.T) { {Type: sqltypes.Int64}, {Type: sqltypes.Int64}, {Type: sqltypes.VarChar}, + {Type: sqltypes.Text}, }, Rows: [][]sqltypes.Value{{ sqltypes.NewVarBinary("dtid0"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("stmt01"), + sqltypes.NULL, }, { sqltypes.NewVarBinary("dtid0"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("stmt02"), + sqltypes.NULL, }, { sqltypes.NewVarBinary("dtid1"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("stmt11"), + sqltypes.NULL, }}, }) prepared, failed, err = tpc.ReadAllRedo(ctx) @@ -175,37 +184,44 @@ func TestReadAllRedo(t *testing.T) { {Type: sqltypes.Int64}, {Type: sqltypes.Int64}, {Type: sqltypes.VarChar}, + {Type: sqltypes.Text}, }, Rows: [][]sqltypes.Value{{ sqltypes.NewVarBinary("dtid0"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("stmt01"), + sqltypes.NULL, }, { sqltypes.NewVarBinary("dtid0"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("stmt02"), + sqltypes.NULL, }, { sqltypes.NewVarBinary("dtid1"), - sqltypes.NewVarBinary("Failed"), + sqltypes.NewInt64(RedoStateFailed), sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("stmt11"), + sqltypes.TestValue(sqltypes.Text, "error1"), }, { sqltypes.NewVarBinary("dtid2"), - sqltypes.NewVarBinary("Failed"), + sqltypes.NewInt64(RedoStateFailed), sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("stmt21"), + sqltypes.TestValue(sqltypes.Text, "error2"), }, { sqltypes.NewVarBinary("dtid2"), - sqltypes.NewVarBinary("Failed"), + sqltypes.NewInt64(RedoStateFailed), sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("stmt22"), + sqltypes.TestValue(sqltypes.Text, "error2"), }, { sqltypes.NewVarBinary("dtid3"), sqltypes.NewInt64(RedoStatePrepared), sqltypes.NewVarBinary("1"), sqltypes.NewVarBinary("stmt31"), + sqltypes.NULL, }}, }) prepared, failed, err = tpc.ReadAllRedo(ctx) @@ -221,21 +237,19 @@ func TestReadAllRedo(t *testing.T) { Queries: []string{"stmt31"}, Time: time.Unix(0, 1), }} - if !reflect.DeepEqual(prepared, want) { - t.Errorf("ReadAllRedo: %s, want %s", jsonStr(prepared), jsonStr(want)) - } + utils.MustMatch(t, want, prepared) wantFailed := []*tx.PreparedTx{{ Dtid: "dtid1", Queries: []string{"stmt11"}, Time: time.Unix(0, 1), + Message: "error1", }, { Dtid: "dtid2", Queries: []string{"stmt21", "stmt22"}, Time: time.Unix(0, 1), + Message: "error2", }} - if !reflect.DeepEqual(failed, wantFailed) { - t.Errorf("ReadAllRedo failed): %s, want %s", jsonStr(failed), jsonStr(wantFailed)) - } + utils.MustMatch(t, wantFailed, failed) } func TestReadAllTransactions(t *testing.T) { From b2cd8d1004915d7d90050523d9e44c9226d2f5f8 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Fri, 20 Sep 2024 16:29:16 +0530 Subject: [PATCH 06/10] add test query expectation Signed-off-by: Harshit Gangal --- go/vt/vttablet/tabletserver/tx_engine_test.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletserver/tx_engine_test.go b/go/vt/vttablet/tabletserver/tx_engine_test.go index a46488b92cf..3c666701539 100644 --- a/go/vt/vttablet/tabletserver/tx_engine_test.go +++ b/go/vt/vttablet/tabletserver/tx_engine_test.go @@ -610,7 +610,7 @@ func TestTxEngineFailReserve(t *testing.T) { func TestCheckReceivedError(t *testing.T) { db := setUpQueryExecutorTest(t) defer db.Close() - db.AddQueryPattern(".*", &sqltypes.Result{}) + // db.AddQueryPattern(".*", &sqltypes.Result{}) cfg := tabletenv.NewDefaultConfig() cfg.DB = newDBConfigs(db) env := tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "TabletServerTest") @@ -622,15 +622,18 @@ func TestCheckReceivedError(t *testing.T) { tcases := []struct { receivedErr error nonRetryable bool + expQuery string }{{ receivedErr: vterrors.New(vtrpcpb.Code_DEADLINE_EXCEEDED, "deadline exceeded"), nonRetryable: false, }, { receivedErr: vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "invalid argument"), nonRetryable: true, + expQuery: `update _vt.redo_state set state = 0, message = 'invalid argument' where dtid = 'aa'`, }, { receivedErr: sqlerror.NewSQLError(sqlerror.ERLockDeadlock, sqlerror.SSLockDeadlock, "Deadlock found when trying to get lock; try restarting transaction"), nonRetryable: true, + expQuery: `update _vt.redo_state set state = 0, message = 'Deadlock found when trying to get lock; try restarting transaction (errno 1213) (sqlstate 40001)' where dtid = 'aa'`, }, { receivedErr: context.DeadlineExceeded, nonRetryable: false, @@ -643,12 +646,14 @@ func TestCheckReceivedError(t *testing.T) { }, { receivedErr: sqlerror.NewSQLError(sqlerror.CRMalformedPacket, sqlerror.SSUnknownSQLState, "Malformed packet"), nonRetryable: true, + expQuery: `update _vt.redo_state set state = 0, message = 'Malformed packet (errno 2027) (sqlstate HY000)' where dtid = 'aa'`, }, { receivedErr: sqlerror.NewSQLError(sqlerror.CRServerGone, sqlerror.SSUnknownSQLState, "Server has gone away"), nonRetryable: false, }, { receivedErr: vterrors.New(vtrpcpb.Code_ABORTED, "Row count exceeded"), nonRetryable: true, + expQuery: `update _vt.redo_state set state = 0, message = 'Row count exceeded' where dtid = 'aa'`, }, { receivedErr: errors.New("(errno 2013) (sqlstate HY000) lost connection"), nonRetryable: false, @@ -656,6 +661,9 @@ func TestCheckReceivedError(t *testing.T) { for _, tc := range tcases { t.Run(tc.receivedErr.Error(), func(t *testing.T) { + if tc.expQuery != "" { + db.AddQuery(tc.expQuery, &sqltypes.Result{}) + } nonRetryable := te.checkErrorAndMarkFailed(context.Background(), "aa", tc.receivedErr) require.Equal(t, tc.nonRetryable, nonRetryable) if tc.nonRetryable { From a7f6392201486b216a23685ba4449559bc832283 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 23 Sep 2024 16:59:31 +0530 Subject: [PATCH 07/10] save error message on retryable error to redo log state table Signed-off-by: Harshit Gangal --- go/vt/vttablet/tabletserver/dt_executor.go | 2 +- go/vt/vttablet/tabletserver/tx_engine.go | 42 ++++++++++--------- go/vt/vttablet/tabletserver/tx_engine_test.go | 9 +++- 3 files changed, 31 insertions(+), 22 deletions(-) diff --git a/go/vt/vttablet/tabletserver/dt_executor.go b/go/vt/vttablet/tabletserver/dt_executor.go index 9a1c50bcd65..fa79bd44672 100644 --- a/go/vt/vttablet/tabletserver/dt_executor.go +++ b/go/vt/vttablet/tabletserver/dt_executor.go @@ -158,7 +158,7 @@ func (dte *DTExecutor) CommitPrepared(dtid string) (err error) { defer func() { if err != nil { log.Warningf("failed to commit the prepared transaction '%s' with error: %v", dtid, err) - dte.te.checkErrorAndMarkFailed(ctx, dtid, err) + dte.te.checkErrorAndMarkFailed(ctx, dtid, err, "TwopcCommit") } dte.te.txPool.RollbackAndRelease(ctx, conn) }() diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go index f228f09eb73..60f02e50d75 100644 --- a/go/vt/vttablet/tabletserver/tx_engine.go +++ b/go/vt/vttablet/tabletserver/tx_engine.go @@ -426,6 +426,15 @@ func (te *TxEngine) prepareFromRedo() error { allErrs []error ) + checkErr := func(dtid string, err error) { + if err != nil { + allErrs = append(allErrs, vterrors.Wrapf(err, "dtid - %v", dtid)) + if te.checkErrorAndMarkFailed(ctx, dtid, err, "TwopcPrepareRedo") { + failedCounter++ + } + } + } + outer: for _, preparedTx := range prepared { var conn *StatefulConnection @@ -436,12 +445,7 @@ outer: } // check last error to record failure. - if lastErr != nil { - allErrs = append(allErrs, vterrors.Wrapf(lastErr, "dtid - %v", lastDtid)) - if te.checkErrorAndMarkFailed(ctx, lastDtid, lastErr) { - failedCounter++ - } - } + checkErr(lastDtid, lastErr) lastDtid = preparedTx.Dtid @@ -466,12 +470,7 @@ outer: } // check last error to record failure. - if lastErr != nil { - allErrs = append(allErrs, vterrors.Wrapf(lastErr, "dtid - %v", lastDtid)) - if te.checkErrorAndMarkFailed(ctx, lastDtid, lastErr) { - failedCounter++ - } - } + checkErr(lastDtid, lastErr) for _, preparedTx := range failed { txID, _ := dtids.TransactionID(preparedTx.Dtid) @@ -487,16 +486,21 @@ outer: // checkErrorAndMarkFailed check that the error is retryable or non-retryable error. // If it is a non-retryable error than it marks the dtid as failed in the prepared pool, -// increments the InternalErrors counter, and also changes the state of the transaction in the redo log as failed. -func (te *TxEngine) checkErrorAndMarkFailed(ctx context.Context, dtid string, receivedErr error) (fail bool) { +// increments the InternalErrors counter, and also changes the state of the transaction in the redo log as failed. +func (te *TxEngine) checkErrorAndMarkFailed(ctx context.Context, dtid string, receivedErr error, metricName string) (fail bool) { + state := RedoStateFailed if isRetryableError(receivedErr) { log.Infof("retryable error for dtid: %s", dtid) - return + state = RedoStatePrepared + } else { + fail = true + te.env.Stats().InternalErrors.Add(metricName, 1) + te.preparedPool.SetFailed(dtid) } - fail = true - te.env.Stats().InternalErrors.Add("TwopcCommit", 1) - te.preparedPool.SetFailed(dtid) + // Update the state of the transaction in the redo log. + // Retryable Error: Update the message with error message. + // Non-retryable Error: Along with message, update the state as RedoStateFailed. conn, _, _, err := te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) if err != nil { log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err) @@ -504,7 +508,7 @@ func (te *TxEngine) checkErrorAndMarkFailed(ctx context.Context, dtid string, re } defer te.txPool.RollbackAndRelease(ctx, conn) - if err = te.twoPC.UpdateRedo(ctx, conn, dtid, RedoStateFailed, receivedErr.Error()); err != nil { + if err = te.twoPC.UpdateRedo(ctx, conn, dtid, state, receivedErr.Error()); err != nil { log.Errorf("markFailed: UpdateRedo failed for dtid %s: %v", dtid, err) return } diff --git a/go/vt/vttablet/tabletserver/tx_engine_test.go b/go/vt/vttablet/tabletserver/tx_engine_test.go index 3c666701539..5dfb7235c0d 100644 --- a/go/vt/vttablet/tabletserver/tx_engine_test.go +++ b/go/vt/vttablet/tabletserver/tx_engine_test.go @@ -610,7 +610,6 @@ func TestTxEngineFailReserve(t *testing.T) { func TestCheckReceivedError(t *testing.T) { db := setUpQueryExecutorTest(t) defer db.Close() - // db.AddQueryPattern(".*", &sqltypes.Result{}) cfg := tabletenv.NewDefaultConfig() cfg.DB = newDBConfigs(db) env := tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "TabletServerTest") @@ -626,6 +625,7 @@ func TestCheckReceivedError(t *testing.T) { }{{ receivedErr: vterrors.New(vtrpcpb.Code_DEADLINE_EXCEEDED, "deadline exceeded"), nonRetryable: false, + expQuery: `update _vt.redo_state set state = 1, message = 'deadline exceeded' where dtid = 'aa'`, }, { receivedErr: vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "invalid argument"), nonRetryable: true, @@ -637,12 +637,15 @@ func TestCheckReceivedError(t *testing.T) { }, { receivedErr: context.DeadlineExceeded, nonRetryable: false, + expQuery: `update _vt.redo_state set state = 1, message = 'context deadline exceeded' where dtid = 'aa'`, }, { receivedErr: context.Canceled, nonRetryable: false, + expQuery: `update _vt.redo_state set state = 1, message = 'context canceled' where dtid = 'aa'`, }, { receivedErr: sqlerror.NewSQLError(sqlerror.CRServerLost, sqlerror.SSUnknownSQLState, "Lost connection to MySQL server during query"), nonRetryable: false, + expQuery: `update _vt.redo_state set state = 1, message = 'Lost connection to MySQL server during query (errno 2013) (sqlstate HY000)' where dtid = 'aa'`, }, { receivedErr: sqlerror.NewSQLError(sqlerror.CRMalformedPacket, sqlerror.SSUnknownSQLState, "Malformed packet"), nonRetryable: true, @@ -650,6 +653,7 @@ func TestCheckReceivedError(t *testing.T) { }, { receivedErr: sqlerror.NewSQLError(sqlerror.CRServerGone, sqlerror.SSUnknownSQLState, "Server has gone away"), nonRetryable: false, + expQuery: `update _vt.redo_state set state = 1, message = 'Server has gone away (errno 2006) (sqlstate HY000)' where dtid = 'aa'`, }, { receivedErr: vterrors.New(vtrpcpb.Code_ABORTED, "Row count exceeded"), nonRetryable: true, @@ -657,6 +661,7 @@ func TestCheckReceivedError(t *testing.T) { }, { receivedErr: errors.New("(errno 2013) (sqlstate HY000) lost connection"), nonRetryable: false, + expQuery: `update _vt.redo_state set state = 1, message = '(errno 2013) (sqlstate HY000) lost connection' where dtid = 'aa'`, }} for _, tc := range tcases { @@ -664,7 +669,7 @@ func TestCheckReceivedError(t *testing.T) { if tc.expQuery != "" { db.AddQuery(tc.expQuery, &sqltypes.Result{}) } - nonRetryable := te.checkErrorAndMarkFailed(context.Background(), "aa", tc.receivedErr) + nonRetryable := te.checkErrorAndMarkFailed(context.Background(), "aa", tc.receivedErr, "") require.Equal(t, tc.nonRetryable, nonRetryable) if tc.nonRetryable { require.Equal(t, errPrepFailed, te.preparedPool.reserved["aa"]) From 6ecacacdd8b476d5c708c87f6b7a5a9845857d76 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 23 Sep 2024 19:28:27 +0530 Subject: [PATCH 08/10] address review comments Signed-off-by: Harshit Gangal --- go/vt/vttablet/tabletserver/tx_engine.go | 74 +++++++++---------- go/vt/vttablet/tabletserver/tx_engine_test.go | 70 +++++++++--------- 2 files changed, 72 insertions(+), 72 deletions(-) diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go index 60f02e50d75..549851790ed 100644 --- a/go/vt/vttablet/tabletserver/tx_engine.go +++ b/go/vt/vttablet/tabletserver/tx_engine.go @@ -421,57 +421,30 @@ func (te *TxEngine) prepareFromRedo() error { maxID = int64(0) preparedCounter = 0 failedCounter = len(failed) - lastDtid string - lastErr error allErrs []error ) - checkErr := func(dtid string, err error) { - if err != nil { - allErrs = append(allErrs, vterrors.Wrapf(err, "dtid - %v", dtid)) - if te.checkErrorAndMarkFailed(ctx, dtid, err, "TwopcPrepareRedo") { - failedCounter++ - } - } - } + // While going through the prepared transaction. + // We will extract the transaction ID from the dtid and + // update the last transaction ID to max value to avoid any collision with the new transactions. -outer: for _, preparedTx := range prepared { - var conn *StatefulConnection - txID, _ := dtids.TransactionID(preparedTx.Dtid) if txID > maxID { maxID = txID } - // check last error to record failure. - checkErr(lastDtid, lastErr) - - lastDtid = preparedTx.Dtid - - // We need to redo the prepared transactions using a dba user because MySQL might still be in read only mode. - conn, lastErr = te.beginNewDbaConnection(ctx) - if lastErr != nil { - continue - } - for _, stmt := range preparedTx.Queries { - conn.TxProperties().RecordQuery(stmt, te.env.Environment().Parser()) - if _, lastErr = conn.Exec(ctx, stmt, 1, false); lastErr != nil { - te.txPool.RollbackAndRelease(ctx, conn) - continue outer + prepFailed, err := te.prepareTx(ctx, preparedTx) + if err != nil { + allErrs = append(allErrs, vterrors.Wrapf(err, "dtid - %v", preparedTx.Dtid)) + if prepFailed { + failedCounter++ } + } else { + preparedCounter++ } - // We should not use the external Prepare because - // we don't want to write again to the redo log. - if lastErr = te.preparedPool.Put(conn, preparedTx.Dtid); lastErr != nil { - continue - } - preparedCounter++ } - // check last error to record failure. - checkErr(lastDtid, lastErr) - for _, preparedTx := range failed { txID, _ := dtids.TransactionID(preparedTx.Dtid) if txID > maxID { @@ -479,11 +452,38 @@ outer: } te.preparedPool.SetFailed(preparedTx.Dtid) } + te.txPool.AdjustLastID(maxID) log.Infof("TwoPC: Prepared %d transactions, and registered %d failures.", preparedCounter, failedCounter) return vterrors.Aggregate(allErrs) } +func (te *TxEngine) prepareTx(ctx context.Context, preparedTx *tx.PreparedTx) (failed bool, err error) { + defer func() { + if err != nil { + failed = te.checkErrorAndMarkFailed(ctx, preparedTx.Dtid, err, "TwopcPrepareRedo") + } + }() + + // We need to redo the prepared transactions using a dba user because MySQL might still be in read only mode. + var conn *StatefulConnection + if conn, err = te.beginNewDbaConnection(ctx); err != nil { + return + } + + for _, stmt := range preparedTx.Queries { + conn.TxProperties().RecordQuery(stmt, te.env.Environment().Parser()) + if _, err = conn.Exec(ctx, stmt, 1, false); err != nil { + te.txPool.RollbackAndRelease(ctx, conn) + return + } + } + // We should not use the external Prepare because + // we don't want to write again to the redo log. + err = te.preparedPool.Put(conn, preparedTx.Dtid) + return +} + // checkErrorAndMarkFailed check that the error is retryable or non-retryable error. // If it is a non-retryable error than it marks the dtid as failed in the prepared pool, // increments the InternalErrors counter, and also changes the state of the transaction in the redo log as failed. diff --git a/go/vt/vttablet/tabletserver/tx_engine_test.go b/go/vt/vttablet/tabletserver/tx_engine_test.go index 5dfb7235c0d..43916dab3c2 100644 --- a/go/vt/vttablet/tabletserver/tx_engine_test.go +++ b/go/vt/vttablet/tabletserver/tx_engine_test.go @@ -619,49 +619,49 @@ func TestCheckReceivedError(t *testing.T) { te.AcceptReadWrite() tcases := []struct { - receivedErr error - nonRetryable bool - expQuery string + receivedErr error + retryable bool + expQuery string }{{ - receivedErr: vterrors.New(vtrpcpb.Code_DEADLINE_EXCEEDED, "deadline exceeded"), - nonRetryable: false, - expQuery: `update _vt.redo_state set state = 1, message = 'deadline exceeded' where dtid = 'aa'`, + receivedErr: vterrors.New(vtrpcpb.Code_DEADLINE_EXCEEDED, "deadline exceeded"), + retryable: true, + expQuery: `update _vt.redo_state set state = 1, message = 'deadline exceeded' where dtid = 'aa'`, }, { - receivedErr: vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "invalid argument"), - nonRetryable: true, - expQuery: `update _vt.redo_state set state = 0, message = 'invalid argument' where dtid = 'aa'`, + receivedErr: vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "invalid argument"), + retryable: false, + expQuery: `update _vt.redo_state set state = 0, message = 'invalid argument' where dtid = 'aa'`, }, { - receivedErr: sqlerror.NewSQLError(sqlerror.ERLockDeadlock, sqlerror.SSLockDeadlock, "Deadlock found when trying to get lock; try restarting transaction"), - nonRetryable: true, - expQuery: `update _vt.redo_state set state = 0, message = 'Deadlock found when trying to get lock; try restarting transaction (errno 1213) (sqlstate 40001)' where dtid = 'aa'`, + receivedErr: sqlerror.NewSQLError(sqlerror.ERLockDeadlock, sqlerror.SSLockDeadlock, "Deadlock found when trying to get lock; try restarting transaction"), + retryable: false, + expQuery: `update _vt.redo_state set state = 0, message = 'Deadlock found when trying to get lock; try restarting transaction (errno 1213) (sqlstate 40001)' where dtid = 'aa'`, }, { - receivedErr: context.DeadlineExceeded, - nonRetryable: false, - expQuery: `update _vt.redo_state set state = 1, message = 'context deadline exceeded' where dtid = 'aa'`, + receivedErr: context.DeadlineExceeded, + retryable: true, + expQuery: `update _vt.redo_state set state = 1, message = 'context deadline exceeded' where dtid = 'aa'`, }, { - receivedErr: context.Canceled, - nonRetryable: false, - expQuery: `update _vt.redo_state set state = 1, message = 'context canceled' where dtid = 'aa'`, + receivedErr: context.Canceled, + retryable: true, + expQuery: `update _vt.redo_state set state = 1, message = 'context canceled' where dtid = 'aa'`, }, { - receivedErr: sqlerror.NewSQLError(sqlerror.CRServerLost, sqlerror.SSUnknownSQLState, "Lost connection to MySQL server during query"), - nonRetryable: false, - expQuery: `update _vt.redo_state set state = 1, message = 'Lost connection to MySQL server during query (errno 2013) (sqlstate HY000)' where dtid = 'aa'`, + receivedErr: sqlerror.NewSQLError(sqlerror.CRServerLost, sqlerror.SSUnknownSQLState, "Lost connection to MySQL server during query"), + retryable: true, + expQuery: `update _vt.redo_state set state = 1, message = 'Lost connection to MySQL server during query (errno 2013) (sqlstate HY000)' where dtid = 'aa'`, }, { - receivedErr: sqlerror.NewSQLError(sqlerror.CRMalformedPacket, sqlerror.SSUnknownSQLState, "Malformed packet"), - nonRetryable: true, - expQuery: `update _vt.redo_state set state = 0, message = 'Malformed packet (errno 2027) (sqlstate HY000)' where dtid = 'aa'`, + receivedErr: sqlerror.NewSQLError(sqlerror.CRMalformedPacket, sqlerror.SSUnknownSQLState, "Malformed packet"), + retryable: false, + expQuery: `update _vt.redo_state set state = 0, message = 'Malformed packet (errno 2027) (sqlstate HY000)' where dtid = 'aa'`, }, { - receivedErr: sqlerror.NewSQLError(sqlerror.CRServerGone, sqlerror.SSUnknownSQLState, "Server has gone away"), - nonRetryable: false, - expQuery: `update _vt.redo_state set state = 1, message = 'Server has gone away (errno 2006) (sqlstate HY000)' where dtid = 'aa'`, + receivedErr: sqlerror.NewSQLError(sqlerror.CRServerGone, sqlerror.SSUnknownSQLState, "Server has gone away"), + retryable: true, + expQuery: `update _vt.redo_state set state = 1, message = 'Server has gone away (errno 2006) (sqlstate HY000)' where dtid = 'aa'`, }, { - receivedErr: vterrors.New(vtrpcpb.Code_ABORTED, "Row count exceeded"), - nonRetryable: true, - expQuery: `update _vt.redo_state set state = 0, message = 'Row count exceeded' where dtid = 'aa'`, + receivedErr: vterrors.New(vtrpcpb.Code_ABORTED, "Row count exceeded"), + retryable: false, + expQuery: `update _vt.redo_state set state = 0, message = 'Row count exceeded' where dtid = 'aa'`, }, { - receivedErr: errors.New("(errno 2013) (sqlstate HY000) lost connection"), - nonRetryable: false, - expQuery: `update _vt.redo_state set state = 1, message = '(errno 2013) (sqlstate HY000) lost connection' where dtid = 'aa'`, + receivedErr: errors.New("(errno 2013) (sqlstate HY000) lost connection"), + retryable: true, + expQuery: `update _vt.redo_state set state = 1, message = '(errno 2013) (sqlstate HY000) lost connection' where dtid = 'aa'`, }} for _, tc := range tcases { @@ -670,8 +670,8 @@ func TestCheckReceivedError(t *testing.T) { db.AddQuery(tc.expQuery, &sqltypes.Result{}) } nonRetryable := te.checkErrorAndMarkFailed(context.Background(), "aa", tc.receivedErr, "") - require.Equal(t, tc.nonRetryable, nonRetryable) - if tc.nonRetryable { + require.NotEqual(t, tc.retryable, nonRetryable) + if !tc.retryable { require.Equal(t, errPrepFailed, te.preparedPool.reserved["aa"]) } delete(te.preparedPool.reserved, "aa") From c2a2b80b87a806081bebdee1ee6afec1b528e952 Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Mon, 23 Sep 2024 23:24:05 +0530 Subject: [PATCH 09/10] test: added e2e test for commit prepared fail Signed-off-by: Harshit Gangal --- go/vt/vttablet/endtoend/main_test.go | 2 +- go/vt/vttablet/endtoend/transaction_test.go | 41 +++++++++++++++++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/go/vt/vttablet/endtoend/main_test.go b/go/vt/vttablet/endtoend/main_test.go index 1284f790b93..eedd893f3eb 100644 --- a/go/vt/vttablet/endtoend/main_test.go +++ b/go/vt/vttablet/endtoend/main_test.go @@ -343,7 +343,7 @@ var tableACLConfig = `{ }, { "name": "vitess_twopc", - "table_names_or_prefixes": ["dt_state"], + "table_names_or_prefixes": ["dt_state", "redo_state"], "readers": ["dev"], "writers": ["dev"], "admins": ["dev"] diff --git a/go/vt/vttablet/endtoend/transaction_test.go b/go/vt/vttablet/endtoend/transaction_test.go index 58a1cacbe10..21e67f41012 100644 --- a/go/vt/vttablet/endtoend/transaction_test.go +++ b/go/vt/vttablet/endtoend/transaction_test.go @@ -873,3 +873,44 @@ func TestUnresolvedTransactionsOrdering(t *testing.T) { assert.Equal(t, want[i].Participants, transaction.Participants) } } + +// TestCommitPreparedFail tests the case where the commit_prepared fails when the query is killed. +// The transaction remains in the prepare state. +func TestCommitPreparedFail(t *testing.T) { + client := framework.NewClient() + defer client.RollbackPrepared("aa", 0) + + _, err := client.BeginExecute(`insert into vitess_test (intval, floatval, charval, binval) values(4, null, null, null)`, nil, nil) + require.NoError(t, err) + connRes, err := client.Execute(`select connection_id()`, nil) + require.NoError(t, err) + err = client.Prepare("aa") + require.NoError(t, err) + + client2 := framework.NewClient() + _, err = client2.BeginExecute(`select * from _vt.redo_state for update`, nil, nil) + require.NoError(t, err) + + ch := make(chan any) + go func() { + err = client.CommitPrepared("aa") + require.ErrorContains(t, err, "commit_prepared") + ch <- nil + }() + time.Sleep(100 * time.Millisecond) + + dbaConnector := framework.Server.Config().DB.DbaWithDB() + conn, err := dbaConnector.Connect(context.Background()) + require.NoError(t, err) + defer conn.Close() + + _, err = conn.ExecuteFetch(fmt.Sprintf("kill query %s", connRes.Rows[0][0].ToString()), 1, false) + require.NoError(t, err) + + client2.Release() + <-ch + + qr, err := client.Execute("select dtid, state, message from _vt.redo_state where dtid = 'aa'", nil) + require.NoError(t, err) + require.Equal(t, `[[VARBINARY("aa") INT64(1) TEXT("Query execution was interrupted (errno 1317) (sqlstate 70100) during query: delete from _vt.redo_state where dtid = 'aa'")]]`, fmt.Sprintf("%v", qr.Rows)) +} From 0dbcc31fa06e66282c170a1c1c0c38f7c0fcdbaf Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Tue, 24 Sep 2024 17:02:37 +0530 Subject: [PATCH 10/10] added a non retryable e2e test Signed-off-by: Harshit Gangal --- go/vt/vttablet/endtoend/transaction_test.go | 41 ------- go/vt/vttablet/endtoend/twopc/main_test.go | 100 ++++++++++++++++ go/vt/vttablet/endtoend/twopc/prepare_test.go | 109 ++++++++++++++++++ 3 files changed, 209 insertions(+), 41 deletions(-) create mode 100644 go/vt/vttablet/endtoend/twopc/main_test.go create mode 100644 go/vt/vttablet/endtoend/twopc/prepare_test.go diff --git a/go/vt/vttablet/endtoend/transaction_test.go b/go/vt/vttablet/endtoend/transaction_test.go index 21e67f41012..58a1cacbe10 100644 --- a/go/vt/vttablet/endtoend/transaction_test.go +++ b/go/vt/vttablet/endtoend/transaction_test.go @@ -873,44 +873,3 @@ func TestUnresolvedTransactionsOrdering(t *testing.T) { assert.Equal(t, want[i].Participants, transaction.Participants) } } - -// TestCommitPreparedFail tests the case where the commit_prepared fails when the query is killed. -// The transaction remains in the prepare state. -func TestCommitPreparedFail(t *testing.T) { - client := framework.NewClient() - defer client.RollbackPrepared("aa", 0) - - _, err := client.BeginExecute(`insert into vitess_test (intval, floatval, charval, binval) values(4, null, null, null)`, nil, nil) - require.NoError(t, err) - connRes, err := client.Execute(`select connection_id()`, nil) - require.NoError(t, err) - err = client.Prepare("aa") - require.NoError(t, err) - - client2 := framework.NewClient() - _, err = client2.BeginExecute(`select * from _vt.redo_state for update`, nil, nil) - require.NoError(t, err) - - ch := make(chan any) - go func() { - err = client.CommitPrepared("aa") - require.ErrorContains(t, err, "commit_prepared") - ch <- nil - }() - time.Sleep(100 * time.Millisecond) - - dbaConnector := framework.Server.Config().DB.DbaWithDB() - conn, err := dbaConnector.Connect(context.Background()) - require.NoError(t, err) - defer conn.Close() - - _, err = conn.ExecuteFetch(fmt.Sprintf("kill query %s", connRes.Rows[0][0].ToString()), 1, false) - require.NoError(t, err) - - client2.Release() - <-ch - - qr, err := client.Execute("select dtid, state, message from _vt.redo_state where dtid = 'aa'", nil) - require.NoError(t, err) - require.Equal(t, `[[VARBINARY("aa") INT64(1) TEXT("Query execution was interrupted (errno 1317) (sqlstate 70100) during query: delete from _vt.redo_state where dtid = 'aa'")]]`, fmt.Sprintf("%v", qr.Rows)) -} diff --git a/go/vt/vttablet/endtoend/twopc/main_test.go b/go/vt/vttablet/endtoend/twopc/main_test.go new file mode 100644 index 00000000000..090751503d4 --- /dev/null +++ b/go/vt/vttablet/endtoend/twopc/main_test.go @@ -0,0 +1,100 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endtoend + +import ( + "context" + "flag" + "fmt" + "os" + "testing" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/vt/vttablet/endtoend/framework" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttest" + + vttestpb "vitess.io/vitess/go/vt/proto/vttest" +) + +var ( + connParams mysql.ConnParams + connAppDebugParams mysql.ConnParams + cluster vttest.LocalCluster +) + +func TestMain(m *testing.M) { + flag.Parse() // Do not remove this comment, import into google3 depends on it + tabletenv.Init() + + exitCode := func() int { + // Launch MySQL. + // We need a Keyspace in the topology, so the DbName is set. + // We need a Shard too, so the database 'vttest' is created. + cfg := vttest.Config{ + Topology: &vttestpb.VTTestTopology{ + Keyspaces: []*vttestpb.Keyspace{ + { + Name: "vttest", + Shards: []*vttestpb.Shard{ + { + Name: "0", + DbNameOverride: "vttest", + }, + }, + }, + }, + }, + OnlyMySQL: true, + Charset: "utf8mb4_general_ci", + } + if err := cfg.InitSchemas("vttest", testSchema, nil); err != nil { + fmt.Fprintf(os.Stderr, "InitSchemas failed: %v\n", err) + return 1 + } + defer os.RemoveAll(cfg.SchemaDir) + cluster = vttest.LocalCluster{ + Config: cfg, + } + if err := cluster.Setup(); err != nil { + fmt.Fprintf(os.Stderr, "could not launch mysql: %v\n", err) + return 1 + } + + defer cluster.TearDown() + + connParams = cluster.MySQLConnParams() + connAppDebugParams = cluster.MySQLAppDebugConnParams() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + config := tabletenv.NewDefaultConfig() + config.TwoPCEnable = true + config.TwoPCAbandonAge = 1 + err := framework.StartCustomServer(ctx, connParams, connAppDebugParams, cluster.DbName(), config) + if err != nil { + fmt.Fprintf(os.Stderr, "%v", err) + return 1 + } + defer framework.StopServer() + + return m.Run() + }() + os.Exit(exitCode) +} + +var testSchema = `create table vitess_test(intval int default 0 primary key);` diff --git a/go/vt/vttablet/endtoend/twopc/prepare_test.go b/go/vt/vttablet/endtoend/twopc/prepare_test.go new file mode 100644 index 00000000000..ae07356d6dc --- /dev/null +++ b/go/vt/vttablet/endtoend/twopc/prepare_test.go @@ -0,0 +1,109 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endtoend + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/vttablet/endtoend/framework" +) + +// TestCommitPreparedFailNonRetryable tests the case where the commit_prepared fails trying to acquire update lock. +// The transaction updates to failed state. +func TestCommitPreparedFailNonRetryable(t *testing.T) { + dbaConnector := framework.Server.Config().DB.DbaWithDB() + conn, err := dbaConnector.Connect(context.Background()) + require.NoError(t, err) + defer conn.Close() + + _, err = conn.ExecuteFetch("set global innodb_lock_wait_timeout = 1", 1, false) + require.NoError(t, err) + defer conn.ExecuteFetch("set global innodb_lock_wait_timeout = default", 1, false) + + client := framework.NewClient() + defer client.RollbackPrepared("bb", client.TransactionID()) + + _, err = client.BeginExecute(`insert into vitess_test (intval) values(50)`, nil, nil) + require.NoError(t, err) + err = client.Prepare("bb") + require.NoError(t, err) + + client2 := framework.NewClient() + _, err = client2.BeginExecute(`select * from _vt.redo_state where dtid = 'bb' for update`, nil, nil) + require.NoError(t, err) + + ch := make(chan any) + go func() { + err := client.CommitPrepared("bb") + ch <- nil + require.ErrorContains(t, err, "commit_prepared") + }() + time.Sleep(1500 * time.Millisecond) + + client2.Release() + <-ch + + qr, err := client2.Execute("select dtid, state, message from _vt.redo_state where dtid = 'bb'", nil) + require.NoError(t, err) + require.Equal(t, `[[VARBINARY("bb") INT64(0) TEXT("Lock wait timeout exceeded; try restarting transaction (errno 1205) (sqlstate HY000) during query: delete from _vt.redo_state where dtid = 'bb'")]]`, fmt.Sprintf("%v", qr.Rows)) +} + +// TestCommitPreparedFailRetryable tests the case where the commit_prepared fails when the query is killed. +// The transaction remains in the prepare state. +func TestCommitPreparedFailRetryable(t *testing.T) { + client := framework.NewClient() + defer client.RollbackPrepared("aa", client.TransactionID()) + + _, err := client.BeginExecute(`insert into vitess_test (intval) values(40)`, nil, nil) + require.NoError(t, err) + connRes, err := client.Execute(`select connection_id()`, nil) + require.NoError(t, err) + err = client.Prepare("aa") + require.NoError(t, err) + + client2 := framework.NewClient() + _, err = client2.BeginExecute(`select * from _vt.redo_state where dtid = 'aa' for update`, nil, nil) + require.NoError(t, err) + + ch := make(chan any) + go func() { + err := client.CommitPrepared("aa") + ch <- nil + require.ErrorContains(t, err, "commit_prepared") + }() + time.Sleep(100 * time.Millisecond) + + dbaConnector := framework.Server.Config().DB.DbaWithDB() + conn, err := dbaConnector.Connect(context.Background()) + require.NoError(t, err) + defer conn.Close() + + _, err = conn.ExecuteFetch(fmt.Sprintf("kill query %s", connRes.Rows[0][0].ToString()), 1, false) + require.NoError(t, err) + + client2.Release() + <-ch + + qr, err := client2.Execute("select dtid, state, message from _vt.redo_state where dtid = 'aa'", nil) + require.NoError(t, err) + require.Equal(t, `[[VARBINARY("aa") INT64(1) TEXT("Query execution was interrupted (errno 1317) (sqlstate 70100) during query: delete from _vt.redo_state where dtid = 'aa'")]]`, fmt.Sprintf("%v", qr.Rows)) +}