diff --git a/protocol/chainlib/common.go b/protocol/chainlib/common.go index 29a25401e6..bbb4fe11d0 100644 --- a/protocol/chainlib/common.go +++ b/protocol/chainlib/common.go @@ -30,11 +30,13 @@ const ( relayMsgLogMaxChars = 200 RPCProviderNodeAddressHash = "Lava-Provider-Node-Address-Hash" RPCProviderNodeExtension = "Lava-Provider-Node-Extension" + RpcProviderLoadRateHeader = "Lava-Provider-Load-Rate" RpcProviderUniqueIdHeader = "Lava-Provider-Unique-Id" WebSocketExtension = "websocket" ) var ( + TrailersToAddToHeaderResponse = []string{RPCProviderNodeExtension, RpcProviderLoadRateHeader} InvalidResponses = []string{"null", "", "nil", "undefined"} FailedSendingSubscriptionToClients = sdkerrors.New("failed Sending Subscription To Clients", 1015, "Failed Sending Subscription To Clients connection might have been closed by the user") NoActiveSubscriptionFound = sdkerrors.New("failed finding an active subscription on provider side", 1016, "no active subscriptions for hashed params.") diff --git a/protocol/chainlib/grpc.go b/protocol/chainlib/grpc.go index b026e8b9d8..37e4d539d8 100644 --- a/protocol/chainlib/grpc.go +++ b/protocol/chainlib/grpc.go @@ -128,7 +128,7 @@ func (apip *GrpcChainParser) ParseMsg(url string, data []byte, connectionType st // Check API is supported and save it in nodeMsg. apiCont, err := apip.getSupportedApi(url, connectionType) if err != nil { - return nil, utils.LavaFormatError("failed to getSupportedApi gRPC", err) + return nil, utils.LavaFormatError("failed to getSupportedApi gRPC", err, utils.LogAttr("url", url)) } apiCollection, err := apip.getApiCollection(connectionType, apiCont.collectionKey.InternalPath, apiCont.collectionKey.Addon) diff --git a/protocol/common/cobra_common.go b/protocol/common/cobra_common.go index 17a2bef31b..338b003f67 100644 --- a/protocol/common/cobra_common.go +++ b/protocol/common/cobra_common.go @@ -46,6 +46,7 @@ const ( // websocket flags RateLimitWebSocketFlag = "rate-limit-websocket-requests-per-connection" BanDurationForWebsocketRateLimitExceededFlag = "ban-duration-for-websocket-rate-limit-exceeded" + RateLimitRequestPerSecondFlag = "rate-limit-requests-per-second" ) const ( diff --git a/protocol/integration/protocol_test.go b/protocol/integration/protocol_test.go index ee7823f669..03ef7cdd2f 100644 --- a/protocol/integration/protocol_test.go +++ b/protocol/integration/protocol_test.go @@ -344,7 +344,7 @@ func createRpcProvider(t *testing.T, ctx context.Context, rpcProviderOptions rpc chainTracker.StartAndServe(ctx) reliabilityManager := reliabilitymanager.NewReliabilityManager(chainTracker, &mockProviderStateTracker, rpcProviderOptions.account.Addr.String(), chainRouter, chainParser) mockReliabilityManager := NewMockReliabilityManager(reliabilityManager) - rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, cache, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false) + rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rws, providerSessionManager, mockReliabilityManager, rpcProviderOptions.account.SK, cache, chainRouter, &mockProviderStateTracker, rpcProviderOptions.account.Addr, rpcProviderOptions.lavaChainID, rpcprovider.DEFAULT_ALLOWED_MISSING_CU, nil, nil, nil, false, nil) listener := rpcprovider.NewProviderListener(ctx, rpcProviderEndpoint.NetworkAddress, "/health") err = listener.RegisterReceiver(rpcProviderServer, rpcProviderEndpoint) require.NoError(t, err) diff --git a/protocol/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go index 8ca37ff6fe..8efeb829f2 100644 --- a/protocol/lavasession/consumer_session_manager.go +++ b/protocol/lavasession/consumer_session_manager.go @@ -959,7 +959,8 @@ func (csm *ConsumerSessionManager) OnSessionFailure(consumerSession *SingleConsu } cuToDecrease := consumerSession.LatestRelayCu // latency, isHangingApi, syncScore aren't updated when there is a failure - go csm.providerOptimizer.AppendRelayFailure(consumerSession.Parent.PublicLavaAddress) + + go csm.providerOptimizer.AppendRelayFailure(consumerSession.Parent.PublicLavaAddress, consumerSession.GetProviderLoad()) consumerSession.LatestRelayCu = 0 // making sure no one uses it in a wrong way consecutiveErrors := uint64(len(consumerSession.ConsecutiveErrors)) parentConsumerSessionsWithProvider := consumerSession.Parent // must read this pointer before unlocking @@ -1042,7 +1043,8 @@ func (csm *ConsumerSessionManager) OnSessionDone( consumerSession.LatestBlock = latestServicedBlock // update latest serviced block // calculate QoS consumerSession.CalculateQoS(currentLatency, expectedLatency, expectedBH-latestServicedBlock, numOfProviders, int64(providersCount)) - go csm.providerOptimizer.AppendRelayData(consumerSession.Parent.PublicLavaAddress, currentLatency, isHangingApi, specComputeUnits, uint64(latestServicedBlock)) + + go csm.providerOptimizer.AppendRelayData(consumerSession.Parent.PublicLavaAddress, currentLatency, isHangingApi, specComputeUnits, uint64(latestServicedBlock), consumerSession.GetProviderLoad()) csm.updateMetricsManager(consumerSession, currentLatency, !isHangingApi) // apply latency only for non hanging apis return nil } diff --git a/protocol/lavasession/consumer_types.go b/protocol/lavasession/consumer_types.go index deb11f7994..7fe373df20 100644 --- a/protocol/lavasession/consumer_types.go +++ b/protocol/lavasession/consumer_types.go @@ -71,8 +71,8 @@ type ConsumerSessionsMap map[string]*SessionInfo type ProviderOptimizer interface { AppendProbeRelayData(providerAddress string, latency time.Duration, success bool) - AppendRelayFailure(providerAddress string) - AppendRelayData(providerAddress string, latency time.Duration, isHangingApi bool, cu, syncBlock uint64) + AppendRelayFailure(providerAddress string, providerLoad *provideroptimizer.ProviderLoadReport) + AppendRelayData(providerAddress string, latency time.Duration, isHangingApi bool, cu, syncBlock uint64, providerLoad *provideroptimizer.ProviderLoadReport) ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (addresses []string, tier int) GetExcellenceQoSReportForProvider(string) (*pairingtypes.QualityOfServiceReport, *pairingtypes.QualityOfServiceReport) Strategy() provideroptimizer.Strategy diff --git a/protocol/lavasession/single_consumer_session.go b/protocol/lavasession/single_consumer_session.go index 4fc8b1b67d..723e4db34e 100644 --- a/protocol/lavasession/single_consumer_session.go +++ b/protocol/lavasession/single_consumer_session.go @@ -7,6 +7,7 @@ import ( "time" sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/lavanet/lava/v4/protocol/provideroptimizer" "github.com/lavanet/lava/v4/utils" pairingtypes "github.com/lavanet/lava/v4/x/pairing/types" ) @@ -21,13 +22,49 @@ type SingleConsumerSession struct { RelayNum uint64 LatestBlock int64 // Each session will holds a pointer to a connection, if the connection is lost, this session will be banned (wont be picked) - EndpointConnection *EndpointConnection - BlockListed bool // if session lost sync we blacklist it. - ConsecutiveErrors []error - errorsCount uint64 - relayProcessor UsedProvidersInf - providerUniqueId string - StaticProvider bool + EndpointConnection *EndpointConnection + BlockListed bool // if session lost sync we blacklist it. + ConsecutiveErrors []error + errorsCount uint64 + relayProcessor UsedProvidersInf + providerUniqueId string + StaticProvider bool + latestKnownLoadReport *provideroptimizer.ProviderLoadReport +} + +// should only be called when locked, returning a copy of the object +func (cs *SingleConsumerSession) GetProviderLoad() *provideroptimizer.ProviderLoadReport { + // create new provider load pointer so we can read it later without locks + var providerLoadReport *provideroptimizer.ProviderLoadReport + if cs.latestKnownLoadReport != nil { + providerLoadReport = &provideroptimizer.ProviderLoadReport{ + ProviderLoad: cs.latestKnownLoadReport.ProviderLoad, + TimeStamp: cs.latestKnownLoadReport.TimeStamp, + } + } + return providerLoadReport +} + +// should only be called when locked. +func (cs *SingleConsumerSession) SetLoadReport(loadReport []string) { + if len(loadReport) <= 0 { + // no load report + return + } + load := loadReport[0] + floatLoad, err := strconv.ParseFloat(load, 64) + if err != nil { + utils.LavaFormatWarning("Failed parsing load report from provider", err, utils.LogAttr("load_reported", loadReport)) + return + } + if floatLoad == 0 { + // Provider did not set his max load options or has 0 load. + return + } + cs.latestKnownLoadReport = &provideroptimizer.ProviderLoadReport{ + TimeStamp: time.Now(), + ProviderLoad: floatLoad, + } } // returns the expected latency to a threshold. diff --git a/protocol/metrics/provider_metrics.go b/protocol/metrics/provider_metrics.go index e4eadbe4dc..3b5f863c50 100644 --- a/protocol/metrics/provider_metrics.go +++ b/protocol/metrics/provider_metrics.go @@ -22,6 +22,7 @@ type ProviderMetrics struct { totalRelaysServicedMetric *prometheus.CounterVec totalErroredMetric *prometheus.CounterVec consumerQoSMetric *prometheus.GaugeVec + loadRateMetric *prometheus.GaugeVec } func (pm *ProviderMetrics) AddRelay(consumerAddress string, cu uint64, qos *pairingtypes.QualityOfServiceReport) { @@ -49,6 +50,13 @@ func (pm *ProviderMetrics) AddRelay(consumerAddress string, cu uint64, qos *pair } } +func (pm *ProviderMetrics) SetLoadRate(loadRate float64) { + if pm == nil { + return + } + pm.loadRateMetric.WithLabelValues(pm.specID).Set(loadRate) +} + func (pm *ProviderMetrics) AddPayment(cu uint64) { if pm == nil { return @@ -72,6 +80,7 @@ func NewProviderMetrics(specID, apiInterface string, totalCUServicedMetric *prom totalRelaysServicedMetric *prometheus.CounterVec, totalErroredMetric *prometheus.CounterVec, consumerQoSMetric *prometheus.GaugeVec, + loadRateMetric *prometheus.GaugeVec, ) *ProviderMetrics { pm := &ProviderMetrics{ specID: specID, @@ -82,6 +91,7 @@ func NewProviderMetrics(specID, apiInterface string, totalCUServicedMetric *prom totalRelaysServicedMetric: totalRelaysServicedMetric, totalErroredMetric: totalErroredMetric, consumerQoSMetric: consumerQoSMetric, + loadRateMetric: loadRateMetric, } return pm } diff --git a/protocol/metrics/provider_metrics_manager.go b/protocol/metrics/provider_metrics_manager.go index f6734f69d2..a4578df233 100644 --- a/protocol/metrics/provider_metrics_manager.go +++ b/protocol/metrics/provider_metrics_manager.go @@ -41,6 +41,7 @@ type ProviderMetricsManager struct { endpointsHealthChecksOk uint64 relaysMonitors map[string]*RelaysMonitor relaysMonitorsLock sync.RWMutex + loadRateMetric *prometheus.GaugeVec } func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager { @@ -107,6 +108,11 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager { Help: "The total number of get latest block queries that succeeded by chainfetcher", }, []string{"spec"}) + loadRateMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_provider_load_rate", + Help: "The load rate according to the load rate limit - Given Y simultaneous relay calls, a value of X and will measure Y/X load rate.", + }, []string{"spec"}) + fetchBlockSuccessMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "lava_provider_fetch_block_success", Help: "The total number of get specific block queries that succeeded by chainfetcher", @@ -141,6 +147,7 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager { prometheus.MustRegister(virtualEpochMetric) prometheus.MustRegister(endpointsHealthChecksOkMetric) prometheus.MustRegister(protocolVersionMetric) + prometheus.MustRegister(loadRateMetric) providerMetricsManager := &ProviderMetricsManager{ providerMetrics: map[string]*ProviderMetrics{}, @@ -161,6 +168,7 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager { endpointsHealthChecksOk: 1, protocolVersionMetric: protocolVersionMetric, relaysMonitors: map[string]*RelaysMonitor{}, + loadRateMetric: loadRateMetric, } http.Handle("/metrics", promhttp.Handler()) @@ -209,7 +217,7 @@ func (pme *ProviderMetricsManager) AddProviderMetrics(specID, apiInterface strin } if pme.getProviderMetric(specID, apiInterface) == nil { - providerMetric := NewProviderMetrics(specID, apiInterface, pme.totalCUServicedMetric, pme.totalCUPaidMetric, pme.totalRelaysServicedMetric, pme.totalErroredMetric, pme.consumerQoSMetric) + providerMetric := NewProviderMetrics(specID, apiInterface, pme.totalCUServicedMetric, pme.totalCUPaidMetric, pme.totalRelaysServicedMetric, pme.totalErroredMetric, pme.consumerQoSMetric, pme.loadRateMetric) pme.setProviderMetric(providerMetric) endpoint := fmt.Sprintf("/metrics/%s/%s/health", specID, apiInterface) diff --git a/protocol/provideroptimizer/provider_optimizer.go b/protocol/provideroptimizer/provider_optimizer.go index 6f9198a9e9..1c2cd02831 100644 --- a/protocol/provideroptimizer/provider_optimizer.go +++ b/protocol/provideroptimizer/provider_optimizer.go @@ -71,6 +71,11 @@ type Exploration struct { time time.Time } +type ProviderLoadReport struct { + ProviderLoad float64 // float describing load set by the provider can go above 1.0. + TimeStamp time.Time +} + type ProviderData struct { Availability score.ScoreStore // will be used to calculate the probability of error Latency score.ScoreStore // will be used to calculate the latency score @@ -78,6 +83,7 @@ type ProviderData struct { SyncBlock uint64 // will be used to calculate the probability of block error LatencyRaw score.ScoreStore // will be used when reporting reputation to the node (Latency = LatencyRaw / baseLatency) SyncRaw score.ScoreStore // will be used when reporting reputation to the node (Sync = SyncRaw / baseSync) + ProviderLoad *ProviderLoadReport } type Strategy int @@ -101,17 +107,22 @@ func (po *ProviderOptimizer) UpdateWeights(weights map[string]int64, epoch uint6 } } -func (po *ProviderOptimizer) AppendRelayFailure(providerAddress string) { - po.appendRelayData(providerAddress, 0, false, false, 0, 0, time.Now()) +// TODO forward load also on relay failure +func (po *ProviderOptimizer) AppendRelayFailure(providerAddress string, providerLoad *ProviderLoadReport) { + po.appendRelayData(providerAddress, 0, false, false, 0, 0, time.Now(), providerLoad) } -func (po *ProviderOptimizer) AppendRelayData(providerAddress string, latency time.Duration, isHangingApi bool, cu, syncBlock uint64) { - po.appendRelayData(providerAddress, latency, isHangingApi, true, cu, syncBlock, time.Now()) +func (po *ProviderOptimizer) AppendRelayData(providerAddress string, latency time.Duration, isHangingApi bool, cu, syncBlock uint64, providerLoad *ProviderLoadReport) { + po.appendRelayData(providerAddress, latency, isHangingApi, true, cu, syncBlock, time.Now(), providerLoad) } -func (po *ProviderOptimizer) appendRelayData(providerAddress string, latency time.Duration, isHangingApi, success bool, cu, syncBlock uint64, sampleTime time.Time) { +func (po *ProviderOptimizer) appendRelayData(providerAddress string, latency time.Duration, isHangingApi, success bool, cu, syncBlock uint64, sampleTime time.Time, providerLoad *ProviderLoadReport) { latestSync, timeSync := po.updateLatestSyncData(syncBlock, sampleTime) providerData, _ := po.getProviderData(providerAddress) + // set current provider load only if incoming data is more fresh than previous stored data + if providerLoad != nil && (providerData.ProviderLoad == nil || providerLoad.TimeStamp.After(providerData.ProviderLoad.TimeStamp)) { + providerData.ProviderLoad = providerLoad + } halfTime := po.calculateHalfTime(providerAddress, sampleTime) providerData = po.updateProbeEntryAvailability(providerData, success, RELAY_UPDATE_WEIGHT, halfTime, sampleTime) if success { diff --git a/protocol/provideroptimizer/provider_optimizer_test.go b/protocol/provideroptimizer/provider_optimizer_test.go index 61e7a9c0e2..3d03624240 100644 --- a/protocol/provideroptimizer/provider_optimizer_test.go +++ b/protocol/provideroptimizer/provider_optimizer_test.go @@ -191,9 +191,9 @@ func TestProviderOptimizerBasicRelayData(t *testing.T) { syncBlock := uint64(requestBlock) - providerOptimizer.AppendRelayData(providersGen.providersAddresses[5], TEST_BASE_WORLD_LATENCY*4, false, requestCU, syncBlock) - providerOptimizer.AppendRelayData(providersGen.providersAddresses[6], TEST_BASE_WORLD_LATENCY*4, false, requestCU, syncBlock) - providerOptimizer.AppendRelayData(providersGen.providersAddresses[7], TEST_BASE_WORLD_LATENCY*4, false, requestCU, syncBlock) + providerOptimizer.AppendRelayData(providersGen.providersAddresses[5], TEST_BASE_WORLD_LATENCY*4, false, requestCU, syncBlock, nil) + providerOptimizer.AppendRelayData(providersGen.providersAddresses[6], TEST_BASE_WORLD_LATENCY*4, false, requestCU, syncBlock, nil) + providerOptimizer.AppendRelayData(providersGen.providersAddresses[7], TEST_BASE_WORLD_LATENCY*4, false, requestCU, syncBlock, nil) time.Sleep(4 * time.Millisecond) returnedProviders, tier := providerOptimizer.ChooseProvider(providersGen.providersAddresses, nil, requestCU, requestBlock) require.Equal(t, 1, len(returnedProviders)) @@ -203,9 +203,9 @@ func TestProviderOptimizerBasicRelayData(t *testing.T) { require.NotEqual(t, returnedProviders[0], providersGen.providersAddresses[6], tier) require.NotEqual(t, returnedProviders[0], providersGen.providersAddresses[7], tier) - providerOptimizer.AppendRelayData(providersGen.providersAddresses[0], TEST_BASE_WORLD_LATENCY/4, false, requestCU, syncBlock) - providerOptimizer.AppendRelayData(providersGen.providersAddresses[1], TEST_BASE_WORLD_LATENCY/4, false, requestCU, syncBlock) - providerOptimizer.AppendRelayData(providersGen.providersAddresses[2], TEST_BASE_WORLD_LATENCY/4, false, requestCU, syncBlock) + providerOptimizer.AppendRelayData(providersGen.providersAddresses[0], TEST_BASE_WORLD_LATENCY/4, false, requestCU, syncBlock, nil) + providerOptimizer.AppendRelayData(providersGen.providersAddresses[1], TEST_BASE_WORLD_LATENCY/4, false, requestCU, syncBlock, nil) + providerOptimizer.AppendRelayData(providersGen.providersAddresses[2], TEST_BASE_WORLD_LATENCY/4, false, requestCU, syncBlock, nil) time.Sleep(4 * time.Millisecond) results, tierResults := runChooseManyTimesAndReturnResults(t, providerOptimizer, providersGen.providersAddresses, nil, requestCU, requestBlock, 1000) @@ -261,7 +261,7 @@ func TestProviderOptimizerAvailabilityRelayData(t *testing.T) { // skip 0 continue } - providerOptimizer.AppendRelayFailure(providersGen.providersAddresses[i]) + providerOptimizer.AppendRelayFailure(providersGen.providersAddresses[i], nil) } time.Sleep(4 * time.Millisecond) results, tierResults := runChooseManyTimesAndReturnResults(t, providerOptimizer, providersGen.providersAddresses, nil, requestCU, requestBlock, 1000) @@ -287,10 +287,10 @@ func TestProviderOptimizerAvailabilityBlockError(t *testing.T) { time.Sleep(4 * time.Millisecond) if i == chosenIndex || i == chosenIndex+1 || i == chosenIndex+2 { // give better syncBlock, worse latency by a little - providerOptimizer.AppendRelayData(providersGen.providersAddresses[i], TEST_BASE_WORLD_LATENCY+10*time.Millisecond, false, requestCU, syncBlock) + providerOptimizer.AppendRelayData(providersGen.providersAddresses[i], TEST_BASE_WORLD_LATENCY+10*time.Millisecond, false, requestCU, syncBlock, nil) continue } - providerOptimizer.AppendRelayData(providersGen.providersAddresses[i], TEST_BASE_WORLD_LATENCY, false, requestCU, syncBlock-1) // update that he doesn't have the latest requested block + providerOptimizer.AppendRelayData(providersGen.providersAddresses[i], TEST_BASE_WORLD_LATENCY, false, requestCU, syncBlock-1, nil) // update that he doesn't have the latest requested block } time.Sleep(4 * time.Millisecond) selectionTier, _, _ := providerOptimizer.CalculateSelectionTiers(providersGen.providersAddresses, nil, requestCU, requestBlock) @@ -378,7 +378,7 @@ func TestProviderOptimizerExploration(t *testing.T) { chosenIndex = rand.Intn(providersCount - 2) // set chosen index with a value in the past so it can be selected for exploration - providerOptimizer.appendRelayData(providersGen.providersAddresses[chosenIndex], TEST_BASE_WORLD_LATENCY*2, false, true, requestCU, syncBlock, time.Now().Add(-35*time.Second)) + providerOptimizer.appendRelayData(providersGen.providersAddresses[chosenIndex], TEST_BASE_WORLD_LATENCY*2, false, true, requestCU, syncBlock, time.Now().Add(-35*time.Second), nil) // set a basic state for all other provider, with a recent time (so they can't be selected for exploration) for i := 0; i < 10; i++ { for index, address := range providersGen.providersAddresses { @@ -387,7 +387,7 @@ func TestProviderOptimizerExploration(t *testing.T) { continue } // set samples in the future so they are never a candidate for exploration - providerOptimizer.appendRelayData(address, TEST_BASE_WORLD_LATENCY*2, false, true, requestCU, syncBlock, time.Now().Add(1*time.Second)) + providerOptimizer.appendRelayData(address, TEST_BASE_WORLD_LATENCY*2, false, true, requestCU, syncBlock, time.Now().Add(1*time.Second), nil) } time.Sleep(4 * time.Millisecond) } @@ -428,10 +428,10 @@ func TestProviderOptimizerSyncScore(t *testing.T) { time.Sleep(4 * time.Millisecond) if i == chosenIndex { // give better syncBlock, latency is a tiny bit worse for the second check - providerOptimizer.appendRelayData(providersGen.providersAddresses[i], TEST_BASE_WORLD_LATENCY*2+1*time.Microsecond, false, true, requestCU, syncBlock+5, sampleTime) + providerOptimizer.appendRelayData(providersGen.providersAddresses[i], TEST_BASE_WORLD_LATENCY*2+1*time.Microsecond, false, true, requestCU, syncBlock+5, sampleTime, nil) continue } - providerOptimizer.appendRelayData(providersGen.providersAddresses[i], TEST_BASE_WORLD_LATENCY*2, false, true, requestCU, syncBlock, sampleTime) // update that he doesn't have the latest requested block + providerOptimizer.appendRelayData(providersGen.providersAddresses[i], TEST_BASE_WORLD_LATENCY*2, false, true, requestCU, syncBlock, sampleTime, nil) // update that he doesn't have the latest requested block } sampleTime = sampleTime.Add(time.Millisecond * 5) } @@ -464,7 +464,7 @@ func TestProviderOptimizerStrategiesScoring(t *testing.T) { sampleTime := time.Now() for i := 0; i < 10; i++ { for _, address := range providersGen.providersAddresses { - providerOptimizer.appendRelayData(address, TEST_BASE_WORLD_LATENCY*2, false, true, requestCU, syncBlock, sampleTime) + providerOptimizer.appendRelayData(address, TEST_BASE_WORLD_LATENCY*2, false, true, requestCU, syncBlock, sampleTime, nil) } time.Sleep(4 * time.Millisecond) } @@ -489,21 +489,21 @@ func TestProviderOptimizerStrategiesScoring(t *testing.T) { normalLatency := TEST_BASE_WORLD_LATENCY * 2 improvedBlock := syncBlock + 1 // provider 0 gets a good latency - providerOptimizer.appendRelayData(providersGen.providersAddresses[0], improvedLatency, false, true, requestCU, syncBlock, sampleTime) + providerOptimizer.appendRelayData(providersGen.providersAddresses[0], improvedLatency, false, true, requestCU, syncBlock, sampleTime, nil) // providers 3,4 get a regular entry - providerOptimizer.appendRelayData(providersGen.providersAddresses[3], normalLatency, false, true, requestCU, syncBlock, sampleTime) - providerOptimizer.appendRelayData(providersGen.providersAddresses[4], normalLatency, false, true, requestCU, syncBlock, sampleTime) + providerOptimizer.appendRelayData(providersGen.providersAddresses[3], normalLatency, false, true, requestCU, syncBlock, sampleTime, nil) + providerOptimizer.appendRelayData(providersGen.providersAddresses[4], normalLatency, false, true, requestCU, syncBlock, sampleTime, nil) // provider 1 gets a good sync - providerOptimizer.appendRelayData(providersGen.providersAddresses[1], normalLatency, false, true, requestCU, improvedBlock, sampleTime) + providerOptimizer.appendRelayData(providersGen.providersAddresses[1], normalLatency, false, true, requestCU, improvedBlock, sampleTime, nil) sampleTime = sampleTime.Add(10 * time.Millisecond) // now repeat to modify all providers scores across sync calculation - providerOptimizer.appendRelayData(providersGen.providersAddresses[0], improvedLatency, false, true, requestCU, syncBlock, sampleTime) - providerOptimizer.appendRelayData(providersGen.providersAddresses[3], normalLatency, false, true, requestCU, syncBlock, sampleTime) - providerOptimizer.appendRelayData(providersGen.providersAddresses[4], normalLatency, false, true, requestCU, syncBlock, sampleTime) - providerOptimizer.appendRelayData(providersGen.providersAddresses[1], normalLatency, false, true, requestCU, improvedBlock, sampleTime) + providerOptimizer.appendRelayData(providersGen.providersAddresses[0], improvedLatency, false, true, requestCU, syncBlock, sampleTime, nil) + providerOptimizer.appendRelayData(providersGen.providersAddresses[3], normalLatency, false, true, requestCU, syncBlock, sampleTime, nil) + providerOptimizer.appendRelayData(providersGen.providersAddresses[4], normalLatency, false, true, requestCU, syncBlock, sampleTime, nil) + providerOptimizer.appendRelayData(providersGen.providersAddresses[1], normalLatency, false, true, requestCU, improvedBlock, sampleTime, nil) time.Sleep(4 * time.Millisecond) providerOptimizer.strategy = STRATEGY_BALANCED @@ -559,7 +559,7 @@ func TestExcellence(t *testing.T) { sampleTime := time.Now() for i := 0; i < 10; i++ { for _, address := range providersGen.providersAddresses { - providerOptimizer.appendRelayData(address, TEST_BASE_WORLD_LATENCY*2, false, true, requestCU, syncBlock, sampleTime) + providerOptimizer.appendRelayData(address, TEST_BASE_WORLD_LATENCY*2, false, true, requestCU, syncBlock, sampleTime, nil) } time.Sleep(4 * time.Millisecond) } @@ -616,7 +616,7 @@ func TestProviderOptimizerProvidersCount(t *testing.T) { sampleTime := time.Now() for i := 0; i < 10; i++ { for _, address := range providersGen.providersAddresses { - providerOptimizer.appendRelayData(address, TEST_BASE_WORLD_LATENCY*2, false, true, requestCU, syncBlock, sampleTime) + providerOptimizer.appendRelayData(address, TEST_BASE_WORLD_LATENCY*2, false, true, requestCU, syncBlock, sampleTime, nil) } time.Sleep(4 * time.Millisecond) } @@ -695,9 +695,9 @@ func TestProviderOptimizerWeights(t *testing.T) { for i := 0; i < 10; i++ { for idx, address := range providersGen.providersAddresses { if idx == 0 { - providerOptimizer.appendRelayData(address, normalLatency, false, true, requestCU, improvedBlock, sampleTime) + providerOptimizer.appendRelayData(address, normalLatency, false, true, requestCU, improvedBlock, sampleTime, nil) } else { - providerOptimizer.appendRelayData(address, improvedLatency, false, true, requestCU, syncBlock, sampleTime) + providerOptimizer.appendRelayData(address, improvedLatency, false, true, requestCU, syncBlock, sampleTime, nil) } sampleTime = sampleTime.Add(5 * time.Millisecond) time.Sleep(4 * time.Millisecond) @@ -739,7 +739,7 @@ func TestProviderOptimizerTiers(t *testing.T) { for _, address := range providersGen.providersAddresses { modifierLatency := rand.Int63n(3) - 1 modifierSync := rand.Int63n(3) - 1 - providerOptimizer.appendRelayData(address, normalLatency+time.Duration(modifierLatency)*time.Millisecond, false, true, requestCU, syncBlock+uint64(modifierSync), sampleTime) + providerOptimizer.appendRelayData(address, normalLatency+time.Duration(modifierLatency)*time.Millisecond, false, true, requestCU, syncBlock+uint64(modifierSync), sampleTime, nil) sampleTime = sampleTime.Add(5 * time.Millisecond) time.Sleep(4 * time.Millisecond) } @@ -798,7 +798,7 @@ func TestProviderOptimizerWithOptimizerQoSClient(t *testing.T) { requestCU := uint64(10) normalLatency := TEST_BASE_WORLD_LATENCY * 2 - providerOptimizer.appendRelayData(providerAddr, normalLatency, false, true, requestCU, syncBlock, time.Now()) + providerOptimizer.appendRelayData(providerAddr, normalLatency, false, true, requestCU, syncBlock, time.Now(), nil) wg.Wait() } diff --git a/protocol/provideroptimizer/selection_weight.go b/protocol/provideroptimizer/selection_weight.go index e0fdc30f38..7094caa20f 100644 --- a/protocol/provideroptimizer/selection_weight.go +++ b/protocol/provideroptimizer/selection_weight.go @@ -44,6 +44,8 @@ func (sw *selectionWeighterInst) SetWeights(weights map[string]int64) { } } +// providerLoad - 0.7; if providerLoad < 0; providerLoad = 0; +// (weight * 1 / 1 + (providerLoad - 0.7)); func (sw *selectionWeighterInst) WeightedChoice(entries []Entry) string { if len(entries) == 0 { return "" diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 7f8989b509..106f8b7584 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -886,6 +886,8 @@ func (rpccs *RPCConsumerServer) relayInner(ctx context.Context, singleConsumerSe } statuses := relayResult.ProviderTrailer.Get(common.StatusCodeMetadataKey) + // set provider load to single consumer session + singleConsumerSession.SetLoadReport(relayResult.ProviderTrailer.Get(chainlib.RpcProviderLoadRateHeader)) if len(statuses) > 0 { codeNum, errStatus := strconv.Atoi(statuses[0]) @@ -1261,6 +1263,20 @@ func (rpccs *RPCConsumerServer) HandleDirectiveHeadersForMessage(chainMessage ch chainMessage.SetForceCacheRefresh(ok) } +// Iterating over metadataHeaders adding each trailer that fits the header if found to relayResult.Relay.Metadata +func (rpccs *RPCConsumerServer) getMetadataFromRelayTrailer(metadataHeaders []string, relayResult *common.RelayResult) { + for _, metadataHeader := range metadataHeaders { + trailerValue := relayResult.ProviderTrailer.Get(metadataHeader) + if len(trailerValue) > 0 { + extensionMD := pairingtypes.Metadata{ + Name: metadataHeader, + Value: trailerValue[0], + } + relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, extensionMD) + } + } +} + func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context, relayResult *common.RelayResult, protocolErrors uint64, relayProcessor *RelayProcessor, protocolMessage chainlib.ProtocolMessage, apiName string) { if relayResult == nil { return @@ -1333,14 +1349,7 @@ func (rpccs *RPCConsumerServer) appendHeadersToRelayResult(ctx context.Context, } // fetch trailer information from the provider by using the provider trailer field. - providerNodeExtensions := relayResult.ProviderTrailer.Get(chainlib.RPCProviderNodeExtension) - if len(providerNodeExtensions) > 0 { - extensionMD := pairingtypes.Metadata{ - Name: chainlib.RPCProviderNodeExtension, - Value: providerNodeExtensions[0], - } - relayResult.Reply.Metadata = append(relayResult.Reply.Metadata, extensionMD) - } + rpccs.getMetadataFromRelayTrailer(chainlib.TrailersToAddToHeaderResponse, relayResult) directiveHeaders := protocolMessage.GetDirectiveHeaders() _, debugRelays := directiveHeaders[common.LAVA_DEBUG_RELAY] diff --git a/protocol/rpcprovider/provider_load_manager.go b/protocol/rpcprovider/provider_load_manager.go new file mode 100644 index 0000000000..f21b221ad9 --- /dev/null +++ b/protocol/rpcprovider/provider_load_manager.go @@ -0,0 +1,55 @@ +package rpcprovider + +import ( + "context" + "strconv" + "sync/atomic" + + "github.com/lavanet/lava/v4/protocol/chainlib" + grpc "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +type ProviderLoadManager struct { + rateLimitThreshold uint64 + activeRequestsPerSecond atomic.Uint64 +} + +func NewProviderLoadManager(rateLimitThreshold uint64) *ProviderLoadManager { + if rateLimitThreshold == 0 { + return nil + } + loadManager := &ProviderLoadManager{rateLimitThreshold: rateLimitThreshold} + return loadManager +} + +func (loadManager *ProviderLoadManager) subtractRelayCall() { + if loadManager == nil { + return + } + loadManager.activeRequestsPerSecond.Add(^uint64(0)) +} + +func (loadManager *ProviderLoadManager) getProviderLoad(activeRequests uint64) float64 { + rateLimitThreshold := loadManager.rateLimitThreshold + if rateLimitThreshold == 0 { + return 0 + } + return float64(activeRequests) / float64(rateLimitThreshold) +} + +// Add relay count, calculate current load +func (loadManager *ProviderLoadManager) addAndSetRelayLoadToContextTrailer(ctx context.Context) float64 { + if loadManager == nil { + return 0 + } + activeRequestsPerSecond := loadManager.activeRequestsPerSecond.Add(1) + provideRelayLoad := loadManager.getProviderLoad(activeRequestsPerSecond) + if provideRelayLoad == 0 { + return provideRelayLoad + } + formattedProviderLoad := strconv.FormatFloat(provideRelayLoad, 'f', -1, 64) + trailerMd := metadata.Pairs(chainlib.RpcProviderLoadRateHeader, formattedProviderLoad) + grpc.SetTrailer(ctx, trailerMd) + return provideRelayLoad +} diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index 3c2a379b9e..22b9ed8b65 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -110,6 +110,7 @@ type rpcProviderStartOptions struct { healthCheckMetricsOptions *rpcProviderHealthCheckMetricsOptions staticProvider bool staticSpecPath string + relayLoadLimit uint64 } type rpcProviderHealthCheckMetricsOptions struct { @@ -123,24 +124,26 @@ type RPCProvider struct { rpcProviderListeners map[string]*ProviderListener lock sync.Mutex // all of the following members need to be concurrency proof - providerMetricsManager *metrics.ProviderMetricsManager - rewardServer *rewardserver.RewardServer - privKey *btcec.PrivateKey - lavaChainID string - addr sdk.AccAddress - blockMemorySize uint64 - chainMutexes map[string]*sync.Mutex - parallelConnections uint - cache *performance.Cache - shardID uint // shardID is a flag that allows setting up multiple provider databases of the same chain - chainTrackers *common.SafeSyncMap[string, *chaintracker.ChainTracker] - relaysMonitorAggregator *metrics.RelaysMonitorAggregator - relaysHealthCheckEnabled bool - relaysHealthCheckInterval time.Duration - grpcHealthCheckEndpoint string - providerUniqueId string - staticProvider bool - staticSpecPath string + providerMetricsManager *metrics.ProviderMetricsManager + rewardServer *rewardserver.RewardServer + privKey *btcec.PrivateKey + lavaChainID string + addr sdk.AccAddress + blockMemorySize uint64 + chainMutexes map[string]*sync.Mutex + parallelConnections uint + cache *performance.Cache + shardID uint // shardID is a flag that allows setting up multiple provider databases of the same chain + chainTrackers *common.SafeSyncMap[string, *chaintracker.ChainTracker] + relaysMonitorAggregator *metrics.RelaysMonitorAggregator + relaysHealthCheckEnabled bool + relaysHealthCheckInterval time.Duration + grpcHealthCheckEndpoint string + providerUniqueId string + staticProvider bool + staticSpecPath string + relayLoadLimit uint64 + providerLoadManagersPerChain *common.SafeSyncMap[string, *ProviderLoadManager] } func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) { @@ -165,7 +168,8 @@ func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) { rpcp.grpcHealthCheckEndpoint = options.healthCheckMetricsOptions.grpcHealthCheckEndpoint rpcp.staticProvider = options.staticProvider rpcp.staticSpecPath = options.staticSpecPath - + rpcp.relayLoadLimit = options.relayLoadLimit + rpcp.providerLoadManagersPerChain = &common.SafeSyncMap[string, *ProviderLoadManager]{} // single state tracker lavaChainFetcher := chainlib.NewLavaChainFetcher(ctx, options.clientCtx) providerStateTracker, err := statetracker.NewProviderStateTracker(ctx, options.txFactory, options.clientCtx, lavaChainFetcher, rpcp.providerMetricsManager) @@ -307,9 +311,7 @@ func (rpcp *RPCProvider) SetupProviderEndpoints(rpcProviderEndpoints []*lavasess wg.Add(parallelJobs) disabledEndpoints := make(chan *lavasession.RPCProviderEndpoint, parallelJobs) // validate static spec configuration is used only on a single chain setup. - chainIds := make(map[string]struct{}) for _, rpcProviderEndpoint := range rpcProviderEndpoints { - chainIds[rpcProviderEndpoint.ChainID] = struct{}{} setupEndpoint := func(rpcProviderEndpoint *lavasession.RPCProviderEndpoint, specValidator *SpecValidator) { defer wg.Done() err := rpcp.SetupEndpoint(context.Background(), rpcProviderEndpoint, specValidator) @@ -404,8 +406,8 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint utils.Attribute{Key: "Chain", Value: rpcProviderEndpoint.ChainID}, utils.Attribute{Key: "apiInterface", Value: apiInterface}) } - // in order to utilize shared resources between chains we need go routines with the same chain to wait for one another here + var loadManager *ProviderLoadManager chainCommonSetup := func() error { rpcp.chainMutexes[chainID].Lock() defer rpcp.chainMutexes[chainID].Unlock() @@ -450,6 +452,12 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint chainTracker = chainTrackerLoaded utils.LavaFormatDebug("reusing chain tracker", utils.Attribute{Key: "chain", Value: rpcProviderEndpoint.ChainID}) } + + // create provider load manager per chain ID + loadManager, _, err = rpcp.providerLoadManagersPerChain.LoadOrStore(rpcProviderEndpoint.ChainID, NewProviderLoadManager(rpcp.relayLoadLimit)) + if err != nil { + utils.LavaFormatError("Failed LoadOrStore providerLoadManagersPerChain", err, utils.LogAttr("chainId", rpcProviderEndpoint.ChainID), utils.LogAttr("rpcp.relayLoadLimit", rpcp.relayLoadLimit)) + } return nil } err = chainCommonSetup() @@ -485,8 +493,7 @@ func (rpcp *RPCProvider) SetupEndpoint(ctx context.Context, rpcProviderEndpoint utils.LavaFormatTrace("Creating provider node subscription manager", utils.LogAttr("rpcProviderEndpoint", rpcProviderEndpoint)) providerNodeSubscriptionManager = chainlib.NewProviderNodeSubscriptionManager(chainRouter, chainParser, rpcProviderServer, rpcp.privKey) } - - rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, reliabilityManager, rpcp.privKey, rpcp.cache, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider) + rpcProviderServer.ServeRPCRequests(ctx, rpcProviderEndpoint, chainParser, rpcp.rewardServer, providerSessionManager, reliabilityManager, rpcp.privKey, rpcp.cache, chainRouter, rpcp.providerStateTracker, rpcp.addr, rpcp.lavaChainID, DEFAULT_ALLOWED_MISSING_CU, providerMetrics, relaysMonitor, providerNodeSubscriptionManager, rpcp.staticProvider, loadManager) // set up grpc listener var listener *ProviderListener func() { @@ -717,6 +724,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt if stickinessHeaderName != "" { RPCProviderStickinessHeaderName = stickinessHeaderName } + relayLoadLimit := viper.GetUint64(common.RateLimitRequestPerSecondFlag) prometheusListenAddr := viper.GetString(metrics.MetricsListenFlagName) rewardStoragePath := viper.GetString(rewardserver.RewardServerStorageFlagName) rewardTTL := viper.GetDuration(rewardserver.RewardTTLFlagName) @@ -754,6 +762,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt &rpcProviderHealthCheckMetricsOptions, staticProvider, offlineSpecPath, + relayLoadLimit, } rpcProvider := RPCProvider{} @@ -790,7 +799,7 @@ rpcprovider 127.0.0.1:3333 OSMOSIS tendermintrpc "wss://www.node-path.com:80,htt cmdRPCProvider.Flags().BoolVar(&chainlib.IgnoreSubscriptionNotConfiguredError, chainlib.IgnoreSubscriptionNotConfiguredErrorFlag, chainlib.IgnoreSubscriptionNotConfiguredError, "ignore webSocket node url not configured error, when subscription is enabled in spec") cmdRPCProvider.Flags().IntVar(&numberOfRetriesAllowedOnNodeErrors, common.SetRelayCountOnNodeErrorFlag, 2, "set the number of retries attempt on node errors") cmdRPCProvider.Flags().String(common.UseStaticSpecFlag, "", "load offline spec provided path to spec file, used to test specs before they are proposed on chain, example for spec with inheritance: --use-static-spec ./cookbook/specs/ibc.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/evmos.json") - + cmdRPCProvider.Flags().Uint64(common.RateLimitRequestPerSecondFlag, 0, "Measuring the load relative to this number for feedback - per second - per chain - default unlimited. Given Y simultaneous relay calls, a value of X and will measure Y/X load rate.") common.AddRollingLogConfig(cmdRPCProvider) return cmdRPCProvider } diff --git a/protocol/rpcprovider/rpcprovider_server.go b/protocol/rpcprovider/rpcprovider_server.go index acd31b9717..64543afa1e 100644 --- a/protocol/rpcprovider/rpcprovider_server.go +++ b/protocol/rpcprovider/rpcprovider_server.go @@ -71,6 +71,7 @@ type RPCProviderServer struct { providerUniqueId string StaticProvider bool providerStateMachine *ProviderStateMachine + providerLoadManager *ProviderLoadManager } type ReliabilityManagerInf interface { @@ -112,6 +113,7 @@ func (rpcps *RPCProviderServer) ServeRPCRequests( relaysMonitor *metrics.RelaysMonitor, providerNodeSubscriptionManager *chainlib.ProviderNodeSubscriptionManager, staticProvider bool, + providerLoadManager *ProviderLoadManager, ) { rpcps.cache = cache rpcps.chainRouter = chainRouter @@ -134,6 +136,7 @@ func (rpcps *RPCProviderServer) ServeRPCRequests( rpcps.relaysMonitor = relaysMonitor rpcps.providerNodeSubscriptionManager = providerNodeSubscriptionManager rpcps.providerStateMachine = NewProviderStateMachine(rpcProviderEndpoint.ChainID, lavaprotocol.NewRelayRetriesManager(), chainRouter) + rpcps.providerLoadManager = providerLoadManager rpcps.initRelaysMonitor(ctx) } @@ -180,7 +183,17 @@ func (rpcps *RPCProviderServer) craftChainMessage() (chainMessage chainlib.Chain // function used to handle relay requests from a consumer, it is called by a provider_listener by calling RegisterReceiver func (rpcps *RPCProviderServer) Relay(ctx context.Context, request *pairingtypes.RelayRequest) (*pairingtypes.RelayReply, error) { - grpc.SetTrailer(ctx, metadata.Pairs(chainlib.RpcProviderUniqueIdHeader, rpcps.providerUniqueId)) + // get the number of simultaneous relay calls + currentLoad := rpcps.providerLoadManager.addAndSetRelayLoadToContextTrailer(ctx) + defer func() { + // add load metric and subtract the load at the end of the relay using a routine. + go func() { + rpcps.providerLoadManager.subtractRelayCall() + rpcps.metrics.SetLoadRate(currentLoad) + }() + }() + trailerMd := metadata.Pairs(chainlib.RpcProviderUniqueIdHeader, rpcps.providerUniqueId) + grpc.SetTrailer(ctx, trailerMd) if request.RelayData == nil || request.RelaySession == nil { return nil, utils.LavaFormatWarning("invalid relay request, internal fields are nil", nil) } diff --git a/scripts/pre_setups/init_lava_only_with_node_two_consumers.sh b/scripts/pre_setups/init_lava_only_with_node_rate_limit.sh similarity index 79% rename from scripts/pre_setups/init_lava_only_with_node_two_consumers.sh rename to scripts/pre_setups/init_lava_only_with_node_rate_limit.sh index 2ebffd14ea..4d35705eb0 100755 --- a/scripts/pre_setups/init_lava_only_with_node_two_consumers.sh +++ b/scripts/pre_setups/init_lava_only_with_node_rate_limit.sh @@ -20,7 +20,7 @@ echo "[Test Setup] sleeping 20 seconds for node to finish setup (if its not enou sleep 5 wait_for_lava_node_to_start -GASPRICE="0.000000001ulava" +GASPRICE="0.00002ulava" lavad tx gov submit-legacy-proposal spec-add ./cookbook/specs/ibc.json,./cookbook/specs/cosmoswasm.json,./cookbook/specs/tendermint.json,./cookbook/specs/cosmossdk.json,./cookbook/specs/cosmossdk_45.json,./cookbook/specs/cosmossdk_full.json,./cookbook/specs/ethermint.json,./cookbook/specs/ethereum.json,./cookbook/specs/cosmoshub.json,./cookbook/specs/lava.json,./cookbook/specs/osmosis.json,./cookbook/specs/fantom.json,./cookbook/specs/celo.json,./cookbook/specs/optimism.json,./cookbook/specs/arbitrum.json,./cookbook/specs/starknet.json,./cookbook/specs/aptos.json,./cookbook/specs/juno.json,./cookbook/specs/polygon.json,./cookbook/specs/evmos.json,./cookbook/specs/base.json,./cookbook/specs/canto.json,./cookbook/specs/sui.json,./cookbook/specs/solana.json,./cookbook/specs/bsc.json,./cookbook/specs/axelar.json,./cookbook/specs/avalanche.json,./cookbook/specs/fvm.json --lava-dev-test -y --from alice --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE & wait_next_block wait_next_block @@ -42,8 +42,6 @@ PROVIDER1_LISTENER="127.0.0.1:2220" lavad tx subscription buy DefaultPlan $(lavad keys show user1 -a) -y --from user1 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE wait_next_block -lavad tx subscription buy DefaultPlan $(lavad keys show user2 -a) -y --from user2 --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE -wait_next_block lavad tx pairing stake-provider "LAV1" $PROVIDERSTAKE "$PROVIDER1_LISTENER,1" 1 $(operator_address) -y --from servicer1 --provider-moniker "dummyMoniker" --gas-adjustment "1.5" --gas "auto" --gas-prices $GASPRICE sleep_until_next_epoch @@ -53,17 +51,13 @@ screen -d -m -S provider1 bash -c "source ~/.bashrc; lavap rpcprovider \ $PROVIDER1_LISTENER LAV1 rest '$LAVA_REST' \ $PROVIDER1_LISTENER LAV1 tendermintrpc '$LAVA_RPC,$LAVA_RPC_WS' \ $PROVIDER1_LISTENER LAV1 grpc '$LAVA_GRPC' \ -$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level trace --from servicer1 --chain-id lava --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 +$EXTRA_PROVIDER_FLAGS --geolocation 1 --log_level debug --from servicer1 --rate-limit-requests-per-second 10 --chain-id lava --metrics-listen-address ":7776" 2>&1 | tee $LOGS_DIR/PROVIDER1.log" && sleep 0.25 wait_next_block screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer \ 127.0.0.1:3360 LAV1 rest 127.0.0.1:3361 LAV1 tendermintrpc 127.0.0.1:3362 LAV1 grpc \ -$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level trace --from user1 --chain-id lava --add-api-method-metrics --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25 - -screen -d -m -S consumers2 bash -c "source ~/.bashrc; lavap rpcconsumer \ -127.0.0.1:3350 LAV1 rest 127.0.0.1:3351 LAV1 tendermintrpc 127.0.0.1:3352 LAV1 grpc \ -$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level trace --from user2 --chain-id lava --add-api-method-metrics --allow-insecure-provider-dialing --metrics-listen-address ":7773" 2>&1 | tee $LOGS_DIR/CONSUMERS2.log" && sleep 0.25 +$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level debug --from user1 --chain-id lava --add-api-method-metrics --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25 echo "--- setting up screens done ---" screen -ls \ No newline at end of file