From 6a7cf7bfca03cda99553fce33b137e218807c34f Mon Sep 17 00:00:00 2001 From: Omer <100387053+omerlavanet@users.noreply.github.com> Date: Sun, 8 Dec 2024 16:09:03 +0200 Subject: [PATCH] feat: remove spec requirement to contain all apis (#1825) * remove spec requirement to contain all apis * fix unitests * fix consumer race * fix unitests * fix unitests * Small fix to tests --------- Co-authored-by: Elad Gildnur --- protocol/chainlib/base_chain_parser.go | 34 +++++++++++++++++++ protocol/chainlib/grpc_test.go | 14 ++++---- protocol/chainlib/jsonRPC_test.go | 17 +++++++--- protocol/chainlib/rest.go | 4 +++ protocol/chainlib/rest_test.go | 17 +++++++--- protocol/chainlib/tendermintRPC_test.go | 9 +++-- protocol/common/cobra_common.go | 2 ++ .../lavasession/consumer_session_manager.go | 2 +- .../provideroptimizer/selection_weight.go | 9 +++-- protocol/rpcconsumer/rpcconsumer.go | 1 + protocol/rpcprovider/rpcprovider.go | 1 + 11 files changed, 88 insertions(+), 22 deletions(-) diff --git a/protocol/chainlib/base_chain_parser.go b/protocol/chainlib/base_chain_parser.go index 1017fb22c0..00b03ea90d 100644 --- a/protocol/chainlib/base_chain_parser.go +++ b/protocol/chainlib/base_chain_parser.go @@ -20,6 +20,8 @@ import ( spectypes "github.com/lavanet/lava/v4/x/spec/types" ) +var AllowMissingApisByDefault = true + type PolicyInf interface { GetSupportedAddons(specID string) (addons []string, err error) GetSupportedExtensions(specID string) (extensions []epochstorage.EndpointService, err error) @@ -323,6 +325,35 @@ func (bcp *BaseChainParser) extensionParsingInner(addon string, parsedMessageArg bcp.extensionParser.ExtensionParsing(addon, parsedMessageArg, latestBlock) } +func (apip *BaseChainParser) defaultApiContainer(apiKey ApiKey) (*ApiContainer, error) { + // Guard that the GrpcChainParser instance exists + if apip == nil { + return nil, errors.New("ChainParser not defined") + } + utils.LavaFormatDebug("api not supported", utils.Attribute{Key: "apiKey", Value: apiKey}) + apiCont := &ApiContainer{ + api: &spectypes.Api{ + Enabled: true, + Name: "Default-" + apiKey.Name, + ComputeUnits: 20, // set 20 compute units by default + ExtraComputeUnits: 0, + Category: spectypes.SpecCategory{}, + BlockParsing: spectypes.BlockParser{ + ParserFunc: spectypes.PARSER_FUNC_EMPTY, + }, + TimeoutMs: 0, + Parsers: []spectypes.GenericParser{}, + }, + collectionKey: CollectionKey{ + ConnectionType: apiKey.ConnectionType, + InternalPath: apiKey.InternalPath, + Addon: "", + }, + } + + return apiCont, nil +} + // getSupportedApi fetches service api from spec by name func (apip *BaseChainParser) getSupportedApi(apiKey ApiKey) (*ApiContainer, error) { // Guard that the GrpcChainParser instance exists @@ -339,6 +370,9 @@ func (apip *BaseChainParser) getSupportedApi(apiKey ApiKey) (*ApiContainer, erro // Return an error if spec does not exist if !ok { + if AllowMissingApisByDefault { + return apip.defaultApiContainer(apiKey) + } return nil, common.APINotSupportedError } diff --git a/protocol/chainlib/grpc_test.go b/protocol/chainlib/grpc_test.go index 119ba64e84..cd3404b2b6 100644 --- a/protocol/chainlib/grpc_test.go +++ b/protocol/chainlib/grpc_test.go @@ -5,7 +5,6 @@ import ( "fmt" "net/http" "strconv" - "strings" "testing" "time" @@ -92,10 +91,12 @@ func TestGRPCGetSupportedApi(t *testing.T) { serverApis: map[ApiKey]ApiContainer{{Name: "API1", ConnectionType: connectionType_test}: {api: &spectypes.Api{Name: "API1", Enabled: true}, collectionKey: CollectionKey{ConnectionType: connectionType_test}}}, }, } - _, err = apip.getSupportedApi("API2", connectionType_test) - assert.Error(t, err) - found := strings.Contains(err.Error(), "api not supported") - require.True(t, found) + apiCont, err = apip.getSupportedApi("API2", connectionType_test) + if err == nil { + require.Equal(t, "Default-API2", apiCont.api.Name) + } else { + require.Contains(t, err.Error(), "api not supported") + } // Test case 3: Returns error if the API is disabled apip = &GrpcChainParser{ @@ -105,8 +106,7 @@ func TestGRPCGetSupportedApi(t *testing.T) { } _, err = apip.getSupportedApi("API1", connectionType_test) assert.Error(t, err) - found = strings.Contains(err.Error(), "api is disabled") - require.True(t, found) + require.Contains(t, err.Error(), "api is disabled") } func TestGRPCParseMessage(t *testing.T) { diff --git a/protocol/chainlib/jsonRPC_test.go b/protocol/chainlib/jsonRPC_test.go index 27910db1d8..1510458704 100644 --- a/protocol/chainlib/jsonRPC_test.go +++ b/protocol/chainlib/jsonRPC_test.go @@ -115,8 +115,12 @@ func TestJSONGetSupportedApi(t *testing.T) { serverApis: map[ApiKey]ApiContainer{{Name: "API1", ConnectionType: connectionType_test}: {api: &spectypes.Api{Name: "API1", Enabled: true}, collectionKey: CollectionKey{ConnectionType: connectionType_test}}}, }, } - _, err = apip.getSupportedApi("API2", connectionType_test, "") - assert.Error(t, err) + apiCont, err := apip.getSupportedApi("API2", connectionType_test, "") + if err == nil { + assert.Equal(t, "Default-API2", apiCont.api.Name) + } else { + assert.ErrorIs(t, err, common.APINotSupportedError) + } // Test case 3: Returns error if the API is disabled apip = &JsonRPCChainParser{ @@ -499,9 +503,12 @@ func TestJsonRpcInternalPathsMultipleVersionsAvalanche(t *testing.T) { require.Equal(t, reqDataWithApiName.apiName, api.Name) require.Equal(t, correctPath, collection.CollectionData.InternalPath) } else { - require.Error(t, err) - require.ErrorIs(t, err, common.APINotSupportedError) - require.Nil(t, chainMessage) + if err == nil { + require.Contains(t, chainMessage.GetApi().Name, "Default-") + } else { + require.ErrorIs(t, err, common.APINotSupportedError) + require.Nil(t, chainMessage) + } } }) } diff --git a/protocol/chainlib/rest.go b/protocol/chainlib/rest.go index f0646ce9a2..c8d6513964 100644 --- a/protocol/chainlib/rest.go +++ b/protocol/chainlib/rest.go @@ -176,6 +176,10 @@ func (apip *RestChainParser) getSupportedApi(name, connectionType string) (*ApiC // Return an error if spec does not exist if !ok { + if AllowMissingApisByDefault { + apiKey := ApiKey{Name: name, ConnectionType: connectionType, InternalPath: ""} + return apip.defaultApiContainer(apiKey) + } utils.LavaFormatDebug("rest api not supported", utils.LogAttr("name", name), utils.LogAttr("connectionType", connectionType), diff --git a/protocol/chainlib/rest_test.go b/protocol/chainlib/rest_test.go index 1521ac445d..18370af01a 100644 --- a/protocol/chainlib/rest_test.go +++ b/protocol/chainlib/rest_test.go @@ -88,9 +88,12 @@ func TestRestGetSupportedApi(t *testing.T) { serverApis: map[ApiKey]ApiContainer{{Name: "API1", ConnectionType: connectionType_test}: {api: &spectypes.Api{Name: "API1", Enabled: true}, collectionKey: CollectionKey{ConnectionType: connectionType_test}}}, }, } - _, err = apip.getSupportedApi("API2", connectionType_test) - assert.Error(t, err) - assert.ErrorIs(t, err, common.APINotSupportedError) + apiCont, err := apip.getSupportedApi("API2", connectionType_test) + if err == nil { + assert.Equal(t, "Default-API2", apiCont.api.Name) + } else { + assert.ErrorIs(t, err, common.APINotSupportedError) + } // Test case 3: Returns error if the API is disabled apip = &RestChainParser{ @@ -313,7 +316,11 @@ func TestRegexParsing(t *testing.T) { for _, api := range []string{ "/cosmos/staking/v1beta1/delegations/lava@17ym998u666u8w2qgjd5m7w7ydjqmu3mlgl7ua2/", } { - _, err := chainParser.ParseMsg(api, nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) - require.Error(t, err) + chainMessage, err := chainParser.ParseMsg(api, nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0}) + if err == nil { + require.Equal(t, "Default-"+api, chainMessage.GetApi().GetName()) + } else { + assert.ErrorIs(t, err, common.APINotSupportedError) + } } } diff --git a/protocol/chainlib/tendermintRPC_test.go b/protocol/chainlib/tendermintRPC_test.go index 93e4930146..1fd6696ebb 100644 --- a/protocol/chainlib/tendermintRPC_test.go +++ b/protocol/chainlib/tendermintRPC_test.go @@ -10,6 +10,7 @@ import ( "github.com/lavanet/lava/v4/protocol/chainlib/chainproxy/rpcInterfaceMessages" "github.com/lavanet/lava/v4/protocol/chainlib/extensionslib" + "github.com/lavanet/lava/v4/protocol/common" spectypes "github.com/lavanet/lava/v4/x/spec/types" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -84,8 +85,12 @@ func TestTendermintGetSupportedApi(t *testing.T) { serverApis: map[ApiKey]ApiContainer{{Name: "API1", ConnectionType: connectionType_test}: {api: &spectypes.Api{Name: "API1", Enabled: true}, collectionKey: CollectionKey{ConnectionType: connectionType_test}}}, }, } - _, err = apip.getSupportedApi("API2", connectionType_test) - assert.Error(t, err) + apiCont, err := apip.getSupportedApi("API2", connectionType_test) + if err == nil { + assert.Equal(t, "Default-API2", apiCont.api.Name) + } else { + assert.ErrorIs(t, err, common.APINotSupportedError) + } // Test case 3: Returns error if the API is disabled apip = &TendermintChainParser{ diff --git a/protocol/common/cobra_common.go b/protocol/common/cobra_common.go index 05e6259ec6..42392f19a5 100644 --- a/protocol/common/cobra_common.go +++ b/protocol/common/cobra_common.go @@ -50,6 +50,8 @@ const ( LimitParallelWebsocketConnectionsPerIpFlag = "limit-parallel-websocket-connections-per-ip" LimitWebsocketIdleTimeFlag = "limit-websocket-connection-idle-time" RateLimitRequestPerSecondFlag = "rate-limit-requests-per-second" + // specification default flags + AllowMissingApisByDefaultFlagName = "allow-missing-apis-by-default" ) const ( diff --git a/protocol/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go index 53f6531900..7c06c50bf3 100644 --- a/protocol/lavasession/consumer_session_manager.go +++ b/protocol/lavasession/consumer_session_manager.go @@ -113,7 +113,7 @@ func (csm *ConsumerSessionManager) UpdateAllProviders(epoch uint64, pairingList } csm.setValidAddressesToDefaultValue("", nil) // the starting point is that valid addresses are equal to pairing addresses. // reset session related metrics - csm.consumerMetricsManager.ResetSessionRelatedMetrics() + go csm.consumerMetricsManager.ResetSessionRelatedMetrics() go csm.providerOptimizer.UpdateWeights(CalcWeightsByStake(pairingList), epoch) utils.LavaFormatDebug("updated providers", utils.Attribute{Key: "epoch", Value: epoch}, utils.Attribute{Key: "spec", Value: csm.rpcEndpoint.Key()}) diff --git a/protocol/provideroptimizer/selection_weight.go b/protocol/provideroptimizer/selection_weight.go index e0fdc30f38..f391f5ef91 100644 --- a/protocol/provideroptimizer/selection_weight.go +++ b/protocol/provideroptimizer/selection_weight.go @@ -28,6 +28,11 @@ func NewSelectionWeighter() SelectionWeighter { func (sw *selectionWeighterInst) Weight(address string) int64 { sw.lock.RLock() defer sw.lock.RUnlock() + return sw.weightInner(address) +} + +// assumes lock is held +func (sw *selectionWeighterInst) weightInner(address string) int64 { weight, ok := sw.weights[address] if !ok { // default weight is 1 @@ -52,12 +57,12 @@ func (sw *selectionWeighterInst) WeightedChoice(entries []Entry) string { defer sw.lock.RUnlock() totalWeight := int64(0) for _, entry := range entries { - totalWeight += int64(float64(sw.Weight(entry.Address)) * entry.Part) + totalWeight += int64(float64(sw.weightInner(entry.Address)) * entry.Part) } randWeight := rand.Int63n(totalWeight) currentWeight := int64(0) for _, entry := range entries { - currentWeight += int64(float64(sw.Weight(entry.Address)) * entry.Part) + currentWeight += int64(float64(sw.weightInner(entry.Address)) * entry.Part) if currentWeight > randWeight { return entry.Address } diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index c69a7a069f..5e84e56511 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -770,6 +770,7 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 cmdRPCConsumer.Flags().Int64Var(&chainlib.MaxIdleTimeInSeconds, common.LimitWebsocketIdleTimeFlag, chainlib.MaxIdleTimeInSeconds, "limit the idle time in seconds for a websocket connection, default is 20 minutes ( 20 * 60 )") cmdRPCConsumer.Flags().DurationVar(&chainlib.WebSocketBanDuration, common.BanDurationForWebsocketRateLimitExceededFlag, chainlib.WebSocketBanDuration, "once websocket rate limit is reached, user will be banned Xfor a duration, default no ban") cmdRPCConsumer.Flags().Bool(LavaOverLavaBackupFlagName, true, "enable lava over lava backup to regular rpc calls") + cmdRPCConsumer.Flags().BoolVar(&chainlib.AllowMissingApisByDefault, common.AllowMissingApisByDefaultFlagName, true, "allows missing apis to be proxied to the provider by default, set flase to block missing apis in the spec") common.AddRollingLogConfig(cmdRPCConsumer) return cmdRPCConsumer } diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index 66d99e2519..2bdb3d837c 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -832,6 +832,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt cmdRPCProvider.Flags().IntVar(&numberOfRetriesAllowedOnNodeErrors, common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors") cmdRPCProvider.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain, example for spec with inheritance: --use-static-spec ./cookbook/specs/ibc.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/evmos.json") cmdRPCProvider.Flags().Uint64(common.RateLimitRequestPerSecondFlag, 0, "Measuring the load relative to this number for feedback - per second - per chain - default unlimited. Given Y simultaneous relay calls, a value of X and will measure Y/X load rate.") + cmdRPCProvider.Flags().BoolVar(&chainlib.AllowMissingApisByDefault, common.AllowMissingApisByDefaultFlagName, true, "allows missing apis to be proxied to the node by default, set false to block missing apis in the spec, might result in degraded performance if spec is misconfigured") common.AddRollingLogConfig(cmdRPCProvider) return cmdRPCProvider }