Skip to content

Commit

Permalink
Txmv2 stuck tx detection (#15436)
Browse files Browse the repository at this point in the history
* Stuck tx detector alpha

* Update stuck tx detection

* Add stuck_tx_detection and dual broadcast client

* Add support for TXMv2

* Fix orchestrator's monitoring call

* Fix AttemptBuilder

* Enable DualBroadcast client

* Switch DualBroadcast params to pointers

* Add context to client

* Fix lint

* Fix DualBroadcast client

* More lint fixes

* Fix lint
  • Loading branch information
dimriou authored Nov 27, 2024
1 parent e331d81 commit cecb0e1
Show file tree
Hide file tree
Showing 22 changed files with 801 additions and 65 deletions.
4 changes: 4 additions & 0 deletions core/chains/evm/config/chain_scoped.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ func (e *EVMConfig) BalanceMonitor() BalanceMonitor {
return &balanceMonitorConfig{c: e.C.BalanceMonitor}
}

func (e *EVMConfig) TxmV2() TxmV2 {
return &txmv2Config{c: e.C.TxmV2}
}

func (e *EVMConfig) Transactions() Transactions {
return &transactionsConfig{c: e.C.Transactions}
}
Expand Down
25 changes: 25 additions & 0 deletions core/chains/evm/config/chain_scoped_txmv2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package config

import (
"net/url"
"time"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/config/toml"
)

type txmv2Config struct {
c toml.TxmV2
}

func (t *txmv2Config) Enabled() bool {
return *t.c.Enabled
}

func (t *txmv2Config) BlockTime() *time.Duration {
d := t.c.BlockTime.Duration()
return &d
}

func (t *txmv2Config) CustomURL() *url.URL {
return t.c.CustomURL.URL()
}
6 changes: 5 additions & 1 deletion core/chains/evm/config/chaintype/chaintype.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
ChainZkEvm ChainType = "zkevm"
ChainZkSync ChainType = "zksync"
ChainZircuit ChainType = "zircuit"
ChainDualBroadcast ChainType = "dualBroadcast"
)

