Skip to content

Commit

Permalink
Merge pull request #299 from onflow/gregor/tx-sender-pool
Browse files Browse the repository at this point in the history
Transaction pool
  • Loading branch information
sideninja authored Jun 27, 2024
2 parents f97114c + bddcb6b commit d69a16c
Show file tree
Hide file tree
Showing 9 changed files with 261 additions and 121 deletions.
13 changes: 9 additions & 4 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,17 @@ func (b *BlockChainAPI) SendRawTransaction(

id, err := b.evm.SendRawTransaction(ctx, input)
if err != nil {
b.logger.Error().Err(err).Msg("failed to send raw transaction")
var errGasPriceTooLow *errs.ErrGasPriceTooLow
if errors.As(err, &errGasPriceTooLow) {
var errGasPriceTooLow *errs.GasPriceTooLowError

// handle typed errors
switch {
case errors.As(err, &errGasPriceTooLow):
return common.Hash{}, errGasPriceTooLow
case errors.Is(err, models.ErrInvalidEVMTransaction):
return common.Hash{}, err
default:
return common.Hash{}, errs.ErrInternal
}
return common.Hash{}, err
}

return id, nil
Expand Down
8 changes: 4 additions & 4 deletions api/errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ var (
ErrRateLimit = errors.New("limit of requests per second reached")
)

type ErrGasPriceTooLow struct {
type GasPriceTooLowError struct {
GasPrice *big.Int
}

func (e *ErrGasPriceTooLow) Error() string {
func (e *GasPriceTooLowError) Error() string {
return fmt.Sprintf(
"the minimum accepted gas price for transactions is: %d",
e.GasPrice,
)
}

func NewErrGasPriceTooLow(gasPrice *big.Int) *ErrGasPriceTooLow {
return &ErrGasPriceTooLow{
func NewErrGasPriceTooLow(gasPrice *big.Int) *GasPriceTooLowError {
return &GasPriceTooLowError{
GasPrice: gasPrice,
}
}
Expand Down
3 changes: 2 additions & 1 deletion models/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
)

var (
ErrDisconnected = NewRecoverableError(errors.New("disconnected"))
ErrDisconnected = NewRecoverableError(errors.New("disconnected"))
ErrInvalidEVMTransaction = errors.New("invalid evm transaction")
)

func NewRecoverableError(err error) RecoverableError {
Expand Down
2 changes: 1 addition & 1 deletion services/requester/cadence/run.cdc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ transaction(hexEncodedTx: String) {
)
assert(
txResult.status == EVM.Status.failed || txResult.status == EVM.Status.successful,
message: "failed to execute evm transaction: ".concat(txResult.errorCode.toString())
message: "evm_error=".concat(txResult.errorMessage).concat("\n")
)
}
}
102 changes: 102 additions & 0 deletions services/requester/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package requester

import (
"context"
"fmt"
"regexp"
"sync"
"time"

"github.com/onflow/flow-go-sdk"
gethTypes "github.com/onflow/go-ethereum/core/types"
"github.com/rs/zerolog"
"github.com/sethvargo/go-retry"

"github.com/onflow/flow-evm-gateway/models"
)

const (
evmErrorRegex = `evm_error=(.*)\n`
)

// todo this is a simple implementation of the transaction pool that is mostly used
// to track the status of submitted transaction, but transactions will always be submitted
// right away, future improvements can make it so the transactions are collected in the pool
// and after submitted based on different strategies.

type TxPool struct {
logger zerolog.Logger
client *CrossSporkClient
pool *sync.Map
// todo add a broadcaster for pending transaction streaming
// todo add methods to inspect transaction pool state
}

func NewTxPool(client *CrossSporkClient, logger zerolog.Logger) *TxPool {
return &TxPool{
logger: logger.With().Str("component", "tx-pool").Logger(),
client: client,
pool: &sync.Map{},
}
}

// Send flow transaction that executes EVM run function which takes in the encoded EVM transaction.
// The flow transaction status is awaited and an error is returned in case of a failure in submission,
// or an EVM validation error.
// Until the flow transaction is sealed the transaction will stay in the transaction pool marked as pending.
func (t *TxPool) Send(
ctx context.Context,
flowTx *flow.Transaction,
evmTx *gethTypes.Transaction,
) error {
if err := t.client.SendTransaction(ctx, *flowTx); err != nil {
return err
}

// add to pool and delete after transaction is sealed or errored out
t.pool.Store(evmTx.Hash(), evmTx)
defer t.pool.Delete(evmTx.Hash())

backoff := retry.WithMaxDuration(time.Minute*3, retry.NewFibonacci(time.Millisecond*100))

return retry.Do(ctx, backoff, func(ctx context.Context) error {
res, err := t.client.GetTransactionResult(ctx, flowTx.ID())
if err != nil {
return fmt.Errorf("failed to retrieve flow transaction result %s: %w", flowTx.ID(), err)
}
// retry until transaction is sealed
if res.Status < flow.TransactionStatusSealed {
return retry.RetryableError(fmt.Errorf("transaction not sealed"))
}

if res.Error != nil {
t.logger.Error().Err(res.Error).
Str("flow-id", flowTx.ID().String()).
Str("evm-id", evmTx.Hash().Hex()).
Msg("flow transaction error")

if err, ok := parseInvalidError(res.Error); ok {
return err
}

// hide specific cause since it's an implementation issue
return fmt.Errorf("failed to submit flow evm transaction %s", evmTx.Hash())
}

return nil
})
}

// this will extract the evm specific error from the Flow transaction error message
// the run.cdc script panics with the evm specific error as the message which we
// extract and return to the client. Any error returned that is evm specific
// is a validation error due to assert statement in the run.cdc script.
func parseInvalidError(err error) (error, bool) {
r := regexp.MustCompile(evmErrorRegex)
matches := r.FindStringSubmatch(err.Error())
if len(matches) != 2 {
return nil, false
}

return fmt.Errorf("%w: %s", models.ErrInvalidEVMTransaction, matches[1]), true
}
Loading

0 comments on commit d69a16c

Please sign in to comment.