Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

failover that retries rpc #346

Merged
merged 25 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 18 additions & 5 deletions common/geth/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,21 @@ var (
rpcUrlFlagName = "chain.rpc"
privateKeyFlagName = "chain.private-key"
numConfirmationsFlagName = "chain.num-confirmations"
numRetriesFlagName = "chain.num-retries"
)

type EthClientConfig struct {
RPCURL string
RPCURLs []string
PrivateKeyString string
NumConfirmations int
NumRetries int
}

func EthClientFlags(envPrefix string) []cli.Flag {
return []cli.Flag{
cli.StringFlag{
cli.StringSliceFlag{
ian-shim marked this conversation as resolved.
Show resolved Hide resolved
Name: rpcUrlFlagName,
Usage: "Chain rpc",
Usage: "Chain rpc. Disperser/Batcher can accept multiple comma separated rpc url. Node only uses the first one",
Required: true,
EnvVar: common.PrefixEnvVar(envPrefix, "CHAIN_RPC"),
},
Expand All @@ -38,22 +40,33 @@ func EthClientFlags(envPrefix string) []cli.Flag {
Value: 0,
EnvVar: common.PrefixEnvVar(envPrefix, "NUM_CONFIRMATIONS"),
},
cli.IntFlag{
Name: numRetriesFlagName,
Usage: "Number of maximal retry for each rpc call after failure",
Required: false,
Value: 2,
EnvVar: common.PrefixEnvVar(envPrefix, "NUM_RETRIES"),
},
}
}

func ReadEthClientConfig(ctx *cli.Context) EthClientConfig {
cfg := EthClientConfig{}
cfg.RPCURL = ctx.GlobalString(rpcUrlFlagName)
cfg.RPCURLs = ctx.GlobalStringSlice(rpcUrlFlagName)
cfg.PrivateKeyString = ctx.GlobalString(privateKeyFlagName)
cfg.NumConfirmations = ctx.GlobalInt(numConfirmationsFlagName)
cfg.NumRetries = ctx.GlobalInt(numRetriesFlagName)

return cfg
}

// ReadEthClientConfigRPCOnly doesn't read private key from flag.
// The private key for Node should be read from encrypted key file.
func ReadEthClientConfigRPCOnly(ctx *cli.Context) EthClientConfig {
cfg := EthClientConfig{}
cfg.RPCURL = ctx.GlobalString(rpcUrlFlagName)
cfg.RPCURLs = ctx.GlobalStringSlice(rpcUrlFlagName)
cfg.NumConfirmations = ctx.GlobalInt(numConfirmationsFlagName)
cfg.NumRetries = ctx.GlobalInt(numRetriesFlagName)

return cfg
}
11 changes: 8 additions & 3 deletions common/geth/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,13 @@ var _ common.EthClient = (*EthClient)(nil)
// NewClient creates a new Ethereum client.
// If PrivateKeyString in the config is empty, the client will not be able to send transactions, and it will use the senderAddress to create transactions.
// If PrivateKeyString in the config is not empty, the client will be able to send transactions, and the senderAddress is ignored.
func NewClient(config EthClientConfig, senderAddress gethcommon.Address, logger logging.Logger) (*EthClient, error) {
chainClient, err := ethclient.Dial(config.RPCURL)
func NewClient(config EthClientConfig, senderAddress gethcommon.Address, rpcIndex int, logger logging.Logger) (*EthClient, error) {
if rpcIndex >= len(config.RPCURLs) {
return nil, fmt.Errorf("NewClient: index out of bound, array size is %v, requested is %v", len(config.RPCURLs), rpcIndex)
}

rpcUrl := config.RPCURLs[rpcIndex]
chainClient, err := ethclient.Dial(rpcUrl)
if err != nil {
return nil, fmt.Errorf("NewClient: cannot connect to provider: %w", err)
}
Expand Down Expand Up @@ -70,7 +75,7 @@ func NewClient(config EthClientConfig, senderAddress gethcommon.Address, logger
}

c := &EthClient{
RPCURL: config.RPCURL,
RPCURL: rpcUrl,
privateKey: privateKey,
chainID: chainIDBigInt,
AccountAddress: accountAddress,
Expand Down
44 changes: 44 additions & 0 deletions common/geth/failover.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package geth

import (
"sync"

"github.com/Layr-Labs/eigensdk-go/logging"
)

type RPCStatistics struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why you change to this name, this does more than statistics like counting num errors, but actually handle and make failover decisions. The old name may fit better.

numberRpcFault uint64
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: group private members and public members separately

{
  mu *sync.RWMutex
  numberRpcFault uint64

  Logger logging.Logger
}

Logger logging.Logger
mu *sync.RWMutex
}

func NewRPCStatistics(logger logging.Logger) *RPCStatistics {
return &RPCStatistics{
Logger: logger,
mu: &sync.RWMutex{},
}
}

// ProcessError attributes the error and updates total number of fault for RPC
// It returns if RPC should immediately give up
func (f *RPCStatistics) ProcessError(err error) bool {
f.mu.Lock()
defer f.mu.Unlock()
if err == nil {
return false
}

nextEndpoint, action := f.handleError(err)

if nextEndpoint == NewRPC {
f.numberRpcFault += 1
}

return action == Return
}

func (f *RPCStatistics) GetTotalNumberRpcFault() uint64 {
f.mu.RLock()
defer f.mu.RUnlock()
return f.numberRpcFault
}
80 changes: 80 additions & 0 deletions common/geth/handle_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package geth

import (
"errors"

"github.com/ethereum/go-ethereum/rpc"
)

type ImmediateAction int

const (
Return ImmediateAction = iota
Retry
)

type NextEndpoint int

const (
NewRPC = iota
CurrentRPC
)

// handleHttpError returns a boolean indicating if the current RPC should be rotated
// the second boolean indicating if should giveup immediately
func (f *RPCStatistics) handleHttpError(httpRespError rpc.HTTPError) (NextEndpoint, ImmediateAction) {
sc := httpRespError.StatusCode
// Default to rotation the current RPC, because it allows a higher chance to get the query completed.
f.Logger.Info("[HTTP Response Error]", "Status Code", sc, "Error", httpRespError)

if sc >= 200 && sc < 300 {
// 2xx error, however it should not be reachable
return CurrentRPC, Return
}

if sc >= 400 && sc < 500 {
// 403 Forbidden, 429 Too many Requests. We should rotate
if sc == 403 || sc == 429 {
return NewRPC, Retry
}
return CurrentRPC, Retry
}

// 500
return NewRPC, Retry
}

// handleError returns a boolean indicating if the current connection should be rotated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be updated with the new return type

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

still to be done

// Because library of the sender uses geth, which supports only 3 types of connections,
// we can categorize the error as HTTP error, Websocket error and IPC error.
//
// If the error is http, non2xx error would generate HTTP error, https://github.com/ethereum/go-ethereum/blob/master/rpc/http.go#L233
// but a 2xx http response could contain JSON RPC error, https://github.com/ethereum/go-ethereum/blob/master/rpc/http.go#L181
// If the error is Websocket or IPC, we only look for JSON error, https://github.com/ethereum/go-ethereum/blob/master/rpc/json.go#L67

func (f *RPCStatistics) handleError(err error) (NextEndpoint, ImmediateAction) {

var httpRespError rpc.HTTPError
if errors.As(err, &httpRespError) {
// if error is http error, i.e. non 2xx error, it is handled here
// if it is 2xx error, the error message is nil, https://github.com/ethereum/go-ethereum/blob/master/rpc/http.go,
// execution does not entere here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

entere -> enter

return f.handleHttpError(httpRespError)
} else {
// it might be http2xx error, websocket error or ipc error. Parse json error code
var rpcError rpc.Error
if errors.As(err, &rpcError) {
ec := rpcError.ErrorCode()
f.Logger.Info("[JSON RPC Response Error]", "Error Code", ec, "Error", rpcError)
// we always attribute JSON RPC error as sender's fault, i.e no connection rotation
return CurrentRPC, Return
}

// If no http response or no rpc response is returned, it is a connection issue,
// since we can't accurately attribute the network issue to neither sender nor receiver
// side. Optimistically, switch rpc client
f.Logger.Info("[Default Response Error]", err)
return NewRPC, Retry
}

}
2 changes: 1 addition & 1 deletion common/geth/instrumented_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ type InstrumentedEthClient struct {
var _ common.EthClient = (*InstrumentedEthClient)(nil)

func NewInstrumentedEthClient(config EthClientConfig, rpcCallsCollector *rpccalls.Collector, logger logging.Logger) (*InstrumentedEthClient, error) {
ethClient, err := NewClient(config, gethcommon.Address{}, logger)
ethClient, err := NewClient(config, gethcommon.Address{}, 0, logger)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading