Skip to content

Commit

Permalink
L1 Publisher: resend stuck tx and manage nonce
Browse files Browse the repository at this point in the history
  • Loading branch information
BedrockSquirrel committed Sep 14, 2023
1 parent 1e6ba68 commit 8948dab
Show file tree
Hide file tree
Showing 15 changed files with 135 additions and 188 deletions.
9 changes: 4 additions & 5 deletions go/ethadapter/erc20contractlib/erc20_contract_lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type ERC20ContractLib interface {
DecodeTx(tx *types.Transaction) ethadapter.L1Transaction

// CreateDepositTx receives an common.L1Transaction and converts it to an eth transaction
CreateDepositTx(tx *ethadapter.L1DepositTx, nonce uint64) types.TxData
CreateDepositTx(tx *ethadapter.L1DepositTx) types.TxData
}

// erc20ContractLibImpl takes a mgmtContractAddr and processes multiple erc20ContractAddrs
Expand All @@ -44,16 +44,15 @@ func NewERC20ContractLib(mgmtContractAddr *gethcommon.Address, contractAddrs ...
}
}

func (c *erc20ContractLibImpl) CreateDepositTx(tx *ethadapter.L1DepositTx, nonce uint64) types.TxData {
func (c *erc20ContractLibImpl) CreateDepositTx(tx *ethadapter.L1DepositTx) types.TxData {
data, err := c.contractABI.Pack("transfer", &tx.To, tx.Amount)
if err != nil {
panic(err)
}

return &types.LegacyTx{
Nonce: nonce,
To: tx.TokenContract,
Data: data,
To: tx.TokenContract,
Data: data,
}
}

Expand Down
6 changes: 3 additions & 3 deletions go/ethadapter/geth_rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ func (e *gethRPCClient) FetchLastBatchSeqNo(address gethcommon.Address) (*big.In
return contract.LastBatchSeqNo(&bind.CallOpts{})
}

// EstimateGasAndGasPrice takes a txData type and overrides the Gas and Gas Price field with estimated values
func (e *gethRPCClient) EstimateGasAndGasPrice(txData types.TxData, from gethcommon.Address) (types.TxData, error) {
// PrepareTransactionToSend takes a txData type and overrides the From, Nonce, Gas and Gas Price field with current values
func (e *gethRPCClient) PrepareTransactionToSend(txData types.TxData, from gethcommon.Address, nonce uint64) (types.TxData, error) {
unEstimatedTx := types.NewTx(txData)
gasPrice, err := e.EthClient().SuggestGasPrice(context.Background())
if err != nil {
Expand All @@ -241,7 +241,7 @@ func (e *gethRPCClient) EstimateGasAndGasPrice(txData types.TxData, from gethcom
}

return &types.LegacyTx{
Nonce: unEstimatedTx.Nonce(),
Nonce: nonce,
GasPrice: gasPrice,
Gas: gasLimit,
To: unEstimatedTx.To(),
Expand Down
3 changes: 2 additions & 1 deletion go/ethadapter/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ type EthClient interface {

CallContract(msg ethereum.CallMsg) ([]byte, error) // Runs the provided call message on the latest block.

EstimateGasAndGasPrice(txData types.TxData, from gethcommon.Address) (types.TxData, error) // Estimates the gas and the gas price for a given tx payload
// PrepareTransactionToSend updates the tx with from address, current nonce and current estimates for the gas and the gas price
PrepareTransactionToSend(txData types.TxData, from gethcommon.Address, nonce uint64) (types.TxData, error)

FetchLastBatchSeqNo(address gethcommon.Address) (*big.Int, error)

Expand Down
36 changes: 16 additions & 20 deletions go/ethadapter/mgmtcontractlib/mgmt_contract_lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ const methodBytesLen = 4
// MgmtContractLib provides methods for creating ethereum transactions by providing an L1Transaction, creating call
// messages for call requests, and converting ethereum transactions into L1Transactions.
type MgmtContractLib interface {
CreateRollup(t *ethadapter.L1RollupTx, nonce uint64) types.TxData
CreateRequestSecret(tx *ethadapter.L1RequestSecretTx, nonce uint64) types.TxData
CreateRespondSecret(tx *ethadapter.L1RespondSecretTx, nonce uint64, verifyAttester bool) types.TxData
CreateInitializeSecret(tx *ethadapter.L1InitializeSecretTx, nonce uint64) types.TxData
CreateRollup(t *ethadapter.L1RollupTx) types.TxData
CreateRequestSecret(tx *ethadapter.L1RequestSecretTx) types.TxData
CreateRespondSecret(tx *ethadapter.L1RespondSecretTx, verifyAttester bool) types.TxData
CreateInitializeSecret(tx *ethadapter.L1InitializeSecretTx) types.TxData
GetHostAddresses() (ethereum.CallMsg, error)

// DecodeTx receives a *types.Transaction and converts it to an common.L1Transaction
Expand Down Expand Up @@ -105,7 +105,7 @@ func (c *contractLibImpl) DecodeTx(tx *types.Transaction) ethadapter.L1Transacti
return nil
}

func (c *contractLibImpl) CreateRollup(t *ethadapter.L1RollupTx, nonce uint64) types.TxData {
func (c *contractLibImpl) CreateRollup(t *ethadapter.L1RollupTx) types.TxData {
decodedRollup, err := common.DecodeRollup(t.Rollup)
if err != nil {
panic(err)
Expand Down Expand Up @@ -141,26 +141,24 @@ func (c *contractLibImpl) CreateRollup(t *ethadapter.L1RollupTx, nonce uint64) t
}

return &types.LegacyTx{
Nonce: nonce,
To: c.addr,
Data: data,
To: c.addr,
Data: data,
}
}

func (c *contractLibImpl) CreateRequestSecret(tx *ethadapter.L1RequestSecretTx, nonce uint64) types.TxData {
func (c *contractLibImpl) CreateRequestSecret(tx *ethadapter.L1RequestSecretTx) types.TxData {
data, err := c.contractABI.Pack(RequestSecretMethod, base64EncodeToString(tx.Attestation))
if err != nil {
panic(err)
}

return &types.LegacyTx{
Nonce: nonce,
To: c.addr,
Data: data,
To: c.addr,
Data: data,
}
}

func (c *contractLibImpl) CreateRespondSecret(tx *ethadapter.L1RespondSecretTx, nonce uint64, verifyAttester bool) types.TxData {
func (c *contractLibImpl) CreateRespondSecret(tx *ethadapter.L1RespondSecretTx, verifyAttester bool) types.TxData {
data, err := c.contractABI.Pack(
RespondSecretMethod,
tx.AttesterID,
Expand All @@ -174,13 +172,12 @@ func (c *contractLibImpl) CreateRespondSecret(tx *ethadapter.L1RespondSecretTx,
panic(err)
}
return &types.LegacyTx{
Nonce: nonce,
To: c.addr,
Data: data,
To: c.addr,
Data: data,
}
}

func (c *contractLibImpl) CreateInitializeSecret(tx *ethadapter.L1InitializeSecretTx, nonce uint64) types.TxData {
func (c *contractLibImpl) CreateInitializeSecret(tx *ethadapter.L1InitializeSecretTx) types.TxData {
data, err := c.contractABI.Pack(
InitializeSecretMethod,
tx.AggregatorID,
Expand All @@ -192,9 +189,8 @@ func (c *contractLibImpl) CreateInitializeSecret(tx *ethadapter.L1InitializeSecr
panic(err)
}
return &types.LegacyTx{
Nonce: nonce,
To: c.addr,
Data: data,
To: c.addr,
Data: data,
}
}

Expand Down
174 changes: 72 additions & 102 deletions go/host/l1/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,9 @@ import (
)

const (
// Attempts to broadcast the rollup transaction to the L1. Worst-case, equates to 7 seconds, plus time per request.
l1TxTriesRollup = 3
// Attempts to send secret initialisation, request or response transactions to the L1. Worst-case, equates to 63 seconds, plus time per request.
l1TxTriesSecret = 7

// todo - these values have to be configurable
maxWaitForL1Receipt = 100 * time.Second
retryIntervalForL1Receipt = 10 * time.Second
maxWaitForSecretResponse = 120 * time.Second
maxWaitForL1Receipt = 6 * time.Second // just over 3 blocks at 15s per block
retryIntervalForL1Receipt = 1 * time.Second
)

type Publisher struct {
Expand Down Expand Up @@ -85,18 +79,9 @@ func (p *Publisher) InitializeSecret(attestation *common.AttestationReport, encS
InitialSecret: encSecret,
HostAddress: p.hostData.P2PPublicAddress,
}
initialiseSecretTx := p.mgmtContractLib.CreateInitializeSecret(l1tx, p.hostWallet.GetNonceAndIncrement())
initialiseSecretTx, err = p.ethClient.EstimateGasAndGasPrice(initialiseSecretTx, p.hostWallet.Address())
if err != nil {
p.hostWallet.SetNonce(p.hostWallet.GetNonce() - 1)
return err
}
initialiseSecretTx := p.mgmtContractLib.CreateInitializeSecret(l1tx)
// we block here until we confirm a successful receipt. It is important this is published before the initial rollup.
err = p.signAndBroadcastL1Tx(initialiseSecretTx, l1TxTriesSecret, true)
if err != nil {
return err
}
return nil
return p.publishTransaction(initialiseSecretTx)
}

func (p *Publisher) RequestSecret(attestation *common.AttestationReport) (gethcommon.Hash, error) {
Expand All @@ -119,14 +104,9 @@ func (p *Publisher) RequestSecret(attestation *common.AttestationReport) (gethco
panic(errors.Wrap(err, "could not fetch head block"))
}
}
requestSecretTx := p.mgmtContractLib.CreateRequestSecret(l1tx, p.hostWallet.GetNonceAndIncrement())
requestSecretTx, err = p.ethClient.EstimateGasAndGasPrice(requestSecretTx, p.hostWallet.Address())
if err != nil {
p.hostWallet.SetNonce(p.hostWallet.GetNonce() - 1)
return gethcommon.Hash{}, err
}
requestSecretTx := p.mgmtContractLib.CreateRequestSecret(l1tx)
// we wait until the secret req transaction has succeeded before we start polling for the secret
err = p.signAndBroadcastL1Tx(requestSecretTx, l1TxTriesSecret, true)
err = p.publishTransaction(requestSecretTx)
if err != nil {
return gethcommon.Hash{}, err
}
Expand All @@ -142,18 +122,17 @@ func (p *Publisher) PublishSecretResponse(secretResponse *common.ProducedSecretR
HostAddress: secretResponse.HostAddress,
}
// todo (#1624) - l1tx.Sign(a.attestationPubKey) doesn't matter as the waitSecret will process a tx that was reverted
respondSecretTx := p.mgmtContractLib.CreateRespondSecret(l1tx, p.hostWallet.GetNonceAndIncrement(), false)
respondSecretTx, err := p.ethClient.EstimateGasAndGasPrice(respondSecretTx, p.hostWallet.Address())
if err != nil {
p.hostWallet.SetNonce(p.hostWallet.GetNonce() - 1)
return err
}
respondSecretTx := p.mgmtContractLib.CreateRespondSecret(l1tx, false)
p.logger.Info("Broadcasting secret response L1 tx.", "requester", secretResponse.RequesterID)

// fire-and-forget (track the receipt asynchronously)
err = p.signAndBroadcastL1Tx(respondSecretTx, l1TxTriesSecret, false)
if err != nil {
return errors.Wrap(err, "could not broadcast secret response L1 tx")
}
go func() {
err := p.publishTransaction(respondSecretTx)
if err != nil {
p.logger.Error("could not broadcast secret response L1 tx", log.ErrKey, err)
}
}()

return nil
}

Expand Down Expand Up @@ -209,16 +188,9 @@ func (p *Publisher) PublishRollup(producedRollup *common.ExtRollup) {
return string(header)
}}, "rollup_hash", producedRollup.Header.Hash().Hex(), "batches_len", len(producedRollup.BatchPayloads))

rollupTx := p.mgmtContractLib.CreateRollup(tx, p.hostWallet.GetNonceAndIncrement())
rollupTx, err = p.ethClient.EstimateGasAndGasPrice(rollupTx, p.hostWallet.Address())
if err != nil {
// todo (#1624) - make rollup submission a separate workflow (design and implement the flow etc)
p.hostWallet.SetNonce(p.hostWallet.GetNonce() - 1)
p.logger.Error("could not estimate rollup tx", log.ErrKey, err)
return
}
rollupTx := p.mgmtContractLib.CreateRollup(tx)

err = p.signAndBroadcastL1Tx(rollupTx, l1TxTriesRollup, true)
err = p.publishTransaction(rollupTx)
if err != nil {
p.logger.Error("could not issue rollup tx", log.ErrKey, err)
} else {
Expand Down Expand Up @@ -258,69 +230,67 @@ func (p *Publisher) FetchLatestPeersList() ([]string, error) {
return filteredHostAddresses, nil
}

// `tries` is the number of times to attempt broadcasting the transaction.
// if awaitReceipt is true then this method will block and synchronously wait to check the receipt, otherwise it is fire
// and forget and the receipt tracking will happen in a separate go-routine
func (p *Publisher) signAndBroadcastL1Tx(tx types.TxData, tries uint64, awaitReceipt bool) error {
var err error
tx, err = p.ethClient.EstimateGasAndGasPrice(tx, p.hostWallet.Address())
if err != nil {
return errors.Wrap(err, "could not estimate gas/gas price for L1 tx")
}

signedTx, err := p.hostWallet.SignTransaction(tx)
if err != nil {
return err
}

p.logger.Info("Host issuing l1 tx", log.TxKey, signedTx.Hash(), "size", signedTx.Size()/1024)

err = retry.Do(func() error {
return p.ethClient.SendTransaction(signedTx)
}, retry.NewDoublingBackoffStrategy(time.Second, tries)) // doubling retry wait (3 tries = 7sec, 7 tries = 63sec)
if err != nil {
return fmt.Errorf("could not broadcast L1 tx after %d tries: %w", tries, err)
}
p.logger.Info("Successfully submitted tx to L1", "txHash", signedTx.Hash())

if awaitReceipt {
// block until receipt is found and then return
return p.waitForReceipt(signedTx.Hash())
}
// publishTransaction will keep trying unless the L1 seems to be unavailable or the tx is otherwise rejected
// It is responsible for keeping the nonce accurate, according to the following rules:
// - Caller should not increment the wallet nonce before this method is called
// - This method will increment the wallet nonce only if the transaction is successfully broadcast
// - This method will continue to resend the tx using latest gas price until it is successfully broadcast or the L1 is unavailable/this service is shutdown
// - **ONLY** the L1 publisher service is publishing transactions for this wallet (to avoid nonce conflicts)
func (p *Publisher) publishTransaction(tx types.TxData) error {
// the nonce to be used for this tx attempt
nonce := p.hostWallet.GetNonceAndIncrement()

// while the publisher service is still alive we keep trying to get the transaction into the L1
for p.running.Load() {
// make sure an earlier tx hasn't been abandoned
if nonce > p.hostWallet.GetNonce() {
return errors.New("earlier transaction has failed to complete, we need to abort this transaction")
}
// update the tx gas price before each attempt
tx, err := p.ethClient.PrepareTransactionToSend(tx, p.hostWallet.Address(), nonce)
if err != nil {
p.hostWallet.SetNonce(nonce) // revert the wallet nonce because we failed to complete the transaction
return errors.Wrap(err, "could not estimate gas/gas price for L1 tx")
}

// else just watch for receipt asynchronously and log if it fails
go func() {
// todo (#1624) - consider how to handle the various ways that L1 transactions could fail to improve node operator QoL
err = p.waitForReceipt(signedTx.Hash())
signedTx, err := p.hostWallet.SignTransaction(tx)
if err != nil {
p.logger.Error("L1 transaction failed", log.ErrKey, err)
p.hostWallet.SetNonce(nonce) // revert the wallet nonce because we failed to complete the transaction
return errors.Wrap(err, "could not sign L1 tx")
}
}()

return nil
}
p.logger.Info("Host issuing l1 tx", log.TxKey, signedTx.Hash(), "size", signedTx.Size()/1024)
err = p.ethClient.SendTransaction(signedTx)
if err != nil {
p.hostWallet.SetNonce(nonce) // revert the wallet nonce because we failed to complete the transaction
return errors.Wrap(err, "could not broadcast L1 tx")
}
p.logger.Info("Successfully submitted tx to L1", "txHash", signedTx.Hash())

var receipt *types.Receipt
// retry until receipt is found
err = retry.Do(
func() error {
receipt, err = p.ethClient.TransactionReceipt(signedTx.Hash())
if err != nil {
return fmt.Errorf("could not get receipt for L1 tx=%s: %w", signedTx.Hash(), err)
}
return err
},
retry.NewTimeoutStrategy(maxWaitForL1Receipt, retryIntervalForL1Receipt),
)
if err != nil {
p.logger.Info("Receipt not found for transaction, we will re-attempt", log.ErrKey, err)
continue // try again on the same nonce, with updated gas price
}

func (p *Publisher) waitForReceipt(txHash common.TxHash) error {
var receipt *types.Receipt
var err error
err = retry.Do(
func() error {
receipt, err = p.ethClient.TransactionReceipt(txHash)
if err != nil {
// adds more info on the error
return fmt.Errorf("could not get receipt for L1 tx=%s: %w", txHash, err)
}
return err
},
retry.NewTimeoutStrategy(maxWaitForL1Receipt, retryIntervalForL1Receipt),
)
if err != nil {
return errors.Wrap(err, "receipt for L1 tx not found despite successful broadcast")
}
if err == nil && receipt.Status != types.ReceiptStatusSuccessful {
return fmt.Errorf("unsuccessful receipt found for published L1 transaction, status=%d", receipt.Status)
}

if err == nil && receipt.Status != types.ReceiptStatusSuccessful {
return fmt.Errorf("unsuccessful receipt found for published L1 transaction, status=%d", receipt.Status)
p.logger.Debug("L1 transaction successful receipt found.", log.TxKey, signedTx.Hash(),
log.BlockHeightKey, receipt.BlockNumber, log.BlockHashKey, receipt.BlockHash)
break
}
p.logger.Debug("L1 transaction receipt found.", log.TxKey, txHash, log.BlockHeightKey, receipt.BlockNumber, log.BlockHashKey, receipt.BlockHash)
return nil
}
5 changes: 2 additions & 3 deletions integration/eth2network/eth2_network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,10 @@ func txsAreMinted(t *testing.T, wallets []wallet.Wallet) {
w := wallets[i]

toAddr := datagenerator.RandomAddress()
estimatedTx, err := ethClient.EstimateGasAndGasPrice(&types.LegacyTx{
Nonce: w.GetNonceAndIncrement(),
estimatedTx, err := ethClient.PrepareTransactionToSend(&types.LegacyTx{
To: &toAddr,
Value: big.NewInt(100),
}, w.Address())
}, w.Address(), w.GetNonceAndIncrement())
assert.Nil(t, err)

signedTx, err := w.SignTransaction(estimatedTx)
Expand Down
4 changes: 2 additions & 2 deletions integration/ethereummock/erc20_contract_lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (

type contractLib struct{}

func (c *contractLib) CreateDepositTx(tx *ethadapter.L1DepositTx, nonce uint64) types.TxData {
return encodeTx(tx, nonce, depositTxAddr)
func (c *contractLib) CreateDepositTx(tx *ethadapter.L1DepositTx) types.TxData {
return encodeTx(tx, depositTxAddr)
}

// Return only deposit transactions to the management contract
Expand Down
Loading

0 comments on commit 8948dab

Please sign in to comment.