Skip to content

Commit

Permalink
add metrics to rpcconsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
ranlavanet committed Apr 14, 2024
1 parent 13cecd9 commit 206bede
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 50 deletions.
2 changes: 1 addition & 1 deletion ecosystem/lavajs/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@lavanet/lavajs",
"version": "1.0.4",
"version": "1.2.2",
"description": "lavajs",
"author": "Lava Network",
"homepage": "https://github.com/lavanet/lava/tree/main/ecosystem/lavajs#readme",
Expand Down
12 changes: 10 additions & 2 deletions ecosystem/lavajs/scripts/codegen.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ telescope({
'cosmos/authz/v1beta1/authz.ts',
'cosmos/gov/v1beta1/tx.ts',
'cosmos/gov/v1beta1/gov.ts',
'cosmos/staking/v1beta1/staking.ts',
'tendermint/types/evidence.ts',
'cosmos/staking/v1beta1/tx.ts',
'cosmos/orm/query/v1alpha1/query.ts',
'tendermint/types/types.ts',
'tendermint/abci/types.ts',
'lavanet/lava/downtime/v1/genesis.ts',
'cosmos/upgrade/v1beta1/upgrade.ts',
'cosmos/staking/v1beta1/tx.amino.ts'
],
patterns: ['**/*amino.ts', '**/*registry.ts']
Expand Down Expand Up @@ -64,8 +72,8 @@ telescope({
]
},
methods: {
fromJSON: false,
toJSON: false,
fromJSON: true,
toJSON: true,
encode: true,
decode: true,
fromPartial: true,
Expand Down
27 changes: 17 additions & 10 deletions protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ func (csm *ConsumerSessionManager) UpdateAllProviders(epoch uint64, pairingList
csm.pairing[provider.PublicLavaAddress] = provider
}
csm.setValidAddressesToDefaultValue("", nil) // the starting point is that valid addresses are equal to pairing addresses.
csm.resetMetricsManager()
// reset session related metrics
csm.consumerMetricsManager.ResetSessionRelatedMetrics()
utils.LavaFormatDebug("updated providers", utils.Attribute{Key: "epoch", Value: epoch}, utils.Attribute{Key: "spec", Value: csm.rpcEndpoint.Key()})
return nil
}
Expand Down Expand Up @@ -440,6 +441,11 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS
} else {
// consumer session is locked and valid, we need to set the relayNumber and the relay cu. before returning.

// add metric to currently open sessions metric
info := csm.RPCEndpoint()
apiInterface := info.ApiInterface
chainId := info.ChainID
go csm.consumerMetricsManager.AddOpenSessionMetric(chainId, apiInterface, providerAddress)
// Successfully created/got a consumerSession.
if debug {
utils.LavaFormatDebug("Consumer get session",
Expand Down Expand Up @@ -921,16 +927,17 @@ func (csm *ConsumerSessionManager) updateMetricsManager(consumerSession *SingleC
qosEx := *consumerSession.QoSInfo.LastExcellenceQoSReport
lastQosExcellence = &qosEx
}
blockedSession := consumerSession.BlockListed
publicProviderAddress := consumerSession.Parent.PublicLavaAddress

go csm.consumerMetricsManager.SetQOSMetrics(chainId, apiInterface, consumerSession.Parent.PublicLavaAddress, lastQos, lastQosExcellence, consumerSession.LatestBlock, consumerSession.RelayNum)
}

// consumerSession should still be locked when accessing this method as it fetches information from the session it self
func (csm *ConsumerSessionManager) resetMetricsManager() {
if csm.consumerMetricsManager == nil {
return
}
csm.consumerMetricsManager.ResetQOSMetrics()
go func() {
csm.consumerMetricsManager.SetQOSMetrics(chainId, apiInterface, publicProviderAddress, lastQos, lastQosExcellence, consumerSession.LatestBlock, consumerSession.RelayNum)
// in case we blocked the session add it to our block sessions metric
if blockedSession {
csm.consumerMetricsManager.AddNumberOfBlockedSessionMetric(chainId, apiInterface, publicProviderAddress)
}
csm.consumerMetricsManager.DecrementOpenSessionMetric(chainId, apiInterface, publicProviderAddress)
}()
}

// Get the reported providers currently stored in the session manager.
Expand Down
147 changes: 117 additions & 30 deletions protocol/metrics/metrics_consumer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,26 @@ import (
)

type ConsumerMetricsManager struct {
totalCURequestedMetric *prometheus.CounterVec
totalRelaysRequestedMetric *prometheus.CounterVec
totalErroredMetric *prometheus.CounterVec
blockMetric *prometheus.GaugeVec
latencyMetric *prometheus.GaugeVec
qosMetric *prometheus.GaugeVec
qosExcellenceMetric *prometheus.GaugeVec
LatestBlockMetric *prometheus.GaugeVec
LatestProviderRelay *prometheus.GaugeVec
virtualEpochMetric *prometheus.GaugeVec
endpointsHealthChecksOkMetric prometheus.Gauge
endpointsHealthChecksOk uint64
lock sync.Mutex
protocolVersionMetric *prometheus.GaugeVec
providerRelays map[string]uint64
totalCURequestedMetric *prometheus.CounterVec
totalRelaysRequestedMetric *prometheus.CounterVec
totalErroredMetric *prometheus.CounterVec
totalRelaysSentToProvidersMetric *prometheus.CounterVec
totalRelaysReturnedFromProvidersMetric *prometheus.CounterVec
totalRelaysSentByNewBatchTickerMetric *prometheus.CounterVec
currentNumberOfOpenSessionsMetric *prometheus.GaugeVec
currentNumberOfBlockedSessionsMetric *prometheus.GaugeVec
blockMetric *prometheus.GaugeVec
latencyMetric *prometheus.GaugeVec
qosMetric *prometheus.GaugeVec
qosExcellenceMetric *prometheus.GaugeVec
LatestBlockMetric *prometheus.GaugeVec
LatestProviderRelay *prometheus.GaugeVec
virtualEpochMetric *prometheus.GaugeVec
endpointsHealthChecksOkMetric prometheus.Gauge
endpointsHealthChecksOk uint64
lock sync.Mutex
protocolVersionMetric *prometheus.GaugeVec
providerRelays map[string]uint64
}

func NewConsumerMetricsManager(networkAddress string) *ConsumerMetricsManager {
Expand All @@ -46,6 +51,27 @@ func NewConsumerMetricsManager(networkAddress string) *ConsumerMetricsManager {
Help: "The total number of relays serviced by the consumer over time.",
}, []string{"spec", "apiInterface"})

totalRelaysSentToProvidersMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "lava_consumer_total_relays_sent_to_providers",
Help: "The total number of relays sent to providers",
}, []string{"spec", "apiInterface"})
totalRelaysReturnedFromProvidersMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "lava_consumer_total_relays_returned_from_providers",
Help: "The total number of relays returned from providers",
}, []string{"spec", "apiInterface"})
totalRelaysSentByNewBatchTickerMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "lava_consumer_total_relays_sent_by_batch_ticker",
Help: "The total number of relays sent by the batch ticker",
}, []string{"spec", "apiInterface"})
currentNumberOfOpenSessionsMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_consumer_current_number_of_open_sessions",
Help: "The total number of currently open sessions",
}, []string{"spec", "apiInterface", "provider"})
currentNumberOfBlockedSessionsMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_consumer_current_number_of_blocked_sessions",
Help: "The total number of currently blocked sessions",
}, []string{"spec", "apiInterface", "provider"})

