diff --git a/.k8/yamls/ssv-exporter.yml b/.k8/yamls/ssv-exporter.yml index e0a746ebb3..acca7033ed 100644 --- a/.k8/yamls/ssv-exporter.yml +++ b/.k8/yamls/ssv-exporter.yml @@ -139,6 +139,8 @@ spec: name: port-15000 hostPort: 15000 env: + - name: SHARE_CONFIG + value: "./data/share.yaml" - name: ETH_1_ADDR valueFrom: secretKeyRef: diff --git a/beacon/beacon.go b/beacon/beacon.go index 161503c1f3..7ecc9ef5f9 100644 --- a/beacon/beacon.go +++ b/beacon/beacon.go @@ -27,8 +27,8 @@ type Beacon interface { // GetDuties returns duties for the passed validators indices GetDuties(epoch spec.Epoch, validatorIndices []spec.ValidatorIndex) ([]*Duty, error) - // GetIndices returns indices for each pubkey from the node - GetIndices(validatorPubKeys []spec.BLSPubKey) (map[spec.ValidatorIndex]*api.Validator, error) + // GetValidatorData returns metadata (balance, index, status, more) for each pubkey from the node + GetValidatorData(validatorPubKeys []spec.BLSPubKey) (map[spec.ValidatorIndex]*api.Validator, error) // GetAttestationData returns attestation data by the given slot and committee index GetAttestationData(slot spec.Slot, committeeIndex spec.CommitteeIndex) (*spec.AttestationData, error) diff --git a/beacon/duty.go b/beacon/duty.go index 57138e6b94..8002b4f04d 100644 --- a/beacon/duty.go +++ b/beacon/duty.go @@ -21,4 +21,3 @@ type Duty struct { // ValidatorCommitteeIndex is the index of the validator in the list of validators in the committee. ValidatorCommitteeIndex uint64 } - diff --git a/beacon/duty_data.go b/beacon/duty_data.go index 3ea59533d4..448fa4e357 100644 --- a/beacon/duty_data.go +++ b/beacon/duty_data.go @@ -56,8 +56,9 @@ type IsInputValueSignedData interface { type InputValueAttestation struct { Attestation *phase0.Attestation } + // isInputValueSignedData implementation - func (*InputValueAttestation) isInputValueSignedData() {} +func (*InputValueAttestation) isInputValueSignedData() {} // GetSignedData returns input data func (m *DutyData) GetSignedData() IsInputValueSignedData { @@ -73,4 +74,4 @@ func (m *DutyData) GetAttestation() *phase0.Attestation { return x.Attestation } return nil -} \ No newline at end of file +} diff --git a/beacon/goclient/goclient.go b/beacon/goclient/goclient.go index dfe13defc3..91e587a571 100644 --- a/beacon/goclient/goclient.go +++ b/beacon/goclient/goclient.go @@ -142,7 +142,8 @@ func (gc *goClient) GetDuties(epoch spec.Epoch, validatorIndices []spec.Validato return nil, errors.New("client does not support AttesterDutiesProvider") } -func (gc *goClient) GetIndices(validatorPubKeys []spec.BLSPubKey) (map[spec.ValidatorIndex]*api.Validator, error) { +// GetValidatorData returns metadata (balance, index, status, more) for each pubkey from the node +func (gc *goClient) GetValidatorData(validatorPubKeys []spec.BLSPubKey) (map[spec.ValidatorIndex]*api.Validator, error) { if provider, isProvider := gc.client.(eth2client.ValidatorsProvider); isProvider { validatorsMap, err := provider.ValidatorsByPubKey(gc.ctx, "head", validatorPubKeys) // TODO maybe need to get the chainId (head) as var if err != nil { diff --git a/beacon/test_utils.go b/beacon/test_utils.go new file mode 100644 index 0000000000..900e3ebc89 --- /dev/null +++ b/beacon/test_utils.go @@ -0,0 +1,95 @@ +package beacon + +import ( + v1 "github.com/attestantio/go-eth2-client/api/v1" + spec "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/herumi/bls-eth-go-binary/bls" + "sync" +) + +func init() { + _ = bls.Init(bls.BLS12_381) +} + +// NewMockBeacon creates a new mock implementation of beacon client +func NewMockBeacon(dutiesResults map[uint64][]*Duty, validatorsData map[spec.BLSPubKey]*v1.Validator) Beacon { + return &mockBeacon{ + indicesMap: map[spec.BLSPubKey]spec.ValidatorIndex{}, + indicesLock: sync.Mutex{}, + dutiesResults: dutiesResults, + validatorsData: validatorsData, + } +} + +type mockBeacon struct { + indicesMap map[spec.BLSPubKey]spec.ValidatorIndex + indicesLock sync.Mutex + dutiesResults map[uint64][]*Duty + validatorsData map[spec.BLSPubKey]*v1.Validator +} + +func (m *mockBeacon) ExtendIndexMap(index spec.ValidatorIndex, pubKey spec.BLSPubKey) { + m.indicesLock.Lock() + defer m.indicesLock.Unlock() + + m.indicesMap[pubKey] = index +} + +func (m *mockBeacon) GetDuties(epoch spec.Epoch, validatorIndices []spec.ValidatorIndex) ([]*Duty, error) { + return m.dutiesResults[uint64(epoch)], nil +} + +func (m *mockBeacon) GetValidatorData(validatorPubKeys []spec.BLSPubKey) (map[spec.ValidatorIndex]*v1.Validator, error) { + results := map[spec.ValidatorIndex]*v1.Validator{} + for _, pk := range validatorPubKeys { + if data, ok := m.validatorsData[pk]; ok { + results[data.Index] = data + } + } + return results, nil +} + +func (m *mockBeacon) GetAttestationData(slot spec.Slot, committeeIndex spec.CommitteeIndex) (*spec.AttestationData, error) { + return nil, nil +} + +func (m *mockBeacon) SignAttestation(data *spec.AttestationData, duty *Duty, shareKey *bls.SecretKey) (*spec.Attestation, []byte, error) { + return nil, nil, nil +} + +func (m *mockBeacon) SubmitAttestation(attestation *spec.Attestation) error { + return nil +} + +func (m *mockBeacon) SubscribeToCommitteeSubnet(subscription []*v1.BeaconCommitteeSubscription) error { + return nil +} + +// NewMockValidatorMetadataStorage creates a new mock implementation of ValidatorMetadataStorage +func NewMockValidatorMetadataStorage() ValidatorMetadataStorage { + return &mockValidatorMetadataStorage{ + map[string]*ValidatorMetadata{}, + sync.Mutex{}, + } +} + +type mockValidatorMetadataStorage struct { + data map[string]*ValidatorMetadata + lock sync.Mutex +} + +func (m *mockValidatorMetadataStorage) UpdateValidatorMetadata(pk string, metadata *ValidatorMetadata) error { + m.lock.Lock() + defer m.lock.Unlock() + + m.data[pk] = metadata + + return nil +} + +func (m *mockValidatorMetadataStorage) Size() int { + m.lock.Lock() + defer m.lock.Unlock() + + return len(m.data) +} diff --git a/beacon/types.go b/beacon/types.go index 69022e9eab..4e7885f470 100644 --- a/beacon/types.go +++ b/beacon/types.go @@ -25,4 +25,4 @@ const ( RoleTypeAttester RoleTypeAggregator RoleTypeProposer -) \ No newline at end of file +) diff --git a/beacon/validator_metadata_test.go b/beacon/validator_metadata_test.go new file mode 100644 index 0000000000..dbde7c707a --- /dev/null +++ b/beacon/validator_metadata_test.go @@ -0,0 +1,91 @@ +package beacon + +import ( + "encoding/hex" + v1 "github.com/attestantio/go-eth2-client/api/v1" + spec "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/bloxapp/ssv/utils/logex" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "strings" + "sync/atomic" + "testing" +) + +func init() { + logex.Build("test", zap.InfoLevel, nil) +} + +func TestValidatorMetadata_Status(t *testing.T) { + t.Run("ready", func(t *testing.T) { + meta := &ValidatorMetadata{ + Status: v1.ValidatorStateActiveOngoing, + } + require.True(t, meta.Activated()) + require.False(t, meta.Exiting()) + require.False(t, meta.Slashed()) + }) + + t.Run("exited", func(t *testing.T) { + meta := &ValidatorMetadata{ + Status: v1.ValidatorStateWithdrawalPossible, + } + require.True(t, meta.Exiting()) + require.True(t, meta.Activated()) + require.False(t, meta.Slashed()) + }) + + t.Run("exitedSlashed", func(t *testing.T) { + meta := &ValidatorMetadata{ + Status: v1.ValidatorStateExitedSlashed, + } + require.True(t, meta.Slashed()) + require.True(t, meta.Exiting()) + require.True(t, meta.Activated()) + }) +} + +func TestUpdateValidatorsMetadata(t *testing.T) { + var updateCount uint64 + pks := []string{ + "a17bb48a3f8f558e29d08ede97d6b7b73823d8dc2e0530fe8b747c93d7d6c2755957b7ffb94a7cec830456fd5492ba19", + "a0cf5642ed5aa82178a5f79e00292c5b700b67fbf59630ce4f542c392495d9835a99c826aa2459a67bc80867245386c6", + "8bafb7165f42e1179f83b7fcd8fe940e60ed5933fac176fdf75a60838c688eaa3b57717be637bde1d5cebdadb8e39865", + } + + decodeds := make([][]byte, len(pks)) + blsPubKeys := make([]spec.BLSPubKey, len(pks)) + for i, pk := range pks { + blsPubKey := spec.BLSPubKey{} + decoded, _ := hex.DecodeString(pk) + copy(blsPubKey[:], decoded[:]) + decodeds[i] = decoded + blsPubKeys[i] = blsPubKey + } + + data := map[spec.BLSPubKey]*v1.Validator{} + data[blsPubKeys[0]] = &v1.Validator{ + Index: spec.ValidatorIndex(210961), + Status: v1.ValidatorStateWithdrawalPossible, + Validator: &spec.Validator{PublicKey: blsPubKeys[0]}, + } + data[blsPubKeys[1]] = &v1.Validator{ + Index: spec.ValidatorIndex(213820), + Status: v1.ValidatorStateActiveOngoing, + Validator: &spec.Validator{PublicKey: blsPubKeys[1]}, + } + bc := NewMockBeacon(map[uint64][]*Duty{}, data) + + storage := NewMockValidatorMetadataStorage() + + onUpdated := func(pk string, meta *ValidatorMetadata) { + joined := strings.Join(pks, ":") + require.True(t, strings.Contains(joined, pk)) + require.True(t, meta.Index == spec.ValidatorIndex(210961) || meta.Index == spec.ValidatorIndex(213820)) + atomic.AddUint64(&updateCount, 1) + } + err := UpdateValidatorsMetadata([][]byte{blsPubKeys[0][:], blsPubKeys[1][:], blsPubKeys[2][:]}, storage, bc, onUpdated) + require.Nil(t, err) + require.Equal(t, uint64(2), updateCount) + require.Equal(t, 2, storage.(*mockValidatorMetadataStorage).Size()) +} diff --git a/beacon/validator_metdata.go b/beacon/validator_metdata.go new file mode 100644 index 0000000000..367b40f3d9 --- /dev/null +++ b/beacon/validator_metdata.go @@ -0,0 +1,107 @@ +package beacon + +import ( + "encoding/hex" + v1 "github.com/attestantio/go-eth2-client/api/v1" + spec "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/bloxapp/ssv/utils/logex" + "github.com/pkg/errors" + "go.uber.org/zap" +) + +// ValidatorMetadataStorage interface for validator metadata +type ValidatorMetadataStorage interface { + UpdateValidatorMetadata(pk string, metadata *ValidatorMetadata) error +} + +// ValidatorMetadata represents validator metdata from beacon +type ValidatorMetadata struct { + Balance spec.Gwei `json:"balance"` + Status v1.ValidatorState `json:"status"` + Index spec.ValidatorIndex `json:"index"` // pointer in order to support nil +} + +// Activated returns true if the validator is not unknown. It might be pending activation or active +func (m *ValidatorMetadata) Activated() bool { + return m.Status.HasActivated() +} + +// Exiting returns true if the validator is existing or exited +func (m *ValidatorMetadata) Exiting() bool { + return m.Status.IsExited() || m.Status.HasExited() +} + +// Slashed returns true if the validator is existing or exited due to slashing +func (m *ValidatorMetadata) Slashed() bool { + return m.Status == v1.ValidatorStateExitedSlashed || m.Status == v1.ValidatorStateActiveSlashed +} + +// OnUpdated represents a function to be called once validator's metadata was updated +type OnUpdated func(pk string, meta *ValidatorMetadata) + +// UpdateValidatorsMetadata updates validator information for the given public keys +func UpdateValidatorsMetadata(pubKeys [][]byte, collection ValidatorMetadataStorage, bc Beacon, onUpdated OnUpdated) error { + logger := logex.GetLogger(zap.String("who", "UpdateValidatorsMetadata")) + + results, err := FetchValidatorsMetadata(bc, pubKeys) + if err != nil { + return errors.Wrap(err, "failed to get validator data from Beacon") + } + logger.Debug("got validators metadata", zap.Int("pks count", len(pubKeys)), + zap.Int("results count", len(results))) + + var errs []error + for pk, meta := range results { + if err := collection.UpdateValidatorMetadata(pk, meta); err != nil { + logger.Error("failed to update validator metadata", + zap.String("pk", pk), zap.Error(err)) + errs = append(errs, err) + } + if onUpdated != nil { + onUpdated(pk, meta) + } + logger.Debug("managed to update validator metadata", + zap.String("pk", pk), zap.Any("metadata", *meta)) + } + if len(errs) > 0 { + logger.Error("could not process validators returned from beacon", + zap.Int("count", len(errs)), zap.Any("errs", errs)) + return errors.Errorf("could not process %d validators returned from beacon", len(errs)) + } + + return nil +} + +// FetchValidatorsMetadata is fetching validators data from beacon +func FetchValidatorsMetadata(bc Beacon, pubKeys [][]byte) (map[string]*ValidatorMetadata, error) { + logger := logex.GetLogger(zap.String("who", "FetchValidatorsMetadata")) + if len(pubKeys) == 0 { + return nil, nil + } + var pubkeys []spec.BLSPubKey + for _, pk := range pubKeys { + blsPubKey := spec.BLSPubKey{} + copy(blsPubKey[:], pk) + pubkeys = append(pubkeys, blsPubKey) + } + logger.Debug("fetching metadata for public keys", zap.Int("total", len(pubkeys))) + validatorsIndexMap, err := bc.GetValidatorData(pubkeys) + if err != nil { + return nil, errors.Wrap(err, "failed to get validators data from beacon") + } + logger.Debug("got validators metadata", zap.Int("pks count", len(pubKeys)), + zap.Int("results count", len(validatorsIndexMap))) + ret := make(map[string]*ValidatorMetadata) + for index, v := range validatorsIndexMap { + pk := hex.EncodeToString(v.Validator.PublicKey[:]) + meta := &ValidatorMetadata{ + Balance: v.Balance, + Status: v.Status, + Index: v.Index, + } + ret[pk] = meta + // once fetched, the internal map in go-client should be updated + bc.ExtendIndexMap(index, v.Validator.PublicKey) + } + return ret, nil +} diff --git a/cli/exporter/node.go b/cli/exporter/node.go index d85bc6ed96..4f610d81a1 100644 --- a/cli/exporter/node.go +++ b/cli/exporter/node.go @@ -3,6 +3,8 @@ package exporter import ( "crypto/rsa" "fmt" + "github.com/bloxapp/ssv/beacon" + "github.com/bloxapp/ssv/beacon/goclient" global_config "github.com/bloxapp/ssv/cli/config" "github.com/bloxapp/ssv/eth1" "github.com/bloxapp/ssv/eth1/goeth" @@ -21,6 +23,7 @@ import ( "go.uber.org/zap" "log" "net/http" + "time" ) type config struct { @@ -28,12 +31,14 @@ type config struct { DBOptions basedb.Options `yaml:"db"` P2pNetworkConfig p2p.Config `yaml:"p2p"` ETH1Options eth1.Options `yaml:"eth1"` + ETH2Options beacon.Options `yaml:"eth2"` - WsAPIPort int `yaml:"WebSocketAPIPort" env:"WS_API_PORT" env-default:"14000" env-description:"port of exporter WS api"` - MetricsAPIPort int `yaml:"MetricsAPIPort" env:"METRICS_API_PORT" env-description:"port of metrics api"` - EnableProfile bool `yaml:"EnableProfile" env:"ENABLE_PROFILE" env-description:"flag that indicates whether go profiling tools are enabled"` - IbftSyncEnabled bool `yaml:"IbftSyncEnabled" env:"IBFT_SYNC_ENABLED" env-default:"false" env-description:"enable ibft sync for all topics"` - NetworkPrivateKey string `yaml:"NetworkPrivateKey" env:"NETWORK_PRIVATE_KEY" env-description:"private key for network identity"` + WsAPIPort int `yaml:"WebSocketAPIPort" env:"WS_API_PORT" env-default:"14000" env-description:"port of exporter WS api"` + MetricsAPIPort int `yaml:"MetricsAPIPort" env:"METRICS_API_PORT" env-description:"port of metrics api"` + EnableProfile bool `yaml:"EnableProfile" env:"ENABLE_PROFILE" env-description:"flag that indicates whether go profiling tools are enabled"` + IbftSyncEnabled bool `yaml:"IbftSyncEnabled" env:"IBFT_SYNC_ENABLED" env-default:"false" env-description:"enable ibft sync for all topics"` + ValidatorMetaDataUpdateInterval time.Duration `yaml:"ValidatorMetaDataUpdateInterval" env:"VALIDATOR_METADATA_UPDATE_INTERVAL" env-default:"12m" env-description:"set the interval at which validator metadata gets updated"` + NetworkPrivateKey string `yaml:"NetworkPrivateKey" env:"NETWORK_PRIVATE_KEY" env-description:"private key for network identity"` } var cfg config @@ -50,6 +55,11 @@ var StartExporterNodeCmd = &cobra.Command{ if err := cleanenv.ReadConfig(globalArgs.ConfigPath, &cfg); err != nil { log.Fatal(err) } + if globalArgs.ShareConfigPath != "" { + if err := cleanenv.ReadConfig(globalArgs.ShareConfigPath, &cfg); err != nil { + log.Fatal(err) + } + } // configure logger and db loggerLevel, errLogLevel := logex.GetLoggerLevelValue(cfg.LogLevel) Logger := logex.Build(cmd.Parent().Short, loggerLevel, &logex.EncodingConfig{ @@ -95,8 +105,18 @@ var StartExporterNodeCmd = &cobra.Command{ Logger.Fatal("failed to create eth1 client", zap.Error(err)) } + // TODO Not refactored yet Start + cfg.ETH2Options.Context = cmd.Context() + cfg.ETH2Options.Logger = Logger + cfg.ETH2Options.Graffiti = []byte("BloxStaking") + beaconClient, err := goclient.New(cfg.ETH2Options) + if err != nil { + Logger.Fatal("failed to create beacon go-client", zap.Error(err)) + } + exporterOptions := new(exporter.Options) exporterOptions.Eth1Client = eth1Client + exporterOptions.Beacon = beaconClient exporterOptions.Logger = Logger exporterOptions.Network = network exporterOptions.DB = db @@ -105,10 +125,12 @@ var StartExporterNodeCmd = &cobra.Command{ exporterOptions.WsAPIPort = cfg.WsAPIPort exporterOptions.IbftSyncEnabled = cfg.IbftSyncEnabled exporterOptions.CleanRegistryData = cfg.ETH1Options.CleanRegistryData + exporterOptions.ValidatorMetaDataUpdateInterval = cfg.ValidatorMetaDataUpdateInterval exporterNode = exporter.New(*exporterOptions) metrics.WaitUntilHealthy(Logger, eth1Client, "eth1 node") + metrics.WaitUntilHealthy(Logger, beaconClient, "beacon node") if err := exporterNode.StartEth1(eth1.HexStringToSyncOffset(cfg.ETH1Options.ETH1SyncOffset)); err != nil { Logger.Fatal("failed to start eth1", zap.Error(err)) diff --git a/cli/operator/node.go b/cli/operator/node.go index e8b602cbe6..a10a0a4b3c 100644 --- a/cli/operator/node.go +++ b/cli/operator/node.go @@ -74,7 +74,7 @@ var StartNodeCmd = &cobra.Command{ eth2Network := core.NetworkFromString(cfg.ETH2Options.Network) - // TODO Not refactored yet Start: + // TODO Not refactored yet Start (refactor in exporter as well): cfg.ETH2Options.Context = cmd.Context() cfg.ETH2Options.Logger = Logger cfg.ETH2Options.Graffiti = []byte("BloxStaking") @@ -92,7 +92,7 @@ var StartNodeCmd = &cobra.Command{ cfg.SSVOptions.Context = ctx cfg.SSVOptions.Logger = Logger cfg.SSVOptions.DB = db - cfg.SSVOptions.Beacon = &beaconClient + cfg.SSVOptions.Beacon = beaconClient cfg.SSVOptions.ETHNetwork = ð2Network cfg.SSVOptions.ValidatorOptions.ETHNetwork = ð2Network diff --git a/eth1/goeth/goETH.go b/eth1/goeth/goETH.go index 6721e8453e..5129dce71d 100644 --- a/eth1/goeth/goETH.go +++ b/eth1/goeth/goETH.go @@ -324,8 +324,6 @@ func (ec *eth1Client) fetchAndProcessEvents(fromBlock, toBlock *big.Int, contrac } func (ec *eth1Client) handleEvent(vLog types.Log, contractAbi abi.ABI) error { - ec.logger.Debug("handling smart contract event", zap.Any("vLog", vLog)) - eventType, err := contractAbi.EventByID(vLog.Topics[0]) if err != nil { // unknown event -> ignored ec.logger.Warn("failed to find event type", zap.Error(err), zap.String("txHash", vLog.TxHash.Hex())) @@ -354,12 +352,10 @@ func (ec *eth1Client) handleEvent(vLog types.Log, contractAbi abi.ABI) error { if err != nil { return errors.Wrap(err, "failed to parse ValidatorAdded event") } - if !isEventBelongsToOperator { - ec.logger.Debug("Validator doesn't belong to operator", + if isEventBelongsToOperator { + ec.logger.Debug("validator is assigned to this operator", zap.String("pubKey", hex.EncodeToString(parsed.PublicKey))) } - ec.logger.Debug("parsed data", - zap.String("pubKey", hex.EncodeToString(parsed.PublicKey)), zap.Any("parsed", parsed)) // if there is no operator-private-key --> assuming that the event should be triggered (e.g. exporter) if isEventBelongsToOperator || shareEncryptionKey == nil { ec.fireEvent(vLog, *parsed) diff --git a/exporter/ibft/decided_reader.go b/exporter/ibft/decided_reader.go index a909b4f2f9..e6c3e3d8de 100644 --- a/exporter/ibft/decided_reader.go +++ b/exporter/ibft/decided_reader.go @@ -3,6 +3,7 @@ package ibft import ( "context" "github.com/bloxapp/ssv/beacon" + "github.com/bloxapp/ssv/ibft" "github.com/bloxapp/ssv/ibft/pipeline" "github.com/bloxapp/ssv/ibft/pipeline/auth" "github.com/bloxapp/ssv/ibft/proto" @@ -130,7 +131,7 @@ func (r *decidedReader) handleNewDecidedMessage(msg *proto.SignedMessage) error return errors.Wrap(err, "could not save decided") } logger.Debug("decided saved") - reportDecided(msg, r.validatorShare) + ibft.ReportDecided(r.validatorShare.PublicKey.SerializeToHexStr(), msg) return r.checkHighestDecided(msg) } diff --git a/exporter/ibft/metrics.go b/exporter/ibft/metrics.go deleted file mode 100644 index d08f7eec02..0000000000 --- a/exporter/ibft/metrics.go +++ /dev/null @@ -1,31 +0,0 @@ -package ibft - -import ( - "github.com/bloxapp/ssv/ibft/proto" - "github.com/bloxapp/ssv/validator/storage" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - "log" - "strconv" -) - -var ( - metricsDecidedSignersExp = promauto.NewGaugeVec(prometheus.GaugeOpts{ - Name: "ssv:validator:ibft_decided_signers_exp", - Help: "Signers of the highest decided sequence number", - }, []string{"lambda", "pubKey", "seq", "nodeId"}) -) - -func init() { - if err := prometheus.Register(metricsDecidedSignersExp); err != nil { - log.Println("could not register prometheus collector") - } -} - -func reportDecided(msg *proto.SignedMessage, share *storage.Share) { - for _, nodeID := range msg.SignerIds { - metricsDecidedSignersExp.WithLabelValues(string(msg.Message.GetLambda()), - share.PublicKey.SerializeToHexStr(), strconv.FormatUint(msg.Message.SeqNumber, 10), - strconv.FormatUint(nodeID, 10)) - } -} \ No newline at end of file diff --git a/exporter/node.go b/exporter/node.go index 292cc5c145..9e107660f7 100644 --- a/exporter/node.go +++ b/exporter/node.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "github.com/bloxapp/eth2-key-manager/core" + "github.com/bloxapp/ssv/beacon" "github.com/bloxapp/ssv/eth1" "github.com/bloxapp/ssv/exporter/api" "github.com/bloxapp/ssv/exporter/ibft" @@ -15,6 +16,7 @@ import ( "github.com/bloxapp/ssv/storage/basedb" "github.com/bloxapp/ssv/storage/collections" "github.com/bloxapp/ssv/utils/tasks" + "github.com/bloxapp/ssv/validator" validatorstorage "github.com/bloxapp/ssv/validator/storage" "github.com/herumi/bls-eth-go-binary/bls" "github.com/pkg/errors" @@ -23,8 +25,10 @@ import ( ) const ( - mainQueueInterval = 100 * time.Millisecond - readerQueuesInterval = 10 * time.Millisecond + mainQueueInterval = 100 * time.Millisecond + readerQueuesInterval = 10 * time.Millisecond + metaDataReaderQueuesInterval = 5 * time.Second + metaDataBatchSize = 25 ) var ( @@ -45,32 +49,37 @@ type Options struct { ETHNetwork *core.Network Eth1Client eth1.Client + Beacon beacon.Beacon Network network.Network DB basedb.IDb - WS api.WebSocketServer - WsAPIPort int - IbftSyncEnabled bool - CleanRegistryData bool + WS api.WebSocketServer + WsAPIPort int + IbftSyncEnabled bool + CleanRegistryData bool + ValidatorMetaDataUpdateInterval time.Duration } // exporter is the internal implementation of Exporter interface type exporter struct { - ctx context.Context - storage storage.Storage - validatorStorage validatorstorage.ICollection - ibftStorage collections.Iibft - logger *zap.Logger - network network.Network - eth1Client eth1.Client - mainQueue tasks.Queue - decidedReadersQueue tasks.Queue - networkReadersQueue tasks.Queue - ws api.WebSocketServer - wsAPIPort int - ibftSyncEnabled bool + ctx context.Context + storage storage.Storage + validatorStorage validatorstorage.ICollection + ibftStorage collections.Iibft + logger *zap.Logger + network network.Network + eth1Client eth1.Client + beacon beacon.Beacon + mainQueue tasks.Queue + decidedReadersQueue tasks.Queue + networkReadersQueue tasks.Queue + metaDataReadersQueue tasks.Queue + ws api.WebSocketServer + wsAPIPort int + ibftSyncEnabled bool + validatorMetaDataUpdateInterval time.Duration } // New creates a new Exporter instance @@ -86,15 +95,18 @@ func New(opts Options) Exporter { Logger: opts.Logger, }, ), - logger: opts.Logger.With(zap.String("component", "exporter/node")), - network: opts.Network, - eth1Client: opts.Eth1Client, - mainQueue: tasks.NewExecutionQueue(mainQueueInterval), - decidedReadersQueue: tasks.NewExecutionQueue(readerQueuesInterval), - networkReadersQueue: tasks.NewExecutionQueue(readerQueuesInterval), - ws: opts.WS, - wsAPIPort: opts.WsAPIPort, - ibftSyncEnabled: opts.IbftSyncEnabled, + logger: opts.Logger.With(zap.String("component", "exporter/node")), + network: opts.Network, + eth1Client: opts.Eth1Client, + beacon: opts.Beacon, + mainQueue: tasks.NewExecutionQueue(mainQueueInterval), + decidedReadersQueue: tasks.NewExecutionQueue(readerQueuesInterval), + networkReadersQueue: tasks.NewExecutionQueue(readerQueuesInterval), + metaDataReadersQueue: tasks.NewExecutionQueue(metaDataReaderQueuesInterval), + ws: opts.WS, + wsAPIPort: opts.WsAPIPort, + ibftSyncEnabled: opts.IbftSyncEnabled, + validatorMetaDataUpdateInterval: opts.ValidatorMetaDataUpdateInterval, } if err := e.init(opts); err != nil { @@ -121,9 +133,15 @@ func (exp *exporter) init(opts Options) error { func (exp *exporter) Start() error { exp.logger.Info("starting node") + if err := exp.warmupValidatorsMetaData(); err != nil { + exp.logger.Error("failed to warmup validators metadata", zap.Error(err)) + } + go exp.continuouslyUpdateValidatorMetaData() + go exp.mainQueue.Start() go exp.decidedReadersQueue.Start() go exp.networkReadersQueue.Start() + go exp.metaDataReadersQueue.Start() if exp.ws == nil { return nil @@ -154,6 +172,9 @@ func (exp *exporter) healthAgents() []metrics.HealthCheckAgent { if agent, ok := exp.eth1Client.(metrics.HealthCheckAgent); ok { agents = append(agents, agent) } + if agent, ok := exp.beacon.(metrics.HealthCheckAgent); ok { + agents = append(agents, agent) + } return agents } @@ -268,8 +289,12 @@ func (exp *exporter) triggerValidator(validatorPubKey *bls.PublicKey) error { func (exp *exporter) setup(validatorShare *validatorstorage.Share) error { pubKey := validatorShare.PublicKey.SerializeToHexStr() logger := exp.logger.With(zap.String("pubKey", pubKey)) + // report validator status upon setup, doing it with defer as sync might fail + defer func() { + validator.ReportValidatorStatus(pubKey, validatorShare.Metadata, exp.logger) + }() decidedReader := exp.getDecidedReader(validatorShare) - if err := tasks.Retry(func() error { + if err := tasks.Retry(func() error { if err := decidedReader.Sync(); err != nil { logger.Error("could not sync validator", zap.Error(err)) return err @@ -304,3 +329,11 @@ func (exp *exporter) getNetworkReader(validatorPubKey *bls.PublicKey) ibft.Reade PK: validatorPubKey, }) } + +func (exp *exporter) updateMetadataTask(pks [][]byte) func() error { + return func() error { + return beacon.UpdateValidatorsMetadata(pks, exp.storage, exp.beacon, func(pk string, meta *beacon.ValidatorMetadata) { + validator.ReportValidatorStatus(pk, meta, exp.logger) + }) + } +} diff --git a/exporter/storage/storage.go b/exporter/storage/storage.go index eecfe9a919..58283f79f3 100644 --- a/exporter/storage/storage.go +++ b/exporter/storage/storage.go @@ -5,6 +5,7 @@ import ( "github.com/bloxapp/ssv/storage/basedb" "go.uber.org/zap" "math" + "sync" ) var ( @@ -23,11 +24,17 @@ type Storage interface { type exporterStorage struct { db basedb.IDb logger *zap.Logger + + validatorsLock sync.RWMutex } // NewExporterStorage creates a new instance of Storage func NewExporterStorage(db basedb.IDb, logger *zap.Logger) Storage { - es := exporterStorage{db, logger.With(zap.String("component", "exporter/storage"))} + es := exporterStorage{ + db: db, + logger: logger.With(zap.String("component", "exporter/storage")), + validatorsLock: sync.RWMutex{}, + } return &es } diff --git a/exporter/storage/validators.go b/exporter/storage/validators.go index c037f29eb3..ee1a33520b 100644 --- a/exporter/storage/validators.go +++ b/exporter/storage/validators.go @@ -3,6 +3,7 @@ package storage import ( "bytes" "encoding/json" + "github.com/bloxapp/ssv/beacon" "github.com/pkg/errors" "go.uber.org/zap" ) @@ -13,13 +14,16 @@ var ( // ValidatorInformation represents a validator type ValidatorInformation struct { - Index int64 `json:"index"` - PublicKey string `json:"publicKey"` - Operators []OperatorNodeLink `json:"operators"` + Index int64 `json:"index"` + PublicKey string `json:"publicKey"` + Metadata *beacon.ValidatorMetadata `json:"metadata"` + Operators []OperatorNodeLink `json:"operators"` } // ValidatorsCollection is the interface for managing validators information type ValidatorsCollection interface { + beacon.ValidatorMetadataStorage + GetValidatorInformation(validatorPubKey string) (*ValidatorInformation, bool, error) SaveValidatorInformation(validatorInformation *ValidatorInformation) error ListValidators(from int64, to int64) ([]ValidatorInformation, error) @@ -34,6 +38,9 @@ type OperatorNodeLink struct { // ListValidators returns information of all the known validators // when 'to' equals zero, all validators will be returned func (es *exporterStorage) ListValidators(from int64, to int64) ([]ValidatorInformation, error) { + es.validatorsLock.RLock() + defer es.validatorsLock.RUnlock() + objs, err := es.db.GetAllByCollection(append(storagePrefix, validatorsPrefix...)) if err != nil { return nil, err @@ -52,6 +59,9 @@ func (es *exporterStorage) ListValidators(from int64, to int64) ([]ValidatorInfo // GetValidatorInformation returns information of the given validator by public key func (es *exporterStorage) GetValidatorInformation(validatorPubKey string) (*ValidatorInformation, bool, error) { + es.validatorsLock.RLock() + defer es.validatorsLock.RUnlock() + obj, found, err := es.db.Get(storagePrefix, validatorKey(validatorPubKey)) if !found { return nil, found, nil @@ -70,6 +80,11 @@ func (es *exporterStorage) SaveValidatorInformation(validatorInformation *Valida if err != nil { return errors.Wrap(err, "could not read information from DB") } + + // lock for writing + es.validatorsLock.Lock() + defer es.validatorsLock.Unlock() + if found { es.logger.Debug("validator already exist", zap.String("pubKey", validatorInformation.PublicKey)) @@ -81,11 +96,34 @@ func (es *exporterStorage) SaveValidatorInformation(validatorInformation *Valida if err != nil { return errors.Wrap(err, "could not calculate next validator index") } - raw, err := json.Marshal(validatorInformation) + return es.saveValidatorNotSafe(validatorInformation) +} + +func (es *exporterStorage) UpdateValidatorMetadata(pk string, metadata *beacon.ValidatorMetadata) error { + // find validator + info, found, err := es.GetValidatorInformation(pk) + if err != nil { + return errors.Wrap(err, "could not read information from DB") + } + if !found { + return errors.New("validator not found") + } + + // lock for writing + es.validatorsLock.Lock() + defer es.validatorsLock.Unlock() + + info.Metadata = metadata + // save + return es.saveValidatorNotSafe(info) +} + +func (es *exporterStorage) saveValidatorNotSafe(val *ValidatorInformation) error { + raw, err := json.Marshal(val) if err != nil { return errors.Wrap(err, "could not marshal validator information") } - return es.db.Set(storagePrefix, validatorKey(validatorInformation.PublicKey), raw) + return es.db.Set(storagePrefix, validatorKey(val.PublicKey), raw) } func validatorKey(pubKey string) []byte { diff --git a/exporter/storage/validators_test.go b/exporter/storage/validators_test.go index e4e5fd625c..3385508c3d 100644 --- a/exporter/storage/validators_test.go +++ b/exporter/storage/validators_test.go @@ -2,6 +2,9 @@ package storage import ( "encoding/hex" + v1 "github.com/attestantio/go-eth2-client/api/v1" + spec "github.com/attestantio/go-eth2-client/spec/phase0" + "github.com/bloxapp/ssv/beacon" "github.com/bloxapp/ssv/utils/rsaencryption" "github.com/stretchr/testify/require" "testing" @@ -119,3 +122,38 @@ func TestStorage_ListValidators(t *testing.T) { require.NoError(t, err) require.Equal(t, 5, len(validators)) } + +func TestStorage_UpdateValidator(t *testing.T) { + storage, done := newStorageForTest() + require.NotNil(t, storage) + defer done() + + pk, _, err := rsaencryption.GenerateKeys() + require.NoError(t, err) + validator := ValidatorInformation{ + PublicKey: hex.EncodeToString(pk), + Operators: []OperatorNodeLink{}, + Metadata: &beacon.ValidatorMetadata{ + Status: v1.ValidatorStateUnknown, + Balance: 10000, + Index: spec.ValidatorIndex(12), + }, + } + err = storage.SaveValidatorInformation(&validator) + require.NoError(t, err) + + err = storage.UpdateValidatorMetadata(validator.PublicKey, &beacon.ValidatorMetadata{ + Status: v1.ValidatorStateExitedSlashed, + Balance: 1000001, + Index: spec.ValidatorIndex(1), + }) + require.NoError(t, err) + + // get + gotVal, _, err := storage.GetValidatorInformation(hex.EncodeToString(pk)) + require.NoError(t, err) + require.Len(t, gotVal.Operators, 0) + require.EqualValues(t, 7, gotVal.Metadata.Status) + require.EqualValues(t, 1000001, gotVal.Metadata.Balance) + require.EqualValues(t, 1, gotVal.Metadata.Index) +} diff --git a/exporter/validators_metadata.go b/exporter/validators_metadata.go new file mode 100644 index 0000000000..6d438f36a4 --- /dev/null +++ b/exporter/validators_metadata.go @@ -0,0 +1,54 @@ +package exporter + +import ( + "fmt" + validatorstorage "github.com/bloxapp/ssv/validator/storage" + "go.uber.org/zap" + "math" + "time" +) + +func (exp *exporter) continuouslyUpdateValidatorMetaData() { + for { + time.Sleep(exp.validatorMetaDataUpdateInterval) + + shares, err := exp.validatorStorage.GetAllValidatorsShare() + if err != nil { + exp.logger.Error("could not get validators shares for metadata update", zap.Error(err)) + continue + } + + exp.updateValidatorsMetadata(shares, metaDataBatchSize) + } +} + +func (exp *exporter) warmupValidatorsMetaData() error { + shares, err := exp.validatorStorage.GetAllValidatorsShare() + if err != nil { + exp.logger.Error("could not get validators shares for metadata update", zap.Error(err)) + return err + } + exp.updateValidatorsMetadata(shares, 100) + return err +} + +func (exp *exporter) updateValidatorsMetadata(shares []*validatorstorage.Share, batchSize int) { + batches := int(math.Ceil(float64(len(shares)) / float64(batchSize))) + start := 0 + end := batchSize + + for i := 0; i <= batches; i++ { + // collect pks + batch := make([][]byte, 0) + for j := start; j < end; j++ { + share := shares[j] + batch = append(batch, share.PublicKey.Serialize()) + } + // run task + exp.metaDataReadersQueue.QueueDistinct(exp.updateMetadataTask(batch), fmt.Sprintf("batch_%d", i)) + + // reset start and end + start = end + end = int(math.Min(float64(len(shares)), float64(start + batchSize))) + } +} \ No newline at end of file diff --git a/ibft/ibft_decided.go b/ibft/ibft_decided.go index 8b3bc6afa7..15ee0687d9 100644 --- a/ibft/ibft_decided.go +++ b/ibft/ibft_decided.go @@ -61,7 +61,7 @@ func (i *ibftImpl) ProcessDecidedMessage(msg *proto.SignedMessage) { return } - reportDecided(msg, i.ValidatorShare) + ReportDecided(i.ValidatorShare.PublicKey.SerializeToHexStr(), msg) // decided for current instance if i.forceDecideCurrentInstance(msg) { diff --git a/ibft/metrics.go b/ibft/metrics.go index cd2fa27b6f..c45b3e4fbe 100644 --- a/ibft/metrics.go +++ b/ibft/metrics.go @@ -2,7 +2,6 @@ package ibft import ( "github.com/bloxapp/ssv/ibft/proto" - "github.com/bloxapp/ssv/validator/storage" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "log" @@ -43,11 +42,11 @@ func init() { } } -func reportDecided(msg *proto.SignedMessage, share *storage.Share) { +// ReportDecided reports on a decided message +func ReportDecided(pk string, msg *proto.SignedMessage) { for _, nodeID := range msg.SignerIds { metricsDecidedSigners.WithLabelValues( - string(msg.Message.GetLambda()), - share.PublicKey.SerializeToHexStr(), + string(msg.Message.GetLambda()), pk, strconv.FormatUint(msg.Message.SeqNumber, 10), strconv.FormatUint(nodeID, 10)).Set(1) } diff --git a/monitoring/grafana/dashboard_ssv_operator.json b/monitoring/grafana/dashboard_ssv_operator.json index b66b693cfb..64f9c48899 100644 --- a/monitoring/grafana/dashboard_ssv_operator.json +++ b/monitoring/grafana/dashboard_ssv_operator.json @@ -366,7 +366,7 @@ "options": { "ssv": { "index": 0, - "text": "https://explorer.stage.ssv.network/images/beaconcha-white.svg" + "text": "Open" } }, "type": "value" @@ -376,7 +376,7 @@ "match": "null+nan", "result": { "index": 1, - "text": "https://explorer.stage.ssv.network/images/beaconcha-white.svg" + "text": "Open" } }, "type": "special" @@ -385,7 +385,7 @@ }, { "id": "custom.displayMode", - "value": "image" + "value": "auto" }, { "id": "custom.width", @@ -393,6 +393,10 @@ }, { "id": "custom.filterable" + }, + { + "id": "custom.align", + "value": "center" } ] }, @@ -445,7 +449,7 @@ "options": { "ssv": { "index": 0, - "text": "https://explorer.stage.ssv.network/images/beaconcha-white.svg" + "text": "Open" } }, "type": "value" @@ -455,7 +459,7 @@ "match": "null+nan", "result": { "index": 1, - "text": "https://explorer.stage.ssv.network/images/beaconcha-white.svg" + "text": "Open" } }, "type": "special" @@ -474,14 +478,18 @@ }, { "id": "custom.width", - "value": 84 + "value": 86 }, { "id": "custom.filterable" }, { "id": "custom.displayMode", - "value": "image" + "value": "auto" + }, + { + "id": "custom.align", + "value": "center" } ] }, @@ -515,6 +523,26 @@ "color": "green", "index": 3, "text": "Ready" + }, + "4": { + "color": "grey", + "index": 4, + "text": "Not Activated" + }, + "5": { + "color": "gray", + "index": 5, + "text": "Exited" + }, + "6": { + "color": "red", + "index": 6, + "text": "Slashed" + }, + "7": { + "color": "red", + "index": 7, + "text": "Not Found" } }, "type": "value" diff --git a/monitoring/grafana/dashboard_ssv_validator.json b/monitoring/grafana/dashboard_ssv_validator.json index 5eeb167bf4..2cd39c9e88 100644 --- a/monitoring/grafana/dashboard_ssv_validator.json +++ b/monitoring/grafana/dashboard_ssv_validator.json @@ -72,7 +72,7 @@ { "options": { "0": { - "color": "red", + "color": "grey", "index": 0, "text": "Inactive" }, @@ -90,6 +90,26 @@ "color": "green", "index": 3, "text": "Ready" + }, + "4": { + "color": "gray", + "index": 4, + "text": "Not Activated" + }, + "5": { + "color": "gray", + "index": 5, + "text": "Exited" + }, + "6": { + "color": "red", + "index": 6, + "text": "Slashed" + }, + "7": { + "color": "red", + "index": 7, + "text": "Not Found" } }, "type": "value" @@ -150,6 +170,7 @@ "mode": "thresholds" }, "mappings": [], + "noValue": "No Peers", "thresholds": { "mode": "absolute", "steps": [ @@ -1214,12 +1235,17 @@ "mode": "thresholds" }, "mappings": [], + "noValue": "N/A", "thresholds": { "mode": "absolute", "steps": [ { - "color": "green", + "color": "grey", "value": null + }, + { + "color": "green", + "value": 1 } ] } @@ -1277,6 +1303,7 @@ "text": "IDLE" }, "1": { + "color": "green", "index": 0, "text": "Running" } @@ -1285,11 +1312,12 @@ } ], "max": 1, + "noValue": "N/A", "thresholds": { "mode": "absolute", "steps": [ { - "color": "green", + "color": "grey", "value": null } ] @@ -1382,11 +1410,12 @@ "type": "value" } ], + "noValue": "N/A", "thresholds": { "mode": "absolute", "steps": [ { - "color": "green", + "color": "grey", "value": null } ] @@ -1437,12 +1466,17 @@ "mode": "thresholds" }, "mappings": [], + "noValue": "N/A", "thresholds": { "mode": "absolute", "steps": [ { - "color": "green", + "color": "grey", "value": null + }, + { + "color": "green", + "value": 1 } ] } @@ -1492,12 +1526,17 @@ "mode": "thresholds" }, "mappings": [], + "noValue": "N/A", "thresholds": { "mode": "absolute", "steps": [ { - "color": "green", + "color": "grey", "value": null + }, + { + "color": "green", + "value": 1 } ] } @@ -1547,13 +1586,18 @@ "mode": "thresholds" }, "mappings": [], + "noValue": "N/A", "thresholds": { "mode": "absolute", "steps": [ { - "color": "green", + "color": "grey", "value": null }, + { + "color": "green", + "value": 1 + }, { "color": "#EAB839", "value": 4 diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 30b25e9f27..0a9a9654f6 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -59,6 +59,8 @@ type p2pNetwork struct { peers *peers.Status host p2pHost.Host pubsub *pubsub.PubSub + + psTopicsLock *sync.RWMutex } // New is the constructor of p2pNetworker @@ -72,6 +74,7 @@ func New(ctx context.Context, logger *zap.Logger, cfg *Config) (network.Network, ctx: ctx, cfg: cfg, listenersLock: &sync.Mutex{}, + psTopicsLock: &sync.RWMutex{}, logger: logger, } @@ -205,6 +208,9 @@ func (n *p2pNetwork) setupGossipPubsub(cfg *Config) (*pubsub.PubSub, error) { func (n *p2pNetwork) watchTopicPeers() { runutil.RunEvery(n.ctx, 1*time.Minute, func() { + n.psTopicsLock.RLock() + defer n.psTopicsLock.RUnlock() + for name, topic := range n.cfg.Topics { peers := n.allPeersOfTopic(topic) n.logger.Debug("topic peers status", zap.String("topic", name), zap.Any("peers", peers)) @@ -214,6 +220,9 @@ func (n *p2pNetwork) watchTopicPeers() { } func (n *p2pNetwork) SubscribeToValidatorNetwork(validatorPk *bls.PublicKey) error { + n.psTopicsLock.Lock() + defer n.psTopicsLock.Unlock() + topic, err := n.pubsub.Join(getTopicName(validatorPk.SerializeToHexStr())) if err != nil { return errors.Wrap(err, "failed to join to Topics") @@ -232,10 +241,26 @@ func (n *p2pNetwork) SubscribeToValidatorNetwork(validatorPk *bls.PublicKey) err // IsSubscribeToValidatorNetwork checks if there is a subscription to the validator topic func (n *p2pNetwork) IsSubscribeToValidatorNetwork(validatorPk *bls.PublicKey) bool { + n.psTopicsLock.RLock() + defer n.psTopicsLock.RUnlock() + _, ok := n.cfg.Topics[validatorPk.SerializeToHexStr()] return ok } +// closeTopic closes the given topic +func (n *p2pNetwork) closeTopic(topicName string) error { + n.psTopicsLock.RLock() + defer n.psTopicsLock.RUnlock() + + pk := unwrapTopicName(topicName) + if t, ok := n.cfg.Topics[pk]; ok { + delete(n.cfg.Topics, pk) + return t.Close() + } + return nil +} + // listen listens to some validator's topic func (n *p2pNetwork) listen(sub *pubsub.Subscription) { t := sub.Topic() @@ -243,8 +268,8 @@ func (n *p2pNetwork) listen(sub *pubsub.Subscription) { for { select { case <-n.ctx.Done(): - if err := n.cfg.Topics[t].Close(); err != nil { - n.logger.Error("failed to close Topics", zap.Error(err)) + if err := n.closeTopic(t); err != nil { + n.logger.Error("failed to close topic", zap.String("topic", t), zap.Error(err)) } sub.Cancel() default: @@ -299,6 +324,9 @@ func (n *p2pNetwork) propagateSignedMsg(cm *network.Message) { // getTopic return topic by validator public key func (n *p2pNetwork) getTopic(validatorPK []byte) (*pubsub.Topic, error) { + n.psTopicsLock.RLock() + defer n.psTopicsLock.RUnlock() + if validatorPK == nil { return nil, errors.New("ValidatorPk is nil") } diff --git a/operator/node.go b/operator/node.go index a94dba3725..1f8d2b92c8 100644 --- a/operator/node.go +++ b/operator/node.go @@ -22,7 +22,7 @@ type Node interface { // Options contains options to create the node type Options struct { ETHNetwork *core.Network - Beacon *beacon.Beacon + Beacon beacon.Beacon Context context.Context Logger *zap.Logger Eth1Client eth1.Client @@ -54,14 +54,14 @@ func New(opts Options) Node { logger: opts.Logger.With(zap.String("component", "operatorNode")), validatorController: opts.ValidatorController, ethNetwork: *opts.ETHNetwork, - beacon: *opts.Beacon, + beacon: opts.Beacon, eth1Client: opts.Eth1Client, storage: NewOperatorNodeStorage(opts.DB, opts.Logger), dutyCtrl: duties.NewDutyController(&duties.ControllerOptions{ Logger: opts.Logger, Ctx: opts.Context, - BeaconClient: *opts.Beacon, + BeaconClient: opts.Beacon, EthNetwork: *opts.ETHNetwork, ValidatorController: opts.ValidatorController, GenesisEpoch: opts.GenesisEpoch, @@ -135,4 +135,3 @@ func (n *operatorNode) healthAgents() []metrics.HealthCheckAgent { } return agents } - diff --git a/storage/kv/badger.go b/storage/kv/badger.go index 37726ffbf4..d9c9d13180 100644 --- a/storage/kv/badger.go +++ b/storage/kv/badger.go @@ -69,7 +69,7 @@ func (b *BadgerDb) Get(prefix []byte, key []byte) (basedb.Obj, bool, error) { return err }) found := err == nil || err.Error() != EntryNotFoundError - if !found{ + if !found { return basedb.Obj{}, found, nil } return basedb.Obj{ @@ -81,29 +81,14 @@ func (b *BadgerDb) Get(prefix []byte, key []byte) (basedb.Obj, bool, error) { // GetAllByCollection return all array of Obj for all keys under specified prefix(bucket) func (b *BadgerDb) GetAllByCollection(prefix []byte) ([]basedb.Obj, error) { var res []basedb.Obj - var err error - err = b.db.View(func(txn *badger.Txn) error { - opt := badger.DefaultIteratorOptions - opt.PrefetchSize = 1000 // if the number of items is larger than this size, results get mixed up - opt.Prefix = prefix - it := txn.NewIterator(opt) - defer it.Close() - for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { - item := it.Item() - resKey := item.Key() - trimmedResKey := bytes.TrimPrefix(resKey, prefix) - val, err := item.ValueCopy(nil) - if err != nil { - b.logger.Error("failed to copy value", zap.Error(err)) - continue - } - obj := basedb.Obj{ - Key: trimmedResKey, - Value: val, - } - res = append(res, obj) - } - return err + + // we got issues when reading more than 100 items with iterator (items get mixed up) + // instead, the keys are first fetched using an iterator, and afterwards the values are fetched one by one + // to avoid issues + err := b.db.View(func(txn *badger.Txn) error { + rawKeys := b.listRawKeys(prefix, txn) + res = b.getAll(rawKeys, prefix, txn) + return nil }) return res, err } @@ -135,3 +120,44 @@ func (b *BadgerDb) Close() { b.logger.Fatal("failed to close db", zap.Error(err)) } } + +func (b *BadgerDb) getAll(rawKeys [][]byte, prefix []byte, txn *badger.Txn) []basedb.Obj { + var res []basedb.Obj + + for _, k := range rawKeys { + trimmedResKey := bytes.TrimPrefix(k, prefix) + item, err := txn.Get(k) + if err != nil { + b.logger.Error("failed to get value", zap.Error(err), + zap.String("trimmedResKey", string(trimmedResKey))) + continue + } + val, err := item.ValueCopy(nil) + if err != nil { + b.logger.Error("failed to copy value", zap.Error(err)) + continue + } + obj := basedb.Obj{ + Key: trimmedResKey, + Value: val, + } + res = append(res, obj) + } + + return res +} + +func (b *BadgerDb) listRawKeys(prefix []byte, txn *badger.Txn) [][]byte { + var keys [][]byte + + opt := badger.DefaultIteratorOptions + opt.Prefix = prefix + it := txn.NewIterator(opt) + defer it.Close() + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + item := it.Item() + keys = append(keys, item.KeyCopy(nil)) + } + + return keys +} diff --git a/storage/kv/badger_test.go b/storage/kv/badger_test.go index 25f77708d8..c5f837dc56 100644 --- a/storage/kv/badger_test.go +++ b/storage/kv/badger_test.go @@ -1,10 +1,12 @@ package kv import ( + "fmt" "github.com/bloxapp/ssv/storage/basedb" "github.com/stretchr/testify/require" "go.uber.org/zap" "testing" + "time" ) func TestBadgerEndToEnd(t *testing.T) { @@ -59,3 +61,55 @@ func TestBadgerEndToEnd(t *testing.T) { require.EqualValues(t, toSave[2].key, obj.Key) require.EqualValues(t, toSave[2].value, obj.Value) } + +func TestBadgerDb_GetAllByCollection(t *testing.T) { + options := basedb.Options{ + Type: "badger-memory", + Logger: zap.L(), + Path: "", + } + + t.Run("100_items", func(t *testing.T) { + db, err := New(options) + require.NoError(t, err) + defer db.Close() + + getAllByCollectionTest(t, 100, db) + }) + + t.Run("10K_items", func(t *testing.T) { + db, err := New(options) + require.NoError(t, err) + defer db.Close() + + getAllByCollectionTest(t, 10000, db) + }) + + t.Run("100K_items", func(t *testing.T) { + db, err := New(options) + require.NoError(t, err) + defer db.Close() + + getAllByCollectionTest(t, 100000, db) + }) +} + +func getAllByCollectionTest(t *testing.T, n int, db basedb.IDb) { + // populating DB + prefix := []byte("test") + for i := 0; i < n; i++ { + id := fmt.Sprintf("test-%d", i) + db.Set(prefix, []byte(id), []byte(id+"-data")) + } + time.Sleep(1 * time.Millisecond) + + all, err := db.GetAllByCollection(prefix) + require.Equal(t, n, len(all)) + require.NoError(t, err) + visited := map[string][]byte{} + for _, item := range all { + visited[string(item.Key[:])] = item.Value[:] + } + require.Equal(t, n, len(visited)) + require.NoError(t, db.RemoveAllByCollection(prefix)) +} diff --git a/utils/tasks/exec_queue.go b/utils/tasks/exec_queue.go index 0ecaea4673..5576cf3b1e 100644 --- a/utils/tasks/exec_queue.go +++ b/utils/tasks/exec_queue.go @@ -84,7 +84,11 @@ func (eq *executionQueue) Start() { // QueueDistinct adds unique events to the queue func (eq *executionQueue) QueueDistinct(fn Fn, id string) { if _, exist := eq.visited.Load(id); !exist { - eq.Queue(fn) + eq.Queue(func() error { + err := fn() + eq.visited.Delete(id) + return err + }) eq.visited.Store(id, true) } } diff --git a/utils/tasks/exec_queue_test.go b/utils/tasks/exec_queue_test.go index 50859d8f89..6e2d774ea5 100644 --- a/utils/tasks/exec_queue_test.go +++ b/utils/tasks/exec_queue_test.go @@ -21,13 +21,6 @@ func TestExecQueue(t *testing.T) { atomic.AddInt64(&i, 1) return nil }) - } - }() - - go func() { - count := 100 - for count > 0 { - count-- q.Queue(func() error { atomic.AddInt64(&i, -1) return nil @@ -42,7 +35,7 @@ func TestExecQueue(t *testing.T) { q.Wait() require.Equal(t, int64(1), atomic.LoadInt64(&i)) require.Equal(t, 0, len(q.(*executionQueue).getWaiting())) - require.Equal(t, 0, len(q.(*executionQueue).errs)) + require.Equal(t, 0, len(q.Errors())) } func TestExecQueue_Stop(t *testing.T) { @@ -72,6 +65,30 @@ func TestExecQueue_Stop(t *testing.T) { require.Equal(t, int64(1), atomic.LoadInt64(&i)) } +func TestExecQueue_QueueDistinct(t *testing.T) { + var i int64 + q := NewExecutionQueue(2 * time.Millisecond) + + inc := func() error { + atomic.AddInt64(&i, 1) + return nil + } + q.QueueDistinct(inc, "1") + q.QueueDistinct(inc, "1") + q.QueueDistinct(inc, "1") + require.Equal(t, 1, len(q.(*executionQueue).getWaiting())) + go q.Start() + defer q.Stop() + // waiting for function to execute + time.Sleep(4 * time.Millisecond) + require.Equal(t, 0, len(q.(*executionQueue).getWaiting())) + q.QueueDistinct(inc, "1") + q.QueueDistinct(inc, "1") + q.QueueDistinct(inc, "1") + require.Equal(t, 1, len(q.(*executionQueue).getWaiting())) + +} + func TestExecQueue_Empty(t *testing.T) { q := NewExecutionQueue(1 * time.Millisecond) diff --git a/validator/controller.go b/validator/controller.go index 589740f76f..b2b5598050 100644 --- a/validator/controller.go +++ b/validator/controller.go @@ -3,14 +3,12 @@ package validator import ( "context" "encoding/hex" - "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/bloxapp/eth2-key-manager/core" "github.com/bloxapp/ssv/beacon" "github.com/bloxapp/ssv/eth1" "github.com/bloxapp/ssv/network" "github.com/bloxapp/ssv/pubsub" "github.com/bloxapp/ssv/storage/basedb" - "github.com/bloxapp/ssv/utils/rsaencryption" validatorstorage "github.com/bloxapp/ssv/validator/storage" "github.com/pkg/errors" "go.uber.org/zap" @@ -19,11 +17,7 @@ import ( spec "github.com/attestantio/go-eth2-client/spec/phase0" ) -var ( - errIndicesNotFound = errors.New("indices not found") -) - -// ControllerOptions for controller struct creation +// ControllerOptions for creating a validator controller type ControllerOptions struct { Context context.Context DB basedb.IDb @@ -37,7 +31,8 @@ type ControllerOptions struct { CleanRegistryData bool } -// IController interface +// IController represent the validators controller, +// it takes care of bootstrapping, updating and managing existing validators and their shares type IController interface { ListenToEth1Events(cn pubsub.SubjectChannel) ProcessEth1Event(e eth1.Event) error @@ -46,7 +41,7 @@ type IController interface { GetValidator(pubKey string) (*Validator, bool) } -// Controller struct that manages all validator shares +// controller implements IController type controller struct { context context.Context collection validatorstorage.ICollection @@ -58,7 +53,7 @@ type controller struct { validatorsMap *validatorsMap } -// NewController creates new validator controller +// NewController creates a new validator controller instance func NewController(options ControllerOptions) IController { collection := validatorstorage.NewCollection(validatorstorage.CollectionOptions{ DB: options.DB, @@ -101,7 +96,7 @@ func (c *controller) ListenToEth1Events(cn pubsub.SubjectChannel) { } } -// ProcessEth1Event handles a single event +// ProcessEth1Event handles a single event, will be called in both sync and stream events from registry contract func (c *controller) ProcessEth1Event(e eth1.Event) error { if validatorAddedEvent, ok := e.Data.(eth1.ValidatorAddedEvent); ok { pubKey := hex.EncodeToString(validatorAddedEvent.PublicKey) @@ -114,7 +109,7 @@ func (c *controller) ProcessEth1Event(e eth1.Event) error { return nil } -// initShares initializes shares +// initShares initializes shares, should be called upon creation of controller func (c *controller) initShares(options ControllerOptions) error { if options.CleanRegistryData { if err := c.collection.CleanAllShares(); err != nil { @@ -129,7 +124,7 @@ func (c *controller) initShares(options ControllerOptions) error { return nil } -// StartValidators functions (queue streaming, msgQueue listen, etc) +// StartValidators loads all persisted shares nd setup the corresponding validators func (c *controller) StartValidators() { shares, err := c.collection.GetAllValidatorsShare() if err != nil { @@ -142,56 +137,89 @@ func (c *controller) StartValidators() { c.setupValidators(shares) } -// setupValidators starts all validators +// setupValidators setup and starts validators from the given shares +// shares w/o validator's metadata won't start, but the metadata will be fetched and the validator will start afterwards func (c *controller) setupValidators(shares []*validatorstorage.Share) { c.logger.Info("starting validators setup...", zap.Int("shares count", len(shares))) + var started int var errs []error + var fetchMetadata [][]byte for _, validatorShare := range shares { v := c.validatorsMap.GetOrCreateValidator(validatorShare) pk := v.Share.PublicKey.SerializeToHexStr() logger := c.logger.With(zap.String("pubkey", pk)) - if v.Share.Index == nil { - if err := c.addValidatorIndex(v.Share); err != nil { - if err == errIndicesNotFound { - logger.Warn("could not start validator: missing index") - } else { - logger.Error("could not start validator: could not add index", zap.Error(err)) - } - metricsValidatorStatus.WithLabelValues(pk).Set(float64(validatorStatusNoIndex)) - errs = append(errs, err) - continue - } - logger.Debug("updated index for validator", zap.Uint64("index", *v.Share.Index)) + if !v.Share.HasMetadata() { // fetching index and status in case not exist + fetchMetadata = append(fetchMetadata, v.Share.PublicKey.Serialize()) + logger.Warn("could not start validator as metadata not found") + continue } - if err := v.Start(); err != nil { - logger.Error("could not start validator", zap.Error(err)) - metricsValidatorStatus.WithLabelValues(pk).Set(float64(validatorStatusError)) + if err := c.startValidator(v); err != nil { + logger.Warn("could not start validator", zap.Error(err)) errs = append(errs, err) - continue + } else { + started++ } } c.logger.Info("setup validators done", zap.Int("map size", c.validatorsMap.Size()), - zap.Int("failures", len(errs)), zap.Int("shares count", len(shares))) + zap.Int("failures", len(errs)), zap.Int("missing metadata", len(fetchMetadata)), + zap.Int("shares count", len(shares)), zap.Int("started", started)) + + go c.updateValidatorsMetadata(fetchMetadata) } -// GetValidator returns a validator +// updateValidatorsMetadata updates metadata of the given public keys. +// as part of the flow in beacon.UpdateValidatorsMetadata, +// UpdateValidatorMetadata is called to persist metadata and start a specific validator +func (c *controller) updateValidatorsMetadata(pubKeys [][]byte) { + if len(pubKeys) > 0 { + c.logger.Debug("updating validators", zap.Int("count", len(pubKeys))) + if err := beacon.UpdateValidatorsMetadata(pubKeys, c, c.beacon, c.onMetadataUpdated); err != nil { + c.logger.Error("could not update all validators", zap.Error(err)) + } + // once update is done -> update statuses + for _, pk := range pubKeys { + pkHex := hex.EncodeToString(pk) + if v, exist := c.GetValidator(pkHex); exist { + ReportValidatorStatus(pkHex, v.Share.Metadata, c.logger) + } + } + } +} + +// UpdateValidatorMetadata updates a given validator with metadata (implements ValidatorMetadataStorage) +func (c *controller) UpdateValidatorMetadata(pk string, metadata *beacon.ValidatorMetadata) error { + if metadata == nil { + return errors.New("could not update empty metadata") + } + if v, found := c.validatorsMap.GetValidator(pk); found { + v.Share.Metadata = metadata + if err := c.collection.SaveValidatorShare(v.Share); err != nil { + return err + } + if err := c.startValidator(v); err != nil { + c.logger.Error("could not start validator", zap.Error(err)) + } + } + return nil +} + +// GetValidator returns a validator instance from validatorsMap func (c *controller) GetValidator(pubKey string) (*Validator, bool) { return c.validatorsMap.GetValidator(pubKey) } -// GetValidatorsIndices returns a list of all the active validators indices and fetch indices for missing once (could be first time attesting or non active once) +// GetValidatorsIndices returns a list of all the active validators indices +// and fetch indices for missing once (could be first time attesting or non active once) func (c *controller) GetValidatorsIndices() []spec.ValidatorIndex { var toFetch [][]byte var indices []spec.ValidatorIndex err := c.validatorsMap.ForEach(func(v *Validator) error { - if v.Share.Index == nil { - c.logger.Warn("validator share doesn't have an index", - zap.String("pubKey", v.Share.PublicKey.SerializeToHexStr())) + logger := c.logger.With(zap.String("pubKey", v.Share.PublicKey.SerializeToHexStr())) + if !v.Share.HasMetadata() { + logger.Warn("validator share doesn't have an index") toFetch = append(toFetch, v.Share.PublicKey.Serialize()) - metricsValidatorStatus.WithLabelValues(v.Share.PublicKey.SerializeToHexStr()).Set(float64(validatorStatusNoIndex)) } else { - index := spec.ValidatorIndex(*v.Share.Index) - indices = append(indices, index) + indices = append(indices, v.Share.Metadata.Index) } return nil }) @@ -199,143 +227,87 @@ func (c *controller) GetValidatorsIndices() []spec.ValidatorIndex { c.logger.Error("failed to get all validators public keys", zap.Error(err)) } - if len(toFetch) > 0 { - go c.addValidatorsIndices(toFetch) - } + go c.updateValidatorsMetadata(toFetch) return indices } +// handleValidatorAddedEvent handles registry contract event for validator added func (c *controller) handleValidatorAddedEvent(validatorAddedEvent eth1.ValidatorAddedEvent) error { - pubKey := hex.EncodeToString(validatorAddedEvent.PublicKey) + pubKey := hex.EncodeToString(validatorAddedEvent.PublicKey[:]) logger := c.logger.With(zap.String("pubKey", pubKey)) - logger.Debug("handles validator added event") - // if exist and resync was not forced -> do nothing + // if exist -> do nothing if _, ok := c.validatorsMap.GetValidator(pubKey); ok { logger.Debug("validator was loaded already") // TODO: handle updateValidator in the future return nil } + logger.Debug("new validator, starting setup") metricsValidatorStatus.WithLabelValues(pubKey).Set(float64(validatorStatusInactive)) - validatorShare, err := c.createShare(validatorAddedEvent) - if err != nil { - return errors.Wrap(err, "failed to create share") - } - foundShare, found, err := c.collection.GetValidatorShare(validatorShare.PublicKey.Serialize()) + validatorShare, found, err := c.collection.GetValidatorShare(validatorAddedEvent.PublicKey[:]) if err != nil { return errors.Wrap(err, "could not check if validator share exits") } if !found { - if err := c.addValidatorIndex(validatorShare); err != nil { - metricsValidatorStatus.WithLabelValues(pubKey).Set(float64(validatorStatusNoIndex)) - logger.Warn("could not add validator index", zap.Error(err)) + validatorShare, err = createShareWithOperatorKey(validatorAddedEvent, c.shareEncryptionKeyProvider) + if err != nil { + return errors.Wrap(err, "failed to create share") } - if err := c.collection.SaveValidatorShare(validatorShare); err != nil { - return errors.Wrap(err, "failed to save new share") + if err := c.onNewShare(validatorShare); err != nil { + return err } - logger.Debug("new validator share was saved") - } else { - // TODO: handle updateValidator in the future - validatorShare = foundShare + logger.Debug("new validator share was created and saved") } v := c.validatorsMap.GetOrCreateValidator(validatorShare) - - if v.Share.Index == nil { - logger.Warn("could not start validator without index") - return nil + if err := c.startValidator(v); err != nil { + logger.Warn("could not start validator", zap.Error(err)) } - if err := v.Start(); err != nil { - metricsValidatorStatus.WithLabelValues(pubKey).Set(float64(validatorStatusError)) - return errors.Wrap(err, "could not start validator") - } - - return nil -} - -// addValidatorIndex adding validator's index to the share -func (c *controller) addValidatorIndex(validatorShare *validatorstorage.Share) error { - pubKey := validatorShare.PublicKey.SerializeToHexStr() - indices, err := c.fetchIndices([][]byte{validatorShare.PublicKey.Serialize()}) - if err != nil { - return errors.Wrap(err, "failed to fetch indices") - } - i, ok := indices[pubKey] - if !ok { - return errIndicesNotFound - } - index := i - validatorShare.Index = &index return nil } -// addValidatorsIndices adds indices for all given validators. shares are taken from validators map -func (c *controller) addValidatorsIndices(toFetch [][]byte) { - indices , err := c.fetchIndices(toFetch) - if err != nil { - c.logger.Error("failed to fetch indices", zap.Error(err)) - return - } - for pk, i := range indices { - if v, exist := c.validatorsMap.GetValidator(pk); exist { - c.logger.Debug("updating indices for validator", zap.String("pubkey", pk), - zap.Uint64("index", i)) - index := i - v.Share.Index = &index - if err := c.collection.SaveValidatorShare(v.Share); err != nil { - c.logger.Debug("could not save share", zap.String("pubkey", pk), zap.Error(err)) - } - if err := v.Start(); err != nil { - c.logger.Error("could not start validator", zap.String("pubkey", pk), zap.Error(err)) - metricsValidatorStatus.WithLabelValues(pk).Set(float64(validatorStatusError)) - } +// onMetadataUpdated is called when validator's metadata was updated +func (c *controller) onMetadataUpdated(pk string, meta *beacon.ValidatorMetadata) { + if v, exist := c.GetValidator(pk); exist { + if err := c.startValidator(v); err != nil { + c.logger.Error("could not start validator after metadata update", + zap.String("pk", pk), zap.Error(err), zap.Any("metadata", *meta)) } } - c.logger.Debug("updated indices", zap.Any("indices", indices)) } -func (c *controller) fetchIndices(sharesPubKeys [][]byte) (map[string]uint64, error) { - if len(sharesPubKeys) == 0 { - return nil, errors.New("invalid pubkeys. at least one validator is required") - } - var pubkeys []phase0.BLSPubKey - for _, pk := range sharesPubKeys { - blsPubKey := phase0.BLSPubKey{} - copy(blsPubKey[:], pk) - pubkeys = append(pubkeys, blsPubKey) - } - c.logger.Debug("fetching indices for public keys", zap.Int("total", len(pubkeys)), - zap.Any("pubkeys", pubkeys)) - validatorsIndexMap, err := c.beacon.GetIndices(pubkeys) - if err != nil { - return nil, errors.Wrap(err, "failed to get indices from beacon") +// onNewShare is called when a new validator was added or during registry sync +// if the validator was persisted already, this function won't be called +func (c *controller) onNewShare(share *validatorstorage.Share) error { + logger := c.logger.With(zap.String("pubKey", share.PublicKey.SerializeToHexStr())) + if updated, err := updateShareMetadata(share, c.beacon); err != nil { + logger.Warn("could not add validator metadata", zap.Error(err)) + } else if !updated { + logger.Warn("could not find validator metadata") + } else { + logger.Debug("validator metadata was updated", + zap.Uint64("index", uint64(share.Metadata.Index))) + ReportValidatorStatus(share.PublicKey.SerializeToHexStr(), share.Metadata, c.logger) } - indices := map[string]uint64{} - for index, v := range validatorsIndexMap { - pk := hex.EncodeToString(v.Validator.PublicKey[:]) - indices[pk] = uint64(index) - c.beacon.ExtendIndexMap(index, v.Validator.PublicKey) // updating goClient map + if err := c.collection.SaveValidatorShare(share); err != nil { + return errors.Wrap(err, "failed to save new share") } - c.logger.Debug("fetched indices from beacon", zap.Any("indices", indices)) - return indices, nil + return nil } -func (c *controller) createShare(validatorAddedEvent eth1.ValidatorAddedEvent) (*validatorstorage.Share, error) { - operatorPrivKey, found, err := c.shareEncryptionKeyProvider() - if !found { - return nil, errors.New("could not find operator private key") +// startValidator will start the given validator if applicable +func (c *controller) startValidator(v *Validator) error { + ReportValidatorStatus(v.Share.PublicKey.SerializeToHexStr(), v.Share.Metadata, c.logger) + if !v.Share.HasMetadata() { + return errors.New("could not start validator: metadata not found") } - if err != nil { - return nil, errors.Wrap(err, "get operator private key") - } - operatorPubKey, err := rsaencryption.ExtractPublicKey(operatorPrivKey) - if err != nil { - return nil, errors.Wrap(err, "could not extract operator public key") + if v.Share.Metadata.Index == 0 { + return errors.New("could not start validator: index not found") } - validatorShare, err := ShareFromValidatorAddedEvent(validatorAddedEvent, operatorPubKey) - if err != nil { - return nil, errors.Wrap(err, "could not create share from event") + if err := v.Start(); err != nil { + metricsValidatorStatus.WithLabelValues(v.Share.PublicKey.SerializeToHexStr()).Set(float64(validatorStatusError)) + return errors.Wrap(err, "could not start validator") } - return validatorShare, nil + return nil } diff --git a/validator/metrics.go b/validator/metrics.go index 9e407dba1d..4cb54b3562 100644 --- a/validator/metrics.go +++ b/validator/metrics.go @@ -4,6 +4,7 @@ import ( "github.com/bloxapp/ssv/beacon" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "go.uber.org/zap" "log" ) @@ -57,11 +58,45 @@ func (v *Validator) reportDutyExecutionMetrics(duty *beacon.Duty) func() { } } +// ReportValidatorStatusReady reports the ready status of validator +func ReportValidatorStatusReady(pk string) { + metricsValidatorStatus.WithLabelValues(pk).Set(float64(validatorStatusReady)) +} + +// ReportValidatorStatus reports the current status of validator +func ReportValidatorStatus(pk string, meta *beacon.ValidatorMetadata, logger *zap.Logger) { + logger = logger.With(zap.String("pubKey", pk), zap.String("who", "ReportValidatorStatus"), + zap.Any("metadata", meta)) + if meta == nil { + logger.Warn("validator metadata not found") + metricsValidatorStatus.WithLabelValues(pk).Set(float64(validatorStatusNotFound)) + } else if !meta.Activated() { + logger.Warn("validator not activated") + metricsValidatorStatus.WithLabelValues(pk).Set(float64(validatorStatusNotActivated)) + } else if meta.Slashed() { + logger.Warn("validator slashed") + metricsValidatorStatus.WithLabelValues(pk).Set(float64(validatorStatusSlashed)) + } else if meta.Exiting() { + logger.Warn("validator exiting / exited") + metricsValidatorStatus.WithLabelValues(pk).Set(float64(validatorStatusExiting)) + } else if meta.Index == 0 { + logger.Warn("validator index not found") + metricsValidatorStatus.WithLabelValues(pk).Set(float64(validatorStatusNoIndex)) + } else { + logger.Warn("validator is ready") + metricsValidatorStatus.WithLabelValues(pk).Set(float64(validatorStatusReady)) + } +} + type validatorStatus int32 var ( - validatorStatusInactive validatorStatus = 0 - validatorStatusNoIndex validatorStatus = 1 - validatorStatusError validatorStatus = 2 - validatorStatusReady validatorStatus = 3 + validatorStatusInactive validatorStatus = 0 + validatorStatusNoIndex validatorStatus = 1 + validatorStatusError validatorStatus = 2 + validatorStatusReady validatorStatus = 3 + validatorStatusNotActivated validatorStatus = 4 + validatorStatusExiting validatorStatus = 5 + validatorStatusSlashed validatorStatus = 6 + validatorStatusNotFound validatorStatus = 7 ) diff --git a/validator/storage/share.go b/validator/storage/share.go index 0316b251f9..0f61bd5ffa 100644 --- a/validator/storage/share.go +++ b/validator/storage/share.go @@ -3,6 +3,7 @@ package storage import ( "bytes" "encoding/gob" + "github.com/bloxapp/ssv/beacon" "github.com/bloxapp/ssv/ibft/proto" "github.com/bloxapp/ssv/storage/basedb" "github.com/herumi/bls-eth-go-binary/bls" @@ -26,17 +27,17 @@ func (keys PubKeys) Aggregate() bls.PublicKey { type Share struct { NodeID uint64 PublicKey *bls.PublicKey - Index *uint64 // pointer in order to support nil ShareKey *bls.SecretKey Committee map[uint64]*proto.Node + Metadata *beacon.ValidatorMetadata // pointer in order to support nil } // serializedShare struct type serializedShare struct { NodeID uint64 - Index *uint64 // pointer in order to support nil ShareKey []byte Committee map[uint64]*proto.Node + Metadata *beacon.ValidatorMetadata // pointer in order to support nil } // CommitteeSize returns the IBFT committee size @@ -96,9 +97,9 @@ func (s *Share) VerifySignedMessage(msg *proto.SignedMessage) error { func (s *Share) Serialize() ([]byte, error) { value := serializedShare{ NodeID: s.NodeID, - Index: s.Index, ShareKey: s.ShareKey.Serialize(), Committee: map[uint64]*proto.Node{}, + Metadata: s.Metadata, } // copy committee by value for k, n := range s.Committee { @@ -137,8 +138,13 @@ func (s *Share) Deserialize(obj basedb.Obj) (*Share, error) { return &Share{ NodeID: value.NodeID, PublicKey: pubKey, - Index: value.Index, ShareKey: shareSecret, Committee: value.Committee, + Metadata: value.Metadata, }, nil } + +// HasMetadata returns true if the validator metadata was fetched +func (s *Share) HasMetadata() bool { + return s.Metadata != nil +} diff --git a/validator/storage/share_opts.go b/validator/storage/share_opts.go index 2efb518491..5797a0a205 100644 --- a/validator/storage/share_opts.go +++ b/validator/storage/share_opts.go @@ -56,7 +56,7 @@ func (options *ShareOptions) ToShare() (*Share, error) { share := Share{ NodeID: options.NodeID, - Index: nil, + Metadata: nil, PublicKey: validatorPk, ShareKey: shareKey, Committee: ibftCommittee, diff --git a/validator/storage/share_test.go b/validator/storage/share_test.go index 807a01005b..85e8a000ca 100644 --- a/validator/storage/share_test.go +++ b/validator/storage/share_test.go @@ -44,7 +44,7 @@ func TestThresholdSize(t *testing.T) { share := &Share{ NodeID: 0, PublicKey: nil, - Index: nil, + Metadata: nil, ShareKey: nil, Committee: map[uint64]*proto.Node{}, } diff --git a/validator/test_utils.go b/validator/test_utils.go index 3482ae36d4..9211c52155 100644 --- a/validator/test_utils.go +++ b/validator/test_utils.go @@ -128,7 +128,7 @@ func (b *testBeacon) GetDuties(epoch spec.Epoch, validatorIndices []spec.Validat return nil, nil } -func (b *testBeacon) GetIndices(validatorPubKeys []spec.BLSPubKey) (map[spec.ValidatorIndex]*api.Validator, error) { +func (b *testBeacon) GetValidatorData(validatorPubKeys []spec.BLSPubKey) (map[spec.ValidatorIndex]*api.Validator, error) { return nil, nil } diff --git a/validator/utils.go b/validator/utils.go index e153673380..1cf34561a3 100644 --- a/validator/utils.go +++ b/validator/utils.go @@ -6,12 +6,49 @@ import ( "github.com/bloxapp/ssv/beacon" "github.com/bloxapp/ssv/eth1" "github.com/bloxapp/ssv/ibft/proto" + "github.com/bloxapp/ssv/utils/rsaencryption" validatorstorage "github.com/bloxapp/ssv/validator/storage" "github.com/herumi/bls-eth-go-binary/bls" "github.com/pkg/errors" "strings" ) +// updateShareMetadata will update the given share object w/o involving storage, +// it will be called only when a new share is created +func updateShareMetadata(share *validatorstorage.Share, bc beacon.Beacon) (bool, error) { + pk := share.PublicKey.SerializeToHexStr() + results, err := beacon.FetchValidatorsMetadata(bc, [][]byte{share.PublicKey.Serialize()}) + if err != nil { + return false, errors.Wrap(err, "failed to fetch metadata for share") + } + meta, ok := results[pk] + if !ok { + return false, nil + } + share.Metadata = meta + return true, nil +} + +// createShareWithOperatorKey creates a new share object from event +func createShareWithOperatorKey(validatorAddedEvent eth1.ValidatorAddedEvent, shareEncryptionKeyProvider eth1.ShareEncryptionKeyProvider) (*validatorstorage.Share, error) { + operatorPrivKey, found, err := shareEncryptionKeyProvider() + if !found { + return nil, errors.New("could not find operator private key") + } + if err != nil { + return nil, errors.Wrap(err, "get operator private key") + } + operatorPubKey, err := rsaencryption.ExtractPublicKey(operatorPrivKey) + if err != nil { + return nil, errors.Wrap(err, "could not extract operator public key") + } + validatorShare, err := ShareFromValidatorAddedEvent(validatorAddedEvent, operatorPubKey) + if err != nil { + return nil, errors.Wrap(err, "could not create share from event") + } + return validatorShare, nil +} + // ShareFromValidatorAddedEvent takes the contract event data and creates the corresponding validator share func ShareFromValidatorAddedEvent(validatorAddedEvent eth1.ValidatorAddedEvent, operatorPubKey string) (*validatorstorage.Share, error) { validatorShare := validatorstorage.Share{} diff --git a/validator/validator.go b/validator/validator.go index 1ca2ae7284..b29469d24e 100644 --- a/validator/validator.go +++ b/validator/validator.go @@ -60,10 +60,11 @@ func New(opt Options) *Validator { //ibfts[beacon.RoleAggregator] = setupIbftController(beacon.RoleAggregator, logger, db, opt.Network, msgQueue, opt.Share) TODO not supported for now //ibfts[beacon.RoleProposer] = setupIbftController(beacon.RoleProposer, logger, db, opt.Network, msgQueue, opt.Share) TODO not supported for now - if opt.Share.Index != nil { // in order ot update goclient map to prevent getting all network indices bug + // updating goclient map + if opt.Share.HasMetadata() && opt.Share.Metadata.Index > 0 { blsPubkey := spec.BLSPubKey{} copy(blsPubkey[:], opt.Share.PublicKey.Serialize()) - opt.Beacon.ExtendIndexMap(spec.ValidatorIndex(*opt.Share.Index), blsPubkey) + opt.Beacon.ExtendIndexMap(opt.Share.Metadata.Index, blsPubkey) } return &Validator{ @@ -99,8 +100,8 @@ func (v *Validator) Start() error { v.logger.Debug("validator started") }) - metricsValidatorStatus.WithLabelValues(v.Share.PublicKey.SerializeToHexStr()). - Set(float64(validatorStatusReady)) + //ReportValidatorStatusReady(v.Share.PublicKey.SerializeToHexStr()) + ReportValidatorStatus(v.Share.PublicKey.SerializeToHexStr(), v.Share.Metadata, v.logger) return nil }