Skip to content

Commit

Permalink
Merge pull request #312 from bloxapp/stage
Browse files Browse the repository at this point in the history
- Add metrics (#307)

- fix validator status (#308)

- Add pubsub tracing (#311)

- update grafana dashboards
  • Loading branch information
amirylm authored Sep 19, 2021
2 parents ad5cf76 + 2c660f0 commit 9841e50
Show file tree
Hide file tree
Showing 34 changed files with 2,573 additions and 647 deletions.
2 changes: 2 additions & 0 deletions .k8/yamls/ssv-exporter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ spec:
value: "true"
- name: IBFT_SYNC_ENABLED
value: "true"
- name: PUBSUB_TRACE_OUT
value: "./data/pubsub_trace.out"
volumeMounts:
- mountPath: /data
name: ssv-exporter
Expand Down
2 changes: 2 additions & 0 deletions .k8/yamls/ssv-node-v2-1-deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ spec:
value: "15000"
- name: ENABLE_PROFILE
value: "true"
- name: PUBSUB_TRACE_OUT
value: "./data/pubsub_trace.out"
volumeMounts:
- mountPath: /data
name: ssv-node-v2-1
Expand Down
2 changes: 2 additions & 0 deletions .k8/yamls/ssv-node-v2-2-deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ spec:
value: "15000"
- name: ENABLE_PROFILE
value: "true"
- name: PUBSUB_TRACE_OUT
value: "./data/pubsub_trace.out"
volumeMounts:
- mountPath: /data
name: ssv-node-v2-2
Expand Down
2 changes: 2 additions & 0 deletions .k8/yamls/ssv-node-v2-3-deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ spec:
value: "15000"
- name: ENABLE_PROFILE
value: "true"
- name: PUBSUB_TRACE_OUT
value: "./data/pubsub_trace.out"
volumeMounts:
- mountPath: /data
name: ssv-node-v2-3
Expand Down
2 changes: 2 additions & 0 deletions .k8/yamls/ssv-node-v2-4-deployment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ spec:
value: "15000"
- name: ENABLE_PROFILE
value: "true"
- name: PUBSUB_TRACE_OUT
value: "./data/pubsub_trace.out"
volumeMounts:
- mountPath: /data
name: ssv-node-v2-4
Expand Down
26 changes: 16 additions & 10 deletions beacon/goclient/goclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/rs/zerolog"
"go.uber.org/zap"
"log"
"sync"
"time"
)

Expand Down Expand Up @@ -46,11 +47,12 @@ func init() {

// goClient implementing Beacon struct
type goClient struct {
ctx context.Context
logger *zap.Logger
network core.Network
client client.Service
graffiti []byte
ctx context.Context
logger *zap.Logger
network core.Network
client client.Service
indicesMapLock sync.Mutex
graffiti []byte
}

// verifies that the client implements HealthCheckAgent
Expand All @@ -75,11 +77,12 @@ func New(opt beacon.Options) (beacon.Beacon, error) {
logger.Info("successfully connected to beacon client")

_client := &goClient{
ctx: opt.Context,
logger: logger,
network: core.NetworkFromString(opt.Network),
client: autoClient,
graffiti: []byte("BloxStaking"),
ctx: opt.Context,
logger: logger,
network: core.NetworkFromString(opt.Network),
client: autoClient,
indicesMapLock: sync.Mutex{},
graffiti: []byte("BloxStaking"),
}

return _client, nil
Expand Down Expand Up @@ -109,6 +112,9 @@ func (gc *goClient) HealthCheck() []string {
}

func (gc *goClient) ExtendIndexMap(index spec.ValidatorIndex, pubKey spec.BLSPubKey) {
gc.indicesMapLock.Lock()
defer gc.indicesMapLock.Unlock()

gc.client.ExtendIndexMap(map[spec.ValidatorIndex]spec.BLSPubKey{index: pubKey})
}

Expand Down
2 changes: 1 addition & 1 deletion docs/resources/cov-badge.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion eth1/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func SyncEth1Events(logger *zap.Logger, client Client, storage SyncOffsetStorage
}
defer client.EventsSubject().Deregister("SyncEth1")

q := tasks.NewSimpleQueue(5 * time.Millisecond)
q := tasks.NewExecutionQueue(5 * time.Millisecond)
go q.Start()
defer q.Stop()
// Stop once SyncEndedEvent arrives
Expand Down
142 changes: 127 additions & 15 deletions exporter/ibft/decided_reader.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package ibft

import (
"context"
"github.com/bloxapp/ssv/beacon"
"github.com/bloxapp/ssv/ibft/pipeline"
"github.com/bloxapp/ssv/ibft/pipeline/auth"
"github.com/bloxapp/ssv/ibft/proto"
"github.com/bloxapp/ssv/ibft/sync/history"
"github.com/bloxapp/ssv/network"
"github.com/bloxapp/ssv/network/commons"
"github.com/bloxapp/ssv/storage/collections"
"github.com/bloxapp/ssv/validator"
"github.com/bloxapp/ssv/validator/storage"
"github.com/herumi/bls-eth-go-binary/bls"
"github.com/pkg/errors"
"go.uber.org/zap"
"strings"
"sync"
"time"
)
Expand All @@ -33,10 +36,12 @@ type decidedReader struct {

config *proto.InstanceConfig
validatorShare *storage.Share

identifier []byte
}

// NewIbftDecidedReadOnly creates new instance of DecidedReader
func NewIbftDecidedReadOnly(opts DecidedReaderOptions) Reader {
// newDecidedReader creates new instance of DecidedReader
func newDecidedReader(opts DecidedReaderOptions) SyncRead {
r := decidedReader{
logger: opts.Logger.With(
zap.String("pubKey", opts.ValidatorShare.PublicKey.SerializeToHexStr()),
Expand All @@ -45,19 +50,17 @@ func NewIbftDecidedReadOnly(opts DecidedReaderOptions) Reader {
network: opts.Network,
config: opts.Config,
validatorShare: opts.ValidatorShare,
identifier: []byte(validator.IdentifierFormat(opts.ValidatorShare.PublicKey.Serialize(), beacon.RoleTypeAttester)),
}
return &r
}

// Start starts to fetch best known decided message (highest sequence) from the network and sync to it.
func (r *decidedReader) Start() error {
// subscribe to topic so we could find relevant nodes
if err := r.network.SubscribeToValidatorNetwork(r.validatorShare.PublicKey); err != nil {
if !strings.Contains(err.Error(), "topic already exists") {
r.logger.Warn("could not subscribe to validator channel", zap.Error(err))
return err
// Sync starts to fetch best known decided message (highest sequence) from the network and sync to it.
func (r *decidedReader) Sync() error {
if !r.network.IsSubscribeToValidatorNetwork(r.validatorShare.PublicKey) {
if err := r.network.SubscribeToValidatorNetwork(r.validatorShare.PublicKey); err != nil {
return errors.Wrap(err, "failed to subscribe topic")
}
r.logger.Debug("no need to subscribe, topic already exist")
}
// wait for network setup (subscribe to topic)
var netWaitGroup sync.WaitGroup
Expand All @@ -70,8 +73,7 @@ func (r *decidedReader) Start() error {

r.logger.Debug("syncing ibft data")
// creating HistorySync and starts it
identifier := []byte(validator.IdentifierFormat(r.validatorShare.PublicKey.Serialize(), beacon.RoleTypeAttester))
hs := history.New(r.logger, r.validatorShare.PublicKey.Serialize(), identifier, r.network,
hs := history.New(r.logger, r.validatorShare.PublicKey.Serialize(), r.identifier, r.network,
r.storage, r.validateDecidedMsg)
err := hs.Start()
if err != nil {
Expand All @@ -80,13 +82,123 @@ func (r *decidedReader) Start() error {
return err
}

// Start starts to listen to decided messages
func (r *decidedReader) Start() error {
if !r.network.IsSubscribeToValidatorNetwork(r.validatorShare.PublicKey) {
if err := r.network.SubscribeToValidatorNetwork(r.validatorShare.PublicKey); err != nil {
return errors.Wrap(err, "failed to subscribe topic")
}
}

r.logger.Debug("starting to read decided messages")
if err := r.waitForMinPeers(r.validatorShare.PublicKey, 2); err != nil {
return errors.Wrap(err, "could not wait for min peers")
}
r.listenToNetwork(r.network.ReceivedDecidedChan())
return nil
}

func (r *decidedReader) listenToNetwork(cn <-chan *proto.SignedMessage) {
r.logger.Debug("listening to decided messages")
for msg := range cn {
if err := validateMsg(msg, string(r.identifier)); err != nil {
continue
}
logger := r.logger.With(messageFields(msg)...)
if err := validateDecidedMsg(msg, r.validatorShare); err != nil {
logger.Debug("received invalid decided message")
continue
}
if msg.Message.SeqNumber == 0 {
logger.Debug("received invalid sequence")
continue
}
if err := r.handleNewDecidedMessage(msg); err != nil {
logger.Error("could not handle decided message")
}
}
}

// handleNewDecidedMessage saves an incoming (valid) decided message
func (r *decidedReader) handleNewDecidedMessage(msg *proto.SignedMessage) error {
logger := r.logger.With(messageFields(msg)...)
if decided, found, _ := r.storage.GetDecided(r.identifier, msg.Message.SeqNumber); found && decided != nil {
logger.Debug("received known sequence")
return nil
}
if err := r.storage.SaveDecided(msg); err != nil {
return errors.Wrap(err, "could not save decided")
}
logger.Debug("decided saved")
reportDecided(msg, r.validatorShare)
return r.checkHighestDecided(msg)
}

// checkHighestDecided check if highest decided should be updated
func (r *decidedReader) checkHighestDecided(msg *proto.SignedMessage) error {
logger := r.logger.With(messageFields(msg)...)
seq := msg.Message.SeqNumber
highestKnown, found, err := r.storage.GetHighestDecidedInstance(r.identifier)
if err != nil {
return errors.Wrap(err, "could not get highest decided")
}
if found {
highestSeqKnown := uint64(0)
if highestKnown != nil {
highestSeqKnown = highestKnown.Message.SeqNumber
}
if seq < highestSeqKnown {
logger.Debug("received old sequence",
zap.Uint64("highestSeqKnown", highestSeqKnown))
return nil
}
if seq > highestSeqKnown+1 {
if err := r.Sync(); err != nil {
logger.Debug("could not sync", zap.Uint64("seq", seq),
zap.Uint64("highestSeqKnown", highestSeqKnown))
return err
}
return nil
}
}
if err := r.storage.SaveHighestDecidedInstance(msg); err != nil {
return errors.Wrap(err, "could not save highest decided")
}
logger.Info("highest decided saved")
return nil
}

// validateDecidedMsg validates the message
func (r *decidedReader) validateDecidedMsg(msg *proto.SignedMessage) error {
r.logger.Debug("validating a new decided message", zap.String("msg", msg.String()))
return validateDecidedMsg(msg, r.validatorShare)
}

// waitForMinPeers will wait until enough peers joined the topic
func (r *decidedReader) waitForMinPeers(pk *bls.PublicKey, minPeerCount int) error {
ctx := commons.WaitMinPeersCtx{
Ctx: context.Background(),
Logger: r.logger,
Net: r.network,
}
return commons.WaitForMinPeers(ctx, pk.Serialize(), minPeerCount,
1*time.Second, 64*time.Second, false)
}

func validateDecidedMsg(msg *proto.SignedMessage, share *storage.Share) error {
p := pipeline.Combine(
auth.BasicMsgValidation(),
auth.MsgTypeCheck(proto.RoundState_Commit),
auth.AuthorizeMsg(r.validatorShare),
auth.ValidateQuorum(r.validatorShare.ThresholdSize()),
auth.AuthorizeMsg(share),
auth.ValidateQuorum(share.ThresholdSize()),
)
return p.Run(msg)
}

func validateMsg(msg *proto.SignedMessage, identifier string) error {
p := pipeline.Combine(
auth.BasicMsgValidation(),
auth.ValidateLambdas([]byte(identifier)),
)
return p.Run(msg)
}
51 changes: 51 additions & 0 deletions exporter/ibft/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package ibft

import "sync"

// TODO: add to ibft package as most of parts here are code duplicates
// tests should be added as well, currently it would cause redundant maintenance

var (
decidedReaders sync.Map
networkReaders sync.Map
)

// Reader is an interface for ibft in the context of an exporter
type Reader interface {
Start() error
}

// Syncer is an interface for syncing data
type Syncer interface {
Sync() error
}

// SyncRead reads and sync data
type SyncRead interface {
Reader
Syncer
}

// NewNetworkReader factory to create network readers
func NewNetworkReader(o IncomingMsgsReaderOptions) Reader {
pk := o.PK.SerializeToHexStr()
r, exist := networkReaders.Load(pk)
if !exist {
reader := newIncomingMsgsReader(o)
networkReaders.Store(pk, reader)
return reader
}
return r.(*incomingMsgsReader)
}

// NewDecidedReader factory to create decided readers
func NewDecidedReader(o DecidedReaderOptions) SyncRead {
pk := o.ValidatorShare.PublicKey.SerializeToHexStr()
r, exist := decidedReaders.Load(pk)
if !exist {
reader := newDecidedReader(o)
decidedReaders.Store(pk, reader)
return reader
}
return r.(*decidedReader)
}
Loading

0 comments on commit 9841e50

Please sign in to comment.