From d1e01e233be4d7f259a97809742fdf921bc292e1 Mon Sep 17 00:00:00 2001 From: Amir Yahalom Date: Thu, 9 Sep 2021 18:06:10 +0300 Subject: [PATCH 01/16] clenup message reader --- exporter/ibft/incoming_msgs.go | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/exporter/ibft/incoming_msgs.go b/exporter/ibft/incoming_msgs.go index e0c623a459..9742751af0 100644 --- a/exporter/ibft/incoming_msgs.go +++ b/exporter/ibft/incoming_msgs.go @@ -10,7 +10,6 @@ import ( "github.com/herumi/bls-eth-go-binary/bls" "github.com/pkg/errors" "go.uber.org/zap" - "strings" "time" ) @@ -42,12 +41,12 @@ func NewIncomingMsgsReader(opts IncomingMsgsReaderOptions) Reader { } func (i *incomingMsgsReader) Start() error { - if err := i.network.SubscribeToValidatorNetwork(i.publicKey); err != nil { - if !strings.Contains(err.Error(), "topic already exists") { - return errors.Wrap(err, "could not subscribe to subnet") + if !i.network.IsSubscribeToValidatorNetwork(i.publicKey) { + if err := i.network.SubscribeToValidatorNetwork(i.publicKey); err != nil { + return errors.Wrap(err, "failed to subscribe topic") } - i.logger.Debug("no need to subscribe, topic already exist") } + if err := i.waitForMinPeers(i.publicKey, 2); err != nil { return errors.Wrap(err, "could not wait for min peers") } @@ -67,16 +66,10 @@ func (i *incomingMsgsReader) listenToNetwork() { // filtering irrelevant messages // TODO: handle other types of roles if identifier != string(msg.Message.Lambda) { - i.logger.Debug("ignoring message with wrong identifier") continue } - fields := []zap.Field{ - zap.Uint64("seq_num", msg.Message.SeqNumber), - zap.Uint64("round", msg.Message.Round), - zap.String("signers", msg.SignersIDString()), - zap.String("identifier", string(msg.Message.Lambda)), - } + fields := messageFields(msg) switch msg.Message.Type { case proto.RoundState_PrePrepare: @@ -85,8 +78,6 @@ func (i *incomingMsgsReader) listenToNetwork() { i.logger.Info("prepare msg", fields...) case proto.RoundState_Commit: i.logger.Info("commit msg", fields...) - case proto.RoundState_Decided: - i.logger.Info("decided msg", fields...) case proto.RoundState_ChangeRound: i.logger.Info("change round msg", fields...) default: @@ -96,7 +87,6 @@ func (i *incomingMsgsReader) listenToNetwork() { } // waitForMinPeers will wait until enough peers joined the topic -// it runs in an exponent interval: 1s > 2s > 4s > ... 64s > 1s > 2s > ... func (i *incomingMsgsReader) waitForMinPeers(pk *bls.PublicKey, minPeerCount int) error { ctx := commons.WaitMinPeersCtx{ Ctx: context.Background(), @@ -106,3 +96,12 @@ func (i *incomingMsgsReader) waitForMinPeers(pk *bls.PublicKey, minPeerCount int return commons.WaitForMinPeers(ctx, pk.Serialize(), minPeerCount, 1*time.Second, 64*time.Second, false) } + +func messageFields(msg *proto.SignedMessage) []zap.Field { + return []zap.Field{ + zap.Uint64("seq_num", msg.Message.SeqNumber), + zap.Uint64("round", msg.Message.Round), + zap.String("signers", msg.SignersIDString()), + zap.String("identifier", string(msg.Message.Lambda)), + } +} \ No newline at end of file From 2ef854d00aa22835f623aa86bb2e26c5a74074f5 Mon Sep 17 00:00:00 2001 From: Amir Yahalom Date: Thu, 9 Sep 2021 18:38:02 +0300 Subject: [PATCH 02/16] refctor decided reader and the dispatch flow --- exporter/ibft/decided_reader.go | 142 ++++++++++++++++++++++++++++---- exporter/ibft/reader.go | 10 +++ exporter/node.go | 32 +++---- 3 files changed, 155 insertions(+), 29 deletions(-) diff --git a/exporter/ibft/decided_reader.go b/exporter/ibft/decided_reader.go index c286374727..a4186d0bec 100644 --- a/exporter/ibft/decided_reader.go +++ b/exporter/ibft/decided_reader.go @@ -1,17 +1,20 @@ package ibft import ( + "context" "github.com/bloxapp/ssv/beacon" "github.com/bloxapp/ssv/ibft/pipeline" "github.com/bloxapp/ssv/ibft/pipeline/auth" "github.com/bloxapp/ssv/ibft/proto" "github.com/bloxapp/ssv/ibft/sync/history" "github.com/bloxapp/ssv/network" + "github.com/bloxapp/ssv/network/commons" "github.com/bloxapp/ssv/storage/collections" "github.com/bloxapp/ssv/validator" "github.com/bloxapp/ssv/validator/storage" + "github.com/herumi/bls-eth-go-binary/bls" + "github.com/pkg/errors" "go.uber.org/zap" - "strings" "sync" "time" ) @@ -33,10 +36,12 @@ type decidedReader struct { config *proto.InstanceConfig validatorShare *storage.Share + + identifier []byte } -// NewIbftDecidedReadOnly creates new instance of DecidedReader -func NewIbftDecidedReadOnly(opts DecidedReaderOptions) Reader { +// NewDecidedReader creates new instance of DecidedReader +func NewDecidedReader(opts DecidedReaderOptions) SyncRead { r := decidedReader{ logger: opts.Logger.With( zap.String("pubKey", opts.ValidatorShare.PublicKey.SerializeToHexStr()), @@ -45,19 +50,17 @@ func NewIbftDecidedReadOnly(opts DecidedReaderOptions) Reader { network: opts.Network, config: opts.Config, validatorShare: opts.ValidatorShare, + identifier: []byte(validator.IdentifierFormat(opts.ValidatorShare.PublicKey.Serialize(), beacon.RoleTypeAttester)), } return &r } -// Start starts to fetch best known decided message (highest sequence) from the network and sync to it. -func (r *decidedReader) Start() error { - // subscribe to topic so we could find relevant nodes - if err := r.network.SubscribeToValidatorNetwork(r.validatorShare.PublicKey); err != nil { - if !strings.Contains(err.Error(), "topic already exists") { - r.logger.Warn("could not subscribe to validator channel", zap.Error(err)) - return err +// Sync starts to fetch best known decided message (highest sequence) from the network and sync to it. +func (r *decidedReader) Sync() error { + if !r.network.IsSubscribeToValidatorNetwork(r.validatorShare.PublicKey) { + if err := r.network.SubscribeToValidatorNetwork(r.validatorShare.PublicKey); err != nil { + return errors.Wrap(err, "failed to subscribe topic") } - r.logger.Debug("no need to subscribe, topic already exist") } // wait for network setup (subscribe to topic) var netWaitGroup sync.WaitGroup @@ -70,8 +73,7 @@ func (r *decidedReader) Start() error { r.logger.Debug("syncing ibft data") // creating HistorySync and starts it - identifier := []byte(validator.IdentifierFormat(r.validatorShare.PublicKey.Serialize(), beacon.RoleTypeAttester)) - hs := history.New(r.logger, r.validatorShare.PublicKey.Serialize(), identifier, r.network, + hs := history.New(r.logger, r.validatorShare.PublicKey.Serialize(), r.identifier, r.network, r.storage, r.validateDecidedMsg) err := hs.Start() if err != nil { @@ -80,13 +82,123 @@ func (r *decidedReader) Start() error { return err } +// Start starts to listen to decided messages +func (r *decidedReader) Start() error { + if !r.network.IsSubscribeToValidatorNetwork(r.validatorShare.PublicKey) { + if err := r.network.SubscribeToValidatorNetwork(r.validatorShare.PublicKey); err != nil { + return errors.Wrap(err, "failed to subscribe topic") + } + } + + r.logger.Debug("starting to read decided messages") + if err := r.waitForMinPeers(r.validatorShare.PublicKey, 2); err != nil { + return errors.Wrap(err, "could not wait for min peers") + } + r.listenToNetwork() + return nil +} + +func (r *decidedReader) listenToNetwork() { + cn := r.network.ReceivedDecidedChan() + r.logger.Debug("listening to decided messages") + for msg := range cn { + if err := validateMsg(msg, string(r.identifier)); err != nil { + continue + } + logger := r.logger.With(messageFields(msg)...) + if err := validateDecidedMsg(msg, r.validatorShare); err != nil { + logger.Debug("received invalid decided") + continue + } + //if msg.Message.SeqNumber == 0 { + // logger.Debug("received invalid sequence") + // continue + //} + if err := r.handleNewDecidedMessage(msg); err != nil { + logger.Error("could not save highest decided") + } + } +} + +// handleNewDecidedMessage saves an incoming (valid) decided message +func (r *decidedReader) handleNewDecidedMessage(msg *proto.SignedMessage) error { + logger := r.logger.With(messageFields(msg)...) + if decided, found, _ := r.storage.GetDecided(r.identifier, msg.Message.SeqNumber); found && decided != nil { + logger.Debug("received known sequence") + return nil + } + if err := r.storage.SaveDecided(msg); err != nil { + return errors.Wrap(err, "could not save decided") + } + logger.Debug("decided saved") + return r.checkHighestDecided(msg) +} + +// checkHighestDecided check if highest decided should be updated +func (r *decidedReader) checkHighestDecided(msg *proto.SignedMessage) error { + logger := r.logger.With(messageFields(msg)...) + seq := msg.Message.SeqNumber + highestKnown, found, err := r.storage.GetHighestDecidedInstance(r.identifier) + if err != nil { + return errors.Wrap(err, "could not get highest decided") + } + if found { + highestSeqKnown := uint64(0) + if highestKnown != nil { + highestSeqKnown = highestKnown.Message.SeqNumber + } + if seq < highestSeqKnown { + logger.Debug("received old sequence", + zap.Uint64("highestSeqKnown", highestSeqKnown)) + return nil + } + if seq > highestSeqKnown + 1 { + if err := r.Sync(); err != nil { + logger.Debug("could not sync", zap.Uint64("seq", seq), + zap.Uint64("highestSeqKnown", highestSeqKnown)) + return err + } + return nil + } + } + if err := r.storage.SaveHighestDecidedInstance(msg); err != nil { + return errors.Wrap(err, "could not save highest decided") + } + logger.Info("highest decided saved") + return nil +} + // validateDecidedMsg validates the message func (r *decidedReader) validateDecidedMsg(msg *proto.SignedMessage) error { r.logger.Debug("validating a new decided message", zap.String("msg", msg.String())) + return validateDecidedMsg(msg, r.validatorShare) +} + +// waitForMinPeers will wait until enough peers joined the topic +func (r *decidedReader) waitForMinPeers(pk *bls.PublicKey, minPeerCount int) error { + ctx := commons.WaitMinPeersCtx{ + Ctx: context.Background(), + Logger: r.logger, + Net: r.network, + } + return commons.WaitForMinPeers(ctx, pk.Serialize(), minPeerCount, + 1*time.Second, 64*time.Second, false) +} + +func validateDecidedMsg(msg *proto.SignedMessage, share *storage.Share) error { p := pipeline.Combine( + auth.BasicMsgValidation(), auth.MsgTypeCheck(proto.RoundState_Commit), - auth.AuthorizeMsg(r.validatorShare), - auth.ValidateQuorum(r.validatorShare.ThresholdSize()), + auth.AuthorizeMsg(share), + auth.ValidateQuorum(share.ThresholdSize()), ) return p.Run(msg) } + +func validateMsg(msg *proto.SignedMessage, identifier string) error { + p := pipeline.Combine( + auth.BasicMsgValidation(), + auth.ValidateLambdas([]byte(identifier)), + ) + return p.Run(msg) +} \ No newline at end of file diff --git a/exporter/ibft/reader.go b/exporter/ibft/reader.go index 38368808b2..4951879a9c 100644 --- a/exporter/ibft/reader.go +++ b/exporter/ibft/reader.go @@ -4,3 +4,13 @@ package ibft type Reader interface { Start() error } + +// Syncer is a minimal interface for syncing data +type Syncer interface { + Sync() error +} + +type SyncRead interface { + Reader + Syncer +} \ No newline at end of file diff --git a/exporter/node.go b/exporter/node.go index b2ca7d9d1c..2d473faa13 100644 --- a/exporter/node.go +++ b/exporter/node.go @@ -24,6 +24,7 @@ import ( const ( ibftSyncDispatcherTick = 1 * time.Second + networkReadDispatcherTick = 100 * time.Millisecond ) var ( @@ -66,6 +67,7 @@ type exporter struct { eth1Client eth1.Client ibftSyncDispatcher tasks.Dispatcher networkReadDispatcher tasks.Dispatcher + decidedReadDispatcher tasks.Dispatcher ws api.WebSocketServer wsAPIPort int ibftSyncEnabled bool @@ -95,8 +97,14 @@ func New(opts Options) Exporter { networkReadDispatcher: tasks.NewDispatcher(tasks.DispatcherOptions{ Ctx: opts.Ctx, Logger: opts.Logger.With(zap.String("component", "networkReadDispatcher")), - Interval: ibftSyncDispatcherTick, - Concurrent: 10000, // listening to network messages remains open, therefore using a large limit for concurrency + Interval: networkReadDispatcherTick, + Concurrent: 1000, // using a large limit of concurrency as listening to network messages remains open in the background + }), + decidedReadDispatcher: tasks.NewDispatcher(tasks.DispatcherOptions{ + Ctx: opts.Ctx, + Logger: opts.Logger.With(zap.String("component", "decidedReadDispatcher")), + Interval: networkReadDispatcherTick, + Concurrent: 1000, // using a large limit of concurrency as listening to network messages remains open in the background }), ws: opts.WS, wsAPIPort: opts.WsAPIPort, @@ -264,7 +272,7 @@ func (exp *exporter) triggerValidator(validatorPubKey *bls.PublicKey) error { logger := exp.logger.With(zap.String("pubKey", pubkey)) logger.Debug("ibft sync was triggered") - syncDecidedReader := ibft.NewIbftDecidedReadOnly(ibft.DecidedReaderOptions{ + decidedReader := ibft.NewDecidedReader(ibft.DecidedReaderOptions{ Logger: exp.logger, Storage: exp.ibftStorage, Network: exp.network, @@ -272,15 +280,15 @@ func (exp *exporter) triggerValidator(validatorPubKey *bls.PublicKey) error { ValidatorShare: validatorShare, }) - syncTask := tasks.NewTask(syncDecidedReader.Start, fmt.Sprintf("ibft:sync/%s", pubkey), func() { + syncTask := tasks.NewTask(decidedReader.Sync, fmt.Sprintf("ibft:sync/%s", pubkey), func() { logger.Debug("sync is done, starting to read network messages") exp.readNetworkMessages(validatorPubKey) - }) - if err := exp.ibftSyncDispatcher.Queue(*syncTask); err != nil { - if err == tasks.ErrTaskExist { - logger.Debug("sync task was already queued") - return nil + readDecidedTask := tasks.NewTask(decidedReader.Start, fmt.Sprintf("ibft:readDecided/%s", pubkey), nil) + if err := exp.decidedReadDispatcher.Queue(*readDecidedTask); err != nil && err != tasks.ErrTaskExist { + exp.logger.Error("could not queue decided reader", zap.String("tid", readDecidedTask.ID), zap.Error(err)) } + }) + if err := exp.ibftSyncDispatcher.Queue(*syncTask); err != nil && err != tasks.ErrTaskExist { return err } @@ -296,11 +304,7 @@ func (exp *exporter) readNetworkMessages(validatorPubKey *bls.PublicKey) { }) readerTask := tasks.NewTask(ibftMsgReader.Start, fmt.Sprintf("ibft:msgReader/%s", validatorPubKey.SerializeToHexStr()), nil) - if err := exp.networkReadDispatcher.Queue(*readerTask); err != nil { - if err == tasks.ErrTaskExist { - exp.logger.Debug("network reader was already queued", zap.String("tid", readerTask.ID)) - return - } + if err := exp.networkReadDispatcher.Queue(*readerTask); err != nil && err != tasks.ErrTaskExist { exp.logger.Error("could not queue network reader", zap.String("tid", readerTask.ID), zap.Error(err)) } } From b4f6d8c019a53d9b0c0546846a0a339a43258861 Mon Sep 17 00:00:00 2001 From: Amir Yahalom Date: Mon, 13 Sep 2021 20:10:32 +0300 Subject: [PATCH 03/16] go fmt and lint --- exporter/ibft/decided_reader.go | 6 +++--- exporter/ibft/incoming_msgs.go | 2 +- exporter/ibft/reader.go | 7 ++++--- exporter/node.go | 6 +++--- 4 files changed, 11 insertions(+), 10 deletions(-) diff --git a/exporter/ibft/decided_reader.go b/exporter/ibft/decided_reader.go index a4186d0bec..7dd19fa5db 100644 --- a/exporter/ibft/decided_reader.go +++ b/exporter/ibft/decided_reader.go @@ -50,7 +50,7 @@ func NewDecidedReader(opts DecidedReaderOptions) SyncRead { network: opts.Network, config: opts.Config, validatorShare: opts.ValidatorShare, - identifier: []byte(validator.IdentifierFormat(opts.ValidatorShare.PublicKey.Serialize(), beacon.RoleTypeAttester)), + identifier: []byte(validator.IdentifierFormat(opts.ValidatorShare.PublicKey.Serialize(), beacon.RoleTypeAttester)), } return &r } @@ -152,7 +152,7 @@ func (r *decidedReader) checkHighestDecided(msg *proto.SignedMessage) error { zap.Uint64("highestSeqKnown", highestSeqKnown)) return nil } - if seq > highestSeqKnown + 1 { + if seq > highestSeqKnown+1 { if err := r.Sync(); err != nil { logger.Debug("could not sync", zap.Uint64("seq", seq), zap.Uint64("highestSeqKnown", highestSeqKnown)) @@ -201,4 +201,4 @@ func validateMsg(msg *proto.SignedMessage, identifier string) error { auth.ValidateLambdas([]byte(identifier)), ) return p.Run(msg) -} \ No newline at end of file +} diff --git a/exporter/ibft/incoming_msgs.go b/exporter/ibft/incoming_msgs.go index 9742751af0..a454368db7 100644 --- a/exporter/ibft/incoming_msgs.go +++ b/exporter/ibft/incoming_msgs.go @@ -104,4 +104,4 @@ func messageFields(msg *proto.SignedMessage) []zap.Field { zap.String("signers", msg.SignersIDString()), zap.String("identifier", string(msg.Message.Lambda)), } -} \ No newline at end of file +} diff --git a/exporter/ibft/reader.go b/exporter/ibft/reader.go index 4951879a9c..38561b312a 100644 --- a/exporter/ibft/reader.go +++ b/exporter/ibft/reader.go @@ -1,16 +1,17 @@ package ibft -// Reader is a minimal interface for ibft in the context of an exporter +// Reader is an interface for ibft in the context of an exporter type Reader interface { Start() error } -// Syncer is a minimal interface for syncing data +// Syncer is an interface for syncing data type Syncer interface { Sync() error } +// SyncRead reads and sync data type SyncRead interface { Reader Syncer -} \ No newline at end of file +} diff --git a/exporter/node.go b/exporter/node.go index 2d473faa13..3c002ef164 100644 --- a/exporter/node.go +++ b/exporter/node.go @@ -23,7 +23,7 @@ import ( ) const ( - ibftSyncDispatcherTick = 1 * time.Second + ibftSyncDispatcherTick = 1 * time.Second networkReadDispatcherTick = 100 * time.Millisecond ) @@ -288,7 +288,7 @@ func (exp *exporter) triggerValidator(validatorPubKey *bls.PublicKey) error { exp.logger.Error("could not queue decided reader", zap.String("tid", readDecidedTask.ID), zap.Error(err)) } }) - if err := exp.ibftSyncDispatcher.Queue(*syncTask); err != nil && err != tasks.ErrTaskExist { + if err := exp.ibftSyncDispatcher.Queue(*syncTask); err != nil && err != tasks.ErrTaskExist { return err } @@ -304,7 +304,7 @@ func (exp *exporter) readNetworkMessages(validatorPubKey *bls.PublicKey) { }) readerTask := tasks.NewTask(ibftMsgReader.Start, fmt.Sprintf("ibft:msgReader/%s", validatorPubKey.SerializeToHexStr()), nil) - if err := exp.networkReadDispatcher.Queue(*readerTask); err != nil && err != tasks.ErrTaskExist { + if err := exp.networkReadDispatcher.Queue(*readerTask); err != nil && err != tasks.ErrTaskExist { exp.logger.Error("could not queue network reader", zap.String("tid", readerTask.ID), zap.Error(err)) } } From e8f477d7a7a711bfbfaefb6e599ce6e1f2903ef1 Mon Sep 17 00:00:00 2001 From: Amir Yahalom Date: Tue, 14 Sep 2021 09:56:17 +0300 Subject: [PATCH 04/16] removed dispatcher and added exec queue --- eth1/sync.go | 2 +- exporter/ibft/decided_reader.go | 12 +- exporter/ibft/factory.go | 48 +++++ exporter/ibft/incoming_msgs.go | 4 +- exporter/ibft/reader.go | 17 -- exporter/node.go | 117 +++++------ utils/tasks/dispatcher.go | 197 ------------------ utils/tasks/dispatcher_test.go | 90 -------- utils/tasks/exec_queue.go | 138 ++++++++++++ ...imple_queue_test.go => exec_queue_test.go} | 29 ++- utils/tasks/simple_queue.go | 123 ----------- 11 files changed, 263 insertions(+), 514 deletions(-) create mode 100644 exporter/ibft/factory.go delete mode 100644 exporter/ibft/reader.go delete mode 100644 utils/tasks/dispatcher.go delete mode 100644 utils/tasks/dispatcher_test.go create mode 100644 utils/tasks/exec_queue.go rename utils/tasks/{simple_queue_test.go => exec_queue_test.go} (57%) delete mode 100644 utils/tasks/simple_queue.go diff --git a/eth1/sync.go b/eth1/sync.go index df8965e82b..ecdf151e1a 100644 --- a/eth1/sync.go +++ b/eth1/sync.go @@ -54,7 +54,7 @@ func SyncEth1Events(logger *zap.Logger, client Client, storage SyncOffsetStorage } defer client.EventsSubject().Deregister("SyncEth1") - q := tasks.NewSimpleQueue(5 * time.Millisecond) + q := tasks.NewExecutionQueue(5 * time.Millisecond) go q.Start() defer q.Stop() // Stop once SyncEndedEvent arrives diff --git a/exporter/ibft/decided_reader.go b/exporter/ibft/decided_reader.go index 7dd19fa5db..2217dbc0ca 100644 --- a/exporter/ibft/decided_reader.go +++ b/exporter/ibft/decided_reader.go @@ -40,8 +40,8 @@ type decidedReader struct { identifier []byte } -// NewDecidedReader creates new instance of DecidedReader -func NewDecidedReader(opts DecidedReaderOptions) SyncRead { +// newDecidedReader creates new instance of DecidedReader +func newDecidedReader(opts DecidedReaderOptions) SyncRead { r := decidedReader{ logger: opts.Logger.With( zap.String("pubKey", opts.ValidatorShare.PublicKey.SerializeToHexStr()), @@ -110,10 +110,10 @@ func (r *decidedReader) listenToNetwork() { logger.Debug("received invalid decided") continue } - //if msg.Message.SeqNumber == 0 { - // logger.Debug("received invalid sequence") - // continue - //} + if msg.Message.SeqNumber == 0 { + logger.Debug("received invalid sequence") + continue + } if err := r.handleNewDecidedMessage(msg); err != nil { logger.Error("could not save highest decided") } diff --git a/exporter/ibft/factory.go b/exporter/ibft/factory.go new file mode 100644 index 0000000000..a354bdf04e --- /dev/null +++ b/exporter/ibft/factory.go @@ -0,0 +1,48 @@ +package ibft + +import "sync" + +var ( + decidedReaders sync.Map + networkReaders sync.Map +) + +// Reader is an interface for ibft in the context of an exporter +type Reader interface { + Start() error +} + +// Syncer is an interface for syncing data +type Syncer interface { + Sync() error +} + +// SyncRead reads and sync data +type SyncRead interface { + Reader + Syncer +} + +// NewNetworkReader factory to create network readers +func NewNetworkReader(o IncomingMsgsReaderOptions) Reader { + pk := o.PK.SerializeToHexStr() + r, exist := networkReaders.Load(pk) + if !exist { + reader := newIncomingMsgsReader(o) + networkReaders.Store(pk, reader) + return reader + } + return r.(*incomingMsgsReader) +} + +// NewDecidedReader factory to create decided readers +func NewDecidedReader(o DecidedReaderOptions) SyncRead { + pk := o.ValidatorShare.PublicKey.SerializeToHexStr() + r, exist := decidedReaders.Load(pk) + if !exist { + reader := newDecidedReader(o) + decidedReaders.Store(pk, reader) + return reader + } + return r.(*decidedReader) +} diff --git a/exporter/ibft/incoming_msgs.go b/exporter/ibft/incoming_msgs.go index a454368db7..29a2876990 100644 --- a/exporter/ibft/incoming_msgs.go +++ b/exporter/ibft/incoming_msgs.go @@ -28,8 +28,8 @@ type incomingMsgsReader struct { publicKey *bls.PublicKey } -// NewIncomingMsgsReader creates new instance -func NewIncomingMsgsReader(opts IncomingMsgsReaderOptions) Reader { +// newIncomingMsgsReader creates new instance +func newIncomingMsgsReader(opts IncomingMsgsReaderOptions) Reader { r := &incomingMsgsReader{ logger: opts.Logger.With(zap.String("ibft", "msg_reader"), zap.String("pubKey", opts.PK.SerializeToHexStr())), diff --git a/exporter/ibft/reader.go b/exporter/ibft/reader.go deleted file mode 100644 index 38561b312a..0000000000 --- a/exporter/ibft/reader.go +++ /dev/null @@ -1,17 +0,0 @@ -package ibft - -// Reader is an interface for ibft in the context of an exporter -type Reader interface { - Start() error -} - -// Syncer is an interface for syncing data -type Syncer interface { - Sync() error -} - -// SyncRead reads and sync data -type SyncRead interface { - Reader - Syncer -} diff --git a/exporter/node.go b/exporter/node.go index 3c002ef164..97497c63b4 100644 --- a/exporter/node.go +++ b/exporter/node.go @@ -23,8 +23,8 @@ import ( ) const ( - ibftSyncDispatcherTick = 1 * time.Second - networkReadDispatcherTick = 100 * time.Millisecond + mainQueueInterval = 100 * time.Millisecond + readerQueuesInterval = 10 * time.Millisecond ) var ( @@ -58,19 +58,19 @@ type Options struct { // exporter is the internal implementation of Exporter interface type exporter struct { - ctx context.Context - storage storage.Storage - validatorStorage validatorstorage.ICollection - ibftStorage collections.Iibft - logger *zap.Logger - network network.Network - eth1Client eth1.Client - ibftSyncDispatcher tasks.Dispatcher - networkReadDispatcher tasks.Dispatcher - decidedReadDispatcher tasks.Dispatcher - ws api.WebSocketServer - wsAPIPort int - ibftSyncEnabled bool + ctx context.Context + storage storage.Storage + validatorStorage validatorstorage.ICollection + ibftStorage collections.Iibft + logger *zap.Logger + network network.Network + eth1Client eth1.Client + mainQueue tasks.Queue + decidedReadersQueue tasks.Queue + networkReadersQueue tasks.Queue + ws api.WebSocketServer + wsAPIPort int + ibftSyncEnabled bool } // New creates a new Exporter instance @@ -86,29 +86,15 @@ func New(opts Options) Exporter { Logger: opts.Logger, }, ), - logger: opts.Logger.With(zap.String("component", "exporter/node")), - network: opts.Network, - eth1Client: opts.Eth1Client, - ibftSyncDispatcher: tasks.NewDispatcher(tasks.DispatcherOptions{ - Ctx: opts.Ctx, - Logger: opts.Logger.With(zap.String("component", "ibftSyncDispatcher")), - Interval: ibftSyncDispatcherTick, - }), - networkReadDispatcher: tasks.NewDispatcher(tasks.DispatcherOptions{ - Ctx: opts.Ctx, - Logger: opts.Logger.With(zap.String("component", "networkReadDispatcher")), - Interval: networkReadDispatcherTick, - Concurrent: 1000, // using a large limit of concurrency as listening to network messages remains open in the background - }), - decidedReadDispatcher: tasks.NewDispatcher(tasks.DispatcherOptions{ - Ctx: opts.Ctx, - Logger: opts.Logger.With(zap.String("component", "decidedReadDispatcher")), - Interval: networkReadDispatcherTick, - Concurrent: 1000, // using a large limit of concurrency as listening to network messages remains open in the background - }), - ws: opts.WS, - wsAPIPort: opts.WsAPIPort, - ibftSyncEnabled: opts.IbftSyncEnabled, + logger: opts.Logger.With(zap.String("component", "exporter/node")), + network: opts.Network, + eth1Client: opts.Eth1Client, + mainQueue: tasks.NewExecutionQueue(mainQueueInterval), + decidedReadersQueue: tasks.NewExecutionQueue(readerQueuesInterval), + networkReadersQueue: tasks.NewExecutionQueue(readerQueuesInterval), + ws: opts.WS, + wsAPIPort: opts.WsAPIPort, + ibftSyncEnabled: opts.IbftSyncEnabled, } if err := e.init(opts); err != nil { @@ -135,8 +121,9 @@ func (exp *exporter) init(opts Options) error { func (exp *exporter) Start() error { exp.logger.Info("starting node") - go exp.ibftSyncDispatcher.Start() - go exp.networkReadDispatcher.Start() + go exp.mainQueue.Start() + go exp.decidedReadersQueue.Start() + go exp.networkReadersQueue.Start() if exp.ws == nil { return nil @@ -270,41 +257,45 @@ func (exp *exporter) triggerValidator(validatorPubKey *bls.PublicKey) error { return errors.Wrap(err, "could not get validator share") } logger := exp.logger.With(zap.String("pubKey", pubkey)) - logger.Debug("ibft sync was triggered") + logger.Debug("validator was triggered") - decidedReader := ibft.NewDecidedReader(ibft.DecidedReaderOptions{ + exp.mainQueue.QueueDistinct(func() error { + return exp.setup(validatorShare) + }, fmt.Sprintf("ibft:setup/%s", pubkey)) + + return nil +} + +func (exp *exporter) setup(validatorShare *validatorstorage.Share) error { + pubKey := validatorShare.PublicKey.SerializeToHexStr() + logger := exp.logger.With(zap.String("pubKey", pubKey)) + decidedReader := exp.getDecidedReader(validatorShare) + if err := decidedReader.Sync(); err != nil { + logger.Error("could not setup validator, sync failed", zap.Error(err)) + return err + } + logger.Debug("sync is done, starting to read network messages") + exp.decidedReadersQueue.QueueDistinct(decidedReader.Start, pubKey) + networkReader := exp.getNetworkReader(validatorShare.PublicKey) + exp.networkReadersQueue.QueueDistinct(networkReader.Start, pubKey) + return nil +} + +func (exp *exporter) getDecidedReader(validatorShare *validatorstorage.Share) ibft.SyncRead { + return ibft.NewDecidedReader(ibft.DecidedReaderOptions{ Logger: exp.logger, Storage: exp.ibftStorage, Network: exp.network, Config: proto.DefaultConsensusParams(), ValidatorShare: validatorShare, }) - - syncTask := tasks.NewTask(decidedReader.Sync, fmt.Sprintf("ibft:sync/%s", pubkey), func() { - logger.Debug("sync is done, starting to read network messages") - exp.readNetworkMessages(validatorPubKey) - readDecidedTask := tasks.NewTask(decidedReader.Start, fmt.Sprintf("ibft:readDecided/%s", pubkey), nil) - if err := exp.decidedReadDispatcher.Queue(*readDecidedTask); err != nil && err != tasks.ErrTaskExist { - exp.logger.Error("could not queue decided reader", zap.String("tid", readDecidedTask.ID), zap.Error(err)) - } - }) - if err := exp.ibftSyncDispatcher.Queue(*syncTask); err != nil && err != tasks.ErrTaskExist { - return err - } - - return nil } -func (exp *exporter) readNetworkMessages(validatorPubKey *bls.PublicKey) { - ibftMsgReader := ibft.NewIncomingMsgsReader(ibft.IncomingMsgsReaderOptions{ +func (exp *exporter) getNetworkReader(validatorPubKey *bls.PublicKey) ibft.Reader { + return ibft.NewNetworkReader(ibft.IncomingMsgsReaderOptions{ Logger: exp.logger, Network: exp.network, Config: proto.DefaultConsensusParams(), PK: validatorPubKey, }) - readerTask := tasks.NewTask(ibftMsgReader.Start, - fmt.Sprintf("ibft:msgReader/%s", validatorPubKey.SerializeToHexStr()), nil) - if err := exp.networkReadDispatcher.Queue(*readerTask); err != nil && err != tasks.ErrTaskExist { - exp.logger.Error("could not queue network reader", zap.String("tid", readerTask.ID), zap.Error(err)) - } } diff --git a/utils/tasks/dispatcher.go b/utils/tasks/dispatcher.go deleted file mode 100644 index df5dc3a9e5..0000000000 --- a/utils/tasks/dispatcher.go +++ /dev/null @@ -1,197 +0,0 @@ -package tasks - -import ( - "context" - "github.com/pkg/errors" - "go.uber.org/zap" - "sync" - "time" -) - -const ( - defaultConcurrentLimit = 100 -) - -var ( - // ErrTaskExist thrown when the task to dispatch already exist - ErrTaskExist = errors.New("task exist") -) - -// Fn represents a function -type Fn func() error - -// Task represents a some function to execute -type Task struct { - Fn Fn - ID string - end func() -} - -// NewTask creates a new task -func NewTask(fn Fn, id string, end func()) *Task { - t := Task{fn, id, end} - return &t -} - -// Dispatcher maintains a queue of tasks to dispatch -type Dispatcher interface { - // Queue adds a new task - Queue(Task) error - // Dispatch will dispatch the next task - Dispatch() - // Start starts ticks - Start() - // Stats returns the number of waiting tasks and the number of running tasks - Stats() *DispatcherStats -} - -// DispatcherOptions describes the needed arguments for dispatcher instance -type DispatcherOptions struct { - // Ctx is a context for stopping the dispatcher - Ctx context.Context - // Logger used for logs - Logger *zap.Logger - // Interval is the time interval ticker used by dispatcher - // if the value was not provided (zero) -> no interval will run. - // *the calls to Dispatch() should be in a higher level - Interval time.Duration - // Concurrent is the limit of concurrent tasks running - // if zero or negative (<= 0) then defaultConcurrentLimit will be used - Concurrent int -} - -// DispatcherStats represents runtime stats of the dispatcher -type DispatcherStats struct { - // Waiting is the number of tasks that waits in queue - Waiting int - // Running is the number of running tasks - Running int - // Time is the time when the stats snapshot was taken - Time time.Time -} - -// dispatcher is the internal implementation of Dispatcher -type dispatcher struct { - ctx context.Context - logger *zap.Logger - - tasks map[string]Task - running map[string]bool - waiting []string - mut sync.RWMutex - - interval time.Duration - concurrentLimit int -} - -// NewDispatcher creates a new instance -func NewDispatcher(opts DispatcherOptions) Dispatcher { - if opts.Concurrent == 0 { - opts.Concurrent = defaultConcurrentLimit - } - d := dispatcher{ - ctx: opts.Ctx, - logger: opts.Logger, - interval: opts.Interval, - concurrentLimit: opts.Concurrent, - tasks: map[string]Task{}, - waiting: []string{}, - mut: sync.RWMutex{}, - running: map[string]bool{}, - } - return &d -} - -func (d *dispatcher) Queue(task Task) error { - d.mut.Lock() - defer d.mut.Unlock() - - if _, exist := d.tasks[task.ID]; exist { - return ErrTaskExist - } - d.tasks[task.ID] = task - d.waiting = append(d.waiting, task.ID) - d.logger.Debug("task was queued", - zap.String("task-id", task.ID), - zap.Int("waiting", len(d.waiting))) - return nil -} - -func (d *dispatcher) nextTaskToRun() *Task { - d.mut.Lock() - defer d.mut.Unlock() - - if len(d.waiting) == 0 { - return nil - } - // pop first task in the waiting queue - tid := d.waiting[0] - d.waiting = d.waiting[1:] - if task, ok := d.tasks[tid]; ok { - d.running[tid] = true - return &task - } - return nil -} - -func (d *dispatcher) Dispatch() { - task := d.nextTaskToRun() - if task == nil { - return - } - go func() { - tid := task.ID - logger := d.logger.With(zap.String("task-id", tid)) - defer func() { - d.mut.Lock() - delete(d.running, tid) - delete(d.tasks, tid) - d.mut.Unlock() - }() - stats := d.Stats() - logger.Debug("task was dispatched", zap.Time("time", stats.Time), - zap.Int("running", stats.Running), - zap.Int("waiting", stats.Waiting)) - err := task.Fn() - if err != nil { - logger.Error("task failed", zap.Error(err)) - } - if task.end != nil { - go task.end() - } - }() -} - -func (d *dispatcher) Start() { - if d.interval.Milliseconds() == 0 { - d.logger.Warn("dispatcher interval was set to zero, ticker won't start") - return - } - ticker := time.NewTicker(d.interval) - defer ticker.Stop() - for { - select { - case <-ticker.C: - d.mut.RLock() - running := len(d.running) - d.mut.RUnlock() - if running < d.concurrentLimit { - d.Dispatch() - } - case <-d.ctx.Done(): - d.logger.Debug("Context closed, exiting dispatcher interval routine") - return - } - } -} - -func (d *dispatcher) Stats() *DispatcherStats { - d.mut.RLock() - defer d.mut.RUnlock() - ds := DispatcherStats{ - Waiting: len(d.waiting), - Running: len(d.running), - Time: time.Now(), - } - return &ds -} diff --git a/utils/tasks/dispatcher_test.go b/utils/tasks/dispatcher_test.go deleted file mode 100644 index 969f1f166e..0000000000 --- a/utils/tasks/dispatcher_test.go +++ /dev/null @@ -1,90 +0,0 @@ -package tasks - -import ( - "context" - "fmt" - "github.com/stretchr/testify/require" - "go.uber.org/zap" - "sync" - "sync/atomic" - "testing" - "time" -) - -func TestNewDispatcher(t *testing.T) { - d := NewDispatcher(DispatcherOptions{ - Ctx: context.TODO(), - Logger: zap.L(), - Interval: 1 * time.Millisecond, - Concurrent: 10, - }) - tmap := sync.Map{} - n := 90 - tasks := []Task{} - for len(tasks) < n { - tid := fmt.Sprintf("id-%d", len(tasks)) - tasks = append(tasks, *NewTask(func() error { - tmap.Store(tid, true) - return nil - }, tid, nil)) - } - go d.Start() - for _, ts := range tasks { - require.NoError(t, d.Queue(ts)) - } - time.Sleep((100 + 20) * time.Millisecond) // 100 (expected) + 20 (buffer) - count := 0 - tmap.Range(func(key, value interface{}) bool { - count++ - return true - }) - require.Equal(t, count, n) -} - -func TestTask_End(t *testing.T) { - var i int64 - inc := func() error { - atomic.AddInt64(&i, 1) - return nil - } - t1 := NewTask(inc, "1", func() { - atomic.AddInt64(&i, -1) - }) - t2 := NewTask(inc, "2", func() { - require.Equal(t, i, 1) - }) - - d := NewDispatcher(DispatcherOptions{ - Ctx: context.TODO(), - Logger: zap.L(), - Interval: 2 * time.Millisecond, - Concurrent: 10, - }) - - require.NoError(t, d.Queue(*t1)) - require.NoError(t, d.Queue(*t2)) -} - - -func TestDispatcher_DistinctTasks(t *testing.T) { - d := NewDispatcher(DispatcherOptions{ - Ctx: context.TODO(), - Logger: zap.L(), - Interval: 1 * time.Millisecond, - Concurrent: 10, - }) - var i int64 - inc := func() error { - atomic.AddInt64(&i, 1) - return nil - } - t1 := NewTask(inc, "1", func() { - atomic.AddInt64(&i, 1) - }) - t2 := NewTask(inc, "1", func() { - atomic.AddInt64(&i, 1) - }) - - require.NoError(t, d.Queue(*t1)) - require.Error(t, d.Queue(*t2)) -} \ No newline at end of file diff --git a/utils/tasks/exec_queue.go b/utils/tasks/exec_queue.go new file mode 100644 index 0000000000..0ecaea4673 --- /dev/null +++ b/utils/tasks/exec_queue.go @@ -0,0 +1,138 @@ +package tasks + +import ( + "sync" + "time" +) + +// Fn represents a function to execute +type Fn func() error + +// Queue is an interface for event queue +type Queue interface { + Start() + Stop() + Queue(fn Fn) + QueueDistinct(Fn, string) + Wait() + Errors() []error +} + +// executionQueue implements Queue interface +type executionQueue struct { + waiting []Fn + stopped bool + + wg sync.WaitGroup + lock sync.RWMutex + + visited sync.Map + + errs []error + + interval time.Duration +} + +// NewExecutionQueue creates a new instance +func NewExecutionQueue(interval time.Duration) Queue { + if interval.Milliseconds() == 0 { + interval = 10 * time.Millisecond // default interval + } + q := executionQueue{ + waiting: []Fn{}, + wg: sync.WaitGroup{}, + lock: sync.RWMutex{}, + visited: sync.Map{}, + errs: []error{}, + interval: interval, + } + return &q +} + +// Stop stops the queue +func (eq *executionQueue) Stop() { + eq.lock.Lock() + defer eq.lock.Unlock() + + eq.stopped = true +} + +// Start starts to execute events +func (eq *executionQueue) Start() { + eq.lock.Lock() + eq.stopped = false + eq.lock.Unlock() + + for { + eq.lock.Lock() + if eq.stopped { + eq.lock.Unlock() + return + } + if len(eq.waiting) > 0 { + next := eq.waiting[0] + eq.waiting = eq.waiting[1:] + eq.lock.Unlock() + go eq.exec(next) + continue + } + eq.lock.Unlock() + time.Sleep(eq.interval) + } +} + +// QueueDistinct adds unique events to the queue +func (eq *executionQueue) QueueDistinct(fn Fn, id string) { + if _, exist := eq.visited.Load(id); !exist { + eq.Queue(fn) + eq.visited.Store(id, true) + } +} + +// Queue adds an event to the queue +func (eq *executionQueue) Queue(fn Fn) { + eq.lock.Lock() + defer eq.lock.Unlock() + + eq.wg.Add(1) + eq.waiting = append(eq.waiting, fn) +} + +// Wait waits until all events were executed +func (eq *executionQueue) Wait() { + eq.wg.Wait() +} + +// Errors returns the errors of events +func (eq *executionQueue) Errors() []error { + eq.lock.RLock() + defer eq.lock.RUnlock() + + return eq.errs +} + +func (eq *executionQueue) exec(fn Fn) { + defer eq.wg.Done() + + if err := fn(); err != nil { + eq.lock.Lock() + eq.errs = append(eq.errs, err) + eq.lock.Unlock() + } +} + +// getWaiting returns waiting events +func (eq *executionQueue) getWaiting() []Fn { + eq.lock.RLock() + defer eq.lock.RUnlock() + + return eq.waiting +} + +// isStopped returns the queue state +func (eq *executionQueue) isStopped() bool { + eq.lock.RLock() + defer eq.lock.RUnlock() + + return eq.stopped +} diff --git a/utils/tasks/simple_queue_test.go b/utils/tasks/exec_queue_test.go similarity index 57% rename from utils/tasks/simple_queue_test.go rename to utils/tasks/exec_queue_test.go index 04759c18f8..50859d8f89 100644 --- a/utils/tasks/simple_queue_test.go +++ b/utils/tasks/exec_queue_test.go @@ -7,9 +7,9 @@ import ( "time" ) -func TestSimpleQueue(t *testing.T) { +func TestExecQueue(t *testing.T) { var i int64 - q := NewSimpleQueue(1 * time.Millisecond) + q := NewExecutionQueue(1 * time.Millisecond) go q.Start() @@ -41,13 +41,13 @@ func TestSimpleQueue(t *testing.T) { }) q.Wait() require.Equal(t, int64(1), atomic.LoadInt64(&i)) - require.Equal(t, 0, len(q.(*simpleQueue).getWaiting())) - require.Equal(t, 0, len(q.(*simpleQueue).errs)) + require.Equal(t, 0, len(q.(*executionQueue).getWaiting())) + require.Equal(t, 0, len(q.(*executionQueue).errs)) } -func TestSimpleQueue_Stop(t *testing.T) { +func TestExecQueue_Stop(t *testing.T) { var i int64 - q := NewSimpleQueue(1 * time.Millisecond) + q := NewExecutionQueue(1 * time.Millisecond) go q.Start() @@ -55,30 +55,29 @@ func TestSimpleQueue_Stop(t *testing.T) { atomic.AddInt64(&i, 1) return nil }) - require.Equal(t, 1, len(q.(*simpleQueue).getWaiting())) + require.Equal(t, 1, len(q.(*executionQueue).getWaiting())) time.Sleep(2 * time.Millisecond) - require.Equal(t, 0, len(q.(*simpleQueue).getWaiting())) + require.Equal(t, 0, len(q.(*executionQueue).getWaiting())) - require.False(t, q.(*simpleQueue).isStopped()) + require.False(t, q.(*executionQueue).isStopped()) q.Stop() - require.True(t, q.(*simpleQueue).isStopped()) + require.True(t, q.(*executionQueue).isStopped()) q.Queue(func() error { atomic.AddInt64(&i, 1) return nil }) time.Sleep(2 * time.Millisecond) // q was stopped, therefore the function should be kept in waiting - require.Equal(t, 1, len(q.(*simpleQueue).getWaiting())) + require.Equal(t, 1, len(q.(*executionQueue).getWaiting())) require.Equal(t, int64(1), atomic.LoadInt64(&i)) } - -func TestSimpleQueue_Empty(t *testing.T) { - q := NewSimpleQueue(1 * time.Millisecond) +func TestExecQueue_Empty(t *testing.T) { + q := NewExecutionQueue(1 * time.Millisecond) go q.Start() q.Wait() q.Stop() - require.True(t, q.(*simpleQueue).isStopped()) + require.True(t, q.(*executionQueue).isStopped()) } diff --git a/utils/tasks/simple_queue.go b/utils/tasks/simple_queue.go deleted file mode 100644 index 8d4a874dca..0000000000 --- a/utils/tasks/simple_queue.go +++ /dev/null @@ -1,123 +0,0 @@ -package tasks - -import ( - "sync" - "time" -) - -// Queue is an interface for event queue -type Queue interface { - Start() - Stop() - Queue(fn Fn) - Wait() - Errors() []error -} - -// simpleQueue implements Queue interface -type simpleQueue struct { - waiting []Fn - stopped bool - - wg sync.WaitGroup - lock sync.RWMutex - - errs []error - - interval time.Duration -} - -// NewSimpleQueue creates a new instance -func NewSimpleQueue(interval time.Duration) Queue { - if interval.Milliseconds() == 0 { - interval = 10 * time.Millisecond // default interval - } - q := simpleQueue{ - waiting: []Fn{}, - wg: sync.WaitGroup{}, - lock: sync.RWMutex{}, - errs: []error{}, - interval: interval, - } - return &q -} - -// Stop stops the queue -func (q *simpleQueue) Stop() { - q.lock.Lock() - defer q.lock.Unlock() - - q.stopped = true -} - -// isStopped returns the queue state -func (q *simpleQueue) isStopped() bool { - q.lock.RLock() - defer q.lock.RUnlock() - - return q.stopped -} - -// Start starts executing events -func (q *simpleQueue) Start() { - q.lock.Lock() - q.stopped = false - q.lock.Unlock() - - for { - q.lock.Lock() - if q.stopped { - q.lock.Unlock() - return - } - if len(q.waiting) > 0 { - next := q.waiting[0] - q.waiting = q.waiting[1:] - q.lock.Unlock() - go q.exec(next) - continue - } - q.lock.Unlock() - time.Sleep(q.interval) - } -} - -// Queue adds an event to the queue -func (q *simpleQueue) Queue(fn Fn) { - q.lock.Lock() - defer q.lock.Unlock() - - q.wg.Add(1) - q.waiting = append(q.waiting, fn) -} - -// Wait waits until all events were executed -func (q *simpleQueue) Wait() { - q.wg.Wait() -} - -// Errors returns the errors of events -func (q *simpleQueue) Errors() []error { - q.lock.RLock() - defer q.lock.RUnlock() - - return q.errs -} - -func (q *simpleQueue) exec(fn Fn) { - defer q.wg.Done() - - if err := fn(); err != nil { - q.lock.Lock() - q.errs = append(q.errs, err) - q.lock.Unlock() - } -} - -// getWaiting returns waiting events -func (q *simpleQueue) getWaiting() []Fn { - q.lock.RLock() - defer q.lock.RUnlock() - - return q.waiting -} \ No newline at end of file From 98d0cde87b09da5976474693f3756965d7aced78 Mon Sep 17 00:00:00 2001 From: Amir Yahalom Date: Tue, 14 Sep 2021 10:58:47 +0300 Subject: [PATCH 05/16] fix logs --- exporter/ibft/decided_reader.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exporter/ibft/decided_reader.go b/exporter/ibft/decided_reader.go index 2217dbc0ca..472969ab52 100644 --- a/exporter/ibft/decided_reader.go +++ b/exporter/ibft/decided_reader.go @@ -107,7 +107,7 @@ func (r *decidedReader) listenToNetwork() { } logger := r.logger.With(messageFields(msg)...) if err := validateDecidedMsg(msg, r.validatorShare); err != nil { - logger.Debug("received invalid decided") + logger.Debug("received invalid decided message") continue } if msg.Message.SeqNumber == 0 { @@ -115,7 +115,7 @@ func (r *decidedReader) listenToNetwork() { continue } if err := r.handleNewDecidedMessage(msg); err != nil { - logger.Error("could not save highest decided") + logger.Error("could not handle decided message") } } } From dfcf32cec9816583e58692c8ed7540903435f902 Mon Sep 17 00:00:00 2001 From: Amir Yahalom Date: Tue, 14 Sep 2021 12:02:23 +0300 Subject: [PATCH 06/16] avoid failure when all indices were fetched --- validator/controller.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/validator/controller.go b/validator/controller.go index 4fc76c9e48..97da2c0290 100644 --- a/validator/controller.go +++ b/validator/controller.go @@ -178,7 +178,9 @@ func (c *controller) GetValidatorsIndices() []spec.ValidatorIndex { c.logger.Error("failed to get all validators public keys", zap.Error(err)) } - go c.addValidatorsIndices(toFetch) + if len(toFetch) > 0 { + go c.addValidatorsIndices(toFetch) + } return indices } From 7b39b23c6e3b0e577b5c70414e8b095452cf0884 Mon Sep 17 00:00:00 2001 From: Amir Yahalom Date: Tue, 14 Sep 2021 12:05:01 +0300 Subject: [PATCH 07/16] deploy exporter --- .gitlab-ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 7e7531de7d..d7a7813752 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -33,7 +33,7 @@ Build stage Docker image: - docker tag $IMAGE_NAME:$CI_BUILD_REF $DOCKER_REPO_INFRA_STAGE:$CI_BUILD_REF - $DOCKER_LOGIN_TO_INFRA_STAGE_REPO && docker push $DOCKER_REPO_INFRA_STAGE:$CI_BUILD_REF only: - - stage + - exporter Deploy ssv exporter to blox-infra-stage cluster: stage: deploy @@ -47,7 +47,7 @@ Deploy ssv exporter to blox-infra-stage cluster: - mv kubectl /usr/bin/ - .k8/scripts/deploy-yamls-on-k8s.sh $DOCKER_REPO_INFRA_STAGE $CI_BUILD_REF ssv $APP_REPLICAS_INFRA_STAGE blox-infra-stage kubernetes-admin@blox-infra stage.ssv.network $K8S_API_VERSION only: - - stage + - exporter Deploy ssv nodes to blox-infra-stage cluster: stage: deploy From 8c8840d8bf41964fe6fa5686e425cc67dca67def Mon Sep 17 00:00:00 2001 From: Amir Yahalom Date: Tue, 14 Sep 2021 18:13:10 +0300 Subject: [PATCH 08/16] retry sync if fails --- exporter/node.go | 11 ++++++++--- utils/tasks/retry.go | 13 +++++++++++++ utils/tasks/retry_test.go | 24 ++++++++++++++++++++++++ 3 files changed, 45 insertions(+), 3 deletions(-) create mode 100644 utils/tasks/retry.go create mode 100644 utils/tasks/retry_test.go diff --git a/exporter/node.go b/exporter/node.go index 97497c63b4..292cc5c145 100644 --- a/exporter/node.go +++ b/exporter/node.go @@ -256,8 +256,7 @@ func (exp *exporter) triggerValidator(validatorPubKey *bls.PublicKey) error { if err != nil { return errors.Wrap(err, "could not get validator share") } - logger := exp.logger.With(zap.String("pubKey", pubkey)) - logger.Debug("validator was triggered") + exp.logger.Debug("validator was triggered", zap.String("pubKey", pubkey)) exp.mainQueue.QueueDistinct(func() error { return exp.setup(validatorShare) @@ -270,7 +269,13 @@ func (exp *exporter) setup(validatorShare *validatorstorage.Share) error { pubKey := validatorShare.PublicKey.SerializeToHexStr() logger := exp.logger.With(zap.String("pubKey", pubKey)) decidedReader := exp.getDecidedReader(validatorShare) - if err := decidedReader.Sync(); err != nil { + if err := tasks.Retry(func() error { + if err := decidedReader.Sync(); err != nil { + logger.Error("could not sync validator", zap.Error(err)) + return err + } + return nil + }, 5); err != nil { logger.Error("could not setup validator, sync failed", zap.Error(err)) return err } diff --git a/utils/tasks/retry.go b/utils/tasks/retry.go new file mode 100644 index 0000000000..eb63297f5c --- /dev/null +++ b/utils/tasks/retry.go @@ -0,0 +1,13 @@ +package tasks + +// Retry executes a function x times or until successful +func Retry(fn Fn, retries int) error { + var err error + for retries > 0 { + if err = fn(); err == nil { + return nil + } + retries-- + } + return err +} diff --git a/utils/tasks/retry_test.go b/utils/tasks/retry_test.go new file mode 100644 index 0000000000..9d76ff5b62 --- /dev/null +++ b/utils/tasks/retry_test.go @@ -0,0 +1,24 @@ +package tasks + +import ( + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + "sync/atomic" + "testing" +) + +func TestRetry(t *testing.T) { + var i int64 + + inc := func() error { + atomic.AddInt64(&i, 1) + if i < 3 { + return errors.New("test-error") + } + return nil + } + + require.Nil(t, Retry(inc, 4)) + atomic.StoreInt64(&i, 0) + require.EqualError(t, Retry(inc, 2), "test-error") +} From 6b91ed6fe0e7e8eee3829eab7094f178ca513292 Mon Sep 17 00:00:00 2001 From: Amir Yahalom Date: Tue, 14 Sep 2021 18:13:42 +0300 Subject: [PATCH 09/16] revert deploy --- .gitlab-ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index d7a7813752..7e7531de7d 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -33,7 +33,7 @@ Build stage Docker image: - docker tag $IMAGE_NAME:$CI_BUILD_REF $DOCKER_REPO_INFRA_STAGE:$CI_BUILD_REF - $DOCKER_LOGIN_TO_INFRA_STAGE_REPO && docker push $DOCKER_REPO_INFRA_STAGE:$CI_BUILD_REF only: - - exporter + - stage Deploy ssv exporter to blox-infra-stage cluster: stage: deploy @@ -47,7 +47,7 @@ Deploy ssv exporter to blox-infra-stage cluster: - mv kubectl /usr/bin/ - .k8/scripts/deploy-yamls-on-k8s.sh $DOCKER_REPO_INFRA_STAGE $CI_BUILD_REF ssv $APP_REPLICAS_INFRA_STAGE blox-infra-stage kubernetes-admin@blox-infra stage.ssv.network $K8S_API_VERSION only: - - exporter + - stage Deploy ssv nodes to blox-infra-stage cluster: stage: deploy From 9d3c3c6384dedcc163c11ac317d60ad60de2d53f Mon Sep 17 00:00:00 2001 From: github-actions Date: Tue, 14 Sep 2021 17:17:49 +0000 Subject: [PATCH 10/16] update code coverage badge --- docs/resources/cov-badge.svg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/resources/cov-badge.svg b/docs/resources/cov-badge.svg index 91eda4f7fc..8571a2fddc 100644 --- a/docs/resources/cov-badge.svg +++ b/docs/resources/cov-badge.svg @@ -1 +1 @@ -coverage: 56.8%coverage56.8% \ No newline at end of file +coverage: 55.7%coverage55.7% \ No newline at end of file From a6826d5bd8d6198ceab3db9c8d455be3ae7e0932 Mon Sep 17 00:00:00 2001 From: amir-blox <83904651+amir-blox@users.noreply.github.com> Date: Wed, 15 Sep 2021 14:34:31 +0300 Subject: [PATCH 11/16] Add metrics (#307) * add comment * added validator status metric * added metrics for decided signers * updated network metrics * add lock before accessing go-client map. will help avoid 'fatal error: bad map state' * add ibft inbound metric --- beacon/goclient/goclient.go | 26 ++++++++++++++++---------- exporter/ibft/decided_reader.go | 6 +++--- exporter/ibft/factory.go | 3 +++ exporter/ibft/metrics.go | 31 +++++++++++++++++++++++++++++++ ibft/ibft_decided.go | 2 ++ ibft/metrics.go | 20 ++++++++++++++++++++ network/p2p/metrics.go | 28 +++++++++++++++++++++++++--- network/p2p/p2p.go | 14 ++++++++++---- network/p2p/p2p_decided.go | 4 +++- network/p2p/p2p_ibft.go | 8 ++++++-- validator/controller.go | 11 +++++++++-- validator/metrics.go | 21 ++++++++++++++++++++- validator/validator.go | 2 ++ 13 files changed, 150 insertions(+), 26 deletions(-) create mode 100644 exporter/ibft/metrics.go diff --git a/beacon/goclient/goclient.go b/beacon/goclient/goclient.go index 0eee688124..dfe13defc3 100644 --- a/beacon/goclient/goclient.go +++ b/beacon/goclient/goclient.go @@ -19,6 +19,7 @@ import ( "github.com/rs/zerolog" "go.uber.org/zap" "log" + "sync" "time" ) @@ -46,11 +47,12 @@ func init() { // goClient implementing Beacon struct type goClient struct { - ctx context.Context - logger *zap.Logger - network core.Network - client client.Service - graffiti []byte + ctx context.Context + logger *zap.Logger + network core.Network + client client.Service + indicesMapLock sync.Mutex + graffiti []byte } // verifies that the client implements HealthCheckAgent @@ -75,11 +77,12 @@ func New(opt beacon.Options) (beacon.Beacon, error) { logger.Info("successfully connected to beacon client") _client := &goClient{ - ctx: opt.Context, - logger: logger, - network: core.NetworkFromString(opt.Network), - client: autoClient, - graffiti: []byte("BloxStaking"), + ctx: opt.Context, + logger: logger, + network: core.NetworkFromString(opt.Network), + client: autoClient, + indicesMapLock: sync.Mutex{}, + graffiti: []byte("BloxStaking"), } return _client, nil @@ -109,6 +112,9 @@ func (gc *goClient) HealthCheck() []string { } func (gc *goClient) ExtendIndexMap(index spec.ValidatorIndex, pubKey spec.BLSPubKey) { + gc.indicesMapLock.Lock() + defer gc.indicesMapLock.Unlock() + gc.client.ExtendIndexMap(map[spec.ValidatorIndex]spec.BLSPubKey{index: pubKey}) } diff --git a/exporter/ibft/decided_reader.go b/exporter/ibft/decided_reader.go index 472969ab52..a909b4f2f9 100644 --- a/exporter/ibft/decided_reader.go +++ b/exporter/ibft/decided_reader.go @@ -94,12 +94,11 @@ func (r *decidedReader) Start() error { if err := r.waitForMinPeers(r.validatorShare.PublicKey, 2); err != nil { return errors.Wrap(err, "could not wait for min peers") } - r.listenToNetwork() + r.listenToNetwork(r.network.ReceivedDecidedChan()) return nil } -func (r *decidedReader) listenToNetwork() { - cn := r.network.ReceivedDecidedChan() +func (r *decidedReader) listenToNetwork(cn <-chan *proto.SignedMessage) { r.logger.Debug("listening to decided messages") for msg := range cn { if err := validateMsg(msg, string(r.identifier)); err != nil { @@ -131,6 +130,7 @@ func (r *decidedReader) handleNewDecidedMessage(msg *proto.SignedMessage) error return errors.Wrap(err, "could not save decided") } logger.Debug("decided saved") + reportDecided(msg, r.validatorShare) return r.checkHighestDecided(msg) } diff --git a/exporter/ibft/factory.go b/exporter/ibft/factory.go index a354bdf04e..98af629b9f 100644 --- a/exporter/ibft/factory.go +++ b/exporter/ibft/factory.go @@ -2,6 +2,9 @@ package ibft import "sync" +// TODO: add to ibft package as most of parts here are code duplicates +// tests should be added as well, currently it would cause redundant maintenance + var ( decidedReaders sync.Map networkReaders sync.Map diff --git a/exporter/ibft/metrics.go b/exporter/ibft/metrics.go new file mode 100644 index 0000000000..d08f7eec02 --- /dev/null +++ b/exporter/ibft/metrics.go @@ -0,0 +1,31 @@ +package ibft + +import ( + "github.com/bloxapp/ssv/ibft/proto" + "github.com/bloxapp/ssv/validator/storage" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "log" + "strconv" +) + +var ( + metricsDecidedSignersExp = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "ssv:validator:ibft_decided_signers_exp", + Help: "Signers of the highest decided sequence number", + }, []string{"lambda", "pubKey", "seq", "nodeId"}) +) + +func init() { + if err := prometheus.Register(metricsDecidedSignersExp); err != nil { + log.Println("could not register prometheus collector") + } +} + +func reportDecided(msg *proto.SignedMessage, share *storage.Share) { + for _, nodeID := range msg.SignerIds { + metricsDecidedSignersExp.WithLabelValues(string(msg.Message.GetLambda()), + share.PublicKey.SerializeToHexStr(), strconv.FormatUint(msg.Message.SeqNumber, 10), + strconv.FormatUint(nodeID, 10)) + } +} \ No newline at end of file diff --git a/ibft/ibft_decided.go b/ibft/ibft_decided.go index 6d34c7d6df..8b3bc6afa7 100644 --- a/ibft/ibft_decided.go +++ b/ibft/ibft_decided.go @@ -61,6 +61,8 @@ func (i *ibftImpl) ProcessDecidedMessage(msg *proto.SignedMessage) { return } + reportDecided(msg, i.ValidatorShare) + // decided for current instance if i.forceDecideCurrentInstance(msg) { return diff --git a/ibft/metrics.go b/ibft/metrics.go index ea4a0f3c71..ce8687c084 100644 --- a/ibft/metrics.go +++ b/ibft/metrics.go @@ -1,9 +1,12 @@ package ibft import ( + "github.com/bloxapp/ssv/ibft/proto" + "github.com/bloxapp/ssv/validator/storage" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "log" + "strconv" ) var ( @@ -19,6 +22,10 @@ var ( Name: "ssv:validator:ibft_round", Help: "IBFTs round", }, []string{"lambda", "pubKey"}) + metricsDecidedSigners = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "ssv:validator:ibft_decided_signers", + Help: "The highest decided sequence number", + }, []string{"lambda", "pubKey", "seq", "nodeId"}) ) func init() { @@ -31,4 +38,17 @@ func init() { if err := prometheus.Register(metricsIBFTRound); err != nil { log.Println("could not register prometheus collector") } + if err := prometheus.Register(metricsDecidedSigners); err != nil { + log.Println("could not register prometheus collector") + } +} + +func reportDecided(msg *proto.SignedMessage, share *storage.Share) { + for _, nodeID := range msg.SignerIds { + metricsDecidedSigners.WithLabelValues( + string(msg.Message.GetLambda()), + share.PublicKey.SerializeToHexStr(), + strconv.FormatUint(msg.Message.SeqNumber, 10), + strconv.FormatUint(nodeID, 10)) + } } diff --git a/network/p2p/metrics.go b/network/p2p/metrics.go index d9e1db9fc7..b92ef82ddc 100644 --- a/network/p2p/metrics.go +++ b/network/p2p/metrics.go @@ -1,9 +1,11 @@ package p2p import ( + "github.com/bloxapp/ssv/network" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "log" + "strconv" ) var ( @@ -14,15 +16,19 @@ var ( metricsNetMsgsInbound = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "ssv:network:net_messages_inbound", Help: "Count incoming network messages", - }, []string{"topic"}) + }, []string{"pubKey", "type", "signer"}) + metricsIBFTMsgsInbound = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "ssv:network:ibft_messages_inbound", + Help: "Count incoming network messages", + }, []string{"pubKey", "signer", "seq", "round", "type"}) metricsIBFTMsgsOutbound = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "ssv:network:ibft_messages_outbound", Help: "Count IBFT messages outbound", - }, []string{"topic"}) + }, []string{"pubKey", "type", "seq", "round"}) metricsIBFTDecidedMsgsOutbound = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "ssv:network:ibft_decided_messages_outbound", Help: "Count IBFT decided messages outbound", - }, []string{"topic"}) + }, []string{"pubKey", "seq"}) ) func init() { @@ -39,3 +45,19 @@ func init() { log.Println("could not register prometheus collector") } } + +func reportIncomingSignedMessage(cm *network.Message, topic string) { + if cm.SignedMessage != nil && len(cm.SignedMessage.SignerIds) > 0 { + for _, nodeID := range cm.SignedMessage.SignerIds { + metricsNetMsgsInbound.WithLabelValues(unwrapTopicName(topic), cm.Type.String(), + strconv.FormatUint(nodeID, 10)).Inc() + if cm.Type == network.NetworkMsg_IBFTType && cm.SignedMessage.Message != nil { + seq := strconv.FormatUint(cm.SignedMessage.Message.SeqNumber, 10) + round := strconv.FormatUint(cm.SignedMessage.Message.Round, 10) + metricsIBFTMsgsInbound.WithLabelValues(unwrapTopicName(topic), + strconv.FormatUint(nodeID, 10), seq, round, + cm.SignedMessage.Message.Type.String()).Inc() + } + } + } +} diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 81f1477637..0051eca6ac 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "net" + "strings" "sync" "time" @@ -34,7 +35,7 @@ const ( // MsgChanSize is the buffer size of the message channel MsgChanSize = 128 - topicFmt = "bloxstaking.ssv.%s" + topicPrefix = "bloxstaking.ssv" syncStreamProtocol = "/sync/0.0.1" ) @@ -244,7 +245,7 @@ func (n *p2pNetwork) listen(sub *pubsub.Subscription) { n.logger.Error("failed to unmarshal message", zap.Error(err)) continue } - metricsNetMsgsInbound.WithLabelValues(t).Inc() + reportIncomingSignedMessage(&cm, t) n.propagateSignedMsg(&cm) } } @@ -328,8 +329,13 @@ func (n *p2pNetwork) allPeersOfTopic(topic *pubsub.Topic) []string { } // getTopicName return formatted topic name -func getTopicName(topicName string) string { - return fmt.Sprintf(topicFmt, topicName) +func getTopicName(pk string) string { + return fmt.Sprintf("%s.%s", topicPrefix, pk) +} + +// getTopicName return formatted topic name +func unwrapTopicName(topicName string) string { + return strings.Replace(topicName, fmt.Sprintf("%s.", topicPrefix), "", 1) } func (n *p2pNetwork) MaxBatch() uint64 { diff --git a/network/p2p/p2p_decided.go b/network/p2p/p2p_decided.go index f8b2f1ec0a..1349d8c88d 100644 --- a/network/p2p/p2p_decided.go +++ b/network/p2p/p2p_decided.go @@ -6,6 +6,7 @@ import ( "github.com/bloxapp/ssv/network" "github.com/pkg/errors" "go.uber.org/zap" + "strconv" ) // BroadcastDecided broadcasts a decided instance with collected signatures @@ -25,7 +26,8 @@ func (n *p2pNetwork) BroadcastDecided(topicName []byte, msg *proto.SignedMessage n.logger.Debug("Broadcasting decided message", zap.String("lambda", string(msg.Message.Lambda)), zap.Any("topic", topic), zap.Any("peers", topic.ListPeers())) - metricsIBFTDecidedMsgsOutbound.WithLabelValues(topic.String()).Inc() + metricsIBFTDecidedMsgsOutbound.WithLabelValues(unwrapTopicName(topic.String()), + strconv.FormatUint(msg.Message.SeqNumber, 10)).Inc() return topic.Publish(n.ctx, msgBytes) } diff --git a/network/p2p/p2p_ibft.go b/network/p2p/p2p_ibft.go index d9a06d597c..114c1af338 100644 --- a/network/p2p/p2p_ibft.go +++ b/network/p2p/p2p_ibft.go @@ -6,6 +6,7 @@ import ( "github.com/bloxapp/ssv/network" "github.com/pkg/errors" "go.uber.org/zap" + "strconv" ) // Broadcast propagates a signed message to all peers @@ -23,8 +24,11 @@ func (n *p2pNetwork) Broadcast(topicName []byte, msg *proto.SignedMessage) error return errors.Wrap(err, "failed to get topic") } - n.logger.Debug("broadcasting ibft msg", zap.String("lambda", string(msg.Message.Lambda)), zap.Any("topic", topic), zap.Any("peers", topic.ListPeers())) - metricsIBFTMsgsOutbound.WithLabelValues(topic.String()).Inc() + n.logger.Debug("broadcasting ibft msg", zap.String("lambda", string(msg.Message.Lambda)), + zap.Any("topic", topic), zap.Any("peers", topic.ListPeers())) + + metricsIBFTMsgsOutbound.WithLabelValues(unwrapTopicName(topic.String()), msg.Message.Type.String(), + strconv.FormatUint(msg.Message.SeqNumber, 10), strconv.FormatUint(msg.Message.Round, 10)).Inc() return topic.Publish(n.ctx, msgBytes) } diff --git a/validator/controller.go b/validator/controller.go index 97da2c0290..9e043442ed 100644 --- a/validator/controller.go +++ b/validator/controller.go @@ -104,9 +104,10 @@ func (c *controller) ListenToEth1Events(cn pubsub.SubjectChannel) { // ProcessEth1Event handles a single event func (c *controller) ProcessEth1Event(e eth1.Event) error { if validatorAddedEvent, ok := e.Data.(eth1.ValidatorAddedEvent); ok { + pubKey := hex.EncodeToString(validatorAddedEvent.PublicKey) if err := c.handleValidatorAddedEvent(validatorAddedEvent); err != nil { c.logger.Error("could not process validator", - zap.String("pubkey", hex.EncodeToString(validatorAddedEvent.PublicKey)), zap.Error(err)) + zap.String("pubkey", pubKey), zap.Error(err)) return err } } @@ -143,8 +144,10 @@ func (c *controller) StartValidators() { for _, validatorShare := range shares { v := c.validatorsMap.GetOrCreateValidator(validatorShare) if err := v.Start(); err != nil { + pk := v.Share.PublicKey.SerializeToHexStr() c.logger.Error("could not start validator", zap.Error(err), - zap.String("pubkey", v.Share.PublicKey.SerializeToHexStr())) + zap.String("pubkey", pk)) + metricsValidatorStatus.WithLabelValues(pk).Set(float64(validatorStatusError)) errs = append(errs, err) continue } @@ -168,6 +171,7 @@ func (c *controller) GetValidatorsIndices() []spec.ValidatorIndex { c.logger.Warn("validator share doesn't have an index", zap.String("pubKey", v.Share.PublicKey.SerializeToHexStr())) toFetch = append(toFetch, v.Share.PublicKey.Serialize()) + metricsValidatorStatus.WithLabelValues(v.Share.PublicKey.SerializeToHexStr()).Set(float64(validatorStatusNoIndex)) } else { index := spec.ValidatorIndex(*v.Share.Index) indices = append(indices, index) @@ -195,6 +199,7 @@ func (c *controller) handleValidatorAddedEvent(validatorAddedEvent eth1.Validato // TODO: handle updateValidator in the future return nil } + metricsValidatorStatus.WithLabelValues(pubKey).Set(float64(validatorStatusInactive)) validatorShare, err := c.createShare(validatorAddedEvent) if err != nil { return errors.Wrap(err, "failed to create share") @@ -205,6 +210,7 @@ func (c *controller) handleValidatorAddedEvent(validatorAddedEvent eth1.Validato } if !found { if err := c.addValidatorIndex(validatorShare); err != nil { + metricsValidatorStatus.WithLabelValues(pubKey).Set(float64(validatorStatusNoIndex)) logger.Warn("could not add validator index", zap.Error(err)) } if err := c.collection.SaveValidatorShare(validatorShare); err != nil { @@ -219,6 +225,7 @@ func (c *controller) handleValidatorAddedEvent(validatorAddedEvent eth1.Validato v := c.validatorsMap.GetOrCreateValidator(validatorShare) if err := v.Start(); err != nil { + metricsValidatorStatus.WithLabelValues(pubKey).Set(float64(validatorStatusError)) return errors.Wrap(err, "could not start validator") } diff --git a/validator/metrics.go b/validator/metrics.go index 4750305c74..d4cfd89ae4 100644 --- a/validator/metrics.go +++ b/validator/metrics.go @@ -20,6 +20,10 @@ var ( Name: "ssv:validator:ibft_current_slot", Help: "Current running slot", }, []string{"pubKey"}) + metricsValidatorStatus = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "ssv:validator:status", + Help: "Validator status", + }, []string{"pubKey"}) ) func init() { @@ -32,6 +36,9 @@ func init() { if err := prometheus.Register(metricsCurrentSlot); err != nil { log.Println("could not register prometheus collector") } + if err := prometheus.Register(metricsValidatorStatus); err != nil { + log.Println("could not register prometheus collector") + } } func (v *Validator) reportDutyExecutionMetrics(duty *beacon.Duty) func() { @@ -43,8 +50,20 @@ func (v *Validator) reportDutyExecutionMetrics(duty *beacon.Duty) func() { metricsCurrentSlot.WithLabelValues(pubKey).Set(float64(duty.Slot)) + metricsValidatorStatus.WithLabelValues(pubKey).Set(float64(validatorStatusRunning)) + return func() { metricsRunningIBFTsCount.Dec() metricsRunningIBFTs.WithLabelValues(pubKey).Dec() } -} \ No newline at end of file +} + +type validatorStatus int32 + +var ( + validatorStatusInactive validatorStatus = 0 + validatorStatusNoIndex validatorStatus = 1 + validatorStatusError validatorStatus = 2 + validatorStatusOnline validatorStatus = 3 + validatorStatusRunning validatorStatus = 4 +) diff --git a/validator/validator.go b/validator/validator.go index 1c6c6471ff..cdcd1f5f6f 100644 --- a/validator/validator.go +++ b/validator/validator.go @@ -97,6 +97,8 @@ func (v *Validator) Start() error { } }) + metricsValidatorStatus.WithLabelValues(v.Share.PublicKey.SerializeToHexStr()).Set(float64(validatorStatusOnline)) + return nil } From 0837c555e8e0df9ca5b9c6f35f265a89cbdc183d Mon Sep 17 00:00:00 2001 From: amir-blox <83904651+amir-blox@users.noreply.github.com> Date: Fri, 17 Sep 2021 09:33:14 +0300 Subject: [PATCH 12/16] fix validator status (#308) --- validator/metrics.go | 4 +++- validator/validator.go | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/validator/metrics.go b/validator/metrics.go index d4cfd89ae4..27362ce6cc 100644 --- a/validator/metrics.go +++ b/validator/metrics.go @@ -41,6 +41,7 @@ func init() { } } +// reportDutyExecutionMetrics reports duty execution metrics, returns done function to be called once duty is done func (v *Validator) reportDutyExecutionMetrics(duty *beacon.Duty) func() { // reporting metrics metricsRunningIBFTsCount.Inc() @@ -55,6 +56,7 @@ func (v *Validator) reportDutyExecutionMetrics(duty *beacon.Duty) func() { return func() { metricsRunningIBFTsCount.Dec() metricsRunningIBFTs.WithLabelValues(pubKey).Dec() + metricsValidatorStatus.WithLabelValues(pubKey).Set(float64(validatorStatusReady)) } } @@ -64,6 +66,6 @@ var ( validatorStatusInactive validatorStatus = 0 validatorStatusNoIndex validatorStatus = 1 validatorStatusError validatorStatus = 2 - validatorStatusOnline validatorStatus = 3 + validatorStatusReady validatorStatus = 3 validatorStatusRunning validatorStatus = 4 ) diff --git a/validator/validator.go b/validator/validator.go index cdcd1f5f6f..e005474c0f 100644 --- a/validator/validator.go +++ b/validator/validator.go @@ -97,7 +97,7 @@ func (v *Validator) Start() error { } }) - metricsValidatorStatus.WithLabelValues(v.Share.PublicKey.SerializeToHexStr()).Set(float64(validatorStatusOnline)) + metricsValidatorStatus.WithLabelValues(v.Share.PublicKey.SerializeToHexStr()).Set(float64(validatorStatusReady)) return nil } From 180c0d4360650fb3b1931409c3847ca5f101bb0e Mon Sep 17 00:00:00 2001 From: Amir Yahalom Date: Fri, 17 Sep 2021 17:22:27 +0300 Subject: [PATCH 13/16] validator + operator dashboards --- monitoring/README.md | 6 +- .../grafana/dashboard_ssv_operator.json | 318 +++-- .../grafana/dashboard_ssv_validator.json | 1066 +++++++++++++++++ 3 files changed, 1312 insertions(+), 78 deletions(-) create mode 100644 monitoring/grafana/dashboard_ssv_validator.json diff --git a/monitoring/README.md b/monitoring/README.md index 49909744df..6077750da7 100644 --- a/monitoring/README.md +++ b/monitoring/README.md @@ -52,8 +52,10 @@ In order to setup a grafana dashboard do the following: 1. Enable metrics (`MetricsAPIPort`) 2. Setup Prometheus as mentioned in the beginning of this document and add as data source * Job name assumed to be '`ssv`' -3. Import [SSV Operator dashboard](./grafana/dashboard_ssv_operator.json) to Grafana -4. Align dashboard variables: +3. Import dashboards to Grafana: + * [SSV Operator Node dashboard](./grafana/dashboard_ssv_operator.json) + * [SSV Validator dashboard](./grafana/dashboard_ssv_validator.json) +5. Align dashboard variables: * `instance` - container name, used in 'instance' field for metrics coming from prometheus. \ In the given dashboard, instances names are: `ssv-node-v2-`, make sure to change according to your setup diff --git a/monitoring/grafana/dashboard_ssv_operator.json b/monitoring/grafana/dashboard_ssv_operator.json index 069cc4bd60..d88b4e8d9e 100644 --- a/monitoring/grafana/dashboard_ssv_operator.json +++ b/monitoring/grafana/dashboard_ssv_operator.json @@ -2,11 +2,18 @@ "__inputs": [ { "name": "DS_PROMETHEUS", - "label": "prometheus-infra-stage", + "label": "prometheus-infra-prod", "description": "", "type": "datasource", "pluginId": "prometheus", "pluginName": "Prometheus" + }, + { + "name": "VAR_VALIDATOR_DASHBOARD_ID", + "type": "constant", + "label": "validator_dashboard_id", + "value": "XIbEQ37aa", + "description": "" } ], "__requires": [ @@ -64,7 +71,7 @@ "gnetId": null, "graphTooltip": 0, "id": null, - "iteration": 1630047547674, + "iteration": 1631686313507, "links": [], "panels": [ { @@ -122,7 +129,7 @@ { "options": { "0": { - "color": "text", + "color": "rgba(92, 92, 92, 1)", "index": 0, "text": "Not Started" }, @@ -163,7 +170,7 @@ }, { "id": "custom.width", - "value": 150 + "value": 123 } ] }, @@ -215,7 +222,7 @@ }, { "id": "custom.width", - "value": 125 + "value": 146 } ] }, @@ -271,27 +278,31 @@ }, { "id": "custom.width", - "value": 125 + "value": 100 } ] }, { "matcher": { "id": "byName", - "options": "Running Instance" + "options": "IBFT Instance" }, "properties": [ { "id": "custom.displayMode", "value": "color-text" }, + { + "id": "custom.width", + "value": 148 + }, { "id": "mappings", "value": [ { "options": { "0": { - "color": "transparent", + "color": "gray", "index": 0, "text": "IDLE" } @@ -306,15 +317,11 @@ "index": 1, "text": "Running" }, - "to": 2 + "to": 3 }, "type": "range" } ] - }, - { - "id": "custom.width", - "value": 150 } ] }, @@ -363,6 +370,16 @@ } }, "type": "value" + }, + { + "options": { + "match": "null+nan", + "result": { + "index": 1, + "text": "https://explorer.stage.ssv.network/images/beaconcha-white.svg" + } + }, + "type": "special" } ] }, @@ -414,6 +431,114 @@ "value": 125 } ] + }, + { + "matcher": { + "id": "byName", + "options": "Dashboard" + }, + "properties": [ + { + "id": "mappings", + "value": [ + { + "options": { + "ssv": { + "index": 0, + "text": "https://explorer.stage.ssv.network/images/beaconcha-white.svg" + } + }, + "type": "value" + }, + { + "options": { + "match": "null+nan", + "result": { + "index": 1, + "text": "https://explorer.stage.ssv.network/images/beaconcha-white.svg" + } + }, + "type": "special" + } + ] + }, + { + "id": "links", + "value": [ + { + "targetBlank": true, + "title": "", + "url": "https://grafana.k8.blox.io/d/${validator_dashboard_id}/ssv-validator?orgId=1&refresh=10s&var-instance=${instance}&var-pubkey=${__data.fields[\"Public Key\"]}" + } + ] + }, + { + "id": "custom.width", + "value": 84 + }, + { + "id": "custom.filterable" + }, + { + "id": "custom.displayMode", + "value": "image" + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "Validator Status" + }, + "properties": [ + { + "id": "mappings", + "value": [ + { + "options": { + "0": { + "color": "gray", + "index": 0, + "text": "Inactive" + }, + "1": { + "color": "orange", + "index": 1, + "text": "No Index" + }, + "2": { + "color": "red", + "index": 2, + "text": "Error" + }, + "3": { + "color": "green", + "index": 3, + "text": "Ready" + }, + "4": { + "color": "green", + "index": 4, + "text": "Running" + } + }, + "type": "value" + } + ] + }, + { + "id": "custom.align", + "value": "left" + }, + { + "id": "custom.displayMode", + "value": "color-text" + }, + { + "id": "custom.width", + "value": 162 + } + ] } ] }, @@ -497,6 +622,16 @@ "interval": "", "legendFormat": "", "refId": "G" + }, + { + "exemplar": true, + "expr": "ssv:validator:status{instance=~\"$instance.*\"}", + "format": "table", + "hide": false, + "instant": true, + "interval": "", + "legendFormat": "", + "refId": "H" } ], "title": "Validators (Attestation)", @@ -518,6 +653,7 @@ "Time 5": true, "Time 6": true, "Time 7": true, + "Time 8": true, "__name__ 1": true, "__name__ 2": true, "__name__ 3": true, @@ -525,6 +661,7 @@ "__name__ 5": true, "__name__ 6": true, "__name__ 7": true, + "__name__ 8": true, "instance 1": true, "instance 2": true, "instance 3": true, @@ -532,58 +669,65 @@ "instance 5": true, "instance 6": true, "instance 7": true, + "instance 8": true, "job 1": false, - "job 2": true, + "job 2": false, "job 3": true, "job 4": true, "job 5": true, "job 6": true, "job 7": true, + "job 8": true, "lambda 1": true, "lambda 2": true, "lambda 3": true, "lambda 4": true }, "indexByName": { - "Time 1": 11, - "Time 2": 16, - "Time 3": 10, - "Time 4": 22, - "Time 5": 27, - "Time 6": 32, - "Time 7": 36, - "Value #A": 9, + "Time 1": 12, + "Time 2": 17, + "Time 3": 11, + "Time 4": 23, + "Time 5": 28, + "Time 6": 33, + "Time 7": 37, + "Time 8": 41, + "Value #A": 10, "Value #B": 3, - "Value #C": 8, - "Value #D": 6, - "Value #E": 5, - "Value #F": 4, - "Value #G": 7, - "__name__ 1": 12, - "__name__ 2": 15, - "__name__ 3": 18, - "__name__ 4": 23, - "__name__ 5": 28, - "__name__ 6": 33, - "__name__ 7": 37, - "instance 1": 13, - "instance 2": 17, - "instance 3": 19, - "instance 4": 24, - "instance 5": 29, - "instance 6": 34, - "instance 7": 38, + "Value #C": 9, + "Value #D": 7, + "Value #E": 6, + "Value #F": 5, + "Value #G": 8, + "Value #H": 4, + "__name__ 1": 13, + "__name__ 2": 16, + "__name__ 3": 19, + "__name__ 4": 24, + "__name__ 5": 29, + "__name__ 6": 34, + "__name__ 7": 38, + "__name__ 8": 42, + "instance 1": 14, + "instance 2": 18, + "instance 3": 20, + "instance 4": 25, + "instance 5": 30, + "instance 6": 35, + "instance 7": 39, + "instance 8": 43, "job 1": 1, "job 2": 0, - "job 3": 20, - "job 4": 25, - "job 5": 30, - "job 6": 35, - "job 7": 39, - "lambda 1": 14, - "lambda 2": 21, - "lambda 3": 26, - "lambda 4": 31, + "job 3": 21, + "job 4": 26, + "job 5": 31, + "job 6": 36, + "job 7": 40, + "job 8": 44, + "lambda 1": 15, + "lambda 2": 22, + "lambda 3": 27, + "lambda 4": 32, "pubKey": 2 }, "renameByName": { @@ -593,8 +737,9 @@ "Value #C": "Current Sequence", "Value #D": "Round", "Value #E": "Stage", - "Value #F": "Running Instance", + "Value #F": "IBFT Instance", "Value #G": "Current slot", + "Value #H": "Validator Status", "job 1": "Beaconcha", "job 2": "Dashboard", "pubKey": "Public Key" @@ -755,6 +900,10 @@ "interval": "", "legendFormat": "SSV Operator", "refId": "C" + }, + { + "hide": false, + "refId": "D" } ], "title": "Current Health Status", @@ -795,27 +944,17 @@ "mode": "off" } }, - "mappings": [ - { - "options": { - "0": { - "index": 0, - "text": "Unhealthy" - }, - "1": { - "index": 1, - "text": "Healthy" - } - }, - "type": "value" - } - ], + "mappings": [], "thresholds": { "mode": "absolute", "steps": [ { "color": "green", "value": null + }, + { + "color": "red", + "value": 80 } ] }, @@ -1518,7 +1657,7 @@ } } ], - "refresh": "10s", + "refresh": "30s", "schemaVersion": 30, "style": "dark", "tags": [], @@ -1558,9 +1697,14 @@ "selected": false, "text": "ssv-node-v2-4", "value": "ssv-node-v2-4" + }, + { + "selected": false, + "text": "ssv-exporter", + "value": "ssv-exporter" } ], - "query": "ssv-node-v2-1,ssv-node-v2-2,ssv-node-v2-3,ssv-node-v2-4", + "query": "ssv-node-v2-1,ssv-node-v2-2,ssv-node-v2-3,ssv-node-v2-4,ssv-exporter", "queryValue": "", "skipUrlSync": false, "type": "custom" @@ -1569,8 +1713,8 @@ "allValue": null, "current": { "selected": false, - "text": "https://explorer.ssv.network", - "value": "https://explorer.ssv.network" + "text": "https://explorer.stage.ssv.network", + "value": "https://explorer.stage.ssv.network" }, "description": "SSV explorer link", "error": null, @@ -1582,19 +1726,41 @@ "options": [ { "selected": true, - "text": "https://explorer.ssv.network", - "value": "https://explorer.ssv.network" + "text": "https://explorer.stage.ssv.network", + "value": "https://explorer.stage.ssv.network" }, { "selected": false, - "text": "https://explorer.stage.ssv.network", - "value": "https://explorer.stage.ssv.network" + "text": "https://explorer.ssv.network", + "value": "https://explorer.ssv.network" } ], - "query": "https://explorer.ssv.network,https://explorer.stage.ssv.network", + "query": "https://explorer.stage.ssv.network,https://explorer.ssv.network", "queryValue": "", "skipUrlSync": false, "type": "custom" + }, + { + "description": null, + "error": null, + "hide": 2, + "label": null, + "name": "validator_dashboard_id", + "query": "${VAR_VALIDATOR_DASHBOARD_ID}", + "skipUrlSync": false, + "type": "constant", + "current": { + "value": "${VAR_VALIDATOR_DASHBOARD_ID}", + "text": "${VAR_VALIDATOR_DASHBOARD_ID}", + "selected": false + }, + "options": [ + { + "value": "${VAR_VALIDATOR_DASHBOARD_ID}", + "text": "${VAR_VALIDATOR_DASHBOARD_ID}", + "selected": false + } + ] } ] }, @@ -1606,5 +1772,5 @@ "timezone": "", "title": "SSV Operator Node", "uid": "FIbEQ37nk", - "version": 36 + "version": 45 } \ No newline at end of file diff --git a/monitoring/grafana/dashboard_ssv_validator.json b/monitoring/grafana/dashboard_ssv_validator.json new file mode 100644 index 0000000000..b482d0a69f --- /dev/null +++ b/monitoring/grafana/dashboard_ssv_validator.json @@ -0,0 +1,1066 @@ +{ + "__inputs": [ + { + "name": "DS_PROMETHEUS", + "label": "prometheus-infra-prod", + "description": "", + "type": "datasource", + "pluginId": "prometheus", + "pluginName": "Prometheus" + } + ], + "__requires": [ + { + "type": "grafana", + "id": "grafana", + "name": "Grafana", + "version": "8.0.4" + }, + { + "type": "datasource", + "id": "prometheus", + "name": "Prometheus", + "version": "1.0.0" + }, + { + "type": "panel", + "id": "stat", + "name": "Stat", + "version": "" + }, + { + "type": "panel", + "id": "table", + "name": "Table", + "version": "" + }, + { + "type": "panel", + "id": "timeseries", + "name": "Time series", + "version": "" + } + ], + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": "-- Grafana --", + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "gnetId": null, + "graphTooltip": 0, + "id": null, + "iteration": 1631688084016, + "links": [], + "panels": [ + { + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "0": { + "color": "red", + "index": 0, + "text": "Inactive" + }, + "1": { + "color": "orange", + "index": 1, + "text": "No Index" + }, + "2": { + "color": "dark-red", + "index": 2, + "text": "Error" + }, + "3": { + "color": "green", + "index": 3, + "text": "Ready" + }, + "4": { + "color": "green", + "index": 4, + "text": "Running" + } + }, + "type": "value" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 5, + "x": 0, + "y": 0 + }, + "id": 68, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "text": {}, + "textMode": "auto" + }, + "pluginVersion": "8.0.4", + "targets": [ + { + "exemplar": true, + "expr": "ssv:validator:status{instance=~\"$instance.*\",pubKey=\"$pubkey\"}", + "interval": "", + "legendFormat": "", + "refId": "A" + } + ], + "title": "Status", + "type": "stat" + }, + { + "datasource": null, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 7 + }, + "id": 64, + "title": "Network", + "type": "row" + }, + { + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "#EAB839", + "value": 2 + }, + { + "color": "green", + "value": 3 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 4, + "x": 0, + "y": 8 + }, + "id": 44, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "text": {}, + "textMode": "auto" + }, + "pluginVersion": "8.0.4", + "targets": [ + { + "exemplar": true, + "expr": "ssv:network:connected_peers{instance=~\"$instance.*\",pubKey=\"$pubkey\"}", + "interval": "", + "legendFormat": "{{pubKey}}", + "refId": "A" + } + ], + "title": "Connected Peers", + "type": "stat" + }, + { + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "align": "center", + "displayMode": "auto", + "width": 120 + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 7, + "x": 4, + "y": 8 + }, + "id": 66, + "options": { + "showHeader": true + }, + "pluginVersion": "8.0.4", + "targets": [ + { + "exemplar": true, + "expr": "ssv:network:ibft_messages_outbound{instance=~\"$instance.*\",pubKey=\"$pubkey\"}", + "format": "table", + "instant": true, + "interval": "", + "legendFormat": "{{seq}}", + "refId": "A" + } + ], + "title": "IBFT Messages Outbound", + "transformations": [ + { + "id": "seriesToColumns", + "options": { + "byField": "seq" + } + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Time": true, + "__name__": true, + "instance": true, + "job": true, + "pubKey": true, + "type": false + }, + "indexByName": { + "Time": 4, + "Value": 2, + "__name__": 5, + "instance": 6, + "job": 7, + "pubKey": 8, + "round": 3, + "seq": 0, + "type": 1 + }, + "renameByName": { + "Value": "Sent Messages", + "round": "Round", + "seq": "Sequence", + "type": "Type" + } + } + } + ], + "type": "table" + }, + { + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "align": "auto", + "displayMode": "auto" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 16 + }, + "id": 70, + "options": { + "showHeader": true + }, + "pluginVersion": "8.0.4", + "targets": [ + { + "exemplar": true, + "expr": "ssv:network:ibft_messages_inbound{instance=~\"$instance.*\",pubKey=\"$pubkey\"}", + "format": "table", + "instant": true, + "interval": "", + "legendFormat": "", + "refId": "A" + } + ], + "title": "IBFT Messages Inbound", + "transformations": [ + { + "id": "seriesToColumns", + "options": { + "byField": "signer" + } + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Time": true, + "Value": true, + "Value #A": true, + "__name__": true, + "instance": true, + "job": true, + "pubKey": true + }, + "indexByName": { + "Time": 4, + "Value": 9, + "__name__": 5, + "instance": 6, + "job": 7, + "pubKey": 8, + "round": 3, + "seq": 0, + "signer": 1, + "type": 2 + }, + "renameByName": { + "round": "Round", + "seq": "Sequence", + "signer": "Node", + "type": "Type" + } + } + } + ], + "type": "table" + }, + { + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "left", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 24 + }, + "id": 62, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "right" + }, + "tooltip": { + "mode": "single" + } + }, + "targets": [ + { + "exemplar": true, + "expr": "ssv:network:net_messages_inbound{instance=~\"$instance.*\",pubKey=\"$pubkey\",type=\"IBFTType\"}", + "interval": "", + "legendFormat": "Node {{signer}}", + "refId": "A" + } + ], + "title": "IBFT Messages Inbound", + "type": "timeseries" + }, + { + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 32 + }, + "id": 60, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "right" + }, + "tooltip": { + "mode": "single" + } + }, + "targets": [ + { + "exemplar": true, + "expr": "ssv:network:net_messages_inbound{instance=~\"$instance.*\",pubKey=\"$pubkey\",type=\"DecidedType\"}", + "interval": "", + "legendFormat": "Node {{signer}}", + "refId": "A" + } + ], + "title": "Decided Messages Inbound", + "type": "timeseries" + }, + { + "datasource": null, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 40 + }, + "id": 56, + "title": "Attestations", + "type": "row" + }, + { + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 8, + "x": 0, + "y": 41 + }, + "id": 42, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "text": {}, + "textMode": "auto" + }, + "pluginVersion": "8.0.4", + "targets": [ + { + "exemplar": true, + "expr": "ssv:validator:ibft_highest_decided{instance=~\"$instance.*\",pubKey=\"$pubkey\"}", + "interval": "", + "legendFormat": "{{pubKey}}", + "refId": "A" + } + ], + "title": "Highest Decided", + "type": "stat" + }, + { + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "0": { + "color": "green", + "index": 1, + "text": "IDLE" + }, + "1": { + "index": 0, + "color": "green", + "text": "Running" + } + }, + "type": "value" + } + ], + "max": 1, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "bool_on_off" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 6, + "x": 8, + "y": 41 + }, + "id": 50, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "text": {}, + "textMode": "auto" + }, + "pluginVersion": "8.0.4", + "targets": [ + { + "exemplar": true, + "expr": "ssv:validator:running_ibfts_count{instance=~\"$instance.*\",pubKey=\"$pubkey\"}", + "instant": false, + "interval": "", + "legendFormat": "", + "refId": "A" + } + ], + "title": "IBFT", + "type": "stat" + }, + { + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [ + { + "options": { + "0": { + "index": 0, + "text": "Not Started" + }, + "1": { + "color": "green", + "index": 1, + "text": "Pre Prepare" + }, + "2": { + "color": "green", + "index": 2, + "text": "Prepare" + }, + "3": { + "color": "green", + "index": 3, + "text": "Commit" + }, + "4": { + "color": "yellow", + "index": 4, + "text": "Change Round" + }, + "5": { + "color": "blue", + "index": 5, + "text": "Decided" + }, + "6": { + "color": "rgba(59, 59, 59, 1)", + "index": 6, + "text": "Stopped" + } + }, + "type": "value" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 6, + "x": 14, + "y": 41 + }, + "id": 46, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "text": {}, + "textMode": "auto" + }, + "pluginVersion": "8.0.4", + "targets": [ + { + "exemplar": true, + "expr": "ssv:validator:ibft_stage{instance=~\"$instance.*\",pubKey=\"$pubkey\"}", + "interval": "", + "legendFormat": "", + "refId": "A" + } + ], + "title": "Stage", + "type": "stat" + }, + { + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 8, + "x": 0, + "y": 47 + }, + "id": 54, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "text": {}, + "textMode": "auto" + }, + "pluginVersion": "8.0.4", + "targets": [ + { + "exemplar": true, + "expr": "ssv:validator:ibft_current_sequence{instance=~\"$instance.*\",pubKey=\"$pubkey\"}", + "interval": "", + "legendFormat": "", + "refId": "A" + } + ], + "title": "Current Sequence", + "type": "stat" + }, + { + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 8, + "x": 8, + "y": 47 + }, + "id": 52, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "text": {}, + "textMode": "auto" + }, + "pluginVersion": "8.0.4", + "targets": [ + { + "exemplar": true, + "expr": "ssv:validator:ibft_current_slot{instance=~\"$instance.*\",pubKey=\"$pubkey\"}", + "interval": "", + "legendFormat": "", + "refId": "A" + } + ], + "title": "Current Slot", + "type": "stat" + }, + { + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "#EAB839", + "value": 4 + }, + { + "color": "orange", + "value": 6 + }, + { + "color": "red", + "value": 8 + }, + { + "color": "dark-red", + "value": 12 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 4, + "x": 16, + "y": 47 + }, + "id": 48, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "text": {}, + "textMode": "auto" + }, + "pluginVersion": "8.0.4", + "targets": [ + { + "exemplar": true, + "expr": "ssv:validator:ibft_round{instance=~\"$instance.*\",pubKey=\"$pubkey\"}", + "interval": "", + "legendFormat": "", + "refId": "A" + } + ], + "title": "Round", + "type": "stat" + } + ], + "refresh": "10s", + "schemaVersion": 30, + "style": "dark", + "tags": [], + "templating": { + "list": [ + { + "allValue": null, + "current": { + "selected": false, + "text": "ssv-node-v2-1", + "value": "ssv-node-v2-1" + }, + "description": null, + "error": null, + "hide": 1, + "includeAll": false, + "label": null, + "multi": false, + "name": "instance", + "options": [ + { + "selected": true, + "text": "ssv-node-v2-1", + "value": "ssv-node-v2-1" + }, + { + "selected": false, + "text": "ssv-node-v2-2", + "value": "ssv-node-v2-2" + }, + { + "selected": false, + "text": "ssv-node-v2-3", + "value": "ssv-node-v2-3" + }, + { + "selected": false, + "text": "ssv-node-v2-4", + "value": "ssv-node-v2-4" + } + ], + "query": "ssv-node-v2-1,ssv-node-v2-2,ssv-node-v2-3,ssv-node-v2-4", + "queryValue": "", + "skipUrlSync": false, + "type": "custom" + }, + { + "allValue": null, + "current": {}, + "datasource": "${DS_PROMETHEUS}", + "definition": "ssv:network:connected_peers{instance=~\"$instance.*\"}", + "description": "", + "error": null, + "hide": 0, + "includeAll": false, + "label": "Public Key", + "multi": false, + "name": "pubkey", + "options": [], + "query": { + "query": "ssv:network:connected_peers{instance=~\"$instance.*\"}", + "refId": "StandardVariableQuery" + }, + "refresh": 1, + "regex": "/.*pubKey=\"([a-z0-9]+)\".*/", + "skipUrlSync": false, + "sort": 0, + "type": "query" + } + ] + }, + "time": { + "from": "now-3h", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "SSV Validator", + "uid": "XIbEQ37aa", + "version": 17 +} \ No newline at end of file From 4701fca0527d9cf7c766dc14d4e91c16594b7c45 Mon Sep 17 00:00:00 2001 From: amir-blox <83904651+amir-blox@users.noreply.github.com> Date: Sun, 19 Sep 2021 15:07:41 +0300 Subject: [PATCH 14/16] Add pubsub tracing (#311) * update README * added PB tracer for pubsub * update deployments * fix validator status metric (reduce running) --- .k8/yamls/ssv-exporter.yml | 2 + .k8/yamls/ssv-node-v2-1-deployment.yml | 2 + .k8/yamls/ssv-node-v2-2-deployment.yml | 2 + .k8/yamls/ssv-node-v2-3-deployment.yml | 2 + .k8/yamls/ssv-node-v2-4-deployment.yml | 2 + monitoring/README.md | 1 + network/p2p/config.go | 2 + network/p2p/p2p.go | 53 ++++++++++++++++---------- validator/metrics.go | 4 -- validator/validator.go | 5 ++- 10 files changed, 49 insertions(+), 26 deletions(-) diff --git a/.k8/yamls/ssv-exporter.yml b/.k8/yamls/ssv-exporter.yml index b3af72e489..e0a746ebb3 100644 --- a/.k8/yamls/ssv-exporter.yml +++ b/.k8/yamls/ssv-exporter.yml @@ -178,6 +178,8 @@ spec: value: "true" - name: IBFT_SYNC_ENABLED value: "true" + - name: PUBSUB_TRACE_OUT + value: "./data/pubsub_trace.out" volumeMounts: - mountPath: /data name: ssv-exporter diff --git a/.k8/yamls/ssv-node-v2-1-deployment.yml b/.k8/yamls/ssv-node-v2-1-deployment.yml index 1922b113ef..178cb3ea96 100644 --- a/.k8/yamls/ssv-node-v2-1-deployment.yml +++ b/.k8/yamls/ssv-node-v2-1-deployment.yml @@ -137,6 +137,8 @@ spec: value: "15000" - name: ENABLE_PROFILE value: "true" + - name: PUBSUB_TRACE_OUT + value: "./data/pubsub_trace.out" volumeMounts: - mountPath: /data name: ssv-node-v2-1 diff --git a/.k8/yamls/ssv-node-v2-2-deployment.yml b/.k8/yamls/ssv-node-v2-2-deployment.yml index 419443854f..2ccb5910d8 100644 --- a/.k8/yamls/ssv-node-v2-2-deployment.yml +++ b/.k8/yamls/ssv-node-v2-2-deployment.yml @@ -137,6 +137,8 @@ spec: value: "15000" - name: ENABLE_PROFILE value: "true" + - name: PUBSUB_TRACE_OUT + value: "./data/pubsub_trace.out" volumeMounts: - mountPath: /data name: ssv-node-v2-2 diff --git a/.k8/yamls/ssv-node-v2-3-deployment.yml b/.k8/yamls/ssv-node-v2-3-deployment.yml index 78afc6571f..b39a0a0286 100644 --- a/.k8/yamls/ssv-node-v2-3-deployment.yml +++ b/.k8/yamls/ssv-node-v2-3-deployment.yml @@ -137,6 +137,8 @@ spec: value: "15000" - name: ENABLE_PROFILE value: "true" + - name: PUBSUB_TRACE_OUT + value: "./data/pubsub_trace.out" volumeMounts: - mountPath: /data name: ssv-node-v2-3 diff --git a/.k8/yamls/ssv-node-v2-4-deployment.yml b/.k8/yamls/ssv-node-v2-4-deployment.yml index e66d6af005..71180519d8 100644 --- a/.k8/yamls/ssv-node-v2-4-deployment.yml +++ b/.k8/yamls/ssv-node-v2-4-deployment.yml @@ -137,6 +137,8 @@ spec: value: "15000" - name: ENABLE_PROFILE value: "true" + - name: PUBSUB_TRACE_OUT + value: "./data/pubsub_trace.out" volumeMounts: - mountPath: /data name: ssv-node-v2-4 diff --git a/monitoring/README.md b/monitoring/README.md index 6077750da7..11b5bb4c1f 100644 --- a/monitoring/README.md +++ b/monitoring/README.md @@ -58,6 +58,7 @@ In order to setup a grafana dashboard do the following: 5. Align dashboard variables: * `instance` - container name, used in 'instance' field for metrics coming from prometheus. \ In the given dashboard, instances names are: `ssv-node-v2-`, make sure to change according to your setup + * `validator_dashboard_id` - exist only in operator dashboard, points to validator dashboard **Note:** In order to show `Process Health` panels, the following K8S metrics should be exposed: * `kubelet_volume_stats_used_bytes` diff --git a/network/p2p/config.go b/network/p2p/config.go index 0d4ee3290f..2b4a560574 100644 --- a/network/p2p/config.go +++ b/network/p2p/config.go @@ -18,6 +18,8 @@ type Config struct { HostDNS string `yaml:"HostDNS" env:"HOST_DNS" env-description:"External DNS node is exposed for discovery"` RequestTimeout time.Duration `yaml:"RequestTimeout" env:"P2P_REQUEST_TIMEOUT" env-default:"5s"` MaxBatchResponse uint64 `yaml:"MaxBatchResponse" env:"P2P_MAX_BATCH_RESPONSE" env-default:"50" env-description:"maximum number of returned objects in a batch"` + PubSubTraceOut string `yaml:"PubSubTraceOut" env:"PUBSUB_TRACE_OUT" env-description:"File path to hold collected pubsub traces"` + //PubSubTracer string `yaml:"PubSubTracer" env:"PUBSUB_TRACER" env-description:"A remote tracer that collects pubsub traces"` // objects / instances HostID peer.ID diff --git a/network/p2p/p2p.go b/network/p2p/p2p.go index 0051eca6ac..30b25e9f27 100644 --- a/network/p2p/p2p.go +++ b/network/p2p/p2p.go @@ -117,28 +117,12 @@ func New(ctx context.Context, logger *zap.Logger, cfg *Config) (network.Network, n.logger = logger.With(zap.String("id", n.host.ID().String())) n.logger.Info("listening on port", zap.String("port", n.host.Addrs()[0].String())) - // Gossipsub registration is done before we add in any new peers - // due to libp2p's gossipsub implementation not taking into - // account previously added peers when creating the gossipsub - // object. - psOpts := []pubsub.Option{ - //pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign), - //pubsub.WithNoAuthor(), - //pubsub.WithMessageIdFn(msgIDFunction), - //pubsub.WithSubscriptionFilter(s), - pubsub.WithPeerOutboundQueueSize(256), - pubsub.WithValidateQueueSize(256), - } - - setPubSubParameters() - - // Create a new PubSub service using the GossipSub router - gs, err := pubsub.NewGossipSub(ctx, n.host, psOpts...) + ps, err := n.setupGossipPubsub(cfg) if err != nil { - n.logger.Error("Failed to start pubsub") - return nil, err + n.logger.Error("failed to start pubsub", zap.Error(err)) + return nil, errors.Wrap(err, "failed to start pubsub") } - n.pubsub = gs + n.pubsub = ps if cfg.DiscoveryType == "mdns" { // use mdns discovery { // Setup Local mDNS discovery @@ -190,6 +174,35 @@ func New(ctx context.Context, logger *zap.Logger, cfg *Config) (network.Network, return n, nil } +func (n *p2pNetwork) setupGossipPubsub(cfg *Config) (*pubsub.PubSub, error) { + // Gossipsub registration is done before we add in any new peers + // due to libp2p's gossipsub implementation not taking into + // account previously added peers when creating the gossipsub + // object. + psOpts := []pubsub.Option{ + //pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign), + //pubsub.WithNoAuthor(), + //pubsub.WithMessageIdFn(msgIDFunction), + //pubsub.WithSubscriptionFilter(s), + pubsub.WithPeerOutboundQueueSize(256), + pubsub.WithValidateQueueSize(256), + } + + if len(cfg.PubSubTraceOut) > 0 { + tracer, err := pubsub.NewPBTracer(cfg.PubSubTraceOut) + if err != nil { + return nil, errors.Wrap(err, "could not create pubsub tracer") + } + n.logger.Debug("pubusb trace file was created", zap.String("path", cfg.PubSubTraceOut)) + psOpts = append(psOpts, pubsub.WithEventTracer(tracer)) + } + + setPubSubParameters() + + // Create a new PubSub service using the GossipSub router + return pubsub.NewGossipSub(n.ctx, n.host, psOpts...) +} + func (n *p2pNetwork) watchTopicPeers() { runutil.RunEvery(n.ctx, 1*time.Minute, func() { for name, topic := range n.cfg.Topics { diff --git a/validator/metrics.go b/validator/metrics.go index 27362ce6cc..9e407dba1d 100644 --- a/validator/metrics.go +++ b/validator/metrics.go @@ -51,12 +51,9 @@ func (v *Validator) reportDutyExecutionMetrics(duty *beacon.Duty) func() { metricsCurrentSlot.WithLabelValues(pubKey).Set(float64(duty.Slot)) - metricsValidatorStatus.WithLabelValues(pubKey).Set(float64(validatorStatusRunning)) - return func() { metricsRunningIBFTsCount.Dec() metricsRunningIBFTs.WithLabelValues(pubKey).Dec() - metricsValidatorStatus.WithLabelValues(pubKey).Set(float64(validatorStatusReady)) } } @@ -67,5 +64,4 @@ var ( validatorStatusNoIndex validatorStatus = 1 validatorStatusError validatorStatus = 2 validatorStatusReady validatorStatus = 3 - validatorStatusRunning validatorStatus = 4 ) diff --git a/validator/validator.go b/validator/validator.go index e005474c0f..64177ec787 100644 --- a/validator/validator.go +++ b/validator/validator.go @@ -95,9 +95,10 @@ func (v *Validator) Start() error { for _, ib := range v.ibfts { // init all ibfts go ib.Init() } - }) - metricsValidatorStatus.WithLabelValues(v.Share.PublicKey.SerializeToHexStr()).Set(float64(validatorStatusReady)) + metricsValidatorStatus.WithLabelValues(v.Share.PublicKey.SerializeToHexStr()). + Set(float64(validatorStatusReady)) + }) return nil } From 04a3fcb607a1b278919894c4051e8fe80333b010 Mon Sep 17 00:00:00 2001 From: Amir Yahalom Date: Sun, 19 Sep 2021 15:27:27 +0300 Subject: [PATCH 15/16] decided signers metric report --- ibft/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ibft/metrics.go b/ibft/metrics.go index ce8687c084..cd2fa27b6f 100644 --- a/ibft/metrics.go +++ b/ibft/metrics.go @@ -49,6 +49,6 @@ func reportDecided(msg *proto.SignedMessage, share *storage.Share) { string(msg.Message.GetLambda()), share.PublicKey.SerializeToHexStr(), strconv.FormatUint(msg.Message.SeqNumber, 10), - strconv.FormatUint(nodeID, 10)) + strconv.FormatUint(nodeID, 10)).Set(1) } } From 2c660f0f7869e64cea4f88541a41f0cea6bcba7a Mon Sep 17 00:00:00 2001 From: Amir Yahalom Date: Sun, 19 Sep 2021 16:18:04 +0300 Subject: [PATCH 16/16] update grafana dashboards --- .../grafana/dashboard_ssv_operator.json | 5 - .../grafana/dashboard_ssv_validator.json | 927 +++++++++++++++--- 2 files changed, 778 insertions(+), 154 deletions(-) diff --git a/monitoring/grafana/dashboard_ssv_operator.json b/monitoring/grafana/dashboard_ssv_operator.json index d88b4e8d9e..b66b693cfb 100644 --- a/monitoring/grafana/dashboard_ssv_operator.json +++ b/monitoring/grafana/dashboard_ssv_operator.json @@ -515,11 +515,6 @@ "color": "green", "index": 3, "text": "Ready" - }, - "4": { - "color": "green", - "index": 4, - "text": "Running" } }, "type": "value" diff --git a/monitoring/grafana/dashboard_ssv_validator.json b/monitoring/grafana/dashboard_ssv_validator.json index b482d0a69f..5eeb167bf4 100644 --- a/monitoring/grafana/dashboard_ssv_validator.json +++ b/monitoring/grafana/dashboard_ssv_validator.json @@ -90,11 +90,6 @@ "color": "green", "index": 3, "text": "Ready" - }, - "4": { - "color": "green", - "index": 4, - "text": "Running" } }, "type": "value" @@ -138,49 +133,730 @@ "targets": [ { "exemplar": true, - "expr": "ssv:validator:status{instance=~\"$instance.*\",pubKey=\"$pubkey\"}", + "expr": "ssv:validator:status{instance=~\"$instance.*\",pubKey=\"$pubkey\"}", + "interval": "", + "legendFormat": "", + "refId": "A" + } + ], + "title": "Status", + "type": "stat" + }, + { + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + }, + { + "color": "#EAB839", + "value": 2 + }, + { + "color": "green", + "value": 3 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 4, + "x": 5, + "y": 0 + }, + "id": 44, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "text": {}, + "textMode": "auto" + }, + "pluginVersion": "8.0.4", + "targets": [ + { + "exemplar": true, + "expr": "ssv:network:connected_peers{instance=~\"$instance.*\",pubKey=\"$pubkey\"}", + "interval": "", + "legendFormat": "{{pubKey}}", + "refId": "A" + } + ], + "title": "Connected Peers", + "type": "stat" + }, + { + "collapsed": false, + "datasource": null, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 7 + }, + "id": 72, + "panels": [], + "title": "Decided Messages", + "type": "row" + }, + { + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "align": "center", + "displayMode": "auto", + "width": 130 + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + } + ] + } + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "Status" + }, + "properties": [ + { + "id": "mappings", + "value": [ + { + "options": { + "0": { + "color": "red", + "index": 0, + "text": "failed" + }, + "1": { + "color": "green", + "index": 1, + "text": "signed" + } + }, + "type": "value" + } + ] + }, + { + "id": "custom.displayMode", + "value": "color-text" + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 4, + "x": 0, + "y": 8 + }, + "id": 74, + "options": { + "showHeader": true + }, + "pluginVersion": "8.0.4", + "targets": [ + { + "exemplar": true, + "expr": "ssv:validator:ibft_decided_signers{instance=~\"$instance.*\",pubKey=\"$pubkey\",nodeId=\"1\"}", + "format": "table", + "interval": "", + "legendFormat": "{{seq}}", + "refId": "A" + } + ], + "title": "Decided Messages (Node 1)", + "transformations": [ + { + "id": "seriesToColumns", + "options": { + "byField": "seq" + } + }, + { + "id": "groupBy", + "options": { + "fields": { + "Value #A": { + "aggregations": [ + "lastNotNull" + ], + "operation": "aggregate" + }, + "seq": { + "aggregations": [], + "operation": "groupby" + } + } + } + }, + { + "id": "sortBy", + "options": { + "fields": {}, + "sort": [ + { + "desc": true, + "field": "seq" + } + ] + } + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Time": true, + "__name__": true, + "instance": true, + "job": true, + "lambda": true, + "nodeId": true, + "pubKey": true + }, + "indexByName": { + "Time": 2, + "Value #A": 1, + "__name__": 3, + "instance": 4, + "job": 5, + "lambda": 6, + "nodeId": 7, + "pubKey": 8, + "seq": 0 + }, + "renameByName": { + "Value #A (lastNotNull)": "Status", + "seq": "Sequence" + } + } + } + ], + "type": "table" + }, + { + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "align": "center", + "displayMode": "auto", + "width": 130 + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + } + ] + } + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "Status" + }, + "properties": [ + { + "id": "mappings", + "value": [ + { + "options": { + "0": { + "color": "red", + "index": 0, + "text": "failed" + }, + "1": { + "color": "green", + "index": 1, + "text": "signed" + } + }, + "type": "value" + } + ] + }, + { + "id": "custom.displayMode", + "value": "color-text" + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 4, + "x": 4, + "y": 8 + }, + "id": 75, + "options": { + "showHeader": true + }, + "pluginVersion": "8.0.4", + "targets": [ + { + "exemplar": true, + "expr": "ssv:validator:ibft_decided_signers{instance=~\"$instance.*\",pubKey=\"$pubkey\",nodeId=\"2\"}", + "format": "table", + "interval": "", + "legendFormat": "{{seq}}", + "refId": "A" + } + ], + "title": "Decided Messages (Node 2)", + "transformations": [ + { + "id": "seriesToColumns", + "options": { + "byField": "seq" + } + }, + { + "id": "groupBy", + "options": { + "fields": { + "Value #A": { + "aggregations": [ + "lastNotNull" + ], + "operation": "aggregate" + }, + "seq": { + "aggregations": [], + "operation": "groupby" + } + } + } + }, + { + "id": "sortBy", + "options": { + "fields": {}, + "sort": [ + { + "desc": true, + "field": "seq" + } + ] + } + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Time": true, + "__name__": true, + "instance": true, + "job": true, + "lambda": true, + "nodeId": true, + "pubKey": true + }, + "indexByName": { + "Time": 2, + "Value #A": 1, + "__name__": 3, + "instance": 4, + "job": 5, + "lambda": 6, + "nodeId": 7, + "pubKey": 8, + "seq": 0 + }, + "renameByName": { + "Value #A (lastNotNull)": "Status", + "seq": "Sequence" + } + } + } + ], + "type": "table" + }, + { + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "align": "center", + "displayMode": "auto", + "width": 130 + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + } + ] + } + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "Status" + }, + "properties": [ + { + "id": "mappings", + "value": [ + { + "options": { + "0": { + "color": "red", + "index": 0, + "text": "failed" + }, + "1": { + "color": "green", + "index": 1, + "text": "signed" + } + }, + "type": "value" + } + ] + }, + { + "id": "custom.displayMode", + "value": "color-text" + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 4, + "x": 8, + "y": 8 + }, + "id": 76, + "options": { + "showHeader": true + }, + "pluginVersion": "8.0.4", + "targets": [ + { + "exemplar": true, + "expr": "ssv:validator:ibft_decided_signers{instance=~\"$instance.*\",pubKey=\"$pubkey\",nodeId=\"3\"}", + "format": "table", + "interval": "", + "legendFormat": "{{seq}}", + "refId": "A" + } + ], + "title": "Decided Messages (Node 3)", + "transformations": [ + { + "id": "seriesToColumns", + "options": { + "byField": "seq" + } + }, + { + "id": "groupBy", + "options": { + "fields": { + "Value #A": { + "aggregations": [ + "lastNotNull" + ], + "operation": "aggregate" + }, + "seq": { + "aggregations": [], + "operation": "groupby" + } + } + } + }, + { + "id": "sortBy", + "options": { + "fields": {}, + "sort": [ + { + "desc": true, + "field": "seq" + } + ] + } + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Time": true, + "__name__": true, + "instance": true, + "job": true, + "lambda": true, + "nodeId": true, + "pubKey": true + }, + "indexByName": { + "Time": 2, + "Value #A": 1, + "__name__": 3, + "instance": 4, + "job": 5, + "lambda": 6, + "nodeId": 7, + "pubKey": 8, + "seq": 0 + }, + "renameByName": { + "Value #A (lastNotNull)": "Status", + "seq": "Sequence" + } + } + } + ], + "type": "table" + }, + { + "datasource": "${DS_PROMETHEUS}", + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "custom": { + "align": "center", + "displayMode": "auto", + "width": 130 + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": null + } + ] + } + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "Status" + }, + "properties": [ + { + "id": "mappings", + "value": [ + { + "options": { + "0": { + "color": "red", + "index": 0, + "text": "failed" + }, + "1": { + "color": "green", + "index": 1, + "text": "signed" + } + }, + "type": "value" + } + ] + }, + { + "id": "custom.displayMode", + "value": "color-text" + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 4, + "x": 12, + "y": 8 + }, + "id": 77, + "options": { + "showHeader": true + }, + "pluginVersion": "8.0.4", + "targets": [ + { + "exemplar": true, + "expr": "ssv:validator:ibft_decided_signers{instance=~\"$instance.*\",pubKey=\"$pubkey\",nodeId=\"4\"}", + "format": "table", "interval": "", - "legendFormat": "", + "legendFormat": "{{seq}}", "refId": "A" } ], - "title": "Status", - "type": "stat" - }, - { - "datasource": null, - "gridPos": { - "h": 1, - "w": 24, - "x": 0, - "y": 7 - }, - "id": 64, - "title": "Network", - "type": "row" + "title": "Decided Messages (Node 4)", + "transformations": [ + { + "id": "seriesToColumns", + "options": { + "byField": "seq" + } + }, + { + "id": "groupBy", + "options": { + "fields": { + "Value #A": { + "aggregations": [ + "lastNotNull" + ], + "operation": "aggregate" + }, + "seq": { + "aggregations": [], + "operation": "groupby" + } + } + } + }, + { + "id": "sortBy", + "options": { + "fields": {}, + "sort": [ + { + "desc": true, + "field": "seq" + } + ] + } + }, + { + "id": "organize", + "options": { + "excludeByName": { + "Time": true, + "__name__": true, + "instance": true, + "job": true, + "lambda": true, + "nodeId": true, + "pubKey": true + }, + "indexByName": { + "Time": 2, + "Value #A": 1, + "__name__": 3, + "instance": 4, + "job": 5, + "lambda": 6, + "nodeId": 7, + "pubKey": 8, + "seq": 0 + }, + "renameByName": { + "Value #A (lastNotNull)": "Status", + "seq": "Sequence" + } + } + } + ], + "type": "table" }, { "datasource": "${DS_PROMETHEUS}", "fieldConfig": { "defaults": { "color": { - "mode": "thresholds" + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } }, "mappings": [], "thresholds": { "mode": "absolute", "steps": [ { - "color": "red", + "color": "green", "value": null }, { - "color": "#EAB839", - "value": 2 - }, - { - "color": "green", - "value": 3 + "color": "red", + "value": 80 } ] } @@ -189,38 +865,46 @@ }, "gridPos": { "h": 8, - "w": 4, + "w": 12, "x": 0, - "y": 8 + "y": 16 }, - "id": 44, + "id": 60, "options": { - "colorMode": "value", - "graphMode": "area", - "justifyMode": "auto", - "orientation": "auto", - "reduceOptions": { - "calcs": [ - "lastNotNull" - ], - "fields": "", - "values": false + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "right" }, - "text": {}, - "textMode": "auto" + "tooltip": { + "mode": "single" + } }, - "pluginVersion": "8.0.4", "targets": [ { "exemplar": true, - "expr": "ssv:network:connected_peers{instance=~\"$instance.*\",pubKey=\"$pubkey\"}", + "expr": "ssv:network:net_messages_inbound{instance=~\"$instance.*\",pubKey=\"$pubkey\",type=\"DecidedType\"}", "interval": "", - "legendFormat": "{{pubKey}}", + "legendFormat": "Node {{signer}}", "refId": "A" } ], - "title": "Connected Peers", - "type": "stat" + "title": "Incoming Decided Messages Count", + "type": "timeseries" + }, + { + "collapsed": false, + "datasource": null, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 24 + }, + "id": 64, + "panels": [], + "title": "IBFT Messages", + "type": "row" }, { "datasource": "${DS_PROMETHEUS}", @@ -248,10 +932,10 @@ "overrides": [] }, "gridPos": { - "h": 8, + "h": 10, "w": 7, - "x": 4, - "y": 8 + "x": 0, + "y": 25 }, "id": 66, "options": { @@ -269,7 +953,7 @@ "refId": "A" } ], - "title": "IBFT Messages Outbound", + "title": "Outgoing IBFT Messages", "transformations": [ { "id": "seriesToColumns", @@ -306,6 +990,18 @@ "type": "Type" } } + }, + { + "id": "sortBy", + "options": { + "fields": {}, + "sort": [ + { + "desc": true, + "field": "Sequence" + } + ] + } } ], "type": "table" @@ -339,10 +1035,10 @@ "overrides": [] }, "gridPos": { - "h": 8, - "w": 12, - "x": 0, - "y": 16 + "h": 10, + "w": 10, + "x": 7, + "y": 25 }, "id": 70, "options": { @@ -360,7 +1056,7 @@ "refId": "A" } ], - "title": "IBFT Messages Inbound", + "title": "Incoming IBFT Messages", "transformations": [ { "id": "seriesToColumns", @@ -399,6 +1095,18 @@ "type": "Type" } } + }, + { + "id": "sortBy", + "options": { + "fields": {}, + "sort": [ + { + "desc": true, + "field": "Sequence" + } + ] + } } ], "type": "table" @@ -457,9 +1165,9 @@ }, "gridPos": { "h": 8, - "w": 12, + "w": 8, "x": 0, - "y": 24 + "y": 35 }, "id": 62, "options": { @@ -481,98 +1189,20 @@ "refId": "A" } ], - "title": "IBFT Messages Inbound", - "type": "timeseries" - }, - { - "datasource": "${DS_PROMETHEUS}", - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisPlacement": "auto", - "barAlignment": 0, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "red", - "value": 80 - } - ] - } - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 12, - "x": 0, - "y": 32 - }, - "id": 60, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "right" - }, - "tooltip": { - "mode": "single" - } - }, - "targets": [ - { - "exemplar": true, - "expr": "ssv:network:net_messages_inbound{instance=~\"$instance.*\",pubKey=\"$pubkey\",type=\"DecidedType\"}", - "interval": "", - "legendFormat": "Node {{signer}}", - "refId": "A" - } - ], - "title": "Decided Messages Inbound", + "title": "Incoming IBFT Messages Count", "type": "timeseries" }, { + "collapsed": false, "datasource": null, "gridPos": { "h": 1, "w": 24, "x": 0, - "y": 40 + "y": 43 }, "id": 56, + "panels": [], "title": "Attestations", "type": "row" }, @@ -600,7 +1230,7 @@ "h": 6, "w": 8, "x": 0, - "y": 41 + "y": 44 }, "id": 42, "options": { @@ -648,7 +1278,6 @@ }, "1": { "index": 0, - "color": "green", "text": "Running" } }, @@ -673,7 +1302,7 @@ "h": 6, "w": 6, "x": 8, - "y": 41 + "y": 44 }, "id": 50, "options": { @@ -769,7 +1398,7 @@ "h": 6, "w": 6, "x": 14, - "y": 41 + "y": 44 }, "id": 46, "options": { @@ -824,7 +1453,7 @@ "h": 6, "w": 8, "x": 0, - "y": 47 + "y": 50 }, "id": 54, "options": { @@ -879,7 +1508,7 @@ "h": 6, "w": 8, "x": 8, - "y": 47 + "y": 50 }, "id": 52, "options": { @@ -950,7 +1579,7 @@ "h": 6, "w": 4, "x": 16, - "y": 47 + "y": 50 }, "id": 48, "options": {