Skip to content

Commit

Permalink
feat: remove spec requirement to contain all apis (#1825)
Browse files Browse the repository at this point in the history
* remove spec requirement to contain all apis

* fix unitests

* fix consumer race

* fix unitests

* fix unitests

* Small fix to tests

---------

Co-authored-by: Elad Gildnur <[email protected]>
  • Loading branch information
omerlavanet and shleikes authored Dec 8, 2024
1 parent ce4332c commit 6a7cf7b
Show file tree
Hide file tree
Showing 11 changed files with 88 additions and 22 deletions.
34 changes: 34 additions & 0 deletions protocol/chainlib/base_chain_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
spectypes "github.com/lavanet/lava/v4/x/spec/types"
)

var AllowMissingApisByDefault = true

type PolicyInf interface {
GetSupportedAddons(specID string) (addons []string, err error)
GetSupportedExtensions(specID string) (extensions []epochstorage.EndpointService, err error)
Expand Down Expand Up @@ -323,6 +325,35 @@ func (bcp *BaseChainParser) extensionParsingInner(addon string, parsedMessageArg
bcp.extensionParser.ExtensionParsing(addon, parsedMessageArg, latestBlock)
}

func (apip *BaseChainParser) defaultApiContainer(apiKey ApiKey) (*ApiContainer, error) {
// Guard that the GrpcChainParser instance exists
if apip == nil {
return nil, errors.New("ChainParser not defined")
}
utils.LavaFormatDebug("api not supported", utils.Attribute{Key: "apiKey", Value: apiKey})
apiCont := &ApiContainer{
api: &spectypes.Api{
Enabled: true,
Name: "Default-" + apiKey.Name,
ComputeUnits: 20, // set 20 compute units by default
ExtraComputeUnits: 0,
Category: spectypes.SpecCategory{},
BlockParsing: spectypes.BlockParser{
ParserFunc: spectypes.PARSER_FUNC_EMPTY,
},
TimeoutMs: 0,
Parsers: []spectypes.GenericParser{},
},
collectionKey: CollectionKey{
ConnectionType: apiKey.ConnectionType,
InternalPath: apiKey.InternalPath,
Addon: "",
},
}

return apiCont, nil
}

// getSupportedApi fetches service api from spec by name
func (apip *BaseChainParser) getSupportedApi(apiKey ApiKey) (*ApiContainer, error) {
// Guard that the GrpcChainParser instance exists
Expand All @@ -339,6 +370,9 @@ func (apip *BaseChainParser) getSupportedApi(apiKey ApiKey) (*ApiContainer, erro

// Return an error if spec does not exist
if !ok {
if AllowMissingApisByDefault {
return apip.defaultApiContainer(apiKey)
}
return nil, common.APINotSupportedError
}

Expand Down
14 changes: 7 additions & 7 deletions protocol/chainlib/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"net/http"
"strconv"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -92,10 +91,12 @@ func TestGRPCGetSupportedApi(t *testing.T) {
serverApis: map[ApiKey]ApiContainer{{Name: "API1", ConnectionType: connectionType_test}: {api: &spectypes.Api{Name: "API1", Enabled: true}, collectionKey: CollectionKey{ConnectionType: connectionType_test}}},
},
}
_, err = apip.getSupportedApi("API2", connectionType_test)
assert.Error(t, err)
found := strings.Contains(err.Error(), "api not supported")
require.True(t, found)
apiCont, err = apip.getSupportedApi("API2", connectionType_test)
if err == nil {
require.Equal(t, "Default-API2", apiCont.api.Name)
} else {
require.Contains(t, err.Error(), "api not supported")
}

// Test case 3: Returns error if the API is disabled
apip = &GrpcChainParser{
Expand All @@ -105,8 +106,7 @@ func TestGRPCGetSupportedApi(t *testing.T) {
}
_, err = apip.getSupportedApi("API1", connectionType_test)
assert.Error(t, err)
found = strings.Contains(err.Error(), "api is disabled")
require.True(t, found)
require.Contains(t, err.Error(), "api is disabled")
}

func TestGRPCParseMessage(t *testing.T) {
Expand Down
17 changes: 12 additions & 5 deletions protocol/chainlib/jsonRPC_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,12 @@ func TestJSONGetSupportedApi(t *testing.T) {
serverApis: map[ApiKey]ApiContainer{{Name: "API1", ConnectionType: connectionType_test}: {api: &spectypes.Api{Name: "API1", Enabled: true}, collectionKey: CollectionKey{ConnectionType: connectionType_test}}},
},
}
_, err = apip.getSupportedApi("API2", connectionType_test, "")
assert.Error(t, err)
apiCont, err := apip.getSupportedApi("API2", connectionType_test, "")
if err == nil {
assert.Equal(t, "Default-API2", apiCont.api.Name)
} else {
assert.ErrorIs(t, err, common.APINotSupportedError)
}

// Test case 3: Returns error if the API is disabled
apip = &JsonRPCChainParser{
Expand Down Expand Up @@ -499,9 +503,12 @@ func TestJsonRpcInternalPathsMultipleVersionsAvalanche(t *testing.T) {
require.Equal(t, reqDataWithApiName.apiName, api.Name)
require.Equal(t, correctPath, collection.CollectionData.InternalPath)
} else {
require.Error(t, err)
require.ErrorIs(t, err, common.APINotSupportedError)
require.Nil(t, chainMessage)
if err == nil {
require.Contains(t, chainMessage.GetApi().Name, "Default-")
} else {
require.ErrorIs(t, err, common.APINotSupportedError)
require.Nil(t, chainMessage)
}
}
})
}
Expand Down
4 changes: 4 additions & 0 deletions protocol/chainlib/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ func (apip *RestChainParser) getSupportedApi(name, connectionType string) (*ApiC

// Return an error if spec does not exist
if !ok {
if AllowMissingApisByDefault {
apiKey := ApiKey{Name: name, ConnectionType: connectionType, InternalPath: ""}
return apip.defaultApiContainer(apiKey)
}
utils.LavaFormatDebug("rest api not supported",
utils.LogAttr("name", name),
utils.LogAttr("connectionType", connectionType),
Expand Down
17 changes: 12 additions & 5 deletions protocol/chainlib/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,12 @@ func TestRestGetSupportedApi(t *testing.T) {
serverApis: map[ApiKey]ApiContainer{{Name: "API1", ConnectionType: connectionType_test}: {api: &spectypes.Api{Name: "API1", Enabled: true}, collectionKey: CollectionKey{ConnectionType: connectionType_test}}},
},
}
_, err = apip.getSupportedApi("API2", connectionType_test)
assert.Error(t, err)
assert.ErrorIs(t, err, common.APINotSupportedError)
apiCont, err := apip.getSupportedApi("API2", connectionType_test)
if err == nil {
assert.Equal(t, "Default-API2", apiCont.api.Name)
} else {
assert.ErrorIs(t, err, common.APINotSupportedError)
}

// Test case 3: Returns error if the API is disabled
apip = &RestChainParser{
Expand Down Expand Up @@ -313,7 +316,11 @@ func TestRegexParsing(t *testing.T) {
for _, api := range []string{
"/cosmos/staking/v1beta1/delegations/lava@17ym998u666u8w2qgjd5m7w7ydjqmu3mlgl7ua2/",
} {
_, err := chainParser.ParseMsg(api, nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
require.Error(t, err)
chainMessage, err := chainParser.ParseMsg(api, nil, http.MethodGet, nil, extensionslib.ExtensionInfo{LatestBlock: 0})
if err == nil {
require.Equal(t, "Default-"+api, chainMessage.GetApi().GetName())
} else {
assert.ErrorIs(t, err, common.APINotSupportedError)
}
}
}
9 changes: 7 additions & 2 deletions protocol/chainlib/tendermintRPC_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/lavanet/lava/v4/protocol/chainlib/chainproxy/rpcInterfaceMessages"
"github.com/lavanet/lava/v4/protocol/chainlib/extensionslib"
"github.com/lavanet/lava/v4/protocol/common"
spectypes "github.com/lavanet/lava/v4/x/spec/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -84,8 +85,12 @@ func TestTendermintGetSupportedApi(t *testing.T) {
serverApis: map[ApiKey]ApiContainer{{Name: "API1", ConnectionType: connectionType_test}: {api: &spectypes.Api{Name: "API1", Enabled: true}, collectionKey: CollectionKey{ConnectionType: connectionType_test}}},
},
}
_, err = apip.getSupportedApi("API2", connectionType_test)
assert.Error(t, err)
apiCont, err := apip.getSupportedApi("API2", connectionType_test)
if err == nil {
assert.Equal(t, "Default-API2", apiCont.api.Name)
} else {
assert.ErrorIs(t, err, common.APINotSupportedError)
}

// Test case 3: Returns error if the API is disabled
apip = &TendermintChainParser{
Expand Down
2 changes: 2 additions & 0 deletions protocol/common/cobra_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ const (
LimitParallelWebsocketConnectionsPerIpFlag = "limit-parallel-websocket-connections-per-ip"
LimitWebsocketIdleTimeFlag = "limit-websocket-connection-idle-time"
RateLimitRequestPerSecondFlag = "rate-limit-requests-per-second"
// specification default flags
AllowMissingApisByDefaultFlagName = "allow-missing-apis-by-default"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (csm *ConsumerSessionManager) UpdateAllProviders(epoch uint64, pairingList
}
csm.setValidAddressesToDefaultValue("", nil) // the starting point is that valid addresses are equal to pairing addresses.
// reset session related metrics
csm.consumerMetricsManager.ResetSessionRelatedMetrics()
go csm.consumerMetricsManager.ResetSessionRelatedMetrics()
go csm.providerOptimizer.UpdateWeights(CalcWeightsByStake(pairingList), epoch)

utils.LavaFormatDebug("updated providers", utils.Attribute{Key: "epoch", Value: epoch}, utils.Attribute{Key: "spec", Value: csm.rpcEndpoint.Key()})
Expand Down
9 changes: 7 additions & 2 deletions protocol/provideroptimizer/selection_weight.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ func NewSelectionWeighter() SelectionWeighter {
func (sw *selectionWeighterInst) Weight(address string) int64 {
sw.lock.RLock()
defer sw.lock.RUnlock()
return sw.weightInner(address)
}

// assumes lock is held
func (sw *selectionWeighterInst) weightInner(address string) int64 {
weight, ok := sw.weights[address]
if !ok {
// default weight is 1
Expand All @@ -52,12 +57,12 @@ func (sw *selectionWeighterInst) WeightedChoice(entries []Entry) string {
defer sw.lock.RUnlock()
totalWeight := int64(0)
for _, entry := range entries {
totalWeight += int64(float64(sw.Weight(entry.Address)) * entry.Part)
totalWeight += int64(float64(sw.weightInner(entry.Address)) * entry.Part)
}
randWeight := rand.Int63n(totalWeight)
currentWeight := int64(0)
for _, entry := range entries {
currentWeight += int64(float64(sw.Weight(entry.Address)) * entry.Part)
currentWeight += int64(float64(sw.weightInner(entry.Address)) * entry.Part)
if currentWeight > randWeight {
return entry.Address
}
Expand Down
1 change: 1 addition & 0 deletions protocol/rpcconsumer/rpcconsumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -770,6 +770,7 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77
cmdRPCConsumer.Flags().Int64Var(&chainlib.MaxIdleTimeInSeconds, common.LimitWebsocketIdleTimeFlag, chainlib.MaxIdleTimeInSeconds, "limit the idle time in seconds for a websocket connection, default is 20 minutes ( 20 * 60 )")
cmdRPCConsumer.Flags().DurationVar(&chainlib.WebSocketBanDuration, common.BanDurationForWebsocketRateLimitExceededFlag, chainlib.WebSocketBanDuration, "once websocket rate limit is reached, user will be banned Xfor a duration, default no ban")
cmdRPCConsumer.Flags().Bool(LavaOverLavaBackupFlagName, true, "enable lava over lava backup to regular rpc calls")
cmdRPCConsumer.Flags().BoolVar(&chainlib.AllowMissingApisByDefault, common.AllowMissingApisByDefaultFlagName, true, "allows missing apis to be proxied to the provider by default, set flase to block missing apis in the spec")
common.AddRollingLogConfig(cmdRPCConsumer)
return cmdRPCConsumer
}
Expand Down
1 change: 1 addition & 0 deletions protocol/rpcprovider/rpcprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt
cmdRPCProvider.Flags().IntVar(&numberOfRetriesAllowedOnNodeErrors, common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors")
cmdRPCProvider.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain, example for spec with inheritance: --use-static-spec ./cookbook/specs/ibc.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/evmos.json")
cmdRPCProvider.Flags().Uint64(common.RateLimitRequestPerSecondFlag, 0, "Measuring the load relative to this number for feedback - per second - per chain - default unlimited. Given Y simultaneous relay calls, a value of X and will measure Y/X load rate.")
cmdRPCProvider.Flags().BoolVar(&chainlib.AllowMissingApisByDefault, common.AllowMissingApisByDefaultFlagName, true, "allows missing apis to be proxied to the node by default, set false to block missing apis in the spec, might result in degraded performance if spec is misconfigured")
common.AddRollingLogConfig(cmdRPCProvider)
return cmdRPCProvider
}

0 comments on commit 6a7cf7b

Please sign in to comment.