Skip to content

Commit

Permalink
Merge branch 'PRT-add-relay-retry-atttempts-flag' into v2-5-4-hf
Browse files Browse the repository at this point in the history
  • Loading branch information
ranlavanet committed Aug 26, 2024
2 parents 0459d21 + a2f8eef commit 4ef940c
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 29 deletions.
6 changes: 5 additions & 1 deletion protocol/common/cobra_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
// Disable relay retries when we get node errors.
// This feature is suppose to help with successful relays in some chains that return node errors on rare race conditions on the serviced chains.
DisableRetryOnNodeErrorsFlag = "disable-retry-on-node-error"
SetRelayCountOnNodeErrorFlag = "set-retry-count-on-node-error"
DisableCacheOnNodeErrorFlag = "disable-cache-on-node-error"
UseOfflineSpecFlag = "use-offline-spec" // allows the user to manually load a spec providing a path, this is useful to test spec changes before they hit the blockchain
)

Expand All @@ -56,7 +58,9 @@ type ConsumerCmdFlags struct {
DebugRelays bool // enables debug mode for relays
DisableConflictTransactions bool // disable conflict transactions
DisableRetryOnNodeErrors bool // disable retries on node errors
OfflineSpecPath string // path to the spec file, works only when bootstrapping a single chain.
SetRelayCountOnNodeError int
DisableCacheOnNodeError bool
OfflineSpecPath string // path to the spec file, works only when bootstrapping a single chain.
}

// default rolling logs behavior (if enabled) will store 3 files each 100MB for up to 1 day every time.
Expand Down
19 changes: 13 additions & 6 deletions protocol/rpcconsumer/relay_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,14 @@ type RelayProcessor struct {
allowSessionDegradation uint32 // used in the scenario where extension was previously used.
metricsInf MetricsInterface
chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter
disableRelayRetry bool
relayRetriesManager *RelayRetriesManager
retryOptions relayProcessorRetryOptions
}

type relayProcessorRetryOptions struct {
relayCountOnNodeError int
disableCacheOnNodeError bool
disableRelayRetry bool
}

