From 530a7f1d2454db3b8f20c569d6b009eb3d823128 Mon Sep 17 00:00:00 2001 From: omerlavanet Date: Sun, 8 Dec 2024 13:18:05 +0200 Subject: [PATCH 1/6] remove spec requirement to contain all apis --- protocol/chainlib/base_chain_parser.go | 34 ++++++++++++++++++++++++++ protocol/chainlib/rest.go | 4 +++ protocol/common/cobra_common.go | 2 ++ protocol/rpcconsumer/rpcconsumer.go | 1 + protocol/rpcprovider/rpcprovider.go | 1 + 5 files changed, 42 insertions(+) diff --git a/protocol/chainlib/base_chain_parser.go b/protocol/chainlib/base_chain_parser.go index 1017fb22c0..00b03ea90d 100644 --- a/protocol/chainlib/base_chain_parser.go +++ b/protocol/chainlib/base_chain_parser.go @@ -20,6 +20,8 @@ import ( spectypes "github.com/lavanet/lava/v4/x/spec/types" ) +var AllowMissingApisByDefault = true + type PolicyInf interface { GetSupportedAddons(specID string) (addons []string, err error) GetSupportedExtensions(specID string) (extensions []epochstorage.EndpointService, err error) @@ -323,6 +325,35 @@ func (bcp *BaseChainParser) extensionParsingInner(addon string, parsedMessageArg bcp.extensionParser.ExtensionParsing(addon, parsedMessageArg, latestBlock) } +func (apip *BaseChainParser) defaultApiContainer(apiKey ApiKey) (*ApiContainer, error) { + // Guard that the GrpcChainParser instance exists + if apip == nil { + return nil, errors.New("ChainParser not defined") + } + utils.LavaFormatDebug("api not supported", utils.Attribute{Key: "apiKey", Value: apiKey}) + apiCont := &ApiContainer{ + api: &spectypes.Api{ + Enabled: true, + Name: "Default-" + apiKey.Name, + ComputeUnits: 20, // set 20 compute units by default + ExtraComputeUnits: 0, + Category: spectypes.SpecCategory{}, + BlockParsing: spectypes.BlockParser{ + ParserFunc: spectypes.PARSER_FUNC_EMPTY, + }, + TimeoutMs: 0, + Parsers: []spectypes.GenericParser{}, + }, + collectionKey: CollectionKey{ + ConnectionType: apiKey.ConnectionType, + InternalPath: apiKey.InternalPath, + Addon: "", + }, + } + + return apiCont, nil +} + // getSupportedApi fetches service api from spec by name func (apip *BaseChainParser) getSupportedApi(apiKey ApiKey) (*ApiContainer, error) { // Guard that the GrpcChainParser instance exists @@ -339,6 +370,9 @@ func (apip *BaseChainParser) getSupportedApi(apiKey ApiKey) (*ApiContainer, erro // Return an error if spec does not exist if !ok { + if AllowMissingApisByDefault { + return apip.defaultApiContainer(apiKey) + } return nil, common.APINotSupportedError } diff --git a/protocol/chainlib/rest.go b/protocol/chainlib/rest.go index f0646ce9a2..c8d6513964 100644 --- a/protocol/chainlib/rest.go +++ b/protocol/chainlib/rest.go @@ -176,6 +176,10 @@ func (apip *RestChainParser) getSupportedApi(name, connectionType string) (*ApiC // Return an error if spec does not exist if !ok { + if AllowMissingApisByDefault { + apiKey := ApiKey{Name: name, ConnectionType: connectionType, InternalPath: ""} + return apip.defaultApiContainer(apiKey) + } utils.LavaFormatDebug("rest api not supported", utils.LogAttr("name", name), utils.LogAttr("connectionType", connectionType), diff --git a/protocol/common/cobra_common.go b/protocol/common/cobra_common.go index 05e6259ec6..42392f19a5 100644 --- a/protocol/common/cobra_common.go +++ b/protocol/common/cobra_common.go @@ -50,6 +50,8 @@ const ( LimitParallelWebsocketConnectionsPerIpFlag = "limit-parallel-websocket-connections-per-ip" LimitWebsocketIdleTimeFlag = "limit-websocket-connection-idle-time" RateLimitRequestPerSecondFlag = "rate-limit-requests-per-second" + // specification default flags + AllowMissingApisByDefaultFlagName = "allow-missing-apis-by-default" ) const ( diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index c69a7a069f..5e84e56511 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -770,6 +770,7 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 cmdRPCConsumer.Flags().Int64Var(&chainlib.MaxIdleTimeInSeconds, common.LimitWebsocketIdleTimeFlag, chainlib.MaxIdleTimeInSeconds, "limit the idle time in seconds for a websocket connection, default is 20 minutes ( 20 * 60 )") cmdRPCConsumer.Flags().DurationVar(&chainlib.WebSocketBanDuration, common.BanDurationForWebsocketRateLimitExceededFlag, chainlib.WebSocketBanDuration, "once websocket rate limit is reached, user will be banned Xfor a duration, default no ban") cmdRPCConsumer.Flags().Bool(LavaOverLavaBackupFlagName, true, "enable lava over lava backup to regular rpc calls") + cmdRPCConsumer.Flags().BoolVar(&chainlib.AllowMissingApisByDefault, common.AllowMissingApisByDefaultFlagName, true, "allows missing apis to be proxied to the provider by default, set flase to block missing apis in the spec") common.AddRollingLogConfig(cmdRPCConsumer) return cmdRPCConsumer } diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index 66d99e2519..2bdb3d837c 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -832,6 +832,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt cmdRPCProvider.Flags().IntVar(&numberOfRetriesAllowedOnNodeErrors, common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors") cmdRPCProvider.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain, example for spec with inheritance: --use-static-spec ./cookbook/specs/ibc.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/evmos.json") cmdRPCProvider.Flags().Uint64(common.RateLimitRequestPerSecondFlag, 0, "Measuring the load relative to this number for feedback - per second - per chain - default unlimited. Given Y simultaneous relay calls, a value of X and will measure Y/X load rate.") + cmdRPCProvider.Flags().BoolVar(&chainlib.AllowMissingApisByDefault, common.AllowMissingApisByDefaultFlagName, true, "allows missing apis to be proxied to the node by default, set false to block missing apis in the spec, might result in degraded performance if spec is misconfigured") common.AddRollingLogConfig(cmdRPCProvider) return cmdRPCProvider } From b4afa20a05fe74da797e38496d53699611834690 Mon Sep 17 00:00:00 2001 From: omerlavanet Date: Sun, 8 Dec 2024 14:32:55 +0200 Subject: [PATCH 2/6] fix unitests --- protocol/chainlib/grpc_test.go | 13 ++++++++----- protocol/chainlib/jsonRPC_test.go | 8 ++++++-- protocol/chainlib/rest_test.go | 9 ++++++--- protocol/chainlib/tendermintRPC_test.go | 9 +++++++-- 4 files changed, 27 insertions(+), 12 deletions(-) diff --git a/protocol/chainlib/grpc_test.go b/protocol/chainlib/grpc_test.go index 119ba64e84..1d9a21fcc5 100644 --- a/protocol/chainlib/grpc_test.go +++ b/protocol/chainlib/grpc_test.go @@ -92,10 +92,13 @@ func TestGRPCGetSupportedApi(t *testing.T) { serverApis: map[ApiKey]ApiContainer{{Name: "API1", ConnectionType: connectionType_test}: {api: &spectypes.Api{Name: "API1", Enabled: true}, collectionKey: CollectionKey{ConnectionType: connectionType_test}}}, }, } - _, err = apip.getSupportedApi("API2", connectionType_test) - assert.Error(t, err) - found := strings.Contains(err.Error(), "api not supported") - require.True(t, found) + apiCont, err = apip.getSupportedApi("API2", connectionType_test) + if err == nil { + require.True(t, apiCont.api.Name == "Default-API2") + } else { + found := strings.Contains(err.Error(), "api not supported") + require.True(t, found) + } // Test case 3: Returns error if the API is disabled apip = &GrpcChainParser{ @@ -105,7 +108,7 @@ func TestGRPCGetSupportedApi(t *testing.T) { } _, err = apip.getSupportedApi("API1", connectionType_test) assert.Error(t, err) - found = strings.Contains(err.Error(), "api is disabled") + found := strings.Contains(err.Error(), "api is disabled") require.True(t, found) } diff --git a/protocol/chainlib/jsonRPC_test.go b/protocol/chainlib/jsonRPC_test.go index 27910db1d8..18e8f50bdf 100644 --- a/protocol/chainlib/jsonRPC_test.go +++ b/protocol/chainlib/jsonRPC_test.go @@ -115,8 +115,12 @@ func TestJSONGetSupportedApi(t *testing.T) { serverApis: map[ApiKey]ApiContainer{{Name: "API1", ConnectionType: connectionType_test}: {api: &spectypes.Api{Name: "API1", Enabled: true}, collectionKey: CollectionKey{ConnectionType: connectionType_test}}}, }, } - _, err = apip.getSupportedApi("API2", connectionType_test, "") - assert.Error(t, err) + apiCont, err := apip.getSupportedApi("API2", connectionType_test, "") + if err == nil { + require.True(t, apiCont.api.Name == "Default-API2") + } else { + assert.ErrorIs(t, err, common.APINotSupportedError) + } // Test case 3: Returns error if the API is disabled apip = &JsonRPCChainParser{ diff --git a/protocol/chainlib/rest_test.go b/protocol/chainlib/rest_test.go index 1521ac445d..8b2e69b1eb 100644 --- a/protocol/chainlib/rest_test.go +++ b/protocol/chainlib/rest_test.go @@ -88,9 +88,12 @@ func TestRestGetSupportedApi(t *testing.T) { serverApis: map[ApiKey]ApiContainer{{Name: "API1", ConnectionType: connectionType_test}: {api: &spectypes.Api{Name: "API1", Enabled: true}, collectionKey: CollectionKey{ConnectionType: connectionType_test}}}, }, } - _, err = apip.getSupportedApi("API2", connectionType_test) - assert.Error(t, err) - assert.ErrorIs(t, err, common.APINotSupportedError) + apiCont, err := apip.getSupportedApi("API2", connectionType_test) + if err == nil { + require.True(t, apiCont.api.Name == "Default-API2") + } else { + assert.ErrorIs(t, err, common.APINotSupportedError) + } // Test case 3: Returns error if the API is disabled apip = &RestChainParser{ diff --git a/protocol/chainlib/tendermintRPC_test.go b/protocol/chainlib/tendermintRPC_test.go index 93e4930146..17b2509706 100644 --- a/protocol/chainlib/tendermintRPC_test.go +++ b/protocol/chainlib/tendermintRPC_test.go @@ -10,6 +10,7 @@ import ( "github.com/lavanet/lava/v4/protocol/chainlib/chainproxy/rpcInterfaceMessages" "github.com/lavanet/lava/v4/protocol/chainlib/extensionslib" + "github.com/lavanet/lava/v4/protocol/common" spectypes "github.com/lavanet/lava/v4/x/spec/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -84,8 +85,12 @@ func TestTendermintGetSupportedApi(t *testing.T) { serverApis: map[ApiKey]ApiContainer{{Name: "API1", ConnectionType: connectionType_test}: {api: &spectypes.Api{Name: "API1", Enabled: true}, collectionKey: CollectionKey{ConnectionType: connectionType_test}}}, }, } - _, err = apip.getSupportedApi("API2", connectionType_test) - assert.Error(t, err) + apiCont, err := apip.getSupportedApi("API2", connectionType_test) + if err == nil { + require.True(t, apiCont.api.Name == "Default-API2") + } else { + assert.ErrorIs(t, err, common.APINotSupportedError) + } // Test case 3: Returns error if the API is disabled apip = &TendermintChainParser{ From 16ac6f19314838f874e7b69a79d30e0e7824d1dc Mon Sep 17 00:00:00 2001 From: omerlavanet Date: Sun, 8 Dec 2024 15:00:18 +0200 Subject: [PATCH 3/6] fix consumer race --- protocol/lavasession/consumer_session_manager.go | 2 +- protocol/provideroptimizer/selection_weight.go | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/protocol/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go index 53f6531900..7c06c50bf3 100644 --- a/protocol/lavasession/consumer_session_manager.go +++ b/protocol/lavasession/consumer_session_manager.go @@ -113,7 +113,7 @@ func (csm *ConsumerSessionManager) UpdateAllProviders(epoch uint64, pairingList } csm.setValidAddressesToDefaultValue("", nil) // the starting point is that valid addresses are equal to pairing addresses. // reset session related metrics - csm.consumerMetricsManager.ResetSessionRelatedMetrics() + go csm.consumerMetricsManager.ResetSessionRelatedMetrics() go csm.providerOptimizer.UpdateWeights(CalcWeightsByStake(pairingList), epoch) utils.LavaFormatDebug("updated providers", utils.Attribute{Key: "epoch", Value: epoch}, utils.Attribute{Key: "spec", Value: csm.rpcEndpoint.Key()}) diff --git a/protocol/provideroptimizer/selection_weight.go b/protocol/provideroptimizer/selection_weight.go index e0fdc30f38..f391f5ef91 100644 --- a/protocol/provideroptimizer/selection_weight.go +++ b/protocol/provideroptimizer/selection_weight.go @@ -28,6 +28,11 @@ func NewSelectionWeighter() SelectionWeighter { func (sw *selectionWeighterInst) Weight(address string) int64 { sw.lock.RLock() defer sw.lock.RUnlock() + return sw.weightInner(address) +} + +// assumes lock is held +func (sw *selectionWeighterInst) weightInner(address string) int64 { weight, ok := sw.weights[address] if !ok { // default weight is 1 @@ -52,12 +57,12 @@ func (sw *selectionWeighterInst) WeightedChoice(entries []Entry) string { defer sw.lock.RUnlock() totalWeight := int64(0) for _, entry := range entries { - totalWeight += int64(float64(sw.Weight(entry.Address)) * entry.Part) + totalWeight += int64(float64(sw.weightInner(entry.Address)) * entry.Part) } randWeight := rand.Int63n(totalWeight) currentWeight := int64(0) for _, entry := range entries { - currentWeight += int64(float64(sw.Weight(entry.Address)) * entry.Part) + currentWeight += int64(float64(sw.weightInner(entry.Address)) * entry.Part) if currentWeight > randWeight { return entry.Address } From f609af828b357d644f6e04fa94155d8f22e47af7 Mon Sep 17 00:00:00 2001 From: omerlavanet Date: Sun, 8 Dec 2024 15:12:35 +0200 Subject: [PATCH 4/6] fix unitests --- protocol/chainlib/jsonRPC_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/protocol/chainlib/jsonRPC_test.go b/protocol/chainlib/jsonRPC_test.go index 18e8f50bdf..bb428bd56a 100644 --- a/protocol/chainlib/jsonRPC_test.go +++ b/protocol/chainlib/jsonRPC_test.go @@ -503,9 +503,12 @@ func TestJsonRpcInternalPathsMultipleVersionsAvalanche(t *testing.T) { require.Equal(t, reqDataWithApiName.apiName, api.Name) require.Equal(t, correctPath, collection.CollectionData.InternalPath) } else { - require.Error(t, err) - require.ErrorIs(t, err, common.APINotSupportedError) - require.Nil(t, chainMessage) + if err == nil { + require.True(t, strings.Contains(chainMessage.GetApi().Name, "Default-")) + } else { + require.ErrorIs(t, err, common.APINotSupportedError) + require.Nil(t, chainMessage) + } } }) } From f6fec7b2de7f052dadb6351b89b220fb7ac4f40c Mon Sep 17 00:00:00 2001 From: omerlavanet Date: Sun, 8 Dec 2024 15:17:26 +0200 Subject: [PATCH 5/6] fix unitests --- protocol/chainlib/rest_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/protocol/chainlib/rest_test.go b/protocol/chainlib/rest_test.go index 8b2e69b1eb..c1393eb057 100644 --- a/protocol/chainlib/rest_test.go +++ b/protocol/chainlib/rest_test.go @@ -316,7 +316,11 @@ func TestRegexParsing(t *testing.T) { for _, api := range []string{ "/cosmos/staking/v1beta1/delegations/lava@17ym998u666u8w2qgjd5m7w7ydjqmu3mlgl7ua2/", } { - _, err := chainParser.ParseMsg(api, nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) - require.Error(t, err) + chainMessage, err := chainParser.ParseMsg(api, nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) + if err == nil { + require.True(t, chainMessage.GetApi().GetName() == "Default-"+api) + } else { + assert.ErrorIs(t, err, common.APINotSupportedError) + } } } From 93dc8f0c514553e3e1330ad91566bccabd267c9c Mon Sep 17 00:00:00 2001 From: Elad Gildnur Date: Sun, 8 Dec 2024 15:12:20 +0200 Subject: [PATCH 6/6] Small fix to tests --- protocol/chainlib/grpc_test.go | 9 +++------ protocol/chainlib/jsonRPC_test.go | 4 ++-- protocol/chainlib/rest_test.go | 4 ++-- protocol/chainlib/tendermintRPC_test.go | 2 +- 4 files changed, 8 insertions(+), 11 deletions(-) diff --git a/protocol/chainlib/grpc_test.go b/protocol/chainlib/grpc_test.go index 1d9a21fcc5..cd3404b2b6 100644 --- a/protocol/chainlib/grpc_test.go +++ b/protocol/chainlib/grpc_test.go @@ -5,7 +5,6 @@ import ( "fmt" "net/http" "strconv" - "strings" "testing" "time" @@ -94,10 +93,9 @@ func TestGRPCGetSupportedApi(t *testing.T) { } apiCont, err = apip.getSupportedApi("API2", connectionType_test) if err == nil { - require.True(t, apiCont.api.Name == "Default-API2") + require.Equal(t, "Default-API2", apiCont.api.Name) } else { - found := strings.Contains(err.Error(), "api not supported") - require.True(t, found) + require.Contains(t, err.Error(), "api not supported") } // Test case 3: Returns error if the API is disabled @@ -108,8 +106,7 @@ func TestGRPCGetSupportedApi(t *testing.T) { } _, err = apip.getSupportedApi("API1", connectionType_test) assert.Error(t, err) - found := strings.Contains(err.Error(), "api is disabled") - require.True(t, found) + require.Contains(t, err.Error(), "api is disabled") } func TestGRPCParseMessage(t *testing.T) { diff --git a/protocol/chainlib/jsonRPC_test.go b/protocol/chainlib/jsonRPC_test.go index bb428bd56a..1510458704 100644 --- a/protocol/chainlib/jsonRPC_test.go +++ b/protocol/chainlib/jsonRPC_test.go @@ -117,7 +117,7 @@ func TestJSONGetSupportedApi(t *testing.T) { } apiCont, err := apip.getSupportedApi("API2", connectionType_test, "") if err == nil { - require.True(t, apiCont.api.Name == "Default-API2") + assert.Equal(t, "Default-API2", apiCont.api.Name) } else { assert.ErrorIs(t, err, common.APINotSupportedError) } @@ -504,7 +504,7 @@ func TestJsonRpcInternalPathsMultipleVersionsAvalanche(t *testing.T) { require.Equal(t, correctPath, collection.CollectionData.InternalPath) } else { if err == nil { - require.True(t, strings.Contains(chainMessage.GetApi().Name, "Default-")) + require.Contains(t, chainMessage.GetApi().Name, "Default-") } else { require.ErrorIs(t, err, common.APINotSupportedError) require.Nil(t, chainMessage) diff --git a/protocol/chainlib/rest_test.go b/protocol/chainlib/rest_test.go index c1393eb057..18370af01a 100644 --- a/protocol/chainlib/rest_test.go +++ b/protocol/chainlib/rest_test.go @@ -90,7 +90,7 @@ func TestRestGetSupportedApi(t *testing.T) { } apiCont, err := apip.getSupportedApi("API2", connectionType_test) if err == nil { - require.True(t, apiCont.api.Name == "Default-API2") + assert.Equal(t, "Default-API2", apiCont.api.Name) } else { assert.ErrorIs(t, err, common.APINotSupportedError) } @@ -318,7 +318,7 @@ func TestRegexParsing(t *testing.T) { } { chainMessage, err := chainParser.ParseMsg(api, nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) if err == nil { - require.True(t, chainMessage.GetApi().GetName() == "Default-"+api) + require.Equal(t, "Default-"+api, chainMessage.GetApi().GetName()) } else { assert.ErrorIs(t, err, common.APINotSupportedError) } diff --git a/protocol/chainlib/tendermintRPC_test.go b/protocol/chainlib/tendermintRPC_test.go index 17b2509706..1fd6696ebb 100644 --- a/protocol/chainlib/tendermintRPC_test.go +++ b/protocol/chainlib/tendermintRPC_test.go @@ -87,7 +87,7 @@ func TestTendermintGetSupportedApi(t *testing.T) { } apiCont, err := apip.getSupportedApi("API2", connectionType_test) if err == nil { - require.True(t, apiCont.api.Name == "Default-API2") + assert.Equal(t, "Default-API2", apiCont.api.Name) } else { assert.ErrorIs(t, err, common.APINotSupportedError) }