diff --git a/go/vt/vttablet/tabletserver/dt_executor.go b/go/vt/vttablet/tabletserver/dt_executor.go index 9a1c50bcd65..fa79bd44672 100644 --- a/go/vt/vttablet/tabletserver/dt_executor.go +++ b/go/vt/vttablet/tabletserver/dt_executor.go @@ -158,7 +158,7 @@ func (dte *DTExecutor) CommitPrepared(dtid string) (err error) { defer func() { if err != nil { log.Warningf("failed to commit the prepared transaction '%s' with error: %v", dtid, err) - dte.te.checkErrorAndMarkFailed(ctx, dtid, err) + dte.te.checkErrorAndMarkFailed(ctx, dtid, err, "TwopcCommit") } dte.te.txPool.RollbackAndRelease(ctx, conn) }() diff --git a/go/vt/vttablet/tabletserver/tx_engine.go b/go/vt/vttablet/tabletserver/tx_engine.go index f228f09eb73..60f02e50d75 100644 --- a/go/vt/vttablet/tabletserver/tx_engine.go +++ b/go/vt/vttablet/tabletserver/tx_engine.go @@ -426,6 +426,15 @@ func (te *TxEngine) prepareFromRedo() error { allErrs []error ) + checkErr := func(dtid string, err error) { + if err != nil { + allErrs = append(allErrs, vterrors.Wrapf(err, "dtid - %v", dtid)) + if te.checkErrorAndMarkFailed(ctx, dtid, err, "TwopcPrepareRedo") { + failedCounter++ + } + } + } + outer: for _, preparedTx := range prepared { var conn *StatefulConnection @@ -436,12 +445,7 @@ outer: } // check last error to record failure. - if lastErr != nil { - allErrs = append(allErrs, vterrors.Wrapf(lastErr, "dtid - %v", lastDtid)) - if te.checkErrorAndMarkFailed(ctx, lastDtid, lastErr) { - failedCounter++ - } - } + checkErr(lastDtid, lastErr) lastDtid = preparedTx.Dtid @@ -466,12 +470,7 @@ outer: } // check last error to record failure. - if lastErr != nil { - allErrs = append(allErrs, vterrors.Wrapf(lastErr, "dtid - %v", lastDtid)) - if te.checkErrorAndMarkFailed(ctx, lastDtid, lastErr) { - failedCounter++ - } - } + checkErr(lastDtid, lastErr) for _, preparedTx := range failed { txID, _ := dtids.TransactionID(preparedTx.Dtid) @@ -487,16 +486,21 @@ outer: // checkErrorAndMarkFailed check that the error is retryable or non-retryable error. // If it is a non-retryable error than it marks the dtid as failed in the prepared pool, -// increments the InternalErrors counter, and also changes the state of the transaction in the redo log as failed. -func (te *TxEngine) checkErrorAndMarkFailed(ctx context.Context, dtid string, receivedErr error) (fail bool) { +// increments the InternalErrors counter, and also changes the state of the transaction in the redo log as failed. +func (te *TxEngine) checkErrorAndMarkFailed(ctx context.Context, dtid string, receivedErr error, metricName string) (fail bool) { + state := RedoStateFailed if isRetryableError(receivedErr) { log.Infof("retryable error for dtid: %s", dtid) - return + state = RedoStatePrepared + } else { + fail = true + te.env.Stats().InternalErrors.Add(metricName, 1) + te.preparedPool.SetFailed(dtid) } - fail = true - te.env.Stats().InternalErrors.Add("TwopcCommit", 1) - te.preparedPool.SetFailed(dtid) + // Update the state of the transaction in the redo log. + // Retryable Error: Update the message with error message. + // Non-retryable Error: Along with message, update the state as RedoStateFailed. conn, _, _, err := te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil, nil) if err != nil { log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err) @@ -504,7 +508,7 @@ func (te *TxEngine) checkErrorAndMarkFailed(ctx context.Context, dtid string, re } defer te.txPool.RollbackAndRelease(ctx, conn) - if err = te.twoPC.UpdateRedo(ctx, conn, dtid, RedoStateFailed, receivedErr.Error()); err != nil { + if err = te.twoPC.UpdateRedo(ctx, conn, dtid, state, receivedErr.Error()); err != nil { log.Errorf("markFailed: UpdateRedo failed for dtid %s: %v", dtid, err) return } diff --git a/go/vt/vttablet/tabletserver/tx_engine_test.go b/go/vt/vttablet/tabletserver/tx_engine_test.go index 3c666701539..5dfb7235c0d 100644 --- a/go/vt/vttablet/tabletserver/tx_engine_test.go +++ b/go/vt/vttablet/tabletserver/tx_engine_test.go @@ -610,7 +610,6 @@ func TestTxEngineFailReserve(t *testing.T) { func TestCheckReceivedError(t *testing.T) { db := setUpQueryExecutorTest(t) defer db.Close() - // db.AddQueryPattern(".*", &sqltypes.Result{}) cfg := tabletenv.NewDefaultConfig() cfg.DB = newDBConfigs(db) env := tabletenv.NewEnv(vtenv.NewTestEnv(), cfg, "TabletServerTest") @@ -626,6 +625,7 @@ func TestCheckReceivedError(t *testing.T) { }{{ receivedErr: vterrors.New(vtrpcpb.Code_DEADLINE_EXCEEDED, "deadline exceeded"), nonRetryable: false, + expQuery: `update _vt.redo_state set state = 1, message = 'deadline exceeded' where dtid = 'aa'`, }, { receivedErr: vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "invalid argument"), nonRetryable: true, @@ -637,12 +637,15 @@ func TestCheckReceivedError(t *testing.T) { }, { receivedErr: context.DeadlineExceeded, nonRetryable: false, + expQuery: `update _vt.redo_state set state = 1, message = 'context deadline exceeded' where dtid = 'aa'`, }, { receivedErr: context.Canceled, nonRetryable: false, + expQuery: `update _vt.redo_state set state = 1, message = 'context canceled' where dtid = 'aa'`, }, { receivedErr: sqlerror.NewSQLError(sqlerror.CRServerLost, sqlerror.SSUnknownSQLState, "Lost connection to MySQL server during query"), nonRetryable: false, + expQuery: `update _vt.redo_state set state = 1, message = 'Lost connection to MySQL server during query (errno 2013) (sqlstate HY000)' where dtid = 'aa'`, }, { receivedErr: sqlerror.NewSQLError(sqlerror.CRMalformedPacket, sqlerror.SSUnknownSQLState, "Malformed packet"), nonRetryable: true, @@ -650,6 +653,7 @@ func TestCheckReceivedError(t *testing.T) { }, { receivedErr: sqlerror.NewSQLError(sqlerror.CRServerGone, sqlerror.SSUnknownSQLState, "Server has gone away"), nonRetryable: false, + expQuery: `update _vt.redo_state set state = 1, message = 'Server has gone away (errno 2006) (sqlstate HY000)' where dtid = 'aa'`, }, { receivedErr: vterrors.New(vtrpcpb.Code_ABORTED, "Row count exceeded"), nonRetryable: true, @@ -657,6 +661,7 @@ func TestCheckReceivedError(t *testing.T) { }, { receivedErr: errors.New("(errno 2013) (sqlstate HY000) lost connection"), nonRetryable: false, + expQuery: `update _vt.redo_state set state = 1, message = '(errno 2013) (sqlstate HY000) lost connection' where dtid = 'aa'`, }} for _, tc := range tcases { @@ -664,7 +669,7 @@ func TestCheckReceivedError(t *testing.T) { if tc.expQuery != "" { db.AddQuery(tc.expQuery, &sqltypes.Result{}) } - nonRetryable := te.checkErrorAndMarkFailed(context.Background(), "aa", tc.receivedErr) + nonRetryable := te.checkErrorAndMarkFailed(context.Background(), "aa", tc.receivedErr, "") require.Equal(t, tc.nonRetryable, nonRetryable) if tc.nonRetryable { require.Equal(t, errPrepFailed, te.preparedPool.reserved["aa"])