// Create a new GaugeVec metric to represent the TotalErrored over time.
totalErroredMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "lava_consumer_total_errored",
Expand Down Expand Up @@ -106,22 +132,33 @@ func NewConsumerMetricsManager(networkAddress string) *ConsumerMetricsManager {
prometheus.MustRegister(virtualEpochMetric)
prometheus.MustRegister(endpointsHealthChecksOkMetric)
prometheus.MustRegister(protocolVersionMetric)
// metrics related to session management
prometheus.MustRegister(totalRelaysSentToProvidersMetric)
prometheus.MustRegister(totalRelaysReturnedFromProvidersMetric)
prometheus.MustRegister(totalRelaysSentByNewBatchTickerMetric)
prometheus.MustRegister(currentNumberOfOpenSessionsMetric)
prometheus.MustRegister(currentNumberOfBlockedSessionsMetric)

consumerMetricsManager := &ConsumerMetricsManager{
totalCURequestedMetric: totalCURequestedMetric,
totalRelaysRequestedMetric: totalRelaysRequestedMetric,
totalErroredMetric: totalErroredMetric,
blockMetric: blockMetric,
latencyMetric: latencyMetric,
qosMetric: qosMetric,
qosExcellenceMetric: qosExcellenceMetric,
LatestBlockMetric: latestBlockMetric,
LatestProviderRelay: latestProviderRelay,
providerRelays: map[string]uint64{},
virtualEpochMetric: virtualEpochMetric,
endpointsHealthChecksOkMetric: endpointsHealthChecksOkMetric,
endpointsHealthChecksOk: 1,
protocolVersionMetric: protocolVersionMetric,
totalCURequestedMetric: totalCURequestedMetric,
totalRelaysRequestedMetric: totalRelaysRequestedMetric,
totalErroredMetric: totalErroredMetric,
blockMetric: blockMetric,
latencyMetric: latencyMetric,
qosMetric: qosMetric,
qosExcellenceMetric: qosExcellenceMetric,
LatestBlockMetric: latestBlockMetric,
LatestProviderRelay: latestProviderRelay,
providerRelays: map[string]uint64{},
virtualEpochMetric: virtualEpochMetric,
endpointsHealthChecksOkMetric: endpointsHealthChecksOkMetric,
endpointsHealthChecksOk: 1,
protocolVersionMetric: protocolVersionMetric,
totalRelaysSentToProvidersMetric: totalRelaysSentToProvidersMetric,
totalRelaysReturnedFromProvidersMetric: totalRelaysReturnedFromProvidersMetric,
totalRelaysSentByNewBatchTickerMetric: totalRelaysSentByNewBatchTickerMetric,
currentNumberOfOpenSessionsMetric: currentNumberOfOpenSessionsMetric,
currentNumberOfBlockedSessionsMetric: currentNumberOfBlockedSessionsMetric,
}

http.Handle("/metrics", promhttp.Handler())
Expand Down Expand Up @@ -170,6 +207,54 @@ func (pme *ConsumerMetricsManager) SetRelayMetrics(relayMetric *RelayMetrics, er
}
}

