diff --git a/protocol/metrics/provider_metrics_manager.go b/protocol/metrics/provider_metrics_manager.go index a4578df233..8481b937b4 100644 --- a/protocol/metrics/provider_metrics_manager.go +++ b/protocol/metrics/provider_metrics_manager.go @@ -41,6 +41,9 @@ type ProviderMetricsManager struct { endpointsHealthChecksOk uint64 relaysMonitors map[string]*RelaysMonitor relaysMonitorsLock sync.RWMutex + frozenStatusMetric *prometheus.GaugeVec + jailStatusMetric *prometheus.GaugeVec + jailedCountMetric *prometheus.GaugeVec loadRateMetric *prometheus.GaugeVec } @@ -117,20 +120,38 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager { Name: "lava_provider_fetch_block_success", Help: "The total number of get specific block queries that succeeded by chainfetcher", }, []string{"spec"}) + virtualEpochMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "virtual_epoch", Help: "The current virtual epoch measured", }, []string{"spec"}) + endpointsHealthChecksOkMetric := prometheus.NewGauge(prometheus.GaugeOpts{ Name: "lava_provider_overall_health", Help: "At least one endpoint is healthy", }) endpointsHealthChecksOkMetric.Set(1) + frozenStatusMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_provider_frozen_status", + Help: "Frozen: 1, Not Frozen: 0", + }, []string{"chainID"}) + + jailStatusMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_provider_jail_status", + Help: "Jailed: 1, Not Jailed: 0", + }, []string{"chainID"}) + + jailedCountMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "lava_provider_jailed_count", + Help: "The amount of times the provider was jailed in the last 24 hours", + }, []string{"chainID"}) + protocolVersionMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "lava_provider_protocol_version", Help: "The current running lavap version for the process. major := version / 1000000, minor := (version / 1000) % 1000 patch := version % 1000", }, []string{"version"}) + // Register the metrics with the Prometheus registry. prometheus.MustRegister(totalCUServicedMetric) prometheus.MustRegister(totalCUPaidMetric) @@ -147,6 +168,9 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager { prometheus.MustRegister(virtualEpochMetric) prometheus.MustRegister(endpointsHealthChecksOkMetric) prometheus.MustRegister(protocolVersionMetric) + prometheus.MustRegister(frozenStatusMetric) + prometheus.MustRegister(jailStatusMetric) + prometheus.MustRegister(jailedCountMetric) prometheus.MustRegister(loadRateMetric) providerMetricsManager := &ProviderMetricsManager{ @@ -168,6 +192,9 @@ func NewProviderMetricsManager(networkAddress string) *ProviderMetricsManager { endpointsHealthChecksOk: 1, protocolVersionMetric: protocolVersionMetric, relaysMonitors: map[string]*RelaysMonitor{}, + frozenStatusMetric: frozenStatusMetric, + jailStatusMetric: jailStatusMetric, + jailedCountMetric: jailedCountMetric, loadRateMetric: loadRateMetric, } @@ -358,3 +385,27 @@ func (pme *ProviderMetricsManager) RegisterRelaysMonitor(chainID, apiInterface s defer pme.relaysMonitorsLock.Unlock() pme.relaysMonitors[chainID+apiInterface] = relaysMonitor } + +func (pme *ProviderMetricsManager) SetFrozenStatus(chain string, frozen bool) { + if pme == nil { + return + } + + pme.frozenStatusMetric.WithLabelValues(chain).Set(utils.Btof(frozen)) +} + +func (pme *ProviderMetricsManager) SetJailStatus(chain string, jailed bool) { + if pme == nil { + return + } + + pme.jailStatusMetric.WithLabelValues(chain).Set(utils.Btof(jailed)) +} + +func (pme *ProviderMetricsManager) SetJailedCount(chain string, jailedCount uint64) { + if pme == nil { + return + } + + pme.jailedCountMetric.WithLabelValues(chain).Set(float64(jailedCount)) +} diff --git a/protocol/rpcprovider/rpcprovider.go b/protocol/rpcprovider/rpcprovider.go index 22b9ed8b65..89a99d5f98 100644 --- a/protocol/rpcprovider/rpcprovider.go +++ b/protocol/rpcprovider/rpcprovider.go @@ -193,6 +193,7 @@ func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) { rpcp.providerStateTracker.RegisterForEpochUpdates(ctx, rpcp.rewardServer) rpcp.providerStateTracker.RegisterPaymentUpdatableForPayments(ctx, rpcp.rewardServer) } + keyName, err := sigs.GetKeyName(options.clientCtx) if err != nil { utils.LavaFormatFatal("failed getting key name from clientCtx", err) @@ -214,8 +215,13 @@ func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) { if err != nil { utils.LavaFormatFatal("failed unmarshaling public address", err, utils.Attribute{Key: "keyName", Value: keyName}, utils.Attribute{Key: "pubkey", Value: pubKey.Address()}) } + utils.LavaFormatInfo("RPCProvider pubkey: " + rpcp.addr.String()) + + rpcp.createAndRegisterFreezeUpdatersByOptions(ctx, options.clientCtx, rpcp.addr.String()) + utils.LavaFormatInfo("RPCProvider setting up endpoints", utils.Attribute{Key: "count", Value: strconv.Itoa(len(options.rpcProviderEndpoints))}) + blockMemorySize, err := rpcp.providerStateTracker.GetEpochSizeMultipliedByRecommendedEpochNumToCollectPayment(ctx) // get the number of blocks to keep in PSM. if err != nil { utils.LavaFormatFatal("Failed fetching GetEpochSizeMultipliedByRecommendedEpochNumToCollectPayment in RPCProvider Start", err) @@ -275,6 +281,12 @@ func (rpcp *RPCProvider) Start(options *rpcProviderStartOptions) (err error) { return nil } +func (rpcp *RPCProvider) createAndRegisterFreezeUpdatersByOptions(ctx context.Context, clientCtx client.Context, publicAddress string) { + queryClient := pairingtypes.NewQueryClient(clientCtx) + freezeJailUpdater := updaters.NewProviderFreezeJailUpdater(queryClient, publicAddress, rpcp.providerMetricsManager) + rpcp.providerStateTracker.RegisterForEpochUpdates(ctx, freezeJailUpdater) +} + func getActiveEndpoints(rpcProviderEndpoints []*lavasession.RPCProviderEndpoint, disabledEndpointsList []*lavasession.RPCProviderEndpoint) []*lavasession.RPCProviderEndpoint { activeEndpoints := map[*lavasession.RPCProviderEndpoint]struct{}{} for _, endpoint := range rpcProviderEndpoints { diff --git a/protocol/statetracker/updaters/provider_freeze_jail_updater.go b/protocol/statetracker/updaters/provider_freeze_jail_updater.go new file mode 100644 index 0000000000..69f24ad1ab --- /dev/null +++ b/protocol/statetracker/updaters/provider_freeze_jail_updater.go @@ -0,0 +1,71 @@ +package updaters + +import ( + "context" + "time" + + "github.com/lavanet/lava/v4/utils" + pairingtypes "github.com/lavanet/lava/v4/x/pairing/types" + "google.golang.org/grpc" +) + +const ( + CallbackKeyForFreezeUpdate = "freeze-update" +) + +type ProviderPairingStatusStateQueryInf interface { + Provider(ctx context.Context, in *pairingtypes.QueryProviderRequest, opts ...grpc.CallOption) (*pairingtypes.QueryProviderResponse, error) +} + +type ProviderMetricsManagerInf interface { + SetFrozenStatus(string, bool) + SetJailStatus(string, bool) + SetJailedCount(string, uint64) +} + +type FrozenStatus uint64 + +const ( + AVAILABLE FrozenStatus = iota + FROZEN +) + +type ProviderFreezeJailUpdater struct { + pairingQueryClient ProviderPairingStatusStateQueryInf + metricsManager ProviderMetricsManagerInf + publicAddress string +} + +func NewProviderFreezeJailUpdater( + pairingQueryClient ProviderPairingStatusStateQueryInf, + publicAddress string, + metricsManager ProviderMetricsManagerInf, +) *ProviderFreezeJailUpdater { + return &ProviderFreezeJailUpdater{ + pairingQueryClient: pairingQueryClient, + publicAddress: publicAddress, + metricsManager: metricsManager, + } +} + +func (pfu *ProviderFreezeJailUpdater) UpdateEpoch(epoch uint64) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + response, err := pfu.pairingQueryClient.Provider(ctx, &pairingtypes.QueryProviderRequest{Address: pfu.publicAddress}) + cancel() + + if err != nil { + utils.LavaFormatError("Failed querying pairing client for provider", err) + return + } + + for _, provider := range response.StakeEntries { + if provider.Address != pfu.publicAddress || !provider.IsAddressVaultOrProvider(provider.Address) { + // should never happen, but just in case + continue + } + + pfu.metricsManager.SetJailedCount(provider.Chain, provider.Jails) + pfu.metricsManager.SetJailStatus(provider.Chain, provider.IsJailed(time.Now().UTC().Unix())) + pfu.metricsManager.SetFrozenStatus(provider.Chain, provider.IsFrozen() || provider.StakeAppliedBlock > epoch) + } +} diff --git a/protocol/statetracker/updaters/provider_freeze_jail_updater_mocks.go b/protocol/statetracker/updaters/provider_freeze_jail_updater_mocks.go new file mode 100644 index 0000000000..24b0738393 --- /dev/null +++ b/protocol/statetracker/updaters/provider_freeze_jail_updater_mocks.go @@ -0,0 +1,121 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: protocol/statetracker/updaters/provider_freeze_updater.go +// +// Generated by this command: +// +// mockgen -source=protocol/statetracker/updaters/provider_freeze_updater.go -destination protocol/statetracker/updaters/provider_freeze_updater_mocks.go -package updaters +// + +// Package updaters is a generated GoMock package. +package updaters + +import ( + context "context" + reflect "reflect" + + types "github.com/lavanet/lava/v4/x/pairing/types" + gomock "go.uber.org/mock/gomock" + grpc "google.golang.org/grpc" +) + +// MockProviderPairingStatusStateQueryInf is a mock of ProviderPairingStatusStateQueryInf interface. +type MockProviderPairingStatusStateQueryInf struct { + ctrl *gomock.Controller + recorder *MockProviderPairingStatusStateQueryInfMockRecorder +} + +// MockProviderPairingStatusStateQueryInfMockRecorder is the mock recorder for MockProviderPairingStatusStateQueryInf. +type MockProviderPairingStatusStateQueryInfMockRecorder struct { + mock *MockProviderPairingStatusStateQueryInf +} + +// NewMockProviderPairingStatusStateQueryInf creates a new mock instance. +func NewMockProviderPairingStatusStateQueryInf(ctrl *gomock.Controller) *MockProviderPairingStatusStateQueryInf { + mock := &MockProviderPairingStatusStateQueryInf{ctrl: ctrl} + mock.recorder = &MockProviderPairingStatusStateQueryInfMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockProviderPairingStatusStateQueryInf) EXPECT() *MockProviderPairingStatusStateQueryInfMockRecorder { + return m.recorder +} + +// Provider mocks base method. +func (m *MockProviderPairingStatusStateQueryInf) Provider(ctx context.Context, in *types.QueryProviderRequest, opts ...grpc.CallOption) (*types.QueryProviderResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Provider", varargs...) + ret0, _ := ret[0].(*types.QueryProviderResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Provider indicates an expected call of Provider. +func (mr *MockProviderPairingStatusStateQueryInfMockRecorder) Provider(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Provider", reflect.TypeOf((*MockProviderPairingStatusStateQueryInf)(nil).Provider), varargs...) +} + +// MockProviderMetricsManagerInf is a mock of ProviderMetricsManagerInf interface. +type MockProviderMetricsManagerInf struct { + ctrl *gomock.Controller + recorder *MockProviderMetricsManagerInfMockRecorder +} + +// MockProviderMetricsManagerInfMockRecorder is the mock recorder for MockProviderMetricsManagerInf. +type MockProviderMetricsManagerInfMockRecorder struct { + mock *MockProviderMetricsManagerInf +} + +// NewMockProviderMetricsManagerInf creates a new mock instance. +func NewMockProviderMetricsManagerInf(ctrl *gomock.Controller) *MockProviderMetricsManagerInf { + mock := &MockProviderMetricsManagerInf{ctrl: ctrl} + mock.recorder = &MockProviderMetricsManagerInfMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockProviderMetricsManagerInf) EXPECT() *MockProviderMetricsManagerInfMockRecorder { + return m.recorder +} + +// SetFrozenStatus mocks base method. +func (m *MockProviderMetricsManagerInf) SetFrozenStatus(arg0 string, arg1 bool) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetFrozenStatus", arg0, arg1) +} + +// SetFrozenStatus indicates an expected call of SetFrozenStatus. +func (mr *MockProviderMetricsManagerInfMockRecorder) SetFrozenStatus(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetFrozenStatus", reflect.TypeOf((*MockProviderMetricsManagerInf)(nil).SetFrozenStatus), arg0, arg1) +} + +// SetJailStatus mocks base method. +func (m *MockProviderMetricsManagerInf) SetJailStatus(arg0 string, arg1 bool) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetJailStatus", arg0, arg1) +} + +// SetJailStatus indicates an expected call of SetJailStatus. +func (mr *MockProviderMetricsManagerInfMockRecorder) SetJailStatus(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetJailStatus", reflect.TypeOf((*MockProviderMetricsManagerInf)(nil).SetJailStatus), arg0, arg1) +} + +// SetJailedCount mocks base method. +func (m *MockProviderMetricsManagerInf) SetJailedCount(arg0 string, arg1 uint64) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetJailedCount", arg0, arg1) +} + +// SetJailedCount indicates an expected call of SetJailedCount. +func (mr *MockProviderMetricsManagerInfMockRecorder) SetJailedCount(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetJailedCount", reflect.TypeOf((*MockProviderMetricsManagerInf)(nil).SetJailedCount), arg0, arg1) +} diff --git a/protocol/statetracker/updaters/provider_freeze_jail_updater_test.go b/protocol/statetracker/updaters/provider_freeze_jail_updater_test.go new file mode 100644 index 0000000000..ce6c6f68be --- /dev/null +++ b/protocol/statetracker/updaters/provider_freeze_jail_updater_test.go @@ -0,0 +1,86 @@ +package updaters + +import ( + "testing" + "time" + + "github.com/lavanet/lava/v4/utils/rand" + epochstoragetypes "github.com/lavanet/lava/v4/x/epochstorage/types" + pairingtypes "github.com/lavanet/lava/v4/x/pairing/types" + gomock "go.uber.org/mock/gomock" +) + +func TestFreezeJailMetricsOnEpochUpdate(t *testing.T) { + rand.InitRandomSeed() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + specID := "test-spec" + address := "initial1" + epoch := uint64(100) + stakeAppliedBlock := uint64(10) + + stakeEntryList := []epochstoragetypes.StakeEntry{ + { + Address: address, + Chain: specID, + Endpoints: []epochstoragetypes.Endpoint{ + { + IPPORT: "1234567", + Geolocation: 1, + Addons: []string{}, + ApiInterfaces: []string{"banana"}, + Extensions: []string{}, + }, + }, + StakeAppliedBlock: stakeAppliedBlock, + }, + } + + response := &pairingtypes.QueryProviderResponse{StakeEntries: stakeEntryList} + + stateQuery := NewMockProviderPairingStatusStateQueryInf(ctrl) + metricManager := NewMockProviderMetricsManagerInf(ctrl) + + freezeUpdater := NewProviderFreezeJailUpdater(stateQuery, address, metricManager) + + expectAndRun := func(stakeAppliedBlock, jailedCount uint64, frozen bool, jailed bool) { + stakeEntryList[0].StakeAppliedBlock = stakeAppliedBlock + stakeEntryList[0].Jails = jailedCount + if jailed { + stakeEntryList[0].JailEndTime = time.Now().Add(time.Hour).UTC().Unix() + } + response = &pairingtypes.QueryProviderResponse{StakeEntries: stakeEntryList} + stateQuery. + EXPECT(). + Provider(gomock.Any(), gomock.Any(), gomock.Any()). + Return(response, nil). + AnyTimes() + + metricManager. + EXPECT(). + SetJailStatus(specID, jailed). + Times(1) + + metricManager. + EXPECT(). + SetFrozenStatus(specID, frozen). + Times(1) + + metricManager. + EXPECT(). + SetJailedCount(specID, jailedCount). + Times(1) + + freezeUpdater.UpdateEpoch(epoch) + } + + // Normal - no freeze, no jail + expectAndRun(stakeAppliedBlock, 0, false, false) + + // StakeAppliedBlock > epoch - frozen + expectAndRun(epoch+1, 0, true, false) + + // Jail status changed + jail count + expectAndRun(epoch-1, 1, false, true) +} diff --git a/utils/convert.go b/utils/convert.go new file mode 100644 index 0000000000..0a39cc7093 --- /dev/null +++ b/utils/convert.go @@ -0,0 +1,8 @@ +package utils + +func Btof(b bool) float64 { + if b { + return 1 + } + return 0 +}