From 206bedecafad76c42eddf801f13d282c869c2eb4 Mon Sep 17 00:00:00 2001 From: Ran Mishael Date: Sun, 14 Apr 2024 15:11:04 +0200 Subject: [PATCH] add metrics to rpcconsumer --- ecosystem/lavajs/package.json | 2 +- ecosystem/lavajs/scripts/codegen.js | 12 +- .../lavasession/consumer_session_manager.go | 27 ++-- protocol/metrics/metrics_consumer_manager.go | 147 ++++++++++++++---- protocol/metrics/rpcconsumerlogs.go | 10 ++ protocol/rpcconsumer/rpcconsumer_server.go | 26 +++- .../pre_setups/init_lava_only_with_node.sh | 2 +- 7 files changed, 176 insertions(+), 50 deletions(-) diff --git a/ecosystem/lavajs/package.json b/ecosystem/lavajs/package.json index d64a99875b..5bf709e11b 100644 --- a/ecosystem/lavajs/package.json +++ b/ecosystem/lavajs/package.json @@ -1,6 +1,6 @@ { "name": "@lavanet/lavajs", - "version": "1.0.4", + "version": "1.2.2", "description": "lavajs", "author": "Lava Network", "homepage": "https://github.com/lavanet/lava/tree/main/ecosystem/lavajs#readme", diff --git a/ecosystem/lavajs/scripts/codegen.js b/ecosystem/lavajs/scripts/codegen.js index 5b26bf7cf5..71f140f4c6 100644 --- a/ecosystem/lavajs/scripts/codegen.js +++ b/ecosystem/lavajs/scripts/codegen.js @@ -18,6 +18,14 @@ telescope({ 'cosmos/authz/v1beta1/authz.ts', 'cosmos/gov/v1beta1/tx.ts', 'cosmos/gov/v1beta1/gov.ts', + 'cosmos/staking/v1beta1/staking.ts', + 'tendermint/types/evidence.ts', + 'cosmos/staking/v1beta1/tx.ts', + 'cosmos/orm/query/v1alpha1/query.ts', + 'tendermint/types/types.ts', + 'tendermint/abci/types.ts', + 'lavanet/lava/downtime/v1/genesis.ts', + 'cosmos/upgrade/v1beta1/upgrade.ts', 'cosmos/staking/v1beta1/tx.amino.ts' ], patterns: ['**/*amino.ts', '**/*registry.ts'] @@ -64,8 +72,8 @@ telescope({ ] }, methods: { - fromJSON: false, - toJSON: false, + fromJSON: true, + toJSON: true, encode: true, decode: true, fromPartial: true, diff --git a/protocol/lavasession/consumer_session_manager.go b/protocol/lavasession/consumer_session_manager.go index d552ff80ea..9e03b71b80 100644 --- a/protocol/lavasession/consumer_session_manager.go +++ b/protocol/lavasession/consumer_session_manager.go @@ -97,7 +97,8 @@ func (csm *ConsumerSessionManager) UpdateAllProviders(epoch uint64, pairingList csm.pairing[provider.PublicLavaAddress] = provider } csm.setValidAddressesToDefaultValue("", nil) // the starting point is that valid addresses are equal to pairing addresses. - csm.resetMetricsManager() + // reset session related metrics + csm.consumerMetricsManager.ResetSessionRelatedMetrics() utils.LavaFormatDebug("updated providers", utils.Attribute{Key: "epoch", Value: epoch}, utils.Attribute{Key: "spec", Value: csm.rpcEndpoint.Key()}) return nil } @@ -440,6 +441,11 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS } else { // consumer session is locked and valid, we need to set the relayNumber and the relay cu. before returning. + // add metric to currently open sessions metric + info := csm.RPCEndpoint() + apiInterface := info.ApiInterface + chainId := info.ChainID + go csm.consumerMetricsManager.AddOpenSessionMetric(chainId, apiInterface, providerAddress) // Successfully created/got a consumerSession. if debug { utils.LavaFormatDebug("Consumer get session", @@ -921,16 +927,17 @@ func (csm *ConsumerSessionManager) updateMetricsManager(consumerSession *SingleC qosEx := *consumerSession.QoSInfo.LastExcellenceQoSReport lastQosExcellence = &qosEx } + blockedSession := consumerSession.BlockListed + publicProviderAddress := consumerSession.Parent.PublicLavaAddress - go csm.consumerMetricsManager.SetQOSMetrics(chainId, apiInterface, consumerSession.Parent.PublicLavaAddress, lastQos, lastQosExcellence, consumerSession.LatestBlock, consumerSession.RelayNum) -} - -// consumerSession should still be locked when accessing this method as it fetches information from the session it self -func (csm *ConsumerSessionManager) resetMetricsManager() { - if csm.consumerMetricsManager == nil { - return - } - csm.consumerMetricsManager.ResetQOSMetrics() + go func() { + csm.consumerMetricsManager.SetQOSMetrics(chainId, apiInterface, publicProviderAddress, lastQos, lastQosExcellence, consumerSession.LatestBlock, consumerSession.RelayNum) + // in case we blocked the session add it to our block sessions metric + if blockedSession { + csm.consumerMetricsManager.AddNumberOfBlockedSessionMetric(chainId, apiInterface, publicProviderAddress) + } + csm.consumerMetricsManager.DecrementOpenSessionMetric(chainId, apiInterface, publicProviderAddress) + }() } // Get the reported providers currently stored in the session manager. diff --git a/protocol/metrics/metrics_consumer_manager.go b/protocol/metrics/metrics_consumer_manager.go index e912326be3..3252dcdc9a 100644 --- a/protocol/metrics/metrics_consumer_manager.go +++ b/protocol/metrics/metrics_consumer_manager.go @@ -13,21 +13,26 @@ import ( ) type ConsumerMetricsManager struct { - totalCURequestedMetric *prometheus.CounterVec - totalRelaysRequestedMetric *prometheus.CounterVec - totalErroredMetric *prometheus.CounterVec - blockMetric *prometheus.GaugeVec - latencyMetric *prometheus.GaugeVec - qosMetric *prometheus.GaugeVec - qosExcellenceMetric *prometheus.GaugeVec - LatestBlockMetric *prometheus.GaugeVec - LatestProviderRelay *prometheus.GaugeVec - virtualEpochMetric *prometheus.GaugeVec - endpointsHealthChecksOkMetric prometheus.Gauge - endpointsHealthChecksOk uint64 - lock sync.Mutex - protocolVersionMetric *prometheus.GaugeVec - providerRelays map[string]uint64 + totalCURequestedMetric *prometheus.CounterVec + totalRelaysRequestedMetric *prometheus.CounterVec + totalErroredMetric *prometheus.CounterVec + totalRelaysSentToProvidersMetric *prometheus.CounterVec + totalRelaysReturnedFromProvidersMetric *prometheus.CounterVec + totalRelaysSentByNewBatchTickerMetric *prometheus.CounterVec + currentNumberOfOpenSessionsMetric *prometheus.GaugeVec + currentNumberOfBlockedSessionsMetric *prometheus.GaugeVec + blockMetric *prometheus.GaugeVec + latencyMetric *prometheus.GaugeVec + qosMetric *prometheus.GaugeVec + qosExcellenceMetric *prometheus.GaugeVec + LatestBlockMetric *prometheus.GaugeVec + LatestProviderRelay *prometheus.GaugeVec + virtualEpochMetric *prometheus.GaugeVec + endpointsHealthChecksOkMetric prometheus.Gauge + endpointsHealthChecksOk uint64 + lock sync.Mutex + protocolVersionMetric *prometheus.GaugeVec + providerRelays map[string]uint64 } func NewConsumerMetricsManager(networkAddress string) *ConsumerMetricsManager { @@ -46,6 +51,27 @@ func NewConsumerMetricsManager(networkAddress string) *ConsumerMetricsManager { Help: "The total number of relays serviced by the consumer over time.", }, []string{"spec", "apiInterface"}) + totalRelaysSentToProvidersMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "lava_consumer_total_relays_sent_to_providers", + Help: "The total number of relays sent to providers", + }, []string{"spec", "apiInterface"}) + totalRelaysReturnedFromProvidersMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "lava_consumer_total_relays_returned_from_providers", + Help: "The total number of relays returned from providers", + }, []string{"spec", "apiInterface"}) + totalRelaysSentByNewBatchTickerMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "lava_consumer_total_relays_sent_by_batch_ticker", + Help: "The total number of relays sent by the batch ticker", + }, []string{"spec", "apiInterface"}) + currentNumberOfOpenSessionsMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_current_number_of_open_sessions", + Help: "The total number of currently open sessions", + }, []string{"spec", "apiInterface", "provider"}) + currentNumberOfBlockedSessionsMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_consumer_current_number_of_blocked_sessions", + Help: "The total number of currently blocked sessions", + }, []string{"spec", "apiInterface", "provider"}) + // Create a new GaugeVec metric to represent the TotalErrored over time. totalErroredMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "lava_consumer_total_errored", @@ -106,22 +132,33 @@ func NewConsumerMetricsManager(networkAddress string) *ConsumerMetricsManager { prometheus.MustRegister(virtualEpochMetric) prometheus.MustRegister(endpointsHealthChecksOkMetric) prometheus.MustRegister(protocolVersionMetric) + // metrics related to session management + prometheus.MustRegister(totalRelaysSentToProvidersMetric) + prometheus.MustRegister(totalRelaysReturnedFromProvidersMetric) + prometheus.MustRegister(totalRelaysSentByNewBatchTickerMetric) + prometheus.MustRegister(currentNumberOfOpenSessionsMetric) + prometheus.MustRegister(currentNumberOfBlockedSessionsMetric) consumerMetricsManager := &ConsumerMetricsManager{ - totalCURequestedMetric: totalCURequestedMetric, - totalRelaysRequestedMetric: totalRelaysRequestedMetric, - totalErroredMetric: totalErroredMetric, - blockMetric: blockMetric, - latencyMetric: latencyMetric, - qosMetric: qosMetric, - qosExcellenceMetric: qosExcellenceMetric, - LatestBlockMetric: latestBlockMetric, - LatestProviderRelay: latestProviderRelay, - providerRelays: map[string]uint64{}, - virtualEpochMetric: virtualEpochMetric, - endpointsHealthChecksOkMetric: endpointsHealthChecksOkMetric, - endpointsHealthChecksOk: 1, - protocolVersionMetric: protocolVersionMetric, + totalCURequestedMetric: totalCURequestedMetric, + totalRelaysRequestedMetric: totalRelaysRequestedMetric, + totalErroredMetric: totalErroredMetric, + blockMetric: blockMetric, + latencyMetric: latencyMetric, + qosMetric: qosMetric, + qosExcellenceMetric: qosExcellenceMetric, + LatestBlockMetric: latestBlockMetric, + LatestProviderRelay: latestProviderRelay, + providerRelays: map[string]uint64{}, + virtualEpochMetric: virtualEpochMetric, + endpointsHealthChecksOkMetric: endpointsHealthChecksOkMetric, + endpointsHealthChecksOk: 1, + protocolVersionMetric: protocolVersionMetric, + totalRelaysSentToProvidersMetric: totalRelaysSentToProvidersMetric, + totalRelaysReturnedFromProvidersMetric: totalRelaysReturnedFromProvidersMetric, + totalRelaysSentByNewBatchTickerMetric: totalRelaysSentByNewBatchTickerMetric, + currentNumberOfOpenSessionsMetric: currentNumberOfOpenSessionsMetric, + currentNumberOfBlockedSessionsMetric: currentNumberOfBlockedSessionsMetric, } http.Handle("/metrics", promhttp.Handler()) @@ -170,6 +207,54 @@ func (pme *ConsumerMetricsManager) SetRelayMetrics(relayMetric *RelayMetrics, er } } +func (pme *ConsumerMetricsManager) SetRelaySentToProviderMetric(chainId string, apiInterface string) { + if pme == nil { + return + } + pme.totalRelaysSentToProvidersMetric.WithLabelValues(chainId, apiInterface).Inc() +} + +func (pme *ConsumerMetricsManager) SetRelayReturnedFromProviderMetric(chainId string, apiInterface string) { + if pme == nil { + return + } + pme.totalRelaysReturnedFromProvidersMetric.WithLabelValues(chainId, apiInterface).Inc() +} + +func (pme *ConsumerMetricsManager) SetRelaySentByNewBatchTickerMetric(chainId string, apiInterface string) { + if pme == nil { + return + } + pme.totalRelaysSentByNewBatchTickerMetric.WithLabelValues(chainId, apiInterface).Inc() +} + +func (pme *ConsumerMetricsManager) AddOpenSessionMetric(chainId string, apiInterface string, provider string) { + if pme == nil { + return + } + pme.lock.Lock() + defer pme.lock.Unlock() + pme.currentNumberOfOpenSessionsMetric.WithLabelValues(chainId, apiInterface, provider).Inc() +} + +func (pme *ConsumerMetricsManager) DecrementOpenSessionMetric(chainId string, apiInterface string, provider string) { + if pme == nil { + return + } + pme.lock.Lock() + defer pme.lock.Unlock() + pme.currentNumberOfOpenSessionsMetric.WithLabelValues(chainId, apiInterface, provider).Dec() +} + +func (pme *ConsumerMetricsManager) AddNumberOfBlockedSessionMetric(chainId string, apiInterface string, provider string) { + if pme == nil { + return + } + pme.lock.Lock() + defer pme.lock.Unlock() + pme.currentNumberOfBlockedSessionsMetric.WithLabelValues(chainId, apiInterface, provider).Inc() +} + func (pme *ConsumerMetricsManager) SetQOSMetrics(chainId string, apiInterface string, providerAddress string, qos *pairingtypes.QualityOfServiceReport, qosExcellence *pairingtypes.QualityOfServiceReport, latestBlock int64, relays uint64) { if pme == nil { return @@ -239,7 +324,7 @@ func (pme *ConsumerMetricsManager) UpdateHealthCheckStatus(status bool) { atomic.StoreUint64(&pme.endpointsHealthChecksOk, uint64(value)) } -func (pme *ConsumerMetricsManager) ResetQOSMetrics() { +func (pme *ConsumerMetricsManager) ResetSessionRelatedMetrics() { if pme == nil { return } @@ -247,6 +332,8 @@ func (pme *ConsumerMetricsManager) ResetQOSMetrics() { defer pme.lock.Unlock() pme.qosMetric.Reset() pme.qosExcellenceMetric.Reset() + pme.currentNumberOfBlockedSessionsMetric.Reset() + pme.currentNumberOfOpenSessionsMetric.Reset() pme.providerRelays = map[string]uint64{} } diff --git a/protocol/metrics/rpcconsumerlogs.go b/protocol/metrics/rpcconsumerlogs.go index 746d95e1a2..eec377aa9b 100644 --- a/protocol/metrics/rpcconsumerlogs.go +++ b/protocol/metrics/rpcconsumerlogs.go @@ -211,6 +211,16 @@ func (rpccl *RPCConsumerLogs) shouldCountMetrics(refererHeaderValue string, user return true } +func (rpccl *RPCConsumerLogs) SetRelaySentToProviderMetric(chainid string, apiInterface string) { + rpccl.consumerMetricsManager.SetRelaySentToProviderMetric(chainid, apiInterface) +} +func (rpccl *RPCConsumerLogs) SetRelayReturnedFromProviderMetric(chainid string, apiInterface string) { + rpccl.consumerMetricsManager.SetRelayReturnedFromProviderMetric(chainid, apiInterface) +} +func (rpccl *RPCConsumerLogs) SetRelaySentByNewBatchTickerMetric(chainid string, apiInterface string) { + rpccl.consumerMetricsManager.SetRelaySentByNewBatchTickerMetric(chainid, apiInterface) +} + func (rpccl *RPCConsumerLogs) SendMetrics(data *RelayMetrics, err error, origin string) { data.Success = err == nil data.Origin = origin diff --git a/protocol/rpcconsumer/rpcconsumer_server.go b/protocol/rpcconsumer/rpcconsumer_server.go index 0799e93c5c..b325d00ceb 100644 --- a/protocol/rpcconsumer/rpcconsumer_server.go +++ b/protocol/rpcconsumer/rpcconsumer_server.go @@ -325,6 +325,10 @@ func (rpccs *RPCConsumerServer) SendRelay( return returnedResult, nil } +func (rpccs *RPCConsumerServer) getChainIdAndApiInterface() (string, string) { + return rpccs.listenEndpoint.ChainID, rpccs.listenEndpoint.ApiInterface +} + func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, directiveHeaders map[string]string, chainMessage chainlib.ChainMessage, relayRequestData *pairingtypes.RelayPrivateData, dappID string, consumerIp string) (*RelayProcessor, error) { // make sure all of the child contexts are cancelled when we exit ctx, cancel := context.WithCancel(ctx) @@ -392,6 +396,8 @@ func (rpccs *RPCConsumerServer) ProcessRelaySend(ctx context.Context, directiveH if relayProcessor.selection != BestResult { err := rpccs.sendRelayToProvider(ctx, chainMessage, relayRequestData, dappID, consumerIp, relayProcessor) go validateReturnCondition(err) + // add ticker launch metrics + go rpccs.rpcConsumerLogs.SetRelaySentByNewBatchTickerMetric(rpccs.getChainIdAndApiInterface()) } case returnErr := <-returnCondition: // we use this channel because there could be a race condition between us releasing the provider and about to send the return @@ -443,7 +449,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( } privKey := rpccs.privKey - chainID := rpccs.listenEndpoint.ChainID + chainId, apiInterface := rpccs.getChainIdAndApiInterface() lavaChainID := rpccs.lavaChainID // Get Session. we get session here so we can use the epoch in the callbacks @@ -454,7 +460,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( if rpccs.cache.CacheActive() { // use cache only if its defined. if reqBlock != spectypes.NOT_APPLICABLE || !chainMessage.GetForceCacheRefresh() { var cacheReply *pairingtypes.CacheRelayReply - hashKey, outputFormatter, err := chainlib.HashCacheRequest(relayRequestData, chainID) + hashKey, outputFormatter, err := chainlib.HashCacheRequest(relayRequestData, chainId) if err != nil { utils.LavaFormatError("sendRelayToProvider Failed getting Hash for cache request", err) } else { @@ -462,7 +468,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( cacheReply, cacheError = rpccs.cache.GetEntry(cacheCtx, &pairingtypes.RelayCacheGet{ RequestHash: hashKey, RequestedBlock: relayRequestData.RequestBlock, - ChainId: chainID, + ChainId: chainId, BlockHash: nil, Finalized: false, SharedStateId: sharedStateId, @@ -532,8 +538,10 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( // Iterate over the sessions map for providerPublicAddress, sessionInfo := range sessions { + // Launch a separate goroutine for each session go func(providerPublicAddress string, sessionInfo *lavasession.SessionInfo) { + // add ticker launch metrics localRelayResult := &common.RelayResult{ ProviderInfo: common.ProviderInfo{ProviderAddress: providerPublicAddress, ProviderStake: sessionInfo.StakeSize, ProviderQoSExcellenceSummery: sessionInfo.QoSSummeryResult}, Finalized: false, @@ -549,6 +557,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( if found { goroutineCtx = utils.WithUniqueIdentifier(goroutineCtx, guid) } + defer func() { // Return response relayProcessor.SetResponse(&relayResponse{ @@ -567,7 +576,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( epoch := sessionInfo.Epoch reportedProviders := sessionInfo.ReportedProviders - relayRequest, errResponse := lavaprotocol.ConstructRelayRequest(goroutineCtx, privKey, lavaChainID, chainID, &localRelayRequestData, providerPublicAddress, singleConsumerSession, int64(epoch), reportedProviders) + relayRequest, errResponse := lavaprotocol.ConstructRelayRequest(goroutineCtx, privKey, lavaChainID, chainId, &localRelayRequestData, providerPublicAddress, singleConsumerSession, int64(epoch), reportedProviders) if errResponse != nil { utils.LavaFormatError("Failed ConstructRelayRequest", errResponse, utils.LogAttr("Request data", localRelayRequestData)) return @@ -575,6 +584,10 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( localRelayResult.Request = relayRequest endpointClient := *singleConsumerSession.Endpoint.Client + // add metrics (send and receive) + go rpccs.rpcConsumerLogs.SetRelaySentToProviderMetric(chainId, apiInterface) + defer func() { go rpccs.rpcConsumerLogs.SetRelayReturnedFromProviderMetric(chainId, apiInterface) }() + if isSubscription { errResponse = rpccs.relaySubscriptionInner(goroutineCtx, endpointClient, singleConsumerSession, localRelayResult) if errResponse != nil { @@ -586,6 +599,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( // unique per dappId and ip consumerToken := common.GetUniqueToken(dappID, consumerIp) processingTimeout, relayTimeout := rpccs.getProcessingTimeout(chainMessage) + // send relay relayLatency, errResponse, backoff := rpccs.relayInner(goroutineCtx, singleConsumerSession, localRelayResult, processingTimeout, chainMessage, consumerToken) if errResponse != nil { failRelaySession := func(origErr error, backoff_ bool) { @@ -642,7 +656,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( requestedBlock := localRelayResult.Request.RelayData.RequestBlock // get requested block before removing it from the data seenBlock := localRelayResult.Request.RelayData.SeenBlock // get seen block before removing it from the data - hashKey, _, hashErr := chainlib.HashCacheRequest(localRelayResult.Request.RelayData, chainID) // get the hash (this changes the data) + hashKey, _, hashErr := chainlib.HashCacheRequest(localRelayResult.Request.RelayData, chainId) // get the hash (this changes the data) go func() { // deal with copying error. @@ -665,7 +679,7 @@ func (rpccs *RPCConsumerServer) sendRelayToProvider( err2 := rpccs.cache.SetEntry(new_ctx, &pairingtypes.RelayCacheSet{ RequestHash: hashKey, - ChainId: chainID, + ChainId: chainId, RequestedBlock: requestedBlock, SeenBlock: seenBlock, BlockHash: nil, // consumer cache doesn't care about block hashes diff --git a/scripts/pre_setups/init_lava_only_with_node.sh b/scripts/pre_setups/init_lava_only_with_node.sh index ee1ceb2099..883730c934 100755 --- a/scripts/pre_setups/init_lava_only_with_node.sh +++ b/scripts/pre_setups/init_lava_only_with_node.sh @@ -57,7 +57,7 @@ wait_next_block screen -d -m -S consumers bash -c "source ~/.bashrc; lavap rpcconsumer \ 127.0.0.1:3360 LAV1 rest 127.0.0.1:3361 LAV1 tendermintrpc 127.0.0.1:3362 LAV1 grpc \ -$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level debug --from user1 --chain-id lava --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25 +$EXTRA_PORTAL_FLAGS --geolocation 1 --log_level debug --from user1 --chain-id lava --pprof-address "127.0.0.1:6060" --allow-insecure-provider-dialing --metrics-listen-address ":7779" 2>&1 | tee $LOGS_DIR/CONSUMERS.log" && sleep 0.25 echo "--- setting up screens done ---" screen -ls \ No newline at end of file