func (pme *ConsumerMetricsManager) SetRelaySentToProviderMetric(chainId string, apiInterface string) {
if pme == nil {
return
}
pme.totalRelaysSentToProvidersMetric.WithLabelValues(chainId, apiInterface).Inc()
}

func (pme *ConsumerMetricsManager) SetRelayReturnedFromProviderMetric(chainId string, apiInterface string) {
if pme == nil {
return
}
pme.totalRelaysReturnedFromProvidersMetric.WithLabelValues(chainId, apiInterface).Inc()
}

func (pme *ConsumerMetricsManager) SetRelaySentByNewBatchTickerMetric(chainId string, apiInterface string) {
if pme == nil {
return
}
pme.totalRelaysSentByNewBatchTickerMetric.WithLabelValues(chainId, apiInterface).Inc()
}

func (pme *ConsumerMetricsManager) AddOpenSessionMetric(chainId string, apiInterface string, provider string) {
if pme == nil {
return
}
pme.lock.Lock()
defer pme.lock.Unlock()
pme.currentNumberOfOpenSessionsMetric.WithLabelValues(chainId, apiInterface, provider).Inc()
}

func (pme *ConsumerMetricsManager) DecrementOpenSessionMetric(chainId string, apiInterface string, provider string) {
if pme == nil {
return
}
pme.lock.Lock()
defer pme.lock.Unlock()
pme.currentNumberOfOpenSessionsMetric.WithLabelValues(chainId, apiInterface, provider).Dec()
}

func (pme *ConsumerMetricsManager) AddNumberOfBlockedSessionMetric(chainId string, apiInterface string, provider string) {
if pme == nil {
return
}
pme.lock.Lock()
defer pme.lock.Unlock()
pme.currentNumberOfBlockedSessionsMetric.WithLabelValues(chainId, apiInterface, provider).Inc()
}

func (pme *ConsumerMetricsManager) SetQOSMetrics(chainId string, apiInterface string, providerAddress string, qos *pairingtypes.QualityOfServiceReport, qosExcellence *pairingtypes.QualityOfServiceReport, latestBlock int64, relays uint64) {
if pme == nil {
return
Expand Down Expand Up @@ -239,14 +324,16 @@ func (pme *ConsumerMetricsManager) UpdateHealthCheckStatus(status bool) {
atomic.StoreUint64(&pme.endpointsHealthChecksOk, uint64(value))
}

func (pme *ConsumerMetricsManager) ResetQOSMetrics() {
func (pme *ConsumerMetricsManager) ResetSessionRelatedMetrics() {
if pme == nil {
return
}
pme.lock.Lock()
defer pme.lock.Unlock()
pme.qosMetric.Reset()
pme.qosExcellenceMetric.Reset()
pme.currentNumberOfBlockedSessionsMetric.Reset()
pme.currentNumberOfOpenSessionsMetric.Reset()
pme.providerRelays = map[string]uint64{}
}

Expand Down
10 changes: 10 additions & 0 deletions protocol/metrics/rpcconsumerlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,16 @@ func (rpccl *RPCConsumerLogs) shouldCountMetrics(refererHeaderValue string, user
return true
}

func (rpccl *RPCConsumerLogs) SetRelaySentToProviderMetric(chainid string, apiInterface string) {
rpccl.consumerMetricsManager.SetRelaySentToProviderMetric(chainid, apiInterface)
}

Check failure on line 216 in protocol/metrics/rpcconsumerlogs.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed (gofumpt)
func (rpccl *RPCConsumerLogs) SetRelayReturnedFromProviderMetric(chainid string, apiInterface string) {
rpccl.consumerMetricsManager.SetRelayReturnedFromProviderMetric(chainid, apiInterface)
}

Check failure on line 219 in protocol/metrics/rpcconsumerlogs.go

View workflow job for this annotation

GitHub Actions / lint

File is not `gofumpt`-ed (gofumpt)
func (rpccl *RPCConsumerLogs) SetRelaySentByNewBatchTickerMetric(chainid string, apiInterface string) {
rpccl.consumerMetricsManager.SetRelaySentByNewBatchTickerMetric(chainid, apiInterface)
}

func (rpccl *RPCConsumerLogs) SendMetrics(data *RelayMetrics, err error, origin string) {
data.Success = err == nil
data.Origin = origin
Expand Down
Loading

0 comments on commit 206bede

Please sign in to comment.