Skip to content

Commit

Permalink
Merge pull request #449 from bloxapp/stage
Browse files Browse the repository at this point in the history
Stage to Main (v0.1.6 preparation)
  • Loading branch information
nivBlox authored Nov 24, 2021
2 parents e4ae575 + 8829e67 commit 95a305e
Show file tree
Hide file tree
Showing 12 changed files with 256 additions and 34 deletions.
2 changes: 1 addition & 1 deletion docs/resources/cov-badge.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
48 changes: 48 additions & 0 deletions exporter/ibft/decided_genesis_migrator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package ibft

import (
"github.com/pkg/errors"
"go.uber.org/zap"
)

// decidedGenesisMigrator
type decidedGenesisMigrator struct {
logger *zap.Logger
}

// Migrate take care of decided messages migration
func (m *decidedGenesisMigrator) Migrate(r Reader) error {
dr, ok := r.(*decidedReader)
if !ok {
return nil
}
n, err := m.migrate(dr)
if err != nil {
return errors.Wrap(err, "could not migrate decided 0")
}
m.logger.Debug("managed to migrate decided 0",
zap.String("identifier", string(dr.identifier)), zap.Int("items", n))
return nil
}

// migrate performing migration for decided messages
func (m *decidedGenesisMigrator) migrate(dr *decidedReader) (int, error) {
if migrateDecided0, err := m.check(dr); err != nil {
return 0, err
} else if migrateDecided0 {
return dr.newHistorySync().StartRange(uint64(0), uint64(1))
}
return 0, nil
}

// check determines if the given reader should migrate
func (m *decidedGenesisMigrator) check(dr *decidedReader) (bool, error) {
_, found, err := dr.storage.GetDecided(dr.identifier, uint64(0))
if err != nil {
return false, err
}
if found {
return false, nil
}
return true, nil
}
35 changes: 20 additions & 15 deletions exporter/ibft/decided_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,17 +65,10 @@ func newDecidedReader(opts DecidedReaderOptions) Reader {
return &r
}

