Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support multiple ethereum rpcs #215

Closed
wants to merge 14 commits into from
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,19 @@ Ref: https://keepachangelog.com/en/1.0.0/

## [Unreleased]

### Features

- [#215](https://github.com/umee-network/peggo/pull/215) Add the flag `--eth-rpcs` and support multiple ethereum rpc endpoints

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- [#215](https://github.com/umee-network/peggo/pull/215) Add the flag `--eth-rpcs` and support multiple ethereum rpc endpoints
- [#215](https://github.com/umee-network/peggo/pull/215) Add the flag `--eth-rpcs` and support multiple ethereum rpc endpoints.


### Bug Fixes

- [#209](https://github.com/umee-network/peggo/pull/209) Fix the `version` command to display correctly.
- [#205](https://github.com/umee-network/peggo/pull/205) Make sure users are warned when using unencrypted non-local urls in flags.

### Deprecated

- [#215](https://github.com/umee-network/peggo/pull/215) Deprecate the `--eth-rpc` flag.

## [v0.2.5](https://github.com/umee-network/peggo/releases/tag/v0.2.5) - 2022-02-21

### Features
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ $ umeed tx gravity set-orchestrator-address \
```shell
export PEGGO_ETH_PK={ethereum private key}
$ peggo orchestrator {gravityAddress} \
--eth-rpc=$ETH_RPC \
--eth-rpcs=$ETH_RPCS \
--relay-batches=true \
--valset-relay-mode=minimum \
--cosmos-chain-id=... \
Expand Down
68 changes: 33 additions & 35 deletions cmd/peggo/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind"
ethcmn "github.com/ethereum/go-ethereum/common"
ethcrypto "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/knadh/koanf"
"github.com/pkg/errors"
"github.com/spf13/cobra"
rpchttp "github.com/tendermint/tendermint/rpc/client/http"
Expand Down Expand Up @@ -64,6 +62,8 @@ func deployGravityCmd() *cobra.Command {
return err
}

em := NewEthRPCManager(konfig)

// COSMOS RPC

cosmosChainID := konfig.String(flagCosmosChainID)
Expand Down Expand Up @@ -119,13 +119,12 @@ func deployGravityCmd() *cobra.Command {
}

// ETH RPC
ethRPCEndpoint := konfig.String(flagEthRPC)
ethRPC, err := ethclient.Dial(ethRPCEndpoint)
ethRPC, err := em.GetEthClient()
if err != nil {
return fmt.Errorf("failed to dial Ethereum RPC node: %w", err)
return err
}

auth, err := buildTransactOpts(konfig, ethRPC)
auth, err := buildTransactOpts(em)
if err != nil {
return err
}
Expand Down Expand Up @@ -200,13 +199,9 @@ func deployERC20Cmd() *cobra.Command {
return err
}

ethRPCEndpoint := konfig.String(flagEthRPC)
ethRPC, err := ethclient.Dial(ethRPCEndpoint)
if err != nil {
return fmt.Errorf("failed to dial Ethereum RPC node: %w", err)
}
em := NewEthRPCManager(konfig)

auth, err := buildTransactOpts(konfig, ethRPC)
auth, err := buildTransactOpts(em)
if err != nil {
return err
}
Expand Down Expand Up @@ -261,7 +256,7 @@ func deployERC20Cmd() *cobra.Command {

gravityAddr := args[0]

gravityContract, err := getGravityContract(ethRPC, gravityAddr)
gravityContract, err := getGravityContract(em, gravityAddr)
if err != nil {
return err
}
Expand Down Expand Up @@ -341,20 +336,16 @@ network starting.`,
return err
}

ethRPCEndpoint := konfig.String(flagEthRPC)
ethRPC, err := ethclient.Dial(ethRPCEndpoint)
if err != nil {
return fmt.Errorf("failed to dial Ethereum RPC node: %w", err)
}
em := NewEthRPCManager(konfig)

auth, err := buildTransactOpts(konfig, ethRPC)
auth, err := buildTransactOpts(em)
if err != nil {
return err
}

gravityAddr := args[0]

gravityContract, err := getGravityContract(ethRPC, gravityAddr)
gravityContract, err := getGravityContract(em, gravityAddr)
if err != nil {
return err
}
Expand Down Expand Up @@ -403,15 +394,11 @@ func sendToCosmosCmd() *cobra.Command {
return err
}

ethRPCEndpoint := konfig.String(flagEthRPC)
ethRPC, err := ethclient.Dial(ethRPCEndpoint)
if err != nil {
return fmt.Errorf("failed to dial Ethereum RPC node: %w", err)
}
em := NewEthRPCManager(konfig)

gravityAddr := args[0]

gravityContract, err := getGravityContract(ethRPC, gravityAddr)
gravityContract, err := getGravityContract(em, gravityAddr)
if err != nil {
return err
}
Expand All @@ -420,12 +407,12 @@ func sendToCosmosCmd() *cobra.Command {
tokenAddr := ethcmn.HexToAddress(tokenAddrStr)

if konfig.Bool(flagAutoApprove) {
if err := approveERC20(konfig, ethRPC, tokenAddrStr, gravityAddr); err != nil {
if err := approveERC20(em, tokenAddrStr, gravityAddr); err != nil {
return err
}
}

auth, err := buildTransactOpts(konfig, ethRPC)
auth, err := buildTransactOpts(em)
if err != nil {
return err
}
Expand Down Expand Up @@ -468,7 +455,8 @@ Transaction: %s
return cmd
}

func buildTransactOpts(konfig *koanf.Koanf, ethClient *ethclient.Client) (*bind.TransactOpts, error) {
func buildTransactOpts(em *EthRPCManager) (*bind.TransactOpts, error) {
konfig := em.konfig
ethPrivKeyHexStr := konfig.String(flagEthPK)

privKey, err := ethcrypto.ToECDSA(ethcmn.FromHex(ethPrivKeyHexStr))
Expand All @@ -487,15 +475,15 @@ func buildTransactOpts(konfig *koanf.Koanf, ethClient *ethclient.Client) (*bind.

fromAddress := ethcrypto.PubkeyToAddress(*publicKeyECDSA)

nonce, err := ethClient.PendingNonceAt(goCtx, fromAddress)
nonce, err := em.PendingNonceAt(goCtx, fromAddress)
if err != nil {
return nil, err
}

goCtx, cancel = context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()

ethChainID, err := ethClient.ChainID(goCtx)
ethChainID, err := em.ChainID(goCtx)
if err != nil {
return nil, fmt.Errorf("failed to get Ethereum chain ID: %w", err)
}
Expand All @@ -516,7 +504,7 @@ func buildTransactOpts(konfig *koanf.Koanf, ethClient *ethclient.Client) (*bind.
gasPrice = big.NewInt(gasPriceInt)

default:
gasPrice, err = ethClient.SuggestGasPrice(context.Background())
gasPrice, err = em.SuggestGasPrice(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to get Ethereum gas estimate: %w", err)
}
Expand Down Expand Up @@ -549,7 +537,12 @@ func getGravityParams(gRPCConn *grpc.ClientConn) (*gravitytypes.Params, error) {
return &gravityParamsResp.Params, nil
}

func getGravityContract(ethRPC *ethclient.Client, gravityAddr string) (*wrappers.Gravity, error) {
func getGravityContract(em *EthRPCManager, gravityAddr string) (*wrappers.Gravity, error) {
ethRPC, err := em.GetEthClient()
if err != nil {
return nil, fmt.Errorf("failed to create Gravity contract instance: %w", err)
}

contract, err := wrappers.NewGravity(ethcmn.HexToAddress(gravityAddr), ethRPC)
if err != nil {
return nil, fmt.Errorf("failed to create Gravity contract instance: %w", err)
Expand All @@ -558,13 +551,18 @@ func getGravityContract(ethRPC *ethclient.Client, gravityAddr string) (*wrappers
return contract, nil
}

func approveERC20(konfig *koanf.Koanf, ethRPC *ethclient.Client, erc20AddrStr, gravityAddrStr string) error {
func approveERC20(em *EthRPCManager, erc20AddrStr, gravityAddrStr string) error {
ethRPC, err := em.GetEthClient()
if err != nil {
return fmt.Errorf("failed to create ERC20 contract instance: %w", err)
}

contract, err := wrappers.NewERC20(ethcmn.HexToAddress(erc20AddrStr), ethRPC)
if err != nil {
return fmt.Errorf("failed to create ERC20 contract instance: %w", err)
}

auth, err := buildTransactOpts(konfig, ethRPC)
auth, err := buildTransactOpts(em)
if err != nil {
return err
}
Expand Down
138 changes: 138 additions & 0 deletions cmd/peggo/ethrpc_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package peggo

import (
"context"
"fmt"
"math/big"
"os"
"strings"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/rpc"
"github.com/knadh/koanf"
"github.com/pkg/errors"
)

type EthRPCManager struct {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a godoc :)

currentEndpoint int // index (in the slice of configured RPC endpoints) of most recent endpoint used
client *rpc.Client
konfig *koanf.Koanf
}

// creates an instance of EthRPCManager with a given konfig.
func NewEthRPCManager(konfig *koanf.Koanf) *EthRPCManager {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible for there to be concurrent use of an EthRPCManager instance?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use of the embedded clients themselves should be concurrency-safe, but concurrent redialing / closing would need to be worked on - for example if one goroutine closed the client (perhaps as a result of an error in a grpc call) while another was doing something, the latter could geta very unexpected "connection was closed" type of error.

Or if two concurrent redialing loops were active, a whole lot could happen. It should be possible to add a lock on DialNext() in particular before merging if we move forward with this.

ethManager := &EthRPCManager{
konfig: konfig,
}
return ethManager
}

// closes and sets to nil the stored eth RPC client
func (em *EthRPCManager) CloseClient() {
if em.client != nil {
em.client.Close()
em.client = nil
}
}

// closes the current client and dials configured ethereum rpc endpoints in a roundrobin fashion until one
// is connected. returns an error if no endpoints ar configured or all dials failed
func (em *EthRPCManager) DialNext() error {
if em.konfig == nil {
return errors.New("ethRPCManager konfig is nil")
}

rpcs := strings.Split(strings.ReplaceAll(em.konfig.String(flagEthRPCs), " ", ""), ",")

em.CloseClient()

dialIndex := func(i int) bool {
if cli, err := rpc.Dial(rpcs[i]); err == nil {
em.currentEndpoint = i
em.client = cli
return true
}
fmt.Fprintf(os.Stderr, "Failed to dial to Ethereum RPC: %s\n", rpcs[i])

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably pass the core logger to NewEthRPCManager instead of using fmt.*.

return false
}

// first tries all endpoints in the slice after the current index

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably combine both these loops into one using the modulo operator.

for i := range rpcs {
if i > em.currentEndpoint && dialIndex(i) {
fmt.Fprintf(os.Stderr, "Connected to Ethereum RPC: %s\n", rpcs[i])
return nil
}
}

// then tries remaining endpoints from the beginning of the slice
for i := range rpcs {
if i <= em.currentEndpoint && dialIndex(i) {
fmt.Fprintf(os.Stderr, "Connected to Ethereum RPC: %s\n", rpcs[i])
return nil
}
}

return errors.New(fmt.Sprintf("failed to dial any of the %d Ethereum RPC endpoints configured", len(rpcs)))
}

// returns the current eth RPC client, dialing one first if nonexistent
func (em *EthRPCManager) GetClient() (*rpc.Client, error) {
if em.client == nil {
if err := em.DialNext(); err != nil {
return nil, err
}
}
return em.client, nil
}

// returns the current eth RPC client, dialing one first if nonexistent
func (em *EthRPCManager) GetEthClient() (*ethclient.Client, error) {
cli, err := em.GetClient()
if err != nil {
return nil, err
}
return ethclient.NewClient(cli), nil
}

// wraps ethclient.PendingNonceAt, also closing client if PendingNonceAt returns an error
func (em *EthRPCManager) PendingNonceAt(ctx context.Context, addr common.Address) (uint64, error) {
cli, err := em.GetEthClient()
if err != nil {
return 0, err
}
nonce, err := cli.PendingNonceAt(ctx, addr)
if err != nil {
em.CloseClient()
return 0, err
}
return nonce, nil
}

// wraps ethclient.ChainID, also closing client if ChainID returns an error
func (em *EthRPCManager) ChainID(ctx context.Context) (*big.Int, error) {
cli, err := em.GetEthClient()
if err != nil {
return nil, err
}
id, err := cli.ChainID(ctx)
if err != nil {
em.CloseClient()
return nil, err
}
return id, nil
}

// wraps ethclient.SuggestGasPrice, also closing client if SuggestGasPrice returns an error
func (em *EthRPCManager) SuggestGasPrice(ctx context.Context) (*big.Int, error) {
cli, err := em.GetEthClient()
if err != nil {
return nil, err
}
price, err := cli.SuggestGasPrice(ctx)
if err != nil {
em.CloseClient()
return nil, err
}
return price, nil
}
8 changes: 8 additions & 0 deletions cmd/peggo/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
package peggo

import (
"fmt"
"net/url"
"strings"

Expand Down Expand Up @@ -37,6 +38,7 @@ const (
flagEthPK = "eth-pk"
flagEthUseLedger = "eth-use-ledger"
flagEthRPC = "eth-rpc"
flagEthRPCs = "eth-rpcs"
flagEthGasAdjustment = "eth-gas-price-adjustment"
flagEthGasLimitAdjustment = "eth-gas-limit-adjustment"
flagEthAlchemyWS = "eth-alchemy-ws"
Expand Down Expand Up @@ -93,19 +95,25 @@ func ethereumOptsFlagSet() *pflag.FlagSet {
fs := pflag.NewFlagSet("", pflag.ContinueOnError)

fs.String(flagEthRPC, "http://localhost:8545", "Specify the RPC address of an Ethereum node")
fs.String(flagEthRPCs, "http://localhost:8545", "Specify comma-separated RPC addresses of one or more Ethereum nodes")
fs.Float64(flagEthGasAdjustment, float64(1.3), "Specify a gas price adjustment for Ethereum transactions")
fs.Float64(flagEthGasLimitAdjustment, float64(1.2), "Specify a gas limit adjustment for Ethereum transactions")

_ = fs.MarkDeprecated(flagEthRPC, fmt.Sprintf("Use the '%s' flag instead to provide one or more Ethereum RPC instances", flagEthRPCs))

return fs
}

func bridgeFlagSet() *pflag.FlagSet {
fs := pflag.NewFlagSet("", pflag.ContinueOnError)

fs.String(flagEthRPC, "http://localhost:8545", "Specify the RPC address of an Ethereum node")
fs.String(flagEthRPCs, "http://localhost:8545", "Specify comma-separated RPC addresses of one or more Ethereum nodes")
fs.Int64(flagEthGasPrice, 0, "The Ethereum gas price to include in the transaction; If zero, gas price will be estimated")
fs.Int64(flagEthGasLimit, 6000000, "The Ethereum gas limit to include in the transaction")

_ = fs.MarkDeprecated(flagEthRPC, fmt.Sprintf("Use the '%s' flag instead to provide one or more Ethereum RPC instances", flagEthRPCs))

return fs
}

Expand Down
Loading