// IsL2 returns true if this chain is a Layer 2 chain. Notably:
Expand All @@ -39,7 +40,7 @@ func (c ChainType) IsL2() bool {

func (c ChainType) IsValid() bool {
switch c {
case "", ChainArbitrum, ChainAstar, ChainCelo, ChainGnosis, ChainHedera, ChainKroma, ChainMantle, ChainMetis, ChainOptimismBedrock, ChainScroll, ChainWeMix, ChainXLayer, ChainZkEvm, ChainZkSync, ChainZircuit:
case "", ChainArbitrum, ChainAstar, ChainCelo, ChainGnosis, ChainHedera, ChainKroma, ChainMantle, ChainMetis, ChainOptimismBedrock, ChainScroll, ChainWeMix, ChainXLayer, ChainZkEvm, ChainZkSync, ChainZircuit, ChainDualBroadcast:
return true
}
return false
Expand Down Expand Up @@ -77,6 +78,8 @@ func FromSlug(slug string) ChainType {
return ChainZkSync
case "zircuit":
return ChainZircuit
case "dualBroadcast":
return ChainDualBroadcast
default:
return ChainType(slug)
}
Expand Down Expand Up @@ -144,4 +147,5 @@ var ErrInvalid = fmt.Errorf("must be one of %s or omitted", strings.Join([]strin
string(ChainZkEvm),
string(ChainZkSync),
string(ChainZircuit),
string(ChainDualBroadcast),
}, ", "))
7 changes: 7 additions & 0 deletions core/chains/evm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
type EVM interface {
HeadTracker() HeadTracker
BalanceMonitor() BalanceMonitor
TxmV2() TxmV2
Transactions() Transactions
GasEstimator() GasEstimator
OCR() OCR
Expand Down Expand Up @@ -102,6 +103,12 @@ type ClientErrors interface {
TooManyResults() string
}

type TxmV2 interface {
Enabled() bool
BlockTime() *time.Duration
CustomURL() *url.URL
}

type Transactions interface {
ForwardersEnabled() bool
ReaperInterval() time.Duration
Expand Down
27 changes: 25 additions & 2 deletions core/chains/evm/config/toml/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,8 +300,10 @@ func (c *EVMConfig) ValidateConfig() (err error) {
is := c.ChainType.ChainType()
if is != must {
if must == "" {
err = multierr.Append(err, commonconfig.ErrInvalid{Name: "ChainType", Value: c.ChainType.ChainType(),
Msg: "must not be set with this chain id"})
if c.ChainType.ChainType() != chaintype.ChainDualBroadcast {
err = multierr.Append(err, commonconfig.ErrInvalid{Name: "ChainType", Value: c.ChainType.ChainType(),
Msg: "must not be set with this chain id"})
}
} else {
err = multierr.Append(err, commonconfig.ErrInvalid{Name: "ChainType", Value: c.ChainType.ChainType(),
Msg: fmt.Sprintf("only %q can be used with this chain id", must)})
Expand Down Expand Up @@ -387,6 +389,7 @@ type Chain struct {
FinalizedBlockOffset *uint32
NoNewFinalizedHeadsThreshold *commonconfig.Duration

TxmV2 TxmV2 `toml:",omitempty"`
Transactions Transactions `toml:",omitempty"`
BalanceMonitor BalanceMonitor `toml:",omitempty"`
GasEstimator GasEstimator `toml:",omitempty"`
Expand Down Expand Up @@ -471,6 +474,26 @@ func (c *Chain) ValidateConfig() (err error) {
return
}

type TxmV2 struct {
Enabled *bool `toml:",omitempty"`
BlockTime *commonconfig.Duration `toml:",omitempty"`
CustomURL *commonconfig.URL `toml:",omitempty"`
}

func (t *TxmV2) setFrom(f *TxmV2) {
if v := f.Enabled; v != nil {
t.Enabled = f.Enabled
}

if v := f.BlockTime; v != nil {
t.BlockTime = f.BlockTime
}

if v := f.CustomURL; v != nil {
t.CustomURL = f.CustomURL
}
}

type Transactions struct {
ForwardersEnabled *bool
MaxInFlight *uint32
Expand Down
1 change: 1 addition & 0 deletions core/chains/evm/config/toml/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ func (c *Chain) SetFrom(f *Chain) {
c.NoNewFinalizedHeadsThreshold = v
}

c.TxmV2.setFrom(&f.TxmV2)
c.Transactions.setFrom(&f.Transactions)
c.BalanceMonitor.setFrom(&f.BalanceMonitor)
c.GasEstimator.setFrom(&f.GasEstimator)
Expand Down
3 changes: 3 additions & 0 deletions core/chains/evm/config/toml/defaults/fallback.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ FinalizedBlockOffset = 0
NoNewFinalizedHeadsThreshold = '0'
LogBroadcasterEnabled = true

[TxmV2]
Enabled = false

[Transactions]
ForwardersEnabled = false
MaxInFlight = 16
Expand Down
1 change: 1 addition & 0 deletions core/chains/evm/keystore/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,6 @@ type Eth interface {
CheckEnabled(ctx context.Context, address common.Address, chainID *big.Int) error
EnabledAddressesForChain(ctx context.Context, chainID *big.Int) (addresses []common.Address, err error)
SignTx(ctx context.Context, fromAddress common.Address, tx *types.Transaction, chainID *big.Int) (*types.Transaction, error)
SignMessage(ctx context.Context, address common.Address, message []byte) ([]byte, error)
SubscribeToKeyChanges(ctx context.Context) (ch chan struct{}, unsub func())
}
60 changes: 60 additions & 0 deletions core/chains/evm/keystore/mocks/eth.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 31 additions & 0 deletions core/chains/evm/txm/clientwrappers/chain_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package clientwrappers

import (
"context"
"math/big"

"github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txm/types"
)

type ChainClient struct {
c client.Client
}

func NewChainClient(client client.Client) *ChainClient {
return &ChainClient{c: client}
}

func (c *ChainClient) NonceAt(ctx context.Context, address common.Address, blockNumber *big.Int) (uint64, error) {
return c.c.NonceAt(ctx, address, blockNumber)
}

func (c *ChainClient) PendingNonceAt(ctx context.Context, address common.Address) (uint64, error) {
return c.c.PendingNonceAt(ctx, address)
}

func (c *ChainClient) SendTransaction(ctx context.Context, _ *types.Transaction, attempt *types.Attempt) error {
return c.c.SendTransaction(ctx, attempt.SignedTransaction)
}
130 changes: 130 additions & 0 deletions core/chains/evm/txm/clientwrappers/dual_broadcast_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package clientwrappers

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"math/big"
"net/http"
"net/url"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txm/types"
)

type DualBroadcastClientKeystore interface {
SignMessage(ctx context.Context, address common.Address, message []byte) ([]byte, error)
}

type DualBroadcastClient struct {
c client.Client
keystore DualBroadcastClientKeystore
customURL *url.URL
}

func NewDualBroadcastClient(c client.Client, keystore DualBroadcastClientKeystore, customURL *url.URL) *DualBroadcastClient {
return &DualBroadcastClient{
c: c,
keystore: keystore,
customURL: customURL,
}
}

func (d *DualBroadcastClient) NonceAt(ctx context.Context, address common.Address, blockNumber *big.Int) (uint64, error) {
return d.c.NonceAt(ctx, address, blockNumber)
}

func (d *DualBroadcastClient) PendingNonceAt(ctx context.Context, address common.Address) (uint64, error) {
body := []byte(fmt.Sprintf(`{"jsonrpc":"2.0","method":"eth_getTransactionCount","params":["%s","pending"]}`, address.String()))
response, err := d.signAndPostMessage(ctx, address, body, "")
if err != nil {
return 0, err
}

nonce, err := hexutil.DecodeUint64(response)
if err != nil {
return 0, fmt.Errorf("failed to decode response %v into uint64: %w", response, err)
}
return nonce, nil
}

func (d *DualBroadcastClient) SendTransaction(ctx context.Context, tx *types.Transaction, attempt *types.Attempt) error {
meta, err := tx.GetMeta()
if err != nil {
return err
}
//nolint:revive //linter nonsense
if meta != nil && meta.DualBroadcast != nil && *meta.DualBroadcast && !tx.IsPurgeable {
data, err := attempt.SignedTransaction.MarshalBinary()
if err != nil {
return err
}
params := ""
if meta.DualBroadcastParams != nil {
params = *meta.DualBroadcastParams
}
body := []byte(fmt.Sprintf(`{"jsonrpc":"2.0","method":"eth_sendRawTransaction","params":["%s"]}`, hexutil.Encode(data)))
if _, err = d.signAndPostMessage(ctx, tx.FromAddress, body, params); err != nil {
return err
}
return nil
} else {
return d.c.SendTransaction(ctx, attempt.SignedTransaction)
}
}

func (d *DualBroadcastClient) signAndPostMessage(ctx context.Context, address common.Address, body []byte, urlParams string) (result string, err error) {
bodyReader := bytes.NewReader(body)
postReq, err := http.NewRequestWithContext(ctx, http.MethodPost, d.customURL.String()+"?"+urlParams, bodyReader)
if err != nil {
return
}

hashedBody := crypto.Keccak256Hash(body).Hex()
signedMessage, err := d.keystore.SignMessage(ctx, address, []byte(hashedBody))
if err != nil {
return
}

postReq.Header.Add("X-Flashbots-signature", address.String()+":"+hexutil.Encode(signedMessage))
postReq.Header.Add("Content-Type", "application/json")

resp, err := http.DefaultClient.Do(postReq)
if err != nil {
return result, fmt.Errorf("request %v failed: %w", postReq, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return result, fmt.Errorf("request %v failed with status: %d", postReq, resp.StatusCode)
}

keyJSON, err := io.ReadAll(resp.Body)
if err != nil {
return
}
var response postResponse
err = json.Unmarshal(keyJSON, &response)
if err != nil {
return result, fmt.Errorf("failed to unmarshal response into struct: %w: %s", err, string(keyJSON))
}
if response.Error.Message != "" {
return result, errors.New(response.Error.Message)
}
return response.Result, nil
}

type postResponse struct {
Result string `json:"result,omitempty"`
Error postError
}

type postError struct {
Message string `json:"message,omitempty"`
}
Loading

0 comments on commit cecb0e1

Please sign in to comment.