Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: PRT-add metrics to rpcconsumer sessions #1374

Merged
merged 3 commits into from
Apr 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ecosystem/lavajs/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@lavanet/lavajs",
"version": "1.0.4",
"version": "1.2.2",
"description": "lavajs",
"author": "Lava Network",
"homepage": "https://github.com/lavanet/lava/tree/main/ecosystem/lavajs#readme",
Expand Down
12 changes: 10 additions & 2 deletions ecosystem/lavajs/scripts/codegen.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ telescope({
'cosmos/authz/v1beta1/authz.ts',
'cosmos/gov/v1beta1/tx.ts',
'cosmos/gov/v1beta1/gov.ts',
'cosmos/staking/v1beta1/staking.ts',
'tendermint/types/evidence.ts',
'cosmos/staking/v1beta1/tx.ts',
'cosmos/orm/query/v1alpha1/query.ts',
'tendermint/types/types.ts',
'tendermint/abci/types.ts',
'lavanet/lava/downtime/v1/genesis.ts',
'cosmos/upgrade/v1beta1/upgrade.ts',
'cosmos/staking/v1beta1/tx.amino.ts'
],
patterns: ['**/*amino.ts', '**/*registry.ts']
Expand Down Expand Up @@ -64,8 +72,8 @@ telescope({
]
},
methods: {
fromJSON: false,
toJSON: false,
fromJSON: true,
toJSON: true,
encode: true,
decode: true,
fromPartial: true,
Expand Down
27 changes: 17 additions & 10 deletions protocol/lavasession/consumer_session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ func (csm *ConsumerSessionManager) UpdateAllProviders(epoch uint64, pairingList
csm.pairing[provider.PublicLavaAddress] = provider
}
csm.setValidAddressesToDefaultValue("", nil) // the starting point is that valid addresses are equal to pairing addresses.
csm.resetMetricsManager()
// reset session related metrics
csm.consumerMetricsManager.ResetSessionRelatedMetrics()
utils.LavaFormatDebug("updated providers", utils.Attribute{Key: "epoch", Value: epoch}, utils.Attribute{Key: "spec", Value: csm.rpcEndpoint.Key()})
return nil
}
Expand Down Expand Up @@ -440,6 +441,11 @@ func (csm *ConsumerSessionManager) GetSessions(ctx context.Context, cuNeededForS
} else {
// consumer session is locked and valid, we need to set the relayNumber and the relay cu. before returning.

// add metric to currently open sessions metric
info := csm.RPCEndpoint()
apiInterface := info.ApiInterface
chainId := info.ChainID
go csm.consumerMetricsManager.AddOpenSessionMetric(chainId, apiInterface, providerAddress)
// Successfully created/got a consumerSession.
if debug {
utils.LavaFormatDebug("Consumer get session",
Expand Down Expand Up @@ -921,16 +927,17 @@ func (csm *ConsumerSessionManager) updateMetricsManager(consumerSession *SingleC
qosEx := *consumerSession.QoSInfo.LastExcellenceQoSReport
lastQosExcellence = &qosEx
}
blockedSession := consumerSession.BlockListed
publicProviderAddress := consumerSession.Parent.PublicLavaAddress

go csm.consumerMetricsManager.SetQOSMetrics(chainId, apiInterface, consumerSession.Parent.PublicLavaAddress, lastQos, lastQosExcellence, consumerSession.LatestBlock, consumerSession.RelayNum)
}

// consumerSession should still be locked when accessing this method as it fetches information from the session it self
func (csm *ConsumerSessionManager) resetMetricsManager() {
if csm.consumerMetricsManager == nil {
return
}
csm.consumerMetricsManager.ResetQOSMetrics()
go func() {
csm.consumerMetricsManager.SetQOSMetrics(chainId, apiInterface, publicProviderAddress, lastQos, lastQosExcellence, consumerSession.LatestBlock, consumerSession.RelayNum)
// in case we blocked the session add it to our block sessions metric
if blockedSession {
csm.consumerMetricsManager.AddNumberOfBlockedSessionMetric(chainId, apiInterface, publicProviderAddress)
}
csm.consumerMetricsManager.DecrementOpenSessionMetric(chainId, apiInterface, publicProviderAddress)
}()
}

// Get the reported providers currently stored in the session manager.
Expand Down
147 changes: 117 additions & 30 deletions protocol/metrics/metrics_consumer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,26 @@ import (
)

type ConsumerMetricsManager struct {
totalCURequestedMetric *prometheus.CounterVec
totalRelaysRequestedMetric *prometheus.CounterVec
totalErroredMetric *prometheus.CounterVec
blockMetric *prometheus.GaugeVec
latencyMetric *prometheus.GaugeVec
qosMetric *prometheus.GaugeVec
qosExcellenceMetric *prometheus.GaugeVec
LatestBlockMetric *prometheus.GaugeVec
LatestProviderRelay *prometheus.GaugeVec
virtualEpochMetric *prometheus.GaugeVec
endpointsHealthChecksOkMetric prometheus.Gauge
endpointsHealthChecksOk uint64
lock sync.Mutex
protocolVersionMetric *prometheus.GaugeVec
providerRelays map[string]uint64
totalCURequestedMetric *prometheus.CounterVec
totalRelaysRequestedMetric *prometheus.CounterVec
totalErroredMetric *prometheus.CounterVec
totalRelaysSentToProvidersMetric *prometheus.CounterVec
totalRelaysReturnedFromProvidersMetric *prometheus.CounterVec
totalRelaysSentByNewBatchTickerMetric *prometheus.CounterVec
currentNumberOfOpenSessionsMetric *prometheus.GaugeVec
currentNumberOfBlockedSessionsMetric *prometheus.GaugeVec
blockMetric *prometheus.GaugeVec
latencyMetric *prometheus.GaugeVec
qosMetric *prometheus.GaugeVec
qosExcellenceMetric *prometheus.GaugeVec
LatestBlockMetric *prometheus.GaugeVec
LatestProviderRelay *prometheus.GaugeVec
virtualEpochMetric *prometheus.GaugeVec
endpointsHealthChecksOkMetric prometheus.Gauge
endpointsHealthChecksOk uint64
lock sync.Mutex
protocolVersionMetric *prometheus.GaugeVec
providerRelays map[string]uint64
}

func NewConsumerMetricsManager(networkAddress string) *ConsumerMetricsManager {
Expand All @@ -46,6 +51,27 @@ func NewConsumerMetricsManager(networkAddress string) *ConsumerMetricsManager {
Help: "The total number of relays serviced by the consumer over time.",
}, []string{"spec", "apiInterface"})

totalRelaysSentToProvidersMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "lava_consumer_total_relays_sent_to_providers",
Help: "The total number of relays sent to providers",
}, []string{"spec", "apiInterface"})
totalRelaysReturnedFromProvidersMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "lava_consumer_total_relays_returned_from_providers",
Help: "The total number of relays returned from providers",
}, []string{"spec", "apiInterface"})
totalRelaysSentByNewBatchTickerMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "lava_consumer_total_relays_sent_by_batch_ticker",
Help: "The total number of relays sent by the batch ticker",
}, []string{"spec", "apiInterface"})
currentNumberOfOpenSessionsMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_consumer_current_number_of_open_sessions",
Help: "The total number of currently open sessions",
}, []string{"spec", "apiInterface", "provider"})
currentNumberOfBlockedSessionsMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_consumer_current_number_of_blocked_sessions",
Help: "The total number of currently blocked sessions",
}, []string{"spec", "apiInterface", "provider"})