func NewRelayProcessor(
Expand All @@ -74,7 +80,7 @@ func NewRelayProcessor(
debugRelay bool,
metricsInf MetricsInterface,
chainIdAndApiInterfaceGetter chainIdAndApiInterfaceGetter,
disableRelayRetry bool,
retryOptions relayProcessorRetryOptions,
relayRetriesManager *RelayRetriesManager,
) *RelayProcessor {
guid, _ := utils.GetUniqueIdentifier(ctx)
Expand All @@ -100,7 +106,7 @@ func NewRelayProcessor(
debugRelay: debugRelay,
metricsInf: metricsInf,
chainIdAndApiInterfaceGetter: chainIdAndApiInterfaceGetter,
disableRelayRetry: disableRelayRetry,
retryOptions: retryOptions,
relayRetriesManager: relayRetriesManager,
}
}
Expand Down Expand Up @@ -320,12 +326,13 @@ func (rp *RelayProcessor) shouldRetryRelay(resultsCount int, hashErr error, node
// 2. If we have 0 successful relays and we have only node errors.
// 3. Hash calculation was successful.
// 4. Number of retries < NumberOfRetriesAllowedOnNodeErrors.
if !rp.disableRelayRetry && resultsCount == 0 && hashErr == nil {
if nodeErrors <= NumberOfRetriesAllowedOnNodeErrors {
if !rp.retryOptions.disableRelayRetry && resultsCount == 0 && hashErr == nil {
if nodeErrors <= rp.retryOptions.relayCountOnNodeError {
// TODO: check chain message retry on archive. (this feature will be added in the generic parsers feature)

// Check if user specified to disable caching - OR
// Check hash already exist, if it does, we don't want to retry
if !rp.relayRetriesManager.CheckHashInCache(hash) {
if rp.retryOptions.disableCacheOnNodeError || !rp.relayRetriesManager.CheckHashInCache(hash) {
// If we didn't find the hash in the hash map we can retry
utils.LavaFormatTrace("retrying on relay error", utils.LogAttr("retry_number", nodeErrors), utils.LogAttr("hash", hash))
go rp.metricsInf.SetNodeErrorAttemptMetric(rp.chainIdAndApiInterfaceGetter.GetChainIdAndApiInterface())
Expand Down
31 changes: 18 additions & 13 deletions protocol/rpcconsumer/relay_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ import (
"github.com/stretchr/testify/require"
)

var retryOptionsTest = relayProcessorRetryOptions{
disableRelayRetry: false,
relayCountOnNodeError: 2,
disableCacheOnNodeError: false,
}

type relayProcessorMetricsMock struct{}

func (romm *relayProcessorMetricsMock) SetRelayNodeErrorMetric(chainId string, apiInterface string) {}
Expand Down Expand Up @@ -104,7 +110,7 @@ func TestRelayProcessorHappyFlow(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance)

usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand Down Expand Up @@ -146,7 +152,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance)

usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand Down Expand Up @@ -188,7 +194,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) {
// check hash map flow:
chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance)
relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance)
usedProviders = relayProcessor.GetUsedProviders()
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
Expand All @@ -212,7 +218,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) {
// check hash map flow:
chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/18", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance)
relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance)
usedProviders = relayProcessor.GetUsedProviders()
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
Expand All @@ -236,7 +242,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) {
// 4th relay, same inputs, this time a successful relay, should remove the hash from the map
chainMsg, err = chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance)
relayProcessor = NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance)
usedProviders = relayProcessor.GetUsedProviders()
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
Expand Down Expand Up @@ -285,8 +291,7 @@ func TestRelayProcessorNodeErrorRetryFlow(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance)
relayProcessor.disableRelayRetry = true
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance)
usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
Expand Down Expand Up @@ -323,7 +328,7 @@ func TestRelayProcessorTimeout(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance)

usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand Down Expand Up @@ -375,7 +380,7 @@ func TestRelayProcessorRetry(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance)

usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand Down Expand Up @@ -419,7 +424,7 @@ func TestRelayProcessorRetryNodeError(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/17", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance)

usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
Expand Down Expand Up @@ -464,7 +469,7 @@ func TestRelayProcessorStatefulApi(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/tx/v1beta1/txs", []byte("data"), http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance)
usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
Expand Down Expand Up @@ -509,7 +514,7 @@ func TestRelayProcessorStatefulApiErr(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/tx/v1beta1/txs", []byte("data"), http.MethodPost, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance)
usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
Expand Down Expand Up @@ -553,7 +558,7 @@ func TestRelayProcessorLatest(t *testing.T) {
require.NoError(t, err)
chainMsg, err := chainParser.ParseMsg("/cosmos/base/tendermint/v1beta1/blocks/latest", nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.NoError(t, err)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, false, relayRetriesManagerInstance)
relayProcessor := NewRelayProcessor(ctx, lavasession.NewUsedProviders(nil), 1, chainMsg, nil, "", "", false, relayProcessorMetrics, relayProcessorMetrics, retryOptionsTest, relayRetriesManagerInstance)
usedProviders := relayProcessor.GetUsedProviders()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10)
defer cancel()
Expand Down
5 changes: 5 additions & 0 deletions protocol/rpcconsumer/rpcconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,8 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77
DebugRelays: viper.GetBool(DebugRelaysFlagName),
DisableConflictTransactions: viper.GetBool(common.DisableConflictTransactionsFlag),
DisableRetryOnNodeErrors: viper.GetBool(common.DisableRetryOnNodeErrorsFlag),
SetRelayCountOnNodeError: viper.GetInt(common.SetRelayCountOnNodeErrorFlag),
DisableCacheOnNodeError: viper.GetBool(common.DisableCacheOnNodeErrorFlag),
OfflineSpecPath: viper.GetString(common.UseOfflineSpecFlag),
}

Expand Down Expand Up @@ -634,6 +636,9 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77
cmdRPCConsumer.Flags().Bool(common.DisableRetryOnNodeErrorsFlag, false, "Disable relay retries on node errors, prevent the rpcconsumer trying a different provider")
cmdRPCConsumer.Flags().String(common.UseOfflineSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain")

cmdRPCConsumer.Flags().Int(common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors")
cmdRPCConsumer.Flags().Bool(common.DisableCacheOnNodeErrorFlag, false, "cancel the use of cache to block retry attempts that failed in the past, this will cause every node error to send multiple relays for ever including spam")

common.AddRollingLogConfig(cmdRPCConsumer)
return cmdRPCConsumer
}
Expand Down
Loading

0 comments on commit 4ef940c

Please sign in to comment.