diff --git a/docs/resources/cov-badge.svg b/docs/resources/cov-badge.svg
index 8571a2fddc..cc63e6cc6d 100644
--- a/docs/resources/cov-badge.svg
+++ b/docs/resources/cov-badge.svg
@@ -1 +1 @@
-
\ No newline at end of file
+
\ No newline at end of file
diff --git a/storage/kv/badger.go b/storage/kv/badger.go
index 88ede2610e..37726ffbf4 100644
--- a/storage/kv/badger.go
+++ b/storage/kv/badger.go
@@ -84,6 +84,7 @@ func (b *BadgerDb) GetAllByCollection(prefix []byte) ([]basedb.Obj, error) {
var err error
err = b.db.View(func(txn *badger.Txn) error {
opt := badger.DefaultIteratorOptions
+ opt.PrefetchSize = 1000 // if the number of items is larger than this size, results get mixed up
opt.Prefix = prefix
it := txn.NewIterator(opt)
defer it.Close()
diff --git a/validator/controller.go b/validator/controller.go
index 9e043442ed..589740f76f 100644
--- a/validator/controller.go
+++ b/validator/controller.go
@@ -139,19 +139,36 @@ func (c *controller) StartValidators() {
c.logger.Info("could not find validators")
return
}
+ c.setupValidators(shares)
+}
+
+// setupValidators starts all validators
+func (c *controller) setupValidators(shares []*validatorstorage.Share) {
c.logger.Info("starting validators setup...", zap.Int("shares count", len(shares)))
var errs []error
for _, validatorShare := range shares {
v := c.validatorsMap.GetOrCreateValidator(validatorShare)
+ pk := v.Share.PublicKey.SerializeToHexStr()
+ logger := c.logger.With(zap.String("pubkey", pk))
+ if v.Share.Index == nil {
+ if err := c.addValidatorIndex(v.Share); err != nil {
+ if err == errIndicesNotFound {
+ logger.Warn("could not start validator: missing index")
+ } else {
+ logger.Error("could not start validator: could not add index", zap.Error(err))
+ }
+ metricsValidatorStatus.WithLabelValues(pk).Set(float64(validatorStatusNoIndex))
+ errs = append(errs, err)
+ continue
+ }
+ logger.Debug("updated index for validator", zap.Uint64("index", *v.Share.Index))
+ }
if err := v.Start(); err != nil {
- pk := v.Share.PublicKey.SerializeToHexStr()
- c.logger.Error("could not start validator", zap.Error(err),
- zap.String("pubkey", pk))
+ logger.Error("could not start validator", zap.Error(err))
metricsValidatorStatus.WithLabelValues(pk).Set(float64(validatorStatusError))
errs = append(errs, err)
continue
}
- c.logger.Debug("validator started", zap.String("pubkey", v.Share.PublicKey.SerializeToHexStr()))
}
c.logger.Info("setup validators done", zap.Int("map size", c.validatorsMap.Size()),
zap.Int("failures", len(errs)), zap.Int("shares count", len(shares)))
@@ -224,13 +241,16 @@ func (c *controller) handleValidatorAddedEvent(validatorAddedEvent eth1.Validato
v := c.validatorsMap.GetOrCreateValidator(validatorShare)
+ if v.Share.Index == nil {
+ logger.Warn("could not start validator without index")
+ return nil
+ }
+
if err := v.Start(); err != nil {
metricsValidatorStatus.WithLabelValues(pubKey).Set(float64(validatorStatusError))
return errors.Wrap(err, "could not start validator")
}
- logger.Debug("validator started")
-
return nil
}
@@ -266,6 +286,10 @@ func (c *controller) addValidatorsIndices(toFetch [][]byte) {
if err := c.collection.SaveValidatorShare(v.Share); err != nil {
c.logger.Debug("could not save share", zap.String("pubkey", pk), zap.Error(err))
}
+ if err := v.Start(); err != nil {
+ c.logger.Error("could not start validator", zap.String("pubkey", pk), zap.Error(err))
+ metricsValidatorStatus.WithLabelValues(pk).Set(float64(validatorStatusError))
+ }
}
}
c.logger.Debug("updated indices", zap.Any("indices", indices))
diff --git a/validator/validator.go b/validator/validator.go
index 64177ec787..1ca2ae7284 100644
--- a/validator/validator.go
+++ b/validator/validator.go
@@ -96,10 +96,12 @@ func (v *Validator) Start() error {
go ib.Init()
}
- metricsValidatorStatus.WithLabelValues(v.Share.PublicKey.SerializeToHexStr()).
- Set(float64(validatorStatusReady))
+ v.logger.Debug("validator started")
})
+ metricsValidatorStatus.WithLabelValues(v.Share.PublicKey.SerializeToHexStr()).
+ Set(float64(validatorStatusReady))
+
return nil
}
diff --git a/validator/validators_map.go b/validator/validators_map.go
index a3b68ea679..9b9917fe6a 100644
--- a/validator/validators_map.go
+++ b/validator/validators_map.go
@@ -80,6 +80,9 @@ func (vm *validatorsMap) GetOrCreateValidator(share *storage.Share) *Validator {
// Size returns the number of validators in the map
func (vm *validatorsMap) Size() int {
+ vm.lock.RLock()
+ defer vm.lock.RUnlock()
+
return len(vm.validatorsMap)
}