// sync starts to fetch best known decided message (highest sequence) from the network and sync to it.
func (r *decidedReader) sync() error {
r.logger.Debug("syncing ibft data")
// creating HistorySync and starts it
hs := history.New(r.logger, r.validatorShare.PublicKey.Serialize(), r.identifier, r.network,
r.storage, r.validateDecidedMsg)
err := hs.Start()
if err != nil {
r.logger.Error("could not sync validator's data", zap.Error(err))
}
return err
// newHistorySync creates a new instance of history sync
func (r *decidedReader) newHistorySync() history.Syncer {
return history.New(r.logger, r.validatorShare.PublicKey.Serialize(), r.validatorShare.CommitteeSize(), r.identifier,
r.network, r.storage, r.validateDecidedMsg)
}

// Share returns the reader's share
Expand All @@ -88,6 +81,10 @@ func (r *decidedReader) Start() error {
if err := r.network.SubscribeToValidatorNetwork(r.validatorShare.PublicKey); err != nil {
return errors.Wrap(err, "failed to subscribe topic")
}
// call migration before starting the other stuff
if err := GetMainMigrator().Migrate(r); err != nil {
r.logger.Error("could not run migration", zap.Error(err))
}
if err := tasks.Retry(func() error {
if err := r.sync(); err != nil {
r.logger.Error("could not sync validator", zap.Error(err))
Expand All @@ -113,6 +110,18 @@ func (r *decidedReader) Start() error {
return nil
}

// sync starts to fetch best known decided message (highest sequence) from the network and sync to it.
func (r *decidedReader) sync() error {
r.logger.Debug("syncing ibft data")
// creating HistorySync and starts it
hs := r.newHistorySync()
err := hs.Start()
if err != nil {
r.logger.Error("could not sync validator's data", zap.Error(err))
}
return err
}

func (r *decidedReader) listenToNetwork(cn <-chan *proto.SignedMessage) {
r.logger.Debug("listening to decided messages")
for msg := range cn {
Expand All @@ -124,10 +133,6 @@ func (r *decidedReader) listenToNetwork(cn <-chan *proto.SignedMessage) {
logger.Debug("received invalid decided message")
continue
}
if msg.Message.SeqNumber == 0 {
logger.Debug("received invalid sequence")
continue
}
go func(msg *proto.SignedMessage) {
defer logger.Debug("done with decided msg")
if saved, err := r.handleNewDecidedMessage(msg); err != nil {
Expand Down
42 changes: 42 additions & 0 deletions exporter/ibft/migrator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package ibft

import (
"github.com/bloxapp/ssv/utils/logex"
"go.uber.org/zap"
"sync"
)

// Migrator is an interface for migrating messages
type Migrator interface {
Migrate(r Reader) error
}

type mainMigrator struct {
migrators []Migrator
}

var migrator mainMigrator

var migratorOnce sync.Once

// GetMainMigrator returns the instance of migrator
func GetMainMigrator() Migrator {
migratorOnce.Do(func() {
logger := logex.GetLogger(zap.String("who", "migrateManager"))
migrators := []Migrator{&decidedGenesisMigrator{logger}}
migrator = mainMigrator{
migrators: migrators,
}
})
return &migrator
}

// Migrate applies the existing migrators on the given reader
func (mm *mainMigrator) Migrate(r Reader) error {
for _, migrator := range mm.migrators {
if err := migrator.Migrate(r); err != nil {
return err
}
}
return nil
}
6 changes: 3 additions & 3 deletions ibft/controller/controller_running_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,15 @@ instanceLoop:
i.currentInstance = nil
i.logger.Debug("iBFT instance result loop stopped")

i.afterInstance(seq, retRes)
i.afterInstance(seq, retRes, err)

return retRes, err
}

// afterInstance is triggered after the instance was finished
func (i *Controller) afterInstance(seq uint64, res *ibft.InstanceResult) {
func (i *Controller) afterInstance(seq uint64, res *ibft.InstanceResult, err error) {
// if instance was decided -> wait for late commit messages
if res.Decided {
if err != nil && res != nil && res.Decided {
go i.listenToLateCommitMsgs(i.Identifier[:], seq)
} else {
i.msgQueue.PurgeIndexedMessages(msgqueue.IBFTMessageIndexKey(i.Identifier[:], seq))
Expand Down
2 changes: 1 addition & 1 deletion ibft/controller/controller_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (i *Controller) SyncIBFT() error {
func (i *Controller) syncIBFT() error {
// TODO: use controller context once added
return tasks.RetryWithContext(context.Background(), func() error {
s := history.New(i.logger, i.ValidatorShare.PublicKey.Serialize(), i.GetIdentifier(), i.network, i.ibftStorage, i.ValidateDecidedMsg)
s := history.New(i.logger, i.ValidatorShare.PublicKey.Serialize(), i.ValidatorShare.CommitteeSize(), i.GetIdentifier(), i.network, i.ibftStorage, i.ValidateDecidedMsg)
err := s.Start()
if err != nil {
return errors.Wrap(err, "history sync failed")
Expand Down
9 changes: 5 additions & 4 deletions ibft/sync/history/fetch_decided.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@ import (

// FetchValidateAndSaveInstances fetches, validates and saves decided messages from the P2P network.
// Range is start to end seq including
func (s *Sync) fetchValidateAndSaveInstances(fromPeer string, startSeq uint64, endSeq uint64) (highestSaved *proto.SignedMessage, err error) {
func (s *Sync) fetchValidateAndSaveInstances(fromPeer string, startSeq uint64, endSeq uint64) (highestSaved *proto.SignedMessage, n int, err error) {
failCount := 0
start := startSeq
done := false
var latestError error
for {
if failCount == 5 {
return highestSaved, latestError
return highestSaved, n, latestError
}
if done {
return highestSaved, nil
return highestSaved, n, nil
}

// conform to max batch
Expand Down Expand Up @@ -70,8 +70,9 @@ func (s *Sync) fetchValidateAndSaveInstances(fromPeer string, startSeq uint64, e

// save
if err := s.ibftStorage.SaveDecided(msg); err != nil {
return highestSaved, err
return highestSaved, n, err
}
n++

// set highest
if highestSaved == nil || highestSaved.Message.SeqNumber < msg.Message.SeqNumber {
Expand Down
4 changes: 2 additions & 2 deletions ibft/sync/history/fetch_decided_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,10 @@ func TestFetchDecided(t *testing.T) {
require.NoError(t, err)
storage := collections.NewIbft(db, logger, "attestation")
network := sync.NewTestNetwork(t, test.peers, int(test.rangeParams[2]), nil, nil, test.decidedArr, nil, nil)
s := New(logger, test.validatorPk, test.identifier, network, &storage, func(msg *proto.SignedMessage) error {
s := New(logger, test.validatorPk, 4, test.identifier, network, &storage, func(msg *proto.SignedMessage) error {
return nil
})
res, err := s.fetchValidateAndSaveInstances(test.fromPeer, test.rangeParams[0], test.rangeParams[1])
res, _, err := s.fetchValidateAndSaveInstances(test.fromPeer, test.rangeParams[0], test.rangeParams[1])

if len(test.expectedError) > 0 {
require.EqualError(t, err, test.expectedError)
Expand Down
7 changes: 3 additions & 4 deletions ibft/sync/history/fetch_highest.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package history
import (
"encoding/hex"
"github.com/bloxapp/ssv/ibft/proto"
sync2 "github.com/bloxapp/ssv/ibft/sync"
ibftsync "github.com/bloxapp/ssv/ibft/sync"
"github.com/bloxapp/ssv/network"
"github.com/bloxapp/ssv/storage/kv"
"github.com/pkg/errors"
Expand All @@ -13,9 +13,8 @@ import (

// findHighestInstance returns the highest found decided signed message and the peer it was received from
func (s *Sync) findHighestInstance() (*proto.SignedMessage, string, error) {
// pick up to 4 peers
// TODO - why 4? should be set as param?
usedPeers, err := sync2.GetPeers(s.network, s.publicKey, 4)
// pick up to committee peers
usedPeers, err := ibftsync.GetPeers(s.network, s.publicKey, s.committeeSize)
if err != nil {
return nil, "", err
}
Expand Down
2 changes: 1 addition & 1 deletion ibft/sync/history/fetch_highest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func TestFindHighest(t *testing.T) {
return nil
}
}
s := New(zap.L(), test.valdiatorPK, test.identifier, sync.NewTestNetwork(t, test.peers, 100,
s := New(zap.L(), test.valdiatorPK, 4, test.identifier, sync.NewTestNetwork(t, test.peers, 100,
test.highestMap, test.errorMap, nil, nil, nil), nil, test.validateMsg)
res, _, err := s.findHighestInstance()

Expand Down
40 changes: 38 additions & 2 deletions ibft/sync/history/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ import (
"time"
)

// Syncer is the interface for history sync
type Syncer interface {
Start() error
StartRange(from, to uint64) (int, error)
}

// Sync is responsible for syncing and iBFT instance when needed by
// fetching decided messages from the network
type Sync struct {
Expand All @@ -20,10 +26,11 @@ type Sync struct {
identifier []byte
// paginationMaxSize is the max number of returned elements in a single response
paginationMaxSize uint64
committeeSize int
}

// New returns a new instance of Sync
func New(logger *zap.Logger, publicKey []byte, identifier []byte, network network.Network, ibftStorage collections.Iibft, validateDecidedMsgF func(msg *proto.SignedMessage) error) *Sync {
func New(logger *zap.Logger, publicKey []byte, committeeSize int, identifier []byte, network network.Network, ibftStorage collections.Iibft, validateDecidedMsgF func(msg *proto.SignedMessage) error) *Sync {
return &Sync{
logger: logger.With(zap.String("sync", "history")),
publicKey: publicKey,
Expand All @@ -32,6 +39,7 @@ func New(logger *zap.Logger, publicKey []byte, identifier []byte, network networ
validateDecidedMsgF: validateDecidedMsgF,
ibftStorage: ibftStorage,
paginationMaxSize: network.MaxBatch(),
committeeSize: committeeSize,
}
}

Expand Down Expand Up @@ -69,7 +77,7 @@ func (s *Sync) Start() error {
}

// fetch, validate and save missing data
highestSaved, err := s.fetchValidateAndSaveInstances(fromPeer, syncStartSeqNumber, remoteHighest.Message.SeqNumber)
highestSaved, _, err := s.fetchValidateAndSaveInstances(fromPeer, syncStartSeqNumber, remoteHighest.Message.SeqNumber)
if err != nil {
return errors.Wrap(err, "could not fetch decided by range during sync")
}
Expand All @@ -84,3 +92,31 @@ func (s *Sync) Start() error {
s.logger.Info("finished syncing", zap.Uint64("highest seq", highestSaved.Message.SeqNumber), zap.String("duration", time.Since(start).String()))
return nil
}

// StartRange starts to sync old messages in a specific range
// first it tries to find a synced peer and then ask a specific range of decided messages
func (s *Sync) StartRange(from, to uint64) (int, error) {
var n int
start := time.Now()
// fetch remote highest
remoteHighest, fromPeer, err := s.findHighestInstance()
if err != nil {
return n, errors.Wrap(err, "could not fetch highest instance during sync")
}
if remoteHighest == nil { // could not find highest, there isn't one
s.logger.Info("node is synced: could not find any peer with highest decided, assuming sequence number is 0",
zap.String("duration", time.Since(start).String()))
return n, nil
}
if remoteHighest.Message.SeqNumber < from {
return n, errors.New("range is out of decided sequence boundaries")
}
// fetch, validate and save missing data
_, n, err = s.fetchValidateAndSaveInstances(fromPeer, from, to)
if err != nil {
return n, errors.Wrap(err, "could not fetch decided by range during sync")
}
s.logger.Info("finished syncing in range", zap.Uint64("from", from), zap.Uint64("to", to),
zap.String("duration", time.Since(start).String()), zap.Int("items", n))
return n, nil
}
Loading

0 comments on commit 95a305e

Please sign in to comment.