From 17341c732cfaed0301a5ec93535e40bc09f9e5fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Friedemann=20F=C3=BCrst?= <59653747+friedemannf@users.noreply.github.com> Date: Thu, 15 Aug 2024 16:48:40 +0200 Subject: [PATCH] Cherry-Pick: Handle terminally stuck transactions on send (#1298) Cherry-pick of https://github.com/smartcontractkit/chainlink/pull/14127 Required to successfully handle zk overflows on Polygon zkEVM and X Layer. Co-authored-by: amit-momin <108959691+amit-momin@users.noreply.github.com> Co-authored-by: Rens Rooimans --- .changeset/yellow-cougars-act.md | 5 +++ common/txmgr/broadcaster.go | 14 +++--- common/txmgr/confirmer.go | 17 +++++++- core/chains/evm/client/errors.go | 5 +++ core/chains/evm/client/errors_test.go | 22 +++++++++- core/chains/evm/txmgr/broadcaster_test.go | 19 +++++++++ core/chains/evm/txmgr/confirmer_test.go | 52 +++++++++++++++++++++++ core/chains/evm/txmgr/txmgr_test.go | 22 ++++++++++ 8 files changed, 146 insertions(+), 10 deletions(-) create mode 100644 .changeset/yellow-cougars-act.md diff --git a/.changeset/yellow-cougars-act.md b/.changeset/yellow-cougars-act.md new file mode 100644 index 0000000000..61ed62607a --- /dev/null +++ b/.changeset/yellow-cougars-act.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +Added client error classification for terminally stuck transactions in the TXM #internal diff --git a/common/txmgr/broadcaster.go b/common/txmgr/broadcaster.go index 2a9c1231d7..25b028447f 100644 --- a/common/txmgr/broadcaster.go +++ b/common/txmgr/broadcaster.go @@ -478,16 +478,16 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand lgr.Infow("Sending transaction", "txAttemptID", attempt.ID, "txHash", attempt.Hash, "meta", etx.Meta, "feeLimit", attempt.ChainSpecificFeeLimit, "callerProvidedFeeLimit", etx.FeeLimit, "attempt", attempt, "etx", etx) errType, err := eb.client.SendTransactionReturnCode(ctx, etx, attempt, lgr) - if errType != client.Fatal { - etx.InitialBroadcastAt = &initialBroadcastAt - etx.BroadcastAt = &initialBroadcastAt - } - - switch errType { - case client.Fatal: + if errType == client.Fatal || errType == client.TerminallyStuck { eb.SvcErrBuffer.Append(err) etx.Error = null.StringFrom(err.Error()) return eb.saveFatallyErroredTransaction(lgr, &etx), true + } + + etx.InitialBroadcastAt = &initialBroadcastAt + etx.BroadcastAt = &initialBroadcastAt + + switch errType { case client.TransactionAlreadyKnown: fallthrough case client.Successful: diff --git a/common/txmgr/confirmer.go b/common/txmgr/confirmer.go index a9e30ffff1..0806eeda26 100644 --- a/common/txmgr/confirmer.go +++ b/common/txmgr/confirmer.go @@ -471,7 +471,7 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Pro go func(tx txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { defer wg.Done() lggr := tx.GetLogger(ec.lggr) - // Create an purge attempt for tx + // Create a purge attempt for tx purgeAttempt, err := ec.TxAttemptBuilder.NewPurgeTxAttempt(ctx, tx, lggr) if err != nil { errMu.Lock() @@ -975,6 +975,21 @@ func (ec *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) han ec.SvcErrBuffer.Append(sendError) // This will loop continuously on every new head so it must be handled manually by the node operator! return ec.txStore.DeleteInProgressAttempt(ctx, attempt) + case client.TerminallyStuck: + // A transaction could broadcast successfully but then be considered terminally stuck on another attempt + // Even though the transaction can succeed under different circumstances, we want to purge this transaction as soon as we get this error + lggr.Warnw("terminally stuck transaction detected", "err", sendError.Error()) + ec.SvcErrBuffer.Append(sendError) + // Create a purge attempt for tx + purgeAttempt, err := ec.TxAttemptBuilder.NewPurgeTxAttempt(ctx, etx, lggr) + if err != nil { + return fmt.Errorf("NewPurgeTxAttempt failed: %w", err) + } + // Replace the in progress attempt with the purge attempt + if err := ec.txStore.SaveReplacementInProgressAttempt(ctx, attempt, &purgeAttempt); err != nil { + return fmt.Errorf("saveReplacementInProgressAttempt failed: %w", err) + } + return ec.handleInProgressAttempt(ctx, lggr, etx, purgeAttempt, blockHeight) case client.TransactionAlreadyKnown: // Sequence too low indicated that a transaction at this sequence was confirmed already. // Mark confirmed_missing_receipt and wait for the next cycle to try to get a receipt diff --git a/core/chains/evm/client/errors.go b/core/chains/evm/client/errors.go index 22fac5f728..83a8bce393 100644 --- a/core/chains/evm/client/errors.go +++ b/core/chains/evm/client/errors.go @@ -583,6 +583,11 @@ func ClassifySendError(err error, clientErrors config.ClientErrors, lggr logger. ) return commonclient.ExceedsMaxFee } + if sendError.IsTerminallyStuckConfigError(configErrors) { + lggr.Warnw("Transaction that would have been terminally stuck in the mempool detected on send. Marking as fatal error.", "err", sendError, "etx", tx) + // Attempt is thrown away in this case; we don't need it since it never got accepted by a node + return commonclient.TerminallyStuck + } lggr.Criticalw("Unknown error encountered when sending transaction", "err", err, "etx", tx) return commonclient.Unknown } diff --git a/core/chains/evm/client/errors_test.go b/core/chains/evm/client/errors_test.go index cca54c2a4a..3af6b70265 100644 --- a/core/chains/evm/client/errors_test.go +++ b/core/chains/evm/client/errors_test.go @@ -285,7 +285,7 @@ func Test_Eth_Errors(t *testing.T) { }) t.Run("Metis gas price errors", func(t *testing.T) { - err := evmclient.NewSendErrorS("primary websocket (wss://ws-mainnet.metis.io) call failed: gas price too low: 18000000000 wei, use at least tx.gasPrice = 19500000000 wei") + err = evmclient.NewSendErrorS("primary websocket (wss://ws-mainnet.metis.io) call failed: gas price too low: 18000000000 wei, use at least tx.gasPrice = 19500000000 wei") assert.True(t, err.L2FeeTooLow(clientErrors)) err = newSendErrorWrapped("primary websocket (wss://ws-mainnet.metis.io) call failed: gas price too low: 18000000000 wei, use at least tx.gasPrice = 19500000000 wei") assert.True(t, err.L2FeeTooLow(clientErrors)) @@ -297,7 +297,7 @@ func Test_Eth_Errors(t *testing.T) { }) t.Run("moonriver errors", func(t *testing.T) { - err := evmclient.NewSendErrorS("primary http (http://***REDACTED***:9933) call failed: submit transaction to pool failed: Pool(Stale)") + err = evmclient.NewSendErrorS("primary http (http://***REDACTED***:9933) call failed: submit transaction to pool failed: Pool(Stale)") assert.True(t, err.IsNonceTooLowError(clientErrors)) assert.False(t, err.IsTransactionAlreadyInMempool(clientErrors)) assert.False(t, err.Fatal(clientErrors)) @@ -306,6 +306,24 @@ func Test_Eth_Errors(t *testing.T) { assert.False(t, err.IsNonceTooLowError(clientErrors)) assert.False(t, err.Fatal(clientErrors)) }) + + t.Run("IsTerminallyStuck", func(t *testing.T) { + tests := []errorCase{ + {"failed to add tx to the pool: not enough step counters to continue the execution", true, "zkEVM"}, + {"failed to add tx to the pool: not enough step counters to continue the execution", true, "Xlayer"}, + {"failed to add tx to the pool: not enough keccak counters to continue the execution", true, "zkEVM"}, + {"failed to add tx to the pool: not enough keccak counters to continue the execution", true, "Xlayer"}, + } + + for _, test := range tests { + t.Run(test.network, func(t *testing.T) { + err = evmclient.NewSendErrorS(test.message) + assert.Equal(t, err.IsTerminallyStuckConfigError(clientErrors), test.expect) + err = newSendErrorWrapped(test.message) + assert.Equal(t, err.IsTerminallyStuckConfigError(clientErrors), test.expect) + }) + } + }) } func Test_Eth_Errors_Fatal(t *testing.T) { diff --git a/core/chains/evm/txmgr/broadcaster_test.go b/core/chains/evm/txmgr/broadcaster_test.go index 3559c329de..85fffe42a8 100644 --- a/core/chains/evm/txmgr/broadcaster_test.go +++ b/core/chains/evm/txmgr/broadcaster_test.go @@ -518,6 +518,25 @@ func TestEthBroadcaster_ProcessUnstartedEthTxs_Success(t *testing.T) { assert.True(t, ethTx.Error.Valid) assert.Equal(t, "transaction reverted during simulation: json-rpc error { Code = 42, Message = 'oh no, it reverted', Data = 'KqYi' }", ethTx.Error.String) }) + + t.Run("terminally stuck transaction is marked as fatal", func(t *testing.T) { + terminallyStuckError := "failed to add tx to the pool: not enough step counters to continue the execution" + etx := mustCreateUnstartedTx(t, txStore, fromAddress, toAddress, []byte{42, 42, 0}, gasLimit, big.Int(assets.NewEthValue(243)), testutils.FixtureChainID) + ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *gethTypes.Transaction) bool { + return tx.Nonce() == uint64(346) && tx.Value().Cmp(big.NewInt(243)) == 0 + }), fromAddress).Return(commonclient.Fatal, errors.New(terminallyStuckError)).Once() + + // Start processing unstarted transactions + retryable, err := eb.ProcessUnstartedTxs(tests.Context(t), fromAddress) + assert.NoError(t, err) + assert.False(t, retryable) + + dbTx, err := txStore.FindTxWithAttempts(ctx, etx.ID) + require.NoError(t, err) + assert.Equal(t, txmgrcommon.TxFatalError, dbTx.State) + assert.True(t, dbTx.Error.Valid) + assert.Equal(t, terminallyStuckError, dbTx.Error.String) + }) }) } diff --git a/core/chains/evm/txmgr/confirmer_test.go b/core/chains/evm/txmgr/confirmer_test.go index 6b107b222a..45d7ac9e12 100644 --- a/core/chains/evm/txmgr/confirmer_test.go +++ b/core/chains/evm/txmgr/confirmer_test.go @@ -2656,6 +2656,58 @@ func TestEthConfirmer_RebroadcastWhereNecessary_WhenOutOfEth(t *testing.T) { }) } +func TestEthConfirmer_RebroadcastWhereNecessary_TerminallyStuckError(t *testing.T) { + t.Parallel() + + db := pgtest.NewSqlxDB(t) + cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { + c.EVM[0].GasEstimator.PriceMax = assets.GWei(500) + }) + txStore := cltest.NewTestTxStore(t, db) + ctx := tests.Context(t) + + ethClient := testutils.NewEthClientMockWithDefaultChain(t) + ethKeyStore := cltest.NewKeyStore(t, db).Eth() + _, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore) + + evmcfg := evmtest.NewChainScopedConfig(t, cfg) + + // Use a mock keystore for this test + ec := newEthConfirmer(t, txStore, ethClient, cfg, evmcfg, ethKeyStore, nil) + currentHead := int64(30) + oldEnough := int64(19) + nonce := int64(0) + terminallyStuckError := "failed to add tx to the pool: not enough step counters to continue the execution" + + t.Run("terminally stuck transaction replaced with purge attempt", func(t *testing.T) { + originalBroadcastAt := time.Unix(1616509100, 0) + etx := cltest.MustInsertUnconfirmedEthTxWithBroadcastLegacyAttempt(t, txStore, nonce, fromAddress, originalBroadcastAt) + nonce++ + attempt1_1 := etx.TxAttempts[0] + var dbAttempt txmgr.DbEthTxAttempt + require.NoError(t, db.Get(&dbAttempt, `UPDATE evm.tx_attempts SET broadcast_before_block_num=$1 WHERE id=$2 RETURNING *`, oldEnough, attempt1_1.ID)) + + // Return terminally stuck error on first rebroadcast + ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool { + return tx.Nonce() == uint64(*etx.Sequence) + }), fromAddress).Return(commonclient.TerminallyStuck, errors.New(terminallyStuckError)).Once() + // Return successful for purge attempt + ethClient.On("SendTransactionReturnCode", mock.Anything, mock.MatchedBy(func(tx *types.Transaction) bool { + return tx.Nonce() == uint64(*etx.Sequence) + }), fromAddress).Return(commonclient.Successful, nil).Once() + + // Start processing transactions for rebroadcast + require.NoError(t, ec.RebroadcastWhereNecessary(tests.Context(t), currentHead)) + var err error + etx, err = txStore.FindTxWithAttempts(ctx, etx.ID) + require.NoError(t, err) + + require.Len(t, etx.TxAttempts, 2) + purgeAttempt := etx.TxAttempts[0] + require.True(t, purgeAttempt.IsPurgeAttempt) + }) +} + func TestEthConfirmer_EnsureConfirmedTransactionsInLongestChain(t *testing.T) { t.Parallel() diff --git a/core/chains/evm/txmgr/txmgr_test.go b/core/chains/evm/txmgr/txmgr_test.go index 40df5616c9..e92acd8230 100644 --- a/core/chains/evm/txmgr/txmgr_test.go +++ b/core/chains/evm/txmgr/txmgr_test.go @@ -747,6 +747,7 @@ func TestTxm_GetTransactionStatus(t *testing.T) { t.Run("returns fatal for fatal error state with terminally stuck error", func(t *testing.T) { idempotencyKey := uuid.New().String() _, fromAddress := cltest.MustInsertRandomKey(t, ethKeyStore) + // Test the internal terminally stuck error returns Fatal nonce := evmtypes.Nonce(0) broadcast := time.Now() tx := &txmgr.Tx{ @@ -765,6 +766,27 @@ func TestTxm_GetTransactionStatus(t *testing.T) { state, err := txm.GetTransactionStatus(ctx, idempotencyKey) require.Equal(t, commontypes.Fatal, state) require.Error(t, err, evmclient.TerminallyStuckMsg) + + // Test a terminally stuck client error returns Fatal + nonce = evmtypes.Nonce(1) + idempotencyKey = uuid.New().String() + terminallyStuckClientError := "failed to add tx to the pool: not enough step counters to continue the execution" + tx = &txmgr.Tx{ + Sequence: &nonce, + IdempotencyKey: &idempotencyKey, + FromAddress: fromAddress, + EncodedPayload: []byte{1, 2, 3}, + FeeLimit: feeLimit, + State: txmgrcommon.TxFatalError, + Error: null.NewString(terminallyStuckClientError, true), + BroadcastAt: &broadcast, + InitialBroadcastAt: &broadcast, + } + err = txStore.InsertTx(ctx, tx) + require.NoError(t, err) + state, err = txm.GetTransactionStatus(ctx, idempotencyKey) + require.Equal(t, commontypes.Fatal, state) + require.Error(t, err, evmclient.TerminallyStuckMsg) }) t.Run("returns failed for fatal error state with other error", func(t *testing.T) {