Skip to content

Commit

Permalink
Merge pull request #490 from bloxapp/stage
Browse files Browse the repository at this point in the history
Stage to Main (v0.1.8 preparation)
  • Loading branch information
nivBlox authored Jan 2, 2022
2 parents da7e365 + 750fbb5 commit 3391eaf
Show file tree
Hide file tree
Showing 39 changed files with 671 additions and 244 deletions.
16 changes: 6 additions & 10 deletions beacon/goclient/ekm/signer_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,22 +96,18 @@ func (s *signerStorage) ListAccounts() ([]core.ValidatorAccount, error) {
s.lock.RLock()
defer s.lock.RUnlock()

objs, err := s.db.GetAllByCollection(s.objPrefix(accountsPrefix))
if err != nil {
return nil, errors.Wrap(err, "Failed to get val share")
}

ret := make([]core.ValidatorAccount, 0)
for _, obj := range objs {

err := s.db.GetAll(s.objPrefix(accountsPrefix), func(i int, obj basedb.Obj) error {
acc, err := s.decodeAccount(obj.Value)
if err != nil {
return nil, errors.Wrap(err, "failed to list accounts")
return errors.Wrap(err, "failed to list accounts")
}

ret = append(ret, acc)
}
return nil
})

return ret, nil
return ret, err
}

// SaveAccount saves the given account
Expand Down
2 changes: 1 addition & 1 deletion config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ p2p:
# TcpPort:
# UdpPort:
# mdns for local network setup
# DiscoveryType: mdns
DiscoveryType: mdns

