Skip to content

Commit

Permalink
Merge pull request #316 from bloxapp/stage
Browse files Browse the repository at this point in the history
Refactor start validator flow #313
Resolve Not all shares all loaded without resync #315
  • Loading branch information
amirylm authored Sep 20, 2021
2 parents 9841e50 + fbeeab6 commit 10c9198
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 9 deletions.
2 changes: 1 addition & 1 deletion docs/resources/cov-badge.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions storage/kv/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (b *BadgerDb) GetAllByCollection(prefix []byte) ([]basedb.Obj, error) {
var err error
err = b.db.View(func(txn *badger.Txn) error {
opt := badger.DefaultIteratorOptions
opt.PrefetchSize = 1000 // if the number of items is larger than this size, results get mixed up
opt.Prefix = prefix
it := txn.NewIterator(opt)
defer it.Close()
Expand Down
36 changes: 30 additions & 6 deletions validator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,19 +139,36 @@ func (c *controller) StartValidators() {
c.logger.Info("could not find validators")
return
}
c.setupValidators(shares)
}

// setupValidators starts all validators
func (c *controller) setupValidators(shares []*validatorstorage.Share) {
c.logger.Info("starting validators setup...", zap.Int("shares count", len(shares)))
var errs []error
for _, validatorShare := range shares {
v := c.validatorsMap.GetOrCreateValidator(validatorShare)
pk := v.Share.PublicKey.SerializeToHexStr()
logger := c.logger.With(zap.String("pubkey", pk))
if v.Share.Index == nil {
if err := c.addValidatorIndex(v.Share); err != nil {
if err == errIndicesNotFound {
logger.Warn("could not start validator: missing index")
} else {
logger.Error("could not start validator: could not add index", zap.Error(err))
}
metricsValidatorStatus.WithLabelValues(pk).Set(float64(validatorStatusNoIndex))
errs = append(errs, err)
continue
}
logger.Debug("updated index for validator", zap.Uint64("index", *v.Share.Index))
}
if err := v.Start(); err != nil {
pk := v.Share.PublicKey.SerializeToHexStr()
c.logger.Error("could not start validator", zap.Error(err),
zap.String("pubkey", pk))
logger.Error("could not start validator", zap.Error(err))
metricsValidatorStatus.WithLabelValues(pk).Set(float64(validatorStatusError))
errs = append(errs, err)
continue
}
c.logger.Debug("validator started", zap.String("pubkey", v.Share.PublicKey.SerializeToHexStr()))
}
c.logger.Info("setup validators done", zap.Int("map size", c.validatorsMap.Size()),
zap.Int("failures", len(errs)), zap.Int("shares count", len(shares)))
Expand Down Expand Up @@ -224,13 +241,16 @@ func (c *controller) handleValidatorAddedEvent(validatorAddedEvent eth1.Validato

v := c.validatorsMap.GetOrCreateValidator(validatorShare)

if v.Share.Index == nil {
logger.Warn("could not start validator without index")
return nil
}

if err := v.Start(); err != nil {
metricsValidatorStatus.WithLabelValues(pubKey).Set(float64(validatorStatusError))
return errors.Wrap(err, "could not start validator")
}

logger.Debug("validator started")

return nil
}

Expand Down Expand Up @@ -266,6 +286,10 @@ func (c *controller) addValidatorsIndices(toFetch [][]byte) {
if err := c.collection.SaveValidatorShare(v.Share); err != nil {
c.logger.Debug("could not save share", zap.String("pubkey", pk), zap.Error(err))
}
if err := v.Start(); err != nil {
c.logger.Error("could not start validator", zap.String("pubkey", pk), zap.Error(err))
metricsValidatorStatus.WithLabelValues(pk).Set(float64(validatorStatusError))
}
}
}
c.logger.Debug("updated indices", zap.Any("indices", indices))
Expand Down
6 changes: 4 additions & 2 deletions validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,12 @@ func (v *Validator) Start() error {
go ib.Init()
}

metricsValidatorStatus.WithLabelValues(v.Share.PublicKey.SerializeToHexStr()).
Set(float64(validatorStatusReady))
v.logger.Debug("validator started")
})

metricsValidatorStatus.WithLabelValues(v.Share.PublicKey.SerializeToHexStr()).
Set(float64(validatorStatusReady))

return nil
}

Expand Down
3 changes: 3 additions & 0 deletions validator/validators_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ func (vm *validatorsMap) GetOrCreateValidator(share *storage.Share) *Validator {

// Size returns the number of validators in the map
func (vm *validatorsMap) Size() int {
vm.lock.RLock()
defer vm.lock.RUnlock()

return len(vm.validatorsMap)
}

Expand Down

0 comments on commit 10c9198

Please sign in to comment.