// Create a new GaugeVec metric to represent the TotalErrored over time.
totalErroredMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "lava_consumer_total_errored",
Expand Down Expand Up @@ -106,22 +132,33 @@ func NewConsumerMetricsManager(networkAddress string) *ConsumerMetricsManager {
prometheus.MustRegister(virtualEpochMetric)
prometheus.MustRegister(endpointsHealthChecksOkMetric)
prometheus.MustRegister(protocolVersionMetric)
// metrics related to session management
prometheus.MustRegister(totalRelaysSentToProvidersMetric)
prometheus.MustRegister(totalRelaysReturnedFromProvidersMetric)
prometheus.MustRegister(totalRelaysSentByNewBatchTickerMetric)
prometheus.MustRegister(currentNumberOfOpenSessionsMetric)
prometheus.MustRegister(currentNumberOfBlockedSessionsMetric)

consumerMetricsManager := &ConsumerMetricsManager{
totalCURequestedMetric: totalCURequestedMetric,
totalRelaysRequestedMetric: totalRelaysRequestedMetric,
totalErroredMetric: totalErroredMetric,
blockMetric: blockMetric,
latencyMetric: latencyMetric,
qosMetric: qosMetric,
qosExcellenceMetric: qosExcellenceMetric,
LatestBlockMetric: latestBlockMetric,
LatestProviderRelay: latestProviderRelay,
providerRelays: map[string]uint64{},
virtualEpochMetric: virtualEpochMetric,
endpointsHealthChecksOkMetric: endpointsHealthChecksOkMetric,
endpointsHealthChecksOk: 1,
protocolVersionMetric: protocolVersionMetric,
totalCURequestedMetric: totalCURequestedMetric,
totalRelaysRequestedMetric: totalRelaysRequestedMetric,
totalErroredMetric: totalErroredMetric,
blockMetric: blockMetric,
latencyMetric: latencyMetric,
qosMetric: qosMetric,
qosExcellenceMetric: qosExcellenceMetric,
LatestBlockMetric: latestBlockMetric,
LatestProviderRelay: latestProviderRelay,
providerRelays: map[string]uint64{},
virtualEpochMetric: virtualEpochMetric,
endpointsHealthChecksOkMetric: endpointsHealthChecksOkMetric,
endpointsHealthChecksOk: 1,
protocolVersionMetric: protocolVersionMetric,
totalRelaysSentToProvidersMetric: totalRelaysSentToProvidersMetric,
totalRelaysReturnedFromProvidersMetric: totalRelaysReturnedFromProvidersMetric,
totalRelaysSentByNewBatchTickerMetric: totalRelaysSentByNewBatchTickerMetric,
currentNumberOfOpenSessionsMetric: currentNumberOfOpenSessionsMetric,
currentNumberOfBlockedSessionsMetric: currentNumberOfBlockedSessionsMetric,
}

http.Handle("/metrics", promhttp.Handler())
Expand Down Expand Up @@ -170,6 +207,54 @@ func (pme *ConsumerMetricsManager) SetRelayMetrics(relayMetric *RelayMetrics, er
}
}

