Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

L1 Publisher: resend stuck tx and manage nonce #1518

Merged
merged 3 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions go/ethadapter/erc20contractlib/erc20_contract_lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type ERC20ContractLib interface {
DecodeTx(tx *types.Transaction) ethadapter.L1Transaction

// CreateDepositTx receives an common.L1Transaction and converts it to an eth transaction
CreateDepositTx(tx *ethadapter.L1DepositTx, nonce uint64) types.TxData
CreateDepositTx(tx *ethadapter.L1DepositTx) types.TxData
}

// erc20ContractLibImpl takes a mgmtContractAddr and processes multiple erc20ContractAddrs
Expand All @@ -44,16 +44,15 @@ func NewERC20ContractLib(mgmtContractAddr *gethcommon.Address, contractAddrs ...
}
}

func (c *erc20ContractLibImpl) CreateDepositTx(tx *ethadapter.L1DepositTx, nonce uint64) types.TxData {
func (c *erc20ContractLibImpl) CreateDepositTx(tx *ethadapter.L1DepositTx) types.TxData {
data, err := c.contractABI.Pack("transfer", &tx.To, tx.Amount)
if err != nil {
panic(err)
}

return &types.LegacyTx{
Nonce: nonce,
To: tx.TokenContract,
Data: data,
To: tx.TokenContract,
Data: data,
}
}

Expand Down
6 changes: 3 additions & 3 deletions go/ethadapter/geth_rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ func (e *gethRPCClient) FetchLastBatchSeqNo(address gethcommon.Address) (*big.In
return contract.LastBatchSeqNo(&bind.CallOpts{})
}

// EstimateGasAndGasPrice takes a txData type and overrides the Gas and Gas Price field with estimated values
func (e *gethRPCClient) EstimateGasAndGasPrice(txData types.TxData, from gethcommon.Address) (types.TxData, error) {
// PrepareTransactionToSend takes a txData type and overrides the From, Nonce, Gas and Gas Price field with current values
func (e *gethRPCClient) PrepareTransactionToSend(txData types.TxData, from gethcommon.Address, nonce uint64) (types.TxData, error) {
unEstimatedTx := types.NewTx(txData)
gasPrice, err := e.EthClient().SuggestGasPrice(context.Background())
if err != nil {
Expand All @@ -241,7 +241,7 @@ func (e *gethRPCClient) EstimateGasAndGasPrice(txData types.TxData, from gethcom
}

return &types.LegacyTx{
Nonce: unEstimatedTx.Nonce(),
Nonce: nonce,
GasPrice: gasPrice,
Gas: gasLimit,
To: unEstimatedTx.To(),
Expand Down
3 changes: 2 additions & 1 deletion go/ethadapter/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ type EthClient interface {

CallContract(msg ethereum.CallMsg) ([]byte, error) // Runs the provided call message on the latest block.

EstimateGasAndGasPrice(txData types.TxData, from gethcommon.Address) (types.TxData, error) // Estimates the gas and the gas price for a given tx payload
// PrepareTransactionToSend updates the tx with from address, current nonce and current estimates for the gas and the gas price
PrepareTransactionToSend(txData types.TxData, from gethcommon.Address, nonce uint64) (types.TxData, error)

FetchLastBatchSeqNo(address gethcommon.Address) (*big.Int, error)

Expand Down
36 changes: 16 additions & 20 deletions go/ethadapter/mgmtcontractlib/mgmt_contract_lib.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ const methodBytesLen = 4
// MgmtContractLib provides methods for creating ethereum transactions by providing an L1Transaction, creating call
// messages for call requests, and converting ethereum transactions into L1Transactions.
type MgmtContractLib interface {
CreateRollup(t *ethadapter.L1RollupTx, nonce uint64) types.TxData
CreateRequestSecret(tx *ethadapter.L1RequestSecretTx, nonce uint64) types.TxData
CreateRespondSecret(tx *ethadapter.L1RespondSecretTx, nonce uint64, verifyAttester bool) types.TxData
CreateInitializeSecret(tx *ethadapter.L1InitializeSecretTx, nonce uint64) types.TxData
CreateRollup(t *ethadapter.L1RollupTx) types.TxData
CreateRequestSecret(tx *ethadapter.L1RequestSecretTx) types.TxData
CreateRespondSecret(tx *ethadapter.L1RespondSecretTx, verifyAttester bool) types.TxData
CreateInitializeSecret(tx *ethadapter.L1InitializeSecretTx) types.TxData
GetHostAddresses() (ethereum.CallMsg, error)

// DecodeTx receives a *types.Transaction and converts it to an common.L1Transaction
Expand Down Expand Up @@ -98,7 +98,7 @@ func (c *contractLibImpl) DecodeTx(tx *types.Transaction) ethadapter.L1Transacti
return nil
}

func (c *contractLibImpl) CreateRollup(t *ethadapter.L1RollupTx, nonce uint64) types.TxData {
func (c *contractLibImpl) CreateRollup(t *ethadapter.L1RollupTx) types.TxData {
decodedRollup, err := common.DecodeRollup(t.Rollup)
if err != nil {
panic(err)
Expand Down Expand Up @@ -127,26 +127,24 @@ func (c *contractLibImpl) CreateRollup(t *ethadapter.L1RollupTx, nonce uint64) t
}

return &types.LegacyTx{
Nonce: nonce,
To: c.addr,
Data: data,
To: c.addr,
Data: data,
}
}

func (c *contractLibImpl) CreateRequestSecret(tx *ethadapter.L1RequestSecretTx, nonce uint64) types.TxData {
func (c *contractLibImpl) CreateRequestSecret(tx *ethadapter.L1RequestSecretTx) types.TxData {
data, err := c.contractABI.Pack(RequestSecretMethod, base64EncodeToString(tx.Attestation))
if err != nil {
panic(err)
}

return &types.LegacyTx{
Nonce: nonce,
To: c.addr,
Data: data,
To: c.addr,
Data: data,
}
}

func (c *contractLibImpl) CreateRespondSecret(tx *ethadapter.L1RespondSecretTx, nonce uint64, verifyAttester bool) types.TxData {
func (c *contractLibImpl) CreateRespondSecret(tx *ethadapter.L1RespondSecretTx, verifyAttester bool) types.TxData {
data, err := c.contractABI.Pack(
RespondSecretMethod,
tx.AttesterID,
Expand All @@ -160,13 +158,12 @@ func (c *contractLibImpl) CreateRespondSecret(tx *ethadapter.L1RespondSecretTx,
panic(err)
}
return &types.LegacyTx{
Nonce: nonce,
To: c.addr,
Data: data,
To: c.addr,
Data: data,
}
}

func (c *contractLibImpl) CreateInitializeSecret(tx *ethadapter.L1InitializeSecretTx, nonce uint64) types.TxData {
func (c *contractLibImpl) CreateInitializeSecret(tx *ethadapter.L1InitializeSecretTx) types.TxData {
data, err := c.contractABI.Pack(
InitializeSecretMethod,
tx.AggregatorID,
Expand All @@ -178,9 +175,8 @@ func (c *contractLibImpl) CreateInitializeSecret(tx *ethadapter.L1InitializeSecr
panic(err)
}
return &types.LegacyTx{
Nonce: nonce,
To: c.addr,
Data: data,
To: c.addr,
Data: data,
}
}

Expand Down
4 changes: 4 additions & 0 deletions go/host/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,4 +221,8 @@ func (h *host) validateConfig() {
if h.config.P2PPublicAddress == "" {
h.logger.Crit("the host must specify a public P2P address")
}

if h.config.L1BlockTime == 0 {
h.logger.Crit("the host must specify an L1 block time")
}
}
172 changes: 71 additions & 101 deletions go/host/l1/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,6 @@ import (
"github.com/pkg/errors"
)

const (
// Attempts to broadcast the rollup transaction to the L1. Worst-case, equates to 7 seconds, plus time per request.
l1TxTriesRollup = 3
// Attempts to send secret initialisation, request or response transactions to the L1. Worst-case, equates to 63 seconds, plus time per request.
l1TxTriesSecret = 7
)

type Publisher struct {
hostData host.Identity
hostWallet wallet.Wallet // Wallet used to issue ethereum transactions
Expand Down Expand Up @@ -95,18 +88,9 @@ func (p *Publisher) InitializeSecret(attestation *common.AttestationReport, encS
InitialSecret: encSecret,
HostAddress: p.hostData.P2PPublicAddress,
}
initialiseSecretTx := p.mgmtContractLib.CreateInitializeSecret(l1tx, p.hostWallet.GetNonceAndIncrement())
initialiseSecretTx, err = p.ethClient.EstimateGasAndGasPrice(initialiseSecretTx, p.hostWallet.Address())
if err != nil {
p.hostWallet.SetNonce(p.hostWallet.GetNonce() - 1)
return err
}
initialiseSecretTx := p.mgmtContractLib.CreateInitializeSecret(l1tx)
// we block here until we confirm a successful receipt. It is important this is published before the initial rollup.
err = p.signAndBroadcastL1Tx(initialiseSecretTx, l1TxTriesSecret, true)
if err != nil {
return err
}
return nil
return p.publishTransaction(initialiseSecretTx)
}

func (p *Publisher) RequestSecret(attestation *common.AttestationReport) (gethcommon.Hash, error) {
Expand All @@ -129,14 +113,9 @@ func (p *Publisher) RequestSecret(attestation *common.AttestationReport) (gethco
panic(errors.Wrap(err, "could not fetch head block"))
}
}
requestSecretTx := p.mgmtContractLib.CreateRequestSecret(l1tx, p.hostWallet.GetNonceAndIncrement())
requestSecretTx, err = p.ethClient.EstimateGasAndGasPrice(requestSecretTx, p.hostWallet.Address())
if err != nil {
p.hostWallet.SetNonce(p.hostWallet.GetNonce() - 1)
return gethcommon.Hash{}, err
}
requestSecretTx := p.mgmtContractLib.CreateRequestSecret(l1tx)
// we wait until the secret req transaction has succeeded before we start polling for the secret
err = p.signAndBroadcastL1Tx(requestSecretTx, l1TxTriesSecret, true)
err = p.publishTransaction(requestSecretTx)
if err != nil {
return gethcommon.Hash{}, err
}
Expand All @@ -152,18 +131,17 @@ func (p *Publisher) PublishSecretResponse(secretResponse *common.ProducedSecretR
HostAddress: secretResponse.HostAddress,
}
// todo (#1624) - l1tx.Sign(a.attestationPubKey) doesn't matter as the waitSecret will process a tx that was reverted
respondSecretTx := p.mgmtContractLib.CreateRespondSecret(l1tx, p.hostWallet.GetNonceAndIncrement(), false)
respondSecretTx, err := p.ethClient.EstimateGasAndGasPrice(respondSecretTx, p.hostWallet.Address())
if err != nil {
p.hostWallet.SetNonce(p.hostWallet.GetNonce() - 1)
return err
}
respondSecretTx := p.mgmtContractLib.CreateRespondSecret(l1tx, false)
p.logger.Info("Broadcasting secret response L1 tx.", "requester", secretResponse.RequesterID)

// fire-and-forget (track the receipt asynchronously)
err = p.signAndBroadcastL1Tx(respondSecretTx, l1TxTriesSecret, false)
if err != nil {
return errors.Wrap(err, "could not broadcast secret response L1 tx")
}
go func() {
err := p.publishTransaction(respondSecretTx)
if err != nil {
p.logger.Error("could not broadcast secret response L1 tx", log.ErrKey, err)
}
}()

return nil
}

Expand Down Expand Up @@ -219,16 +197,9 @@ func (p *Publisher) PublishRollup(producedRollup *common.ExtRollup) {
return string(header)
}}, log.RollupHashKey, producedRollup.Header.Hash(), "batches_len", len(producedRollup.BatchPayloads))

rollupTx := p.mgmtContractLib.CreateRollup(tx, p.hostWallet.GetNonceAndIncrement())
rollupTx, err = p.ethClient.EstimateGasAndGasPrice(rollupTx, p.hostWallet.Address())
if err != nil {
// todo (#1624) - make rollup submission a separate workflow (design and implement the flow etc)
p.hostWallet.SetNonce(p.hostWallet.GetNonce() - 1)
p.logger.Error("could not estimate rollup tx", log.ErrKey, err)
return
}
rollupTx := p.mgmtContractLib.CreateRollup(tx)

err = p.signAndBroadcastL1Tx(rollupTx, l1TxTriesRollup, true)
err = p.publishTransaction(rollupTx)
if err != nil {
p.logger.Error("could not issue rollup tx", log.ErrKey, err)
} else {
Expand Down Expand Up @@ -268,69 +239,68 @@ func (p *Publisher) FetchLatestPeersList() ([]string, error) {
return filteredHostAddresses, nil
}

// `tries` is the number of times to attempt broadcasting the transaction.
// if awaitReceipt is true then this method will block and synchronously wait to check the receipt, otherwise it is fire
// and forget and the receipt tracking will happen in a separate go-routine
func (p *Publisher) signAndBroadcastL1Tx(tx types.TxData, tries uint64, awaitReceipt bool) error {
var err error
tx, err = p.ethClient.EstimateGasAndGasPrice(tx, p.hostWallet.Address())
if err != nil {
return errors.Wrap(err, "could not estimate gas/gas price for L1 tx")
}

signedTx, err := p.hostWallet.SignTransaction(tx)
if err != nil {
return err
}

p.logger.Info("Host issuing l1 tx", log.TxKey, signedTx.Hash(), "size", signedTx.Size()/1024)

err = retry.Do(func() error {
return p.ethClient.SendTransaction(signedTx)
}, retry.NewDoublingBackoffStrategy(time.Second, tries)) // doubling retry wait (3 tries = 7sec, 7 tries = 63sec)
if err != nil {
return fmt.Errorf("could not broadcast L1 tx after %d tries: %w", tries, err)
}
p.logger.Info("Successfully submitted tx to L1", "txHash", signedTx.Hash())

if awaitReceipt {
// block until receipt is found and then return
return p.waitForReceipt(signedTx.Hash())
}
// publishTransaction will keep trying unless the L1 seems to be unavailable or the tx is otherwise rejected
// It is responsible for keeping the nonce accurate, according to the following rules:
// - Caller should not increment the wallet nonce before this method is called
// - This method will increment the wallet nonce only if the transaction is successfully broadcast
// - This method will continue to resend the tx using latest gas price until it is successfully broadcast or the L1 is unavailable/this service is shutdown
// - **ONLY** the L1 publisher service is publishing transactions for this wallet (to avoid nonce conflicts)
// todo (@matt) this method should take a context so we can try to cancel if the tx is no longer required
func (p *Publisher) publishTransaction(tx types.TxData) error {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as an improvement for the future, this could receive a cancelable context, so you can cancel the tx from the caller if circumstances change (for ex when it's time to build another rollup).
This could be a todo

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good shout, we should have that control, added todo

// the nonce to be used for this tx attempt
nonce := p.hostWallet.GetNonceAndIncrement()

// while the publisher service is still alive we keep trying to get the transaction into the L1
for !p.hostStopper.IsStopping() {
// make sure an earlier tx hasn't been abandoned
if nonce > p.hostWallet.GetNonce() {
return errors.New("earlier transaction has failed to complete, we need to abort this transaction")
}
// update the tx gas price before each attempt
tx, err := p.ethClient.PrepareTransactionToSend(tx, p.hostWallet.Address(), nonce)
if err != nil {
p.hostWallet.SetNonce(nonce) // revert the wallet nonce because we failed to complete the transaction
return errors.Wrap(err, "could not estimate gas/gas price for L1 tx")
}

// else just watch for receipt asynchronously and log if it fails
go func() {
// todo (#1624) - consider how to handle the various ways that L1 transactions could fail to improve node operator QoL
err = p.waitForReceipt(signedTx.Hash())
signedTx, err := p.hostWallet.SignTransaction(tx)
if err != nil {
p.logger.Error("L1 transaction failed", log.ErrKey, err)
p.hostWallet.SetNonce(nonce) // revert the wallet nonce because we failed to complete the transaction
return errors.Wrap(err, "could not sign L1 tx")
}
}()

return nil
}
p.logger.Info("Host issuing l1 tx", log.TxKey, signedTx.Hash(), "size", signedTx.Size()/1024)
err = p.ethClient.SendTransaction(signedTx)
if err != nil {
p.hostWallet.SetNonce(nonce) // revert the wallet nonce because we failed to complete the transaction
return errors.Wrap(err, "could not broadcast L1 tx")
}
p.logger.Info("Successfully submitted tx to L1", "txHash", signedTx.Hash())

var receipt *types.Receipt
// retry until receipt is found
err = retry.Do(
func() error {
receipt, err = p.ethClient.TransactionReceipt(signedTx.Hash())
if err != nil {
return fmt.Errorf("could not get receipt for L1 tx=%s: %w", signedTx.Hash(), err)
}
return err
},
retry.NewTimeoutStrategy(p.maxWaitForL1Receipt, p.retryIntervalForL1Receipt),
)
if err != nil {
p.logger.Info("Receipt not found for transaction, we will re-attempt", log.ErrKey, err)
continue // try again on the same nonce, with updated gas price
}

func (p *Publisher) waitForReceipt(txHash common.TxHash) error {
var receipt *types.Receipt
var err error
err = retry.Do(
func() error {
receipt, err = p.ethClient.TransactionReceipt(txHash)
if err != nil {
// adds more info on the error
return fmt.Errorf("could not get receipt for L1 tx=%s: %w", txHash, err)
}
return err
},
retry.NewTimeoutStrategy(p.maxWaitForL1Receipt, p.retryIntervalForL1Receipt),
)
if err != nil {
return errors.Wrap(err, "receipt for L1 tx not found despite successful broadcast")
}
if err == nil && receipt.Status != types.ReceiptStatusSuccessful {
return fmt.Errorf("unsuccessful receipt found for published L1 transaction, status=%d", receipt.Status)
}

if err == nil && receipt.Status != types.ReceiptStatusSuccessful {
return fmt.Errorf("unsuccessful receipt found for published L1 transaction, status=%d", receipt.Status)
p.logger.Debug("L1 transaction successful receipt found.", log.TxKey, signedTx.Hash(),
log.BlockHeightKey, receipt.BlockNumber, log.BlockHashKey, receipt.BlockHash)
break
}
p.logger.Debug("L1 transaction receipt found.", log.TxKey, txHash, log.BlockHeightKey, receipt.BlockNumber, log.BlockHashKey, receipt.BlockHash)
return nil
}
5 changes: 2 additions & 3 deletions integration/eth2network/eth2_network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,10 @@ func txsAreMinted(t *testing.T, wallets []wallet.Wallet) {
w := wallets[i]

toAddr := datagenerator.RandomAddress()
estimatedTx, err := ethClient.EstimateGasAndGasPrice(&types.LegacyTx{
Nonce: w.GetNonceAndIncrement(),
estimatedTx, err := ethClient.PrepareTransactionToSend(&types.LegacyTx{
To: &toAddr,
Value: big.NewInt(100),
}, w.Address())
}, w.Address(), w.GetNonceAndIncrement())
assert.Nil(t, err)

signedTx, err := w.SignTransaction(estimatedTx)
Expand Down
Loading
Loading