Skip to content

Commit

Permalink
Merge branch 'main' into PRT-replace-cookbook-with-lava-config
Browse files Browse the repository at this point in the history
  • Loading branch information
oren-lava authored Dec 9, 2024
2 parents cd80de8 + 4631607 commit 9645592
Show file tree
Hide file tree
Showing 94 changed files with 529 additions and 23,903 deletions.
30 changes: 1 addition & 29 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/cosmos/cosmos-sdk/x/auth"
"github.com/cosmos/cosmos-sdk/x/auth/ante"
authkeeper "github.com/cosmos/cosmos-sdk/x/auth/keeper"
authsims "github.com/cosmos/cosmos-sdk/x/auth/simulation"
authtx "github.com/cosmos/cosmos-sdk/x/auth/tx"
authtypes "github.com/cosmos/cosmos-sdk/x/auth/types"
"github.com/cosmos/cosmos-sdk/x/auth/vesting"
Expand Down Expand Up @@ -873,34 +872,7 @@ func New(
app.setupUpgradeHandlers()

// create the simulation manager and define the order of the modules for deterministic simulations
app.sm = module.NewSimulationManager(
auth.NewAppModule(appCodec, app.AccountKeeper, authsims.RandomGenesisAccounts, app.GetSubspace(authtypes.ModuleName)),
bank.NewAppModule(appCodec, app.BankKeeper, app.AccountKeeper, app.GetSubspace(banktypes.ModuleName)),
capability.NewAppModule(appCodec, *app.CapabilityKeeper, false),
feegrantmodule.NewAppModule(appCodec, app.AccountKeeper, app.BankKeeper, app.FeeGrantKeeper, app.interfaceRegistry),
gov.NewAppModule(appCodec, &app.GovKeeper, app.AccountKeeper, app.BankKeeper, app.GetSubspace(govtypes.ModuleName)),
staking.NewAppModule(appCodec, app.StakingKeeper, app.AccountKeeper, app.BankKeeper, app.GetSubspace(stakingtypes.ModuleName)),
distr.NewAppModule(appCodec, app.DistrKeeper, app.AccountKeeper, app.BankKeeper, app.StakingKeeper, app.GetSubspace(distrtypes.ModuleName)),
slashing.NewAppModule(appCodec, app.SlashingKeeper, app.AccountKeeper, app.BankKeeper, app.StakingKeeper, app.GetSubspace(slashingtypes.ModuleName)),
params.NewAppModule(app.ParamsKeeper),
evidence.NewAppModule(app.EvidenceKeeper),
ibc.NewAppModule(app.IBCKeeper),
groupmodule.NewAppModule(appCodec, app.GroupKeeper, app.AccountKeeper, app.BankKeeper, app.interfaceRegistry),
authzmodule.NewAppModule(appCodec, app.AuthzKeeper, app.AccountKeeper, app.BankKeeper, app.interfaceRegistry),
transferModule,
specModule,
epochstorageModule,
dualstakingModule,
subscriptionModule,
pairingModule,
conflictModule,
projectsModule,
protocolModule,
plansModule,
rewardsModule,
// this line is used by starport scaffolding # stargate/app/appModule
)
app.sm.RegisterStoreDecoders()
app.sm = module.NewSimulationManager()

// initialize stores
app.MountKVStores(keys)
Expand Down
92 changes: 0 additions & 92 deletions app/simulation_test.go

This file was deleted.

34 changes: 34 additions & 0 deletions protocol/chainlib/base_chain_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
spectypes "github.com/lavanet/lava/v4/x/spec/types"
)

var AllowMissingApisByDefault = true