func (pme *ConsumerMetricsManager) SetRelaySentToProviderMetric(chainId string, apiInterface string) {
if pme == nil {
return
}
pme.totalRelaysSentToProvidersMetric.WithLabelValues(chainId, apiInterface).Inc()
}

func (pme *ConsumerMetricsManager) SetRelayReturnedFromProviderMetric(chainId string, apiInterface string) {
if pme == nil {
return
}
pme.totalRelaysReturnedFromProvidersMetric.WithLabelValues(chainId, apiInterface).Inc()
}

func (pme *ConsumerMetricsManager) SetRelaySentByNewBatchTickerMetric(chainId string, apiInterface string) {
if pme == nil {
return
}
pme.totalRelaysSentByNewBatchTickerMetric.WithLabelValues(chainId, apiInterface).Inc()
}

func (pme *ConsumerMetricsManager) AddOpenSessionMetric(chainId string, apiInterface string, provider string) {
if pme == nil {
return
}
pme.lock.Lock()
defer pme.lock.Unlock()
pme.currentNumberOfOpenSessionsMetric.WithLabelValues(chainId, apiInterface, provider).Inc()
}

func (pme *ConsumerMetricsManager) DecrementOpenSessionMetric(chainId string, apiInterface string, provider string) {
if pme == nil {
return
}
pme.lock.Lock()
defer pme.lock.Unlock()
pme.currentNumberOfOpenSessionsMetric.WithLabelValues(chainId, apiInterface, provider).Dec()
}

func (pme *ConsumerMetricsManager) AddNumberOfBlockedSessionMetric(chainId string, apiInterface string, provider string) {
if pme == nil {
return
}
pme.lock.Lock()
defer pme.lock.Unlock()
pme.currentNumberOfBlockedSessionsMetric.WithLabelValues(chainId, apiInterface, provider).Inc()
}

func (pme *ConsumerMetricsManager) SetQOSMetrics(chainId string, apiInterface string, providerAddress string, qos *pairingtypes.QualityOfServiceReport, qosExcellence *pairingtypes.QualityOfServiceReport, latestBlock int64, relays uint64) {
if pme == nil {
return
Expand Down Expand Up @@ -239,14 +324,16 @@ func (pme *ConsumerMetricsManager) UpdateHealthCheckStatus(status bool) {
atomic.StoreUint64(&pme.endpointsHealthChecksOk, uint64(value))
}

func (pme *ConsumerMetricsManager) ResetQOSMetrics() {
func (pme *ConsumerMetricsManager) ResetSessionRelatedMetrics() {
if pme == nil {
return
}
pme.lock.Lock()
defer pme.lock.Unlock()
pme.qosMetric.Reset()
pme.qosExcellenceMetric.Reset()
pme.currentNumberOfBlockedSessionsMetric.Reset()
pme.currentNumberOfOpenSessionsMetric.Reset()
pme.providerRelays = map[string]uint64{}
}

Expand Down
12 changes: 12 additions & 0 deletions protocol/metrics/rpcconsumerlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,18 @@ func (rpccl *RPCConsumerLogs) shouldCountMetrics(refererHeaderValue string, user
return true
}

func (rpccl *RPCConsumerLogs) SetRelaySentToProviderMetric(chainId string, apiInterface string) {
rpccl.consumerMetricsManager.SetRelaySentToProviderMetric(chainId, apiInterface)
}

func (rpccl *RPCConsumerLogs) SetRelayReturnedFromProviderMetric(chainId string, apiInterface string) {
rpccl.consumerMetricsManager.SetRelayReturnedFromProviderMetric(chainId, apiInterface)
}

func (rpccl *RPCConsumerLogs) SetRelaySentByNewBatchTickerMetric(chainId string, apiInterface string) {
rpccl.consumerMetricsManager.SetRelaySentByNewBatchTickerMetric(chainId, apiInterface)
}

func (rpccl *RPCConsumerLogs) SendMetrics(data *RelayMetrics, err error, origin string) {
data.Success = err == nil
data.Origin = origin
Expand Down
Loading
Loading