Skip to content

Commit

Permalink
feat: PRT - Retry node errors mechanism (#1598)
Browse files Browse the repository at this point in the history
* adding retry on archive to api collection

* remove node errors from cache.

* implement rpc consumer relay retry flow

* fix test

* add flags to disable feature

* add hash calculations on chain messages

* add interfaces messages

* remove collection changes

* add more test flows

* fix comment on batches

* lint baby

* fix bug and fix tendermint URI hash

* rename GetRawRequestHash

* change cache to ristretto

* feat: PRT - Add metrics to relay retry and fix a bug in relay process hash conversion (#1600)

* Add metrics to relay retry and fix a bug in relay process hash conversion

* change name to match cache
  • Loading branch information
ranlavanet authored Aug 1, 2024
1 parent 5864b7a commit 8a455e5
Show file tree
Hide file tree
Showing 16 changed files with 511 additions and 64 deletions.
15 changes: 15 additions & 0 deletions protocol/chainlib/chain_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type updatableRPCInput interface {
rpcInterfaceMessages.GenericMessage
UpdateLatestBlockInMessage(latestBlock uint64, modifyContent bool) (success bool)
AppendHeader(metadata []pairingtypes.Metadata)
GetRawRequestHash() ([]byte, error)
}

type baseChainMessageContainer struct {
Expand All @@ -26,11 +27,25 @@ type baseChainMessageContainer struct {
extensions []*spectypes.Extension
timeoutOverride time.Duration
forceCacheRefresh bool
inputHashCache []byte
// resultErrorParsingMethod passed by each api interface message to parse the result of the message
// and validate it doesn't contain a node error
resultErrorParsingMethod func(data []byte, httpStatusCode int) (hasError bool, errorMessage string)
}

func (pm *baseChainMessageContainer) GetRawRequestHash() ([]byte, error) {
if pm.inputHashCache != nil && len(pm.inputHashCache) > 0 {
// Get the cached value
return pm.inputHashCache, nil
}
hash, err := pm.msg.GetRawRequestHash()
if err == nil {
// Now we have the hash cached so we call it only once.
pm.inputHashCache = hash
}
return hash, err
}

// not necessary for base chain message.
func (pm *baseChainMessageContainer) CheckResponseError(data []byte, httpStatusCode int) (hasError bool, errorMessage string) {
if pm.resultErrorParsingMethod == nil {
Expand Down
1 change: 1 addition & 0 deletions protocol/chainlib/chainlib.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type ChainMessage interface {
GetForceCacheRefresh() bool
SetForceCacheRefresh(force bool) bool
CheckResponseError(data []byte, httpStatusCode int) (hasError bool, errorMessage string)
GetRawRequestHash() ([]byte, error)

ChainMessageForSend
}
Expand Down
3 changes: 3 additions & 0 deletions protocol/chainlib/chainproxy/rpcInterfaceMessages/common.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package rpcInterfaceMessages

import (
sdkerrors "cosmossdk.io/errors"
"github.com/goccy/go-json"

"github.com/lavanet/lava/v2/protocol/chainlib/chainproxy"
"github.com/lavanet/lava/v2/protocol/parser"
pairingtypes "github.com/lavanet/lava/v2/x/pairing/types"
)

var WontCalculateBatchHash = sdkerrors.New("Wont calculate batch hash", 892, "wont calculate batch message hash") // on batches we just wont calculate hashes, meaning we wont retry.

type ParsableRPCInput struct {
Result json.RawMessage
chainproxy.BaseMessage
Expand Down
13 changes: 13 additions & 0 deletions protocol/chainlib/chainproxy/rpcInterfaceMessages/grpcMessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
dyncodec "github.com/lavanet/lava/v2/protocol/chainlib/grpcproxy/dyncodec"
"github.com/lavanet/lava/v2/protocol/parser"
"github.com/lavanet/lava/v2/utils"
"github.com/lavanet/lava/v2/utils/sigs"
"google.golang.org/grpc/codes"
"google.golang.org/protobuf/reflect/protoreflect"
"google.golang.org/protobuf/types/dynamicpb"
Expand All @@ -33,6 +34,18 @@ type GrpcMessage struct {
chainproxy.BaseMessage
}

// get msg hash byte array containing all the relevant information for a unique request. (headers / api / params)
func (gm *GrpcMessage) GetRawRequestHash() ([]byte, error) {
headers := gm.GetHeaders()
headersByteArray, err := json.Marshal(headers)
if err != nil {
utils.LavaFormatError("Failed marshalling headers on jsonRpc message", err, utils.LogAttr("headers", headers))
return []byte{}, err
}
pathByteArray := []byte(gm.Path)
return sigs.HashMsg(append(append(pathByteArray, gm.Msg...), headersByteArray...)), nil
}

func (jm GrpcMessage) CheckResponseError(data []byte, httpStatusCode int) (hasError bool, errorMessage string) {
// grpc status code different than OK or 0 is a node error.
if httpStatusCode != 0 && httpStatusCode != http.StatusOK {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/lavanet/lava/v2/protocol/chainlib/chainproxy/rpcclient"
"github.com/lavanet/lava/v2/protocol/parser"
"github.com/lavanet/lava/v2/utils"
"github.com/lavanet/lava/v2/utils/sigs"
)

var ErrFailedToConvertMessage = sdkerrors.New("RPC error", 1000, "failed to convert a message")
Expand All @@ -24,6 +25,25 @@ type JsonrpcMessage struct {
chainproxy.BaseMessage `json:"-"`
}

// get msg hash byte array containing all the relevant information for a unique request. (headers / api / params)
func (jm *JsonrpcMessage) GetRawRequestHash() ([]byte, error) {
headers := jm.GetHeaders()
headersByteArray, err := json.Marshal(headers)
if err != nil {
utils.LavaFormatError("Failed marshalling headers on jsonRpc message", err, utils.LogAttr("headers", headers))
return []byte{}, err
}

methodByteArray := []byte(jm.Method)

paramsByteArray, err := json.Marshal(jm.Params)
if err != nil {
utils.LavaFormatError("Failed marshalling params on jsonRpc message", err, utils.LogAttr("headers", jm.Params))
return []byte{}, err
}
return sigs.HashMsg(append(append(methodByteArray, paramsByteArray...), headersByteArray...)), nil
}

// returns if error exists and
func (jm JsonrpcMessage) CheckResponseError(data []byte, httpStatusCode int) (hasError bool, errorMessage string) {
result := &JsonrpcMessage{}
Expand Down Expand Up @@ -150,6 +170,12 @@ type JsonrpcBatchMessage struct {
chainproxy.BaseMessage
}

// on batches we don't want to calculate the batch hash as its impossible to get the args
// we will just return false so retry wont trigger.
func (jbm JsonrpcBatchMessage) GetRawRequestHash() ([]byte, error) {
return nil, WontCalculateBatchHash
}

func (jbm *JsonrpcBatchMessage) UpdateLatestBlockInMessage(latestBlock uint64, modifyContent bool) (success bool) {
return false
}
Expand Down
13 changes: 13 additions & 0 deletions protocol/chainlib/chainproxy/rpcInterfaceMessages/restMessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/lavanet/lava/v2/protocol/chainlib/chainproxy"
"github.com/lavanet/lava/v2/protocol/parser"
"github.com/lavanet/lava/v2/utils"
"github.com/lavanet/lava/v2/utils/sigs"
)

type RestMessage struct {
Expand All @@ -18,6 +19,18 @@ type RestMessage struct {
chainproxy.BaseMessage
}

// get msg hash byte array containing all the relevant information for a unique request. (headers / api / params)
func (rm *RestMessage) GetRawRequestHash() ([]byte, error) {
headers := rm.GetHeaders()
headersByteArray, err := json.Marshal(headers)
if err != nil {
utils.LavaFormatError("Failed marshalling headers on jsonRpc message", err, utils.LogAttr("headers", headers))
return []byte{}, err
}
pathByteArray := []byte(rm.Path)
return sigs.HashMsg(append(append(pathByteArray, rm.Msg...), headersByteArray...)), nil
}

func (jm RestMessage) CheckResponseError(data []byte, httpStatusCode int) (hasError bool, errorMessage string) {
if httpStatusCode >= 200 && httpStatusCode <= 300 { // valid code
return false, ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,33 @@ import (
"github.com/lavanet/lava/v2/protocol/chainlib/chainproxy/rpcclient"
"github.com/lavanet/lava/v2/protocol/parser"
"github.com/lavanet/lava/v2/utils"

"github.com/lavanet/lava/v2/utils/sigs"
)

type TendermintrpcMessage struct {
JsonrpcMessage
Path string
}

// get msg hash byte array containing all the relevant information for a unique request. (headers / api / params)
func (tm *TendermintrpcMessage) GetRawRequestHash() ([]byte, error) {
headers := tm.GetHeaders()
headersByteArray, err := json.Marshal(headers)
if err != nil {
utils.LavaFormatError("Failed marshalling headers on jsonRpc message", err, utils.LogAttr("headers", headers))
return []byte{}, err
}
methodByteArray := []byte(tm.Method + tm.Path)

paramsByteArray, err := json.Marshal(tm.Params)
if err != nil {
utils.LavaFormatError("Failed marshalling params on jsonRpc message", err, utils.LogAttr("headers", tm.Params))
return []byte{}, err
}
return sigs.HashMsg(append(append(methodByteArray, paramsByteArray...), headersByteArray...)), nil
}

func (cp TendermintrpcMessage) GetParams() interface{} {
return cp.Params
}
Expand Down
5 changes: 5 additions & 0 deletions protocol/chainlib/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chainlib

import (
"encoding/json"
"fmt"
"io"
"net/http/httptest"
"testing"
Expand Down Expand Up @@ -312,6 +313,10 @@ type mockRPCInput struct {
chainproxy.BaseMessage
}

func (m *mockRPCInput) GetRawRequestHash() ([]byte, error) {
return nil, fmt.Errorf("test")
}

func (m *mockRPCInput) GetParams() interface{} {
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions protocol/common/cobra_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ const (
RelayHealthIntervalFlag = "relays-health-interval" // interval between each relay health check, default 5m
SharedStateFlag = "shared-state"
DisableConflictTransactionsFlag = "disable-conflict-transactions" // disable conflict transactions, this will hard the network's data reliability and therefore will harm the service.
// Disable relay retries when we get node errors.
// This feature is suppose to help with successful relays in some chains that return node errors on rare race conditions on the serviced chains.
DisableRetryOnNodeErrorsFlag = "disable-retry-on-node-error"
)

const (
Expand All @@ -51,6 +54,7 @@ type ConsumerCmdFlags struct {
RelaysHealthIntervalFlag time.Duration // interval for relay health check
DebugRelays bool // enables debug mode for relays
DisableConflictTransactions bool // disable conflict transactions
DisableRetryOnNodeErrors bool // disable retries on node errors
}

// default rolling logs behavior (if enabled) will store 3 files each 100MB for up to 1 day every time.
Expand Down
Loading

0 comments on commit 8a455e5

Please sign in to comment.