Skip to content

Commit

Permalink
failover that retries rpc (#346)
Browse files Browse the repository at this point in the history
Co-authored-by: Ubuntu <[email protected]>
  • Loading branch information
bxue-l2 and Ubuntu authored Mar 21, 2024
1 parent 2ead9bc commit f9c3c67
Show file tree
Hide file tree
Showing 20 changed files with 1,224 additions and 35 deletions.
23 changes: 18 additions & 5 deletions common/geth/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,21 @@ var (
rpcUrlFlagName = "chain.rpc"
privateKeyFlagName = "chain.private-key"
numConfirmationsFlagName = "chain.num-confirmations"
numRetriesFlagName = "chain.num-retries"
)

type EthClientConfig struct {
RPCURL string
RPCURLs []string
PrivateKeyString string
NumConfirmations int
NumRetries int
}

func EthClientFlags(envPrefix string) []cli.Flag {
return []cli.Flag{
cli.StringFlag{
cli.StringSliceFlag{
Name: rpcUrlFlagName,
Usage: "Chain rpc",
Usage: "Chain rpc. Disperser/Batcher can accept multiple comma separated rpc url. Node only uses the first one",
Required: true,
EnvVar: common.PrefixEnvVar(envPrefix, "CHAIN_RPC"),
},
Expand All @@ -38,22 +40,33 @@ func EthClientFlags(envPrefix string) []cli.Flag {
Value: 0,
EnvVar: common.PrefixEnvVar(envPrefix, "NUM_CONFIRMATIONS"),
},
cli.IntFlag{
Name: numRetriesFlagName,
Usage: "Number of maximal retry for each rpc call after failure",
Required: false,
Value: 2,
EnvVar: common.PrefixEnvVar(envPrefix, "NUM_RETRIES"),
},
}
}

func ReadEthClientConfig(ctx *cli.Context) EthClientConfig {
cfg := EthClientConfig{}
cfg.RPCURL = ctx.GlobalString(rpcUrlFlagName)
cfg.RPCURLs = ctx.GlobalStringSlice(rpcUrlFlagName)
cfg.PrivateKeyString = ctx.GlobalString(privateKeyFlagName)
cfg.NumConfirmations = ctx.GlobalInt(numConfirmationsFlagName)
cfg.NumRetries = ctx.GlobalInt(numRetriesFlagName)

return cfg
}

// ReadEthClientConfigRPCOnly doesn't read private key from flag.
// The private key for Node should be read from encrypted key file.
func ReadEthClientConfigRPCOnly(ctx *cli.Context) EthClientConfig {
cfg := EthClientConfig{}
cfg.RPCURL = ctx.GlobalString(rpcUrlFlagName)
cfg.RPCURLs = ctx.GlobalStringSlice(rpcUrlFlagName)
cfg.NumConfirmations = ctx.GlobalInt(numConfirmationsFlagName)
cfg.NumRetries = ctx.GlobalInt(numRetriesFlagName)

return cfg
}
11 changes: 8 additions & 3 deletions common/geth/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,13 @@ var _ common.EthClient = (*EthClient)(nil)
// NewClient creates a new Ethereum client.
// If PrivateKeyString in the config is empty, the client will not be able to send transactions, and it will use the senderAddress to create transactions.
// If PrivateKeyString in the config is not empty, the client will be able to send transactions, and the senderAddress is ignored.
func NewClient(config EthClientConfig, senderAddress gethcommon.Address, logger logging.Logger) (*EthClient, error) {
chainClient, err := ethclient.Dial(config.RPCURL)
func NewClient(config EthClientConfig, senderAddress gethcommon.Address, rpcIndex int, logger logging.Logger) (*EthClient, error) {
if rpcIndex >= len(config.RPCURLs) {
return nil, fmt.Errorf("NewClient: index out of bound, array size is %v, requested is %v", len(config.RPCURLs), rpcIndex)
}

rpcUrl := config.RPCURLs[rpcIndex]
chainClient, err := ethclient.Dial(rpcUrl)
if err != nil {
return nil, fmt.Errorf("NewClient: cannot connect to provider: %w", err)
}
Expand Down Expand Up @@ -70,7 +75,7 @@ func NewClient(config EthClientConfig, senderAddress gethcommon.Address, logger
}

c := &EthClient{
RPCURL: config.RPCURL,
RPCURL: rpcUrl,
privateKey: privateKey,
chainID: chainIDBigInt,
AccountAddress: accountAddress,
Expand Down
45 changes: 45 additions & 0 deletions common/geth/failover.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package geth

import (
"sync"

"github.com/Layr-Labs/eigensdk-go/logging"
)

type FailoverController struct {
mu *sync.RWMutex
numberRpcFault uint64

Logger logging.Logger
}

func NewFailoverController(logger logging.Logger) *FailoverController {
return &FailoverController{
Logger: logger,
mu: &sync.RWMutex{},
}
}

// ProcessError attributes the error and updates total number of fault for RPC
// It returns if RPC should immediately give up
func (f *FailoverController) ProcessError(err error) bool {
f.mu.Lock()
defer f.mu.Unlock()
if err == nil {
return false
}

nextEndpoint, action := f.handleError(err)

if nextEndpoint == NewRPC {
f.numberRpcFault += 1
}

return action == Return
}

func (f *FailoverController) GetTotalNumberRpcFault() uint64 {
f.mu.RLock()
defer f.mu.RUnlock()
return f.numberRpcFault
}
80 changes: 80 additions & 0 deletions common/geth/handle_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package geth

import (
"errors"

"github.com/ethereum/go-ethereum/rpc"
)

type ImmediateAction int

const (
Return ImmediateAction = iota
Retry
)

type NextEndpoint int

const (
NewRPC = iota
CurrentRPC
)

// handleHttpError returns a boolean indicating if the current RPC should be rotated
// the second boolean indicating if should giveup immediately
func (f *FailoverController) handleHttpError(httpRespError rpc.HTTPError) (NextEndpoint, ImmediateAction) {
sc := httpRespError.StatusCode
// Default to rotation the current RPC, because it allows a higher chance to get the query completed.
f.Logger.Info("[HTTP Response Error]", "Status Code", sc, "Error", httpRespError)

if sc >= 200 && sc < 300 {
// 2xx error, however it should not be reachable
return CurrentRPC, Return
}

if sc >= 400 && sc < 500 {
// 403 Forbidden, 429 Too many Requests. We should rotate
if sc == 403 || sc == 429 {
return NewRPC, Retry
}
return CurrentRPC, Retry
}

// 500
return NewRPC, Retry
}

// handleError returns a boolean indicating if the current connection should be rotated.
// Because library of the sender uses geth, which supports only 3 types of connections,
// we can categorize the error as HTTP error, Websocket error and IPC error.
//
// If the error is http, non2xx error would generate HTTP error, https://github.com/ethereum/go-ethereum/blob/master/rpc/http.go#L233
// but a 2xx http response could contain JSON RPC error, https://github.com/ethereum/go-ethereum/blob/master/rpc/http.go#L181
// If the error is Websocket or IPC, we only look for JSON error, https://github.com/ethereum/go-ethereum/blob/master/rpc/json.go#L67

func (f *FailoverController) handleError(err error) (NextEndpoint, ImmediateAction) {

var httpRespError rpc.HTTPError
if errors.As(err, &httpRespError) {
// if error is http error, i.e. non 2xx error, it is handled here
// if it is 2xx error, the error message is nil, https://github.com/ethereum/go-ethereum/blob/master/rpc/http.go,
// execution does not enter here.
return f.handleHttpError(httpRespError)
} else {
// it might be http2xx error, websocket error or ipc error. Parse json error code
var rpcError rpc.Error
if errors.As(err, &rpcError) {
ec := rpcError.ErrorCode()
f.Logger.Info("[JSON RPC Response Error]", "Error Code", ec, "Error", rpcError)
// we always attribute JSON RPC error as sender's fault, i.e no connection rotation
return CurrentRPC, Return
}

// If no http response or no rpc response is returned, it is a connection issue,
// since we can't accurately attribute the network issue to neither sender nor receiver
// side. Optimistically, switch rpc client
f.Logger.Info("[Default Response Error]", err)
return NewRPC, Retry
}

}
2 changes: 1 addition & 1 deletion common/geth/instrumented_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type InstrumentedEthClient struct {
var _ common.EthClient = (*InstrumentedEthClient)(nil)

func NewInstrumentedEthClient(config EthClientConfig, rpcCallsCollector *rpccalls.Collector, logger logging.Logger) (*InstrumentedEthClient, error) {
ethClient, err := NewClient(config, gethcommon.Address{}, logger)
ethClient, err := NewClient(config, gethcommon.Address{}, 0, logger)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit f9c3c67

Please sign in to comment.