ssv:
GenesisEpoch:
Expand Down
3 changes: 1 addition & 2 deletions eth1/goeth/goETH.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,8 +256,7 @@ func (ec *eth1Client) listenToSubscription(logs chan types.Log, sub ethereum.Sub

// syncSmartContractsEvents sync events history of the given contract
func (ec *eth1Client) syncSmartContractsEvents(fromBlock *big.Int) error {
ec.logger.Debug("syncing smart contract events",
zap.Uint64("fromBlock", fromBlock.Uint64()))
ec.logger.Debug("syncing smart contract events", zap.Uint64("fromBlock", fromBlock.Uint64()))

contractAbi, err := abi.JSON(strings.NewReader(ec.contractABI))
if err != nil {
Expand Down
10 changes: 10 additions & 0 deletions exporter/api/broadcaster_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"context"
"fmt"
"github.com/prysmaticlabs/prysm/async/event"
"github.com/stretchr/testify/require"
Expand All @@ -10,6 +11,15 @@ import (
"time"
)

func TestConn_Send_FullQueue(t *testing.T) {
logger := zaptest.NewLogger(t)
c := newConn(context.Background(), logger, nil, "test", 0)

for i := 0; i < chanSize+2; i++ {
c.Send([]byte(fmt.Sprintf("test-%d", i)))
}
}

func TestBroadcaster(t *testing.T) {
logger := zaptest.NewLogger(t)
b := newBroadcaster(logger)
Expand Down
20 changes: 18 additions & 2 deletions exporter/api/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ func (c *conn) ReadNext() []byte {

// Send sends the given message
func (c *conn) Send(msg []byte) {
if len(c.send) >= chanSize {
// don't send on full channel
return
}
c.send <- msg
}

Expand Down Expand Up @@ -149,11 +153,23 @@ func (c *conn) ReadLoop() {
_ = c.ws.Close()
}()
c.ws.SetReadLimit(maxMessageSize)
_ = c.ws.SetReadDeadline(time.Now().Add(pongWait))
err := c.ws.SetReadDeadline(time.Now().Add(pongWait))
if err != nil {
c.logger.Error("read loop stopped by set read deadline", zap.Error(err))
return
}
c.ws.SetPongHandler(func(string) error {
// extend read limit on every pong message
// this will keep the connection alive from our POV
_ = c.ws.SetReadDeadline(time.Now().Add(pongWait))
c.logger.Debug("pong received")
err := c.ws.SetReadDeadline(time.Now().Add(pongWait))
if err != nil {
c.logger.Error("pong handler - readDeadline", zap.Error(err))
}
return err
})
c.ws.SetPingHandler(func(string) error {
c.logger.Debug("ping received")
return nil
})
for {
Expand Down
56 changes: 31 additions & 25 deletions exporter/ibft/decided_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ibft

import (
"context"
"fmt"
"github.com/bloxapp/ssv/beacon"
"github.com/bloxapp/ssv/exporter/api"
"github.com/bloxapp/ssv/ibft"
Expand Down Expand Up @@ -104,9 +105,6 @@ func (r *decidedReader) Start() error {
if err := r.waitForMinPeers(ctx, r.validatorShare.PublicKey, 1); err != nil {
return errors.Wrap(err, "could not wait for min peers")
}
cn, done := r.network.ReceivedDecidedChan()
defer done()
r.listenToNetwork(cn)
return nil
}

Expand All @@ -122,27 +120,35 @@ func (r *decidedReader) sync() error {
return err
}

func (r *decidedReader) listenToNetwork(cn <-chan *proto.SignedMessage) {
r.logger.Debug("listening to decided messages")
for msg := range cn {
if err := validateMsg(msg, string(r.identifier)); err != nil {
continue
}
logger := r.logger.With(messageFields(msg)...)
if err := validateDecidedMsg(msg, r.validatorShare); err != nil {
logger.Debug("received invalid decided message")
continue
}
go func(msg *proto.SignedMessage) {
defer logger.Debug("done with decided msg")
if saved, err := r.handleNewDecidedMessage(msg); err != nil {
if !saved {
logger.Error("could not handle decided message", zap.Error(err))
}
logger.Error("could not check highest decided", zap.Error(err))
}
}(msg)
// GetMsgResolver returns proper handler for msg based on msg type
func (r *decidedReader) GetMsgResolver(networkMsg network.NetworkMsg) func(msg *proto.SignedMessage) {
switch networkMsg {
case network.NetworkMsg_DecidedType:
return r.onMessage
}
return func(msg *proto.SignedMessage) {
r.logger.Warn(fmt.Sprintf("handler type (%s) is not supported", networkMsg))
}
}

func (r *decidedReader) onMessage(msg *proto.SignedMessage) {
if err := validateMsg(msg, r.identifier); err != nil {
return
}
logger := r.logger.With(messageFields(msg)...)
if err := validateDecidedMsg(msg, r.validatorShare); err != nil {
logger.Debug("received invalid decided message")
return
}
go func(msg *proto.SignedMessage) {
defer logger.Debug("done with decided msg")
if saved, err := r.handleNewDecidedMessage(msg); err != nil {
if !saved {
logger.Error("could not handle decided message", zap.Error(err))
}
logger.Error("could not check highest decided", zap.Error(err))
}
}(msg)
}

// handleNewDecidedMessage saves an incoming (valid) decided message
Expand Down Expand Up @@ -236,10 +242,10 @@ func validateDecidedMsg(msg *proto.SignedMessage, share *storage.Share) error {
return p.Run(msg)
}

func validateMsg(msg *proto.SignedMessage, identifier string) error {
func validateMsg(msg *proto.SignedMessage, identifier []byte) error {
p := pipeline.Combine(
auth.BasicMsgValidation(),
auth.ValidateLambdas([]byte(identifier)),
auth.ValidateLambdas(identifier),
)
return p.Run(msg)
}
Expand Down
61 changes: 33 additions & 28 deletions exporter/ibft/incoming_msgs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ibft

import (
"context"
"fmt"
"github.com/bloxapp/ssv/beacon"
"github.com/bloxapp/ssv/ibft/proto"
"github.com/bloxapp/ssv/network"
Expand Down Expand Up @@ -50,40 +51,44 @@ func (i *incomingMsgsReader) Start() error {
if err := i.waitForMinPeers(ctx, i.publicKey, 1); err != nil {
return errors.Wrap(err, "could not wait for min peers")
}
cn, done := i.network.ReceivedMsgChan()
defer done()
i.listenToNetwork(cn)
return nil
}

func (i *incomingMsgsReader) listenToNetwork(cn <-chan *proto.SignedMessage) {
func (i *incomingMsgsReader) GetMsgResolver(networkMsg network.NetworkMsg) func(msg *proto.SignedMessage) {
switch networkMsg {
case network.NetworkMsg_IBFTType:
return i.onMessage
}
return func(msg *proto.SignedMessage) {
i.logger.Warn(fmt.Sprintf("handler type (%s) is not supported", networkMsg))
}
}

func (i *incomingMsgsReader) onMessage(msg *proto.SignedMessage) {
identifier := format.IdentifierFormat(i.publicKey.Serialize(), beacon.RoleTypeAttester.String())
i.logger.Debug("listening to network messages")
for msg := range cn {
if msg == nil || msg.Message == nil {
i.logger.Info("received invalid msg")
continue
}
// filtering irrelevant messages
// TODO: handle other types of roles
if identifier != string(msg.Message.Lambda) {
continue
}
if msg == nil || msg.Message == nil {
i.logger.Info("received invalid msg")
return
}
// filtering irrelevant messages
// TODO: handle other types of roles
if identifier != string(msg.Message.Lambda) {
return
}

fields := messageFields(msg)
fields := messageFields(msg)

switch msg.Message.Type {
case proto.RoundState_PrePrepare:
i.logger.Info("pre-prepare msg", fields...)
case proto.RoundState_Prepare:
i.logger.Info("prepare msg", fields...)
case proto.RoundState_Commit:
i.logger.Info("commit msg", fields...)
case proto.RoundState_ChangeRound:
i.logger.Info("change round msg", fields...)
default:
i.logger.Warn("undefined message type", zap.Any("msg", msg))
}
switch msg.Message.Type {
case proto.RoundState_PrePrepare:
i.logger.Info("pre-prepare msg", fields...)
case proto.RoundState_Prepare:
i.logger.Info("prepare msg", fields...)
case proto.RoundState_Commit:
i.logger.Info("commit msg", fields...)
case proto.RoundState_ChangeRound:
i.logger.Info("change round msg", fields...)
default:
i.logger.Warn("undefined message type", zap.Any("msg", msg))
}
}

Expand Down
38 changes: 34 additions & 4 deletions exporter/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/bloxapp/ssv/exporter/api"
"github.com/bloxapp/ssv/exporter/ibft"
"github.com/bloxapp/ssv/exporter/storage"
ibftController "github.com/bloxapp/ssv/ibft/controller"
"github.com/bloxapp/ssv/ibft/proto"
"github.com/bloxapp/ssv/monitoring/metrics"
"github.com/bloxapp/ssv/network"
Expand Down Expand Up @@ -86,6 +87,8 @@ type exporter struct {
decidedReadersQueue tasks.Queue
networkReadersQueue tasks.Queue
metaDataReadersQueue tasks.Queue

networkMsgMediator ibftController.Mediator
}

// New creates a new Exporter instance
Expand All @@ -109,10 +112,13 @@ func New(opts Options) Exporter {
decidedReadersQueue: tasks.NewExecutionQueue(readerQueuesInterval),
networkReadersQueue: tasks.NewExecutionQueue(readerQueuesInterval),
metaDataReadersQueue: tasks.NewExecutionQueue(metaDataReaderQueuesInterval),
ws: opts.WS,
readersMut: sync.RWMutex{},
decidedReaders: map[string]ibft.Reader{},
netReaders: map[string]ibft.Reader{},

networkMsgMediator: ibftController.NewMediator(opts.Logger),

ws: opts.WS,
readersMut: sync.RWMutex{},
decidedReaders: map[string]ibft.Reader{},
netReaders: map[string]ibft.Reader{},
commitReader: ibft.NewCommitReader(ibft.CommitReaderOptions{
Logger: opts.Logger,
Network: opts.Network,
Expand Down Expand Up @@ -174,6 +180,8 @@ func (exp *exporter) Start() error {

go exp.startMainTopic()

exp.startNetworkMediators()

go exp.reportOperators()

return exp.ws.Start(fmt.Sprintf(":%d", exp.wsAPIPort))
Expand Down Expand Up @@ -367,3 +375,25 @@ func (exp *exporter) reportOperators() {
reportOperatorIndex(exp.logger, &operators[i])
}
}

func (exp *exporter) startNetworkMediators() {
msgChan, msgDone := exp.network.ReceivedMsgChan()
decidedChan, decidedDone := exp.network.ReceivedDecidedChan()

exp.networkMsgMediator.AddListener(network.NetworkMsg_IBFTType, msgChan, msgDone, func(publicKey string) (ibftController.MediatorReader, bool) {
exp.readersMut.Lock()
defer exp.readersMut.Unlock()
if reader, ok := exp.netReaders[publicKey]; ok {
return reader.(ibftController.MediatorReader), ok
}
return nil, false
})
exp.networkMsgMediator.AddListener(network.NetworkMsg_DecidedType, decidedChan, decidedDone, func(publicKey string) (ibftController.MediatorReader, bool) {
exp.readersMut.Lock()
defer exp.readersMut.Unlock()
if reader, ok := exp.decidedReaders[publicKey]; ok {
return reader.(ibftController.MediatorReader), ok
}
return nil, false
})
}
17 changes: 9 additions & 8 deletions exporter/storage/operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package storage
import (
"bytes"
"encoding/json"
"github.com/bloxapp/ssv/storage/basedb"
"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"go.uber.org/zap"
Expand Down Expand Up @@ -33,19 +34,19 @@ func (es *exporterStorage) ListOperators(from int64, to int64) ([]OperatorInform
es.operatorsLock.RLock()
defer es.operatorsLock.RUnlock()

objs, err := es.db.GetAllByCollection(append(storagePrefix(), operatorsPrefix...))
if err != nil {
return nil, err
}
to = normalTo(to)
var operators []OperatorInformation
for _, obj := range objs {
to = normalTo(to)
err := es.db.GetAll(append(storagePrefix(), operatorsPrefix...), func(i int, obj basedb.Obj) error {
var oi OperatorInformation
err = json.Unmarshal(obj.Value, &oi)
if err := json.Unmarshal(obj.Value, &oi); err != nil {
return err
}
if oi.Index >= from && oi.Index <= to {
operators = append(operators, oi)
}
}
return nil
})

return operators, err
}

Expand Down
Loading

0 comments on commit 3391eaf

Please sign in to comment.