type PolicyInf interface {
GetSupportedAddons(specID string) (addons []string, err error)
GetSupportedExtensions(specID string) (extensions []epochstorage.EndpointService, err error)
Expand Down Expand Up @@ -323,6 +325,35 @@ func (bcp *BaseChainParser) extensionParsingInner(addon string, parsedMessageArg
bcp.extensionParser.ExtensionParsing(addon, parsedMessageArg, latestBlock)
}

func (apip *BaseChainParser) defaultApiContainer(apiKey ApiKey) (*ApiContainer, error) {
// Guard that the GrpcChainParser instance exists
if apip == nil {
return nil, errors.New("ChainParser not defined")
}
utils.LavaFormatDebug("api not supported", utils.Attribute{Key: "apiKey", Value: apiKey})
apiCont := &ApiContainer{
api: &spectypes.Api{
Enabled: true,
Name: "Default-" + apiKey.Name,
ComputeUnits: 20, // set 20 compute units by default
ExtraComputeUnits: 0,
Category: spectypes.SpecCategory{},
BlockParsing: spectypes.BlockParser{
ParserFunc: spectypes.PARSER_FUNC_EMPTY,
},
TimeoutMs: 0,
Parsers: []spectypes.GenericParser{},
},
collectionKey: CollectionKey{
ConnectionType: apiKey.ConnectionType,
InternalPath: apiKey.InternalPath,
Addon: "",
},
}

return apiCont, nil
}

// getSupportedApi fetches service api from spec by name
func (apip *BaseChainParser) getSupportedApi(apiKey ApiKey) (*ApiContainer, error) {
// Guard that the GrpcChainParser instance exists
Expand All @@ -339,6 +370,9 @@ func (apip *BaseChainParser) getSupportedApi(apiKey ApiKey) (*ApiContainer, erro

// Return an error if spec does not exist
if !ok {
if AllowMissingApisByDefault {
return apip.defaultApiContainer(apiKey)
}
return nil, common.APINotSupportedError
}

Expand Down
48 changes: 38 additions & 10 deletions protocol/chainlib/consumer_websocket_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chainlib

import (
"context"
"fmt"
"strconv"
"sync/atomic"
"time"
Expand All @@ -20,6 +21,12 @@ import (
var (
WebSocketRateLimit = -1 // rate limit requests per second on websocket connection
WebSocketBanDuration = time.Duration(0) // once rate limit is reached, will not allow new incoming message for a duration
MaxIdleTimeInSeconds = int64(20 * 60) // 20 minutes of idle time will disconnect the websocket connection
)

const (
WebSocketRateLimitHeader = "x-lava-websocket-rate-limit"
WebSocketOpenConnectionsLimitHeader = "x-lava-websocket-open-connections-limit"
)

type ConsumerWebsocketManager struct {
Expand All @@ -35,6 +42,7 @@ type ConsumerWebsocketManager struct {
relaySender RelaySender
consumerWsSubscriptionManager *ConsumerWSSubscriptionManager
WebsocketConnectionUID string
headerRateLimit uint64
}

type ConsumerWebsocketManagerOptions struct {
Expand All @@ -50,6 +58,7 @@ type ConsumerWebsocketManagerOptions struct {
RelaySender RelaySender
ConsumerWsSubscriptionManager *ConsumerWSSubscriptionManager
WebsocketConnectionUID string
headerRateLimit uint64
}

func NewConsumerWebsocketManager(options ConsumerWebsocketManagerOptions) *ConsumerWebsocketManager {
Expand All @@ -66,6 +75,7 @@ func NewConsumerWebsocketManager(options ConsumerWebsocketManagerOptions) *Consu
refererData: options.RefererData,
consumerWsSubscriptionManager: options.ConsumerWsSubscriptionManager,
WebsocketConnectionUID: options.WebsocketConnectionUID,
headerRateLimit: options.headerRateLimit,
}
return cwm
}
Expand Down Expand Up @@ -142,34 +152,49 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
}
}()

// rate limit routine
// set up a routine to check for rate limits or idle time
idleFor := atomic.Int64{}
idleFor.Store(time.Now().Unix())
requestsPerSecond := &atomic.Uint64{}
go func() {
if WebSocketRateLimit <= 0 {
if WebSocketRateLimit <= 0 && cwm.headerRateLimit <= 0 && MaxIdleTimeInSeconds <= 0 {
return
}
ticker := time.NewTicker(time.Second) // rate limit per second.
defer ticker.Stop()
for {
select {
case <-webSocketCtx.Done():
utils.LavaFormatDebug("ctx done in time checker")
return
case <-ticker.C:
// check if rate limit reached, and ban is required
if WebSocketBanDuration > 0 && requestsPerSecond.Load() > uint64(WebSocketRateLimit) {
// wait the ban duration before resetting the store.
select {
case <-webSocketCtx.Done():
if MaxIdleTimeInSeconds > 0 {
utils.LavaFormatDebug("checking idle time", utils.LogAttr("idleFor", idleFor.Load()), utils.LogAttr("maxIdleTime", MaxIdleTimeInSeconds), utils.LogAttr("now", time.Now().Unix()))
idleDuration := idleFor.Load() + MaxIdleTimeInSeconds
if time.Now().Unix() > idleDuration {
websocketConn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, fmt.Sprintf("Connection idle for too long, closing connection. Idle time: %d", idleDuration)))
return
case <-time.After(WebSocketBanDuration): // just continue
}
}
requestsPerSecond.Store(0)
if cwm.headerRateLimit > 0 || WebSocketRateLimit > 0 {
// check if rate limit reached, and ban is required
currentRequestsPerSecondLoad := requestsPerSecond.Load()
if WebSocketBanDuration > 0 && (currentRequestsPerSecondLoad > cwm.headerRateLimit || currentRequestsPerSecondLoad > uint64(WebSocketRateLimit)) {
// wait the ban duration before resetting the store.
select {
case <-webSocketCtx.Done():
return
case <-time.After(WebSocketBanDuration): // just continue
}
}
requestsPerSecond.Store(0)
}
}
}
}()

for {
idleFor.Store(time.Now().Unix())
startTime := time.Now()
msgSeed := guidString + "_" + strconv.Itoa(rand.Intn(10000000000)) // use message seed with original guid and new int

Expand All @@ -185,7 +210,9 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
}

// Check rate limit is met
if WebSocketRateLimit > 0 && requestsPerSecond.Add(1) > uint64(WebSocketRateLimit) {
currentRequestsPerSecond := requestsPerSecond.Add(1)
if (cwm.headerRateLimit > 0 && currentRequestsPerSecond > cwm.headerRateLimit) ||
(WebSocketRateLimit > 0 && currentRequestsPerSecond > uint64(WebSocketRateLimit)) {
rateLimitResponse, err := cwm.handleRateLimitReached(msg)
if err == nil {
websocketConnWriteChan <- webSocketMsgWithType{messageType: messageType, msg: rateLimitResponse}
Expand Down Expand Up @@ -313,6 +340,7 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
)

for subscriptionMsgReply := range subscriptionMsgsChan {
idleFor.Store(time.Now().Unix())
websocketConnWriteChan <- webSocketMsgWithType{messageType: messageType, msg: outputFormatter(subscriptionMsgReply.Data)}
}

Expand Down
Loading

0 comments on commit 9645592

Please sign in to comment.