Skip to content

Commit

Permalink
feat: PRT - Add provider freeze and jailed status metric (#1729)
Browse files Browse the repository at this point in the history
* add AvailabilityStateUpdater  per chain to rpcprovider

* added provider availability updater

* fix lint

* added correct metrics

* remove redundent comment

* update latest epoch

* wip - adding unitests

* wip - availabilitty updater mock and tests

* fix frozen metric help description

* revert init lava

* fix pr - freeze metric creation logic and split jailed metric data

* fix lint

* fix make lint

* set updater key with chainid only

* fix public address setup for freeze updater

* add unitests for freeze updater epoch updates

* change query to provider instead of providers

* fix jailed status metric labels

* Update protocol/metrics/metrics_provider_manager.go

Co-authored-by: Elad Gildnur <[email protected]>

* Fix after merge

* Numerus changes

* Fix lint

---------

Co-authored-by: leon mandel <[email protected]>
Co-authored-by: Ran Mishael <[email protected]>
Co-authored-by: Elad Gildnur <[email protected]>
Co-authored-by: Elad Gildnur <[email protected]>
Co-authored-by: Ran Mishael <[email protected]>
Co-authored-by: omerlavanet <[email protected]>
  • Loading branch information
7 people authored Oct 30, 2024
1 parent da37d59 commit a0b0de8
Show file tree
Hide file tree
Showing 6 changed files with 349 additions and 0 deletions.
51 changes: 51 additions & 0 deletions protocol/metrics/provider_metrics_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type ProviderMetricsManager struct {
endpointsHealthChecksOk uint64
relaysMonitors map[string]*RelaysMonitor
relaysMonitorsLock sync.RWMutex
frozenStatusMetric *prometheus.GaugeVec
jailStatusMetric *prometheus.GaugeVec
jailedCountMetric *prometheus.GaugeVec
loadRateMetric *prometheus.GaugeVec
}

Expand Down Expand Up @@ -117,20 +120,38 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager {
Name: "lava_provider_fetch_block_success",
Help: "The total number of get specific block queries that succeeded by chainfetcher",
}, []string{"spec"})

virtualEpochMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "virtual_epoch",
Help: "The current virtual epoch measured",
}, []string{"spec"})

endpointsHealthChecksOkMetric := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "lava_provider_overall_health",
Help: "At least one endpoint is healthy",
})
endpointsHealthChecksOkMetric.Set(1)

frozenStatusMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_provider_frozen_status",
Help: "Frozen: 1, Not Frozen: 0",
}, []string{"chainID"})

jailStatusMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_provider_jail_status",
Help: "Jailed: 1, Not Jailed: 0",
}, []string{"chainID"})

jailedCountMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_provider_jailed_count",
Help: "The amount of times the provider was jailed in the last 24 hours",
}, []string{"chainID"})

protocolVersionMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_provider_protocol_version",
Help: "The current running lavap version for the process. major := version / 1000000, minor := (version / 1000) % 1000 patch := version % 1000",
}, []string{"version"})

// Register the metrics with the Prometheus registry.
prometheus.MustRegister(totalCUServicedMetric)
prometheus.MustRegister(totalCUPaidMetric)
Expand All @@ -147,6 +168,9 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager {
prometheus.MustRegister(virtualEpochMetric)
prometheus.MustRegister(endpointsHealthChecksOkMetric)
prometheus.MustRegister(protocolVersionMetric)
prometheus.MustRegister(frozenStatusMetric)
prometheus.MustRegister(jailStatusMetric)
prometheus.MustRegister(jailedCountMetric)
prometheus.MustRegister(loadRateMetric)

providerMetricsManager := &ProviderMetricsManager{
Expand All @@ -168,6 +192,9 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager {
endpointsHealthChecksOk: 1,
protocolVersionMetric: protocolVersionMetric,
relaysMonitors: map[string]*RelaysMonitor{},
frozenStatusMetric: frozenStatusMetric,
jailStatusMetric: jailStatusMetric,
jailedCountMetric: jailedCountMetric,
loadRateMetric: loadRateMetric,
}

Expand Down Expand Up @@ -358,3 +385,27 @@ func (pme *ProviderMetricsManager) RegisterRelaysMonitor(chainID, apiInterface s
defer pme.relaysMonitorsLock.Unlock()
pme.relaysMonitors[chainID+apiInterface] = relaysMonitor
}

func (pme *ProviderMetricsManager) SetFrozenStatus(chain string, frozen bool) {
if pme == nil {
return
}

pme.frozenStatusMetric.WithLabelValues(chain).Set(utils.Btof(frozen))
}

func (pme *ProviderMetricsManager) SetJailStatus(chain string, jailed bool) {
if pme == nil {
return
}

pme.jailStatusMetric.WithLabelValues(chain).Set(utils.Btof(jailed))
}

func (pme *ProviderMetricsManager) SetJailedCount(chain string, jailedCount uint64) {
if pme == nil {
return
}

pme.jailedCountMetric.WithLabelValues(chain).Set(float64(jailedCount))
}
12 changes: 12 additions & 0 deletions protocol/rpcprovider/rpcprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) {
rpcp.providerStateTracker.RegisterForEpochUpdates(ctx, rpcp.rewardServer)
rpcp.providerStateTracker.RegisterPaymentUpdatableForPayments(ctx, rpcp.rewardServer)
}

keyName, err := sigs.GetKeyName(options.clientCtx)
if err != nil {
utils.LavaFormatFatal("failed getting key name from clientCtx", err)
Expand All @@ -214,8 +215,13 @@ func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) {
if err != nil {
utils.LavaFormatFatal("failed unmarshaling public address", err, utils.Attribute{Key: "keyName", Value: keyName}, utils.Attribute{Key: "pubkey", Value: pubKey.Address()})
}

utils.LavaFormatInfo("RPCProvider pubkey: " + rpcp.addr.String())

rpcp.createAndRegisterFreezeUpdatersByOptions(ctx, options.clientCtx, rpcp.addr.String())

utils.LavaFormatInfo("RPCProvider setting up endpoints", utils.Attribute{Key: "count", Value: strconv.Itoa(len(options.rpcProviderEndpoints))})

blockMemorySize, err := rpcp.providerStateTracker.GetEpochSizeMultipliedByRecommendedEpochNumToCollectPayment(ctx) // get the number of blocks to keep in PSM.
if err != nil {
utils.LavaFormatFatal("Failed fetching GetEpochSizeMultipliedByRecommendedEpochNumToCollectPayment in RPCProvider Start", err)
Expand Down Expand Up @@ -275,6 +281,12 @@ func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) {
return nil
}

func (rpcp *RPCProvider) createAndRegisterFreezeUpdatersByOptions(ctx context.Context, clientCtx client.Context, publicAddress string) {
queryClient := pairingtypes.NewQueryClient(clientCtx)
freezeJailUpdater := updaters.NewProviderFreezeJailUpdater(queryClient, publicAddress, rpcp.providerMetricsManager)
rpcp.providerStateTracker.RegisterForEpochUpdates(ctx, freezeJailUpdater)
}

func getActiveEndpoints(rpcProviderEndpoints []*lavasession.RPCProviderEndpoint, disabledEndpointsList []*lavasession.RPCProviderEndpoint) []*lavasession.RPCProviderEndpoint {
activeEndpoints := map[*lavasession.RPCProviderEndpoint]struct{}{}
for _, endpoint := range rpcProviderEndpoints {
Expand Down
71 changes: 71 additions & 0 deletions protocol/statetracker/updaters/provider_freeze_jail_updater.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package updaters

import (
"context"
"time"

"github.com/lavanet/lava/v4/utils"
pairingtypes "github.com/lavanet/lava/v4/x/pairing/types"
"google.golang.org/grpc"
)

const (
CallbackKeyForFreezeUpdate = "freeze-update"
)

type ProviderPairingStatusStateQueryInf interface {
Provider(ctx context.Context, in *pairingtypes.QueryProviderRequest, opts ...grpc.CallOption) (*pairingtypes.QueryProviderResponse, error)
}

type ProviderMetricsManagerInf interface {
SetFrozenStatus(string, bool)
SetJailStatus(string, bool)
SetJailedCount(string, uint64)
}

type FrozenStatus uint64

const (
AVAILABLE FrozenStatus = iota
FROZEN
)

type ProviderFreezeJailUpdater struct {
pairingQueryClient ProviderPairingStatusStateQueryInf
metricsManager ProviderMetricsManagerInf
publicAddress string
}

func NewProviderFreezeJailUpdater(
pairingQueryClient ProviderPairingStatusStateQueryInf,
publicAddress string,
metricsManager ProviderMetricsManagerInf,
) *ProviderFreezeJailUpdater {
return &ProviderFreezeJailUpdater{
pairingQueryClient: pairingQueryClient,
publicAddress: publicAddress,
metricsManager: metricsManager,
}
}

func (pfu *ProviderFreezeJailUpdater) UpdateEpoch(epoch uint64) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
response, err := pfu.pairingQueryClient.Provider(ctx, &pairingtypes.QueryProviderRequest{Address: pfu.publicAddress})
cancel()

if err != nil {
utils.LavaFormatError("Failed querying pairing client for provider", err)
return
}

for _, provider := range response.StakeEntries {
if provider.Address != pfu.publicAddress || !provider.IsAddressVaultOrProvider(provider.Address) {
// should never happen, but just in case
continue
}

pfu.metricsManager.SetJailedCount(provider.Chain, provider.Jails)
pfu.metricsManager.SetJailStatus(provider.Chain, provider.IsJailed(time.Now().UTC().Unix()))
pfu.metricsManager.SetFrozenStatus(provider.Chain, provider.IsFrozen() || provider.StakeAppliedBlock > epoch)
}
}
121 changes: 121 additions & 0 deletions protocol/statetracker/updaters/provider_freeze_jail_updater_mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a0b0de8

Please sign in to comment.