From 46387a903a50571f005569c7c826753cc5e6b5bb Mon Sep 17 00:00:00 2001 From: Manu NALEPA Date: Tue, 16 Jan 2024 12:29:36 +0100 Subject: [PATCH 1/7] `getLegacyDatabaseLocation`: Change message. (#13471) * `getLegacyDatabaseLocation`: Change message. * Update validator/node/node.go Co-authored-by: Nishant Das --------- Co-authored-by: Nishant Das --- validator/node/node.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/validator/node/node.go b/validator/node/node.go index 3da7b4984992..111e68dcdddc 100644 --- a/validator/node/node.go +++ b/validator/node/node.go @@ -232,8 +232,15 @@ func (c *ValidatorClient) getLegacyDatabaseLocation( legacyDataFile := filepath.Join(legacyDataDir, kv.ProtectionDbFileName) if file.Exists(legacyDataFile) { - log.Warningf("Database not found in `--datadir` (%s) but found in `--wallet-dir` (%s).", dataFile, legacyDataFile) - log.Warningf("Please move the database from (%s) to (%s).", legacyDataFile, dataFile) + log.Infof(`Database not found in the --datadir directory (%s) + but found in the --wallet-dir directory (%s), + which was the legacy default. + The next time you run the validator client without a database, + it will be created into the --datadir directory (%s). + To silence this message, you can move the database from (%s) + to (%s).`, + dataDir, legacyDataDir, dataDir, legacyDataFile, dataFile) + dataDir = legacyDataDir dataFile = legacyDataFile } From 790a09f9b1d3431c8142ee9c2bf23a1bb2317503 Mon Sep 17 00:00:00 2001 From: james-prysm <90280386+james-prysm@users.noreply.github.com> Date: Tue, 16 Jan 2024 11:04:54 -0600 Subject: [PATCH 2/7] Improve wait for activation (#13448) * removing timeout on wait for activation, instead switched to an event driven approach * fixing unit tests * linting * simplifying return * adding sleep for the remaining slot to avoid cpu spikes * removing ifstatement on log * removing ifstatement on log * improving switch statement * removing the loop entirely * fixing unit test * fixing manu's reported issue with deletion of json file * missed change around writefile at path * gofmt * fixing deepsource issue with reading file * trying to clean file to avoid deepsource issue * still getting error trying a different approach * fixing stream loop * fixing unit test * Update validator/keymanager/local/keymanager.go Co-authored-by: Manu NALEPA * fixing linting --------- Co-authored-by: Manu NALEPA --- validator/accounts/iface/wallet.go | 2 +- validator/accounts/testing/mock.go | 6 +- validator/accounts/wallet/wallet.go | 11 +-- validator/accounts/wallet_create.go | 3 +- validator/client/key_reload.go | 7 +- validator/client/validator.go | 19 ++--- validator/client/validator_test.go | 37 --------- validator/client/wait_for_activation.go | 72 ++++++----------- validator/client/wait_for_activation_test.go | 84 ++++++++++++++++---- validator/keymanager/local/keymanager.go | 28 +++++-- validator/keymanager/local/refresh.go | 52 +++++++----- 11 files changed, 161 insertions(+), 160 deletions(-) diff --git a/validator/accounts/iface/wallet.go b/validator/accounts/iface/wallet.go index 9eb8299fa5b6..b5afbc90aa25 100644 --- a/validator/accounts/iface/wallet.go +++ b/validator/accounts/iface/wallet.go @@ -23,7 +23,7 @@ type Wallet interface { // Read methods for important wallet and accounts-related files. ReadFileAtPath(ctx context.Context, filePath string, fileName string) ([]byte, error) // Write methods to persist important wallet and accounts-related files to disk. - WriteFileAtPath(ctx context.Context, pathName string, fileName string, data []byte) error + WriteFileAtPath(ctx context.Context, pathName string, fileName string, data []byte) (bool, error) // Method for initializing a new keymanager. InitializeKeymanager(ctx context.Context, cfg InitKeymanagerConfig) (keymanager.IKeymanager, error) } diff --git a/validator/accounts/testing/mock.go b/validator/accounts/testing/mock.go index cb903c65ff46..f43ed5130e89 100644 --- a/validator/accounts/testing/mock.go +++ b/validator/accounts/testing/mock.go @@ -55,19 +55,19 @@ func (w *Wallet) Password() string { } // WriteFileAtPath -- -func (w *Wallet) WriteFileAtPath(_ context.Context, pathName, fileName string, data []byte) error { +func (w *Wallet) WriteFileAtPath(_ context.Context, pathName, fileName string, data []byte) (bool, error) { w.lock.Lock() defer w.lock.Unlock() if w.HasWriteFileError { // reset the flag to not contaminate other tests w.HasWriteFileError = false - return errors.New("could not write keystore file for accounts") + return false, errors.New("could not write keystore file for accounts") } if w.Files[pathName] == nil { w.Files[pathName] = make(map[string][]byte) } w.Files[pathName][fileName] = data - return nil + return true, nil } // ReadFileAtPath -- diff --git a/validator/accounts/wallet/wallet.go b/validator/accounts/wallet/wallet.go index acbc07a1e998..fc97d577f29d 100644 --- a/validator/accounts/wallet/wallet.go +++ b/validator/accounts/wallet/wallet.go @@ -366,26 +366,27 @@ func (w *Wallet) InitializeKeymanager(ctx context.Context, cfg iface.InitKeymana } // WriteFileAtPath within the wallet directory given the desired path, filename, and raw data. -func (w *Wallet) WriteFileAtPath(_ context.Context, filePath, fileName string, data []byte) error { +func (w *Wallet) WriteFileAtPath(_ context.Context, filePath, fileName string, data []byte) (bool /* exited previously */, error) { accountPath := filepath.Join(w.accountsPath, filePath) hasDir, err := file.HasDir(accountPath) if err != nil { - return err + return false, err } if !hasDir { if err := file.MkdirAll(accountPath); err != nil { - return errors.Wrapf(err, "could not create path: %s", accountPath) + return false, errors.Wrapf(err, "could not create path: %s", accountPath) } } fullPath := filepath.Join(accountPath, fileName) + existedPreviously := file.Exists(fullPath) if err := file.WriteFile(fullPath, data); err != nil { - return errors.Wrapf(err, "could not write %s", filePath) + return false, errors.Wrapf(err, "could not write %s", filePath) } log.WithFields(logrus.Fields{ "path": fullPath, "fileName": fileName, }).Debug("Wrote new file at path") - return nil + return existedPreviously, nil } // ReadFileAtPath within the wallet directory given the desired path and filename. diff --git a/validator/accounts/wallet_create.go b/validator/accounts/wallet_create.go index b1b081bf4f2b..1e2852863228 100644 --- a/validator/accounts/wallet_create.go +++ b/validator/accounts/wallet_create.go @@ -32,7 +32,8 @@ func (acm *CLIManager) WalletCreate(ctx context.Context) (*wallet.Wallet, error) if err != nil { return nil, err } - if err = w.WriteFileAtPath(ctx, local.AccountsPath, local.AccountsKeystoreFileName, encodedAccounts); err != nil { + _, err = w.WriteFileAtPath(ctx, local.AccountsPath, local.AccountsKeystoreFileName, encodedAccounts) + if err != nil { return nil, err } log.WithField("--wallet-dir", acm.walletDir).Info( diff --git a/validator/client/key_reload.go b/validator/client/key_reload.go index cbfc67a74eed..bb8add0fde3f 100644 --- a/validator/client/key_reload.go +++ b/validator/client/key_reload.go @@ -48,10 +48,5 @@ func (v *validator) HandleKeyReload(ctx context.Context, currentKeys [][fieldpar valCount = int64(valCounts[0].Count) } - anyActive = v.checkAndLogValidatorStatus(statuses, valCount) - if anyActive { - logActiveValidatorStatus(statuses) - } - - return anyActive, nil + return v.checkAndLogValidatorStatus(statuses, valCount), nil } diff --git a/validator/client/validator.go b/validator/client/validator.go index d263ed3b23a4..bcf28905a12a 100644 --- a/validator/client/validator.go +++ b/validator/client/validator.go @@ -54,14 +54,13 @@ import ( // keyFetchPeriod is the frequency that we try to refetch validating keys // in case no keys were fetched previously. var ( - keyRefetchPeriod = 30 * time.Second ErrBuilderValidatorRegistration = errors.New("Builder API validator registration unsuccessful") ErrValidatorsAllExited = errors.New("All validators are exited, no more work to perform...") ) var ( msgCouldNotFetchKeys = "could not fetch validating keys" - msgNoKeysFetched = "No validating keys fetched. Trying again" + msgNoKeysFetched = "No validating keys fetched. Waiting for keys..." ) type validator struct { @@ -403,6 +402,10 @@ func (v *validator) checkAndLogValidatorStatus(statuses []*validatorStatus, acti } case ethpb.ValidatorStatus_ACTIVE, ethpb.ValidatorStatus_EXITING: validatorActivated = true + log.WithFields(logrus.Fields{ + "publicKey": fmt.Sprintf("%#x", bytesutil.Trunc(status.publicKey)), + "index": status.index, + }).Info("Validator activated") case ethpb.ValidatorStatus_EXITED: log.Info("Validator exited") case ethpb.ValidatorStatus_INVALID: @@ -416,18 +419,6 @@ func (v *validator) checkAndLogValidatorStatus(statuses []*validatorStatus, acti return validatorActivated } -func logActiveValidatorStatus(statuses []*validatorStatus) { - for _, s := range statuses { - if s.status.Status != ethpb.ValidatorStatus_ACTIVE { - continue - } - log.WithFields(logrus.Fields{ - "publicKey": fmt.Sprintf("%#x", bytesutil.Trunc(s.publicKey)), - "index": s.index, - }).Info("Validator activated") - } -} - // CanonicalHeadSlot returns the slot of canonical block currently found in the // beacon chain via RPC. func (v *validator) CanonicalHeadSlot(ctx context.Context) (primitives.Slot, error) { diff --git a/validator/client/validator_test.go b/validator/client/validator_test.go index 8ea977848006..55aea89d7c3c 100644 --- a/validator/client/validator_test.go +++ b/validator/client/validator_test.go @@ -388,43 +388,6 @@ func TestWaitMultipleActivation_LogsActivationEpochOK(t *testing.T) { require.LogsContain(t, hook, "Validator activated") } -func TestWaitActivation_NotAllValidatorsActivatedOK(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - validatorClient := validatormock.NewMockValidatorClient(ctrl) - beaconClient := validatormock.NewMockBeaconChainClient(ctrl) - prysmBeaconClient := validatormock.NewMockPrysmBeaconChainClient(ctrl) - - kp := randKeypair(t) - v := validator{ - validatorClient: validatorClient, - keyManager: newMockKeymanager(t, kp), - beaconClient: beaconClient, - prysmBeaconClient: prysmBeaconClient, - } - resp := generateMockStatusResponse([][]byte{kp.pub[:]}) - resp.Statuses[0].Status.Status = ethpb.ValidatorStatus_ACTIVE - clientStream := mock2.NewMockBeaconNodeValidator_WaitForActivationClient(ctrl) - validatorClient.EXPECT().WaitForActivation( - gomock.Any(), - gomock.Any(), - ).Return(clientStream, nil) - prysmBeaconClient.EXPECT().GetValidatorCount( - gomock.Any(), - "head", - []validatorType.Status{validatorType.Active}, - ).Return([]iface.ValidatorCount{}, nil).Times(2) - clientStream.EXPECT().Recv().Return( - ðpb.ValidatorActivationResponse{}, - nil, - ) - clientStream.EXPECT().Recv().Return( - resp, - nil, - ) - assert.NoError(t, v.WaitForActivation(context.Background(), nil), "Could not wait for activation") -} - func TestWaitSync_ContextCanceled(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/validator/client/wait_for_activation.go b/validator/client/wait_for_activation.go index c2c3d10fab34..61bbd6810c1b 100644 --- a/validator/client/wait_for_activation.go +++ b/validator/client/wait_for_activation.go @@ -5,17 +5,14 @@ import ( "io" "time" - validator2 "github.com/prysmaticlabs/prysm/v4/consensus-types/validator" - "github.com/prysmaticlabs/prysm/v4/validator/client/iface" - "github.com/pkg/errors" fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams" - "github.com/prysmaticlabs/prysm/v4/config/params" + validator2 "github.com/prysmaticlabs/prysm/v4/consensus-types/validator" "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v4/math" "github.com/prysmaticlabs/prysm/v4/monitoring/tracing" ethpb "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" - "github.com/prysmaticlabs/prysm/v4/time/slots" + "github.com/prysmaticlabs/prysm/v4/validator/client/iface" "go.opencensus.io/trace" ) @@ -33,18 +30,18 @@ func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan c if err != nil { return err } + // subscribe to the channel if it's the first time sub := km.SubscribeAccountChanges(accountsChangedChan) defer func() { sub.Unsubscribe() close(accountsChangedChan) }() } - return v.internalWaitForActivation(ctx, accountsChangedChan) } // internalWaitForActivation performs the following: -// 1) While the key manager is empty, poll the key manager until some validator keys exist. +// 1) While the key manager is empty, subscribe to keymanager changes until some validator keys exist. // 2) Open a server side stream for activation events against the given keys. // 3) In another go routine, the key manager is monitored for updates and emits an update event on // the accountsChangedChan. When an event signal is received, restart the internalWaitForActivation routine. @@ -53,39 +50,26 @@ func (v *validator) WaitForActivation(ctx context.Context, accountsChangedChan c func (v *validator) internalWaitForActivation(ctx context.Context, accountsChangedChan <-chan [][fieldparams.BLSPubkeyLength]byte) error { ctx, span := trace.StartSpan(ctx, "validator.WaitForActivation") defer span.End() - validatingKeys, err := v.keyManager.FetchValidatingPublicKeys(ctx) if err != nil { - return errors.Wrap(err, "could not fetch validating keys") + return errors.Wrap(err, msgCouldNotFetchKeys) } + // if there are no validating keys, wait for some if len(validatingKeys) == 0 { log.Warn(msgNoKeysFetched) - - ticker := time.NewTicker(keyRefetchPeriod) - defer ticker.Stop() - for { - select { - case <-ticker.C: - validatingKeys, err = v.keyManager.FetchValidatingPublicKeys(ctx) - if err != nil { - return errors.Wrap(err, msgCouldNotFetchKeys) - } - if len(validatingKeys) == 0 { - log.Warn(msgNoKeysFetched) - continue - } - case <-ctx.Done(): - log.Debug("Context closed, exiting fetching validating keys") - return ctx.Err() - } - break + select { + case <-ctx.Done(): + log.Debug("Context closed, exiting fetching validating keys") + return ctx.Err() + case <-accountsChangedChan: + // if the accounts changed try it again + return v.internalWaitForActivation(ctx, accountsChangedChan) } } - req := ðpb.ValidatorActivationRequest{ + stream, err := v.validatorClient.WaitForActivation(ctx, ðpb.ValidatorActivationRequest{ PublicKeys: bytesutil.FromBytes48Array(validatingKeys), - } - stream, err := v.validatorClient.WaitForActivation(ctx, req) + }) if err != nil { tracing.AnnotateError(span, err) attempts := streamAttempts(ctx) @@ -96,22 +80,17 @@ func (v *validator) internalWaitForActivation(ctx context.Context, accountsChang return v.internalWaitForActivation(incrementRetries(ctx), accountsChangedChan) } - if err = v.handleAccountsChanged(ctx, accountsChangedChan, &stream, span); err != nil { - return err - } - - v.ticker = slots.NewSlotTicker(time.Unix(int64(v.genesisTime), 0), params.BeaconConfig().SecondsPerSlot) - return nil -} - -func (v *validator) handleAccountsChanged(ctx context.Context, accountsChangedChan <-chan [][fieldparams.BLSPubkeyLength]byte, stream *ethpb.BeaconNodeValidator_WaitForActivationClient, span *trace.Span) error { - for { + someAreActive := false + for !someAreActive { select { + case <-ctx.Done(): + log.Debug("Context closed, exiting fetching validating keys") + return ctx.Err() case <-accountsChangedChan: // Accounts (keys) changed, restart the process. return v.internalWaitForActivation(ctx, accountsChangedChan) default: - res, err := (*stream).Recv() + res, err := (stream).Recv() // retrieve from stream one loop at a time // If the stream is closed, we stop the loop. if errors.Is(err, io.EOF) { break @@ -150,15 +129,10 @@ func (v *validator) handleAccountsChanged(ctx context.Context, accountsChangedCh valCount = int64(valCounts[0].Count) } - valActivated := v.checkAndLogValidatorStatus(statuses, valCount) - if valActivated { - logActiveValidatorStatus(statuses) - } else { - continue - } + someAreActive = v.checkAndLogValidatorStatus(statuses, valCount) } - break } + return nil } diff --git a/validator/client/wait_for_activation_test.go b/validator/client/wait_for_activation_test.go index e28aa2451481..25c8ca39096f 100644 --- a/validator/client/wait_for_activation_test.go +++ b/validator/client/wait_for_activation_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/prysmaticlabs/prysm/v4/config/params" validatorType "github.com/prysmaticlabs/prysm/v4/consensus-types/validator" "github.com/prysmaticlabs/prysm/v4/validator/client/iface" @@ -39,7 +40,7 @@ func TestWaitActivation_ContextCanceled(t *testing.T) { beaconClient: beaconClient, } clientStream := mock.NewMockBeaconNodeValidator_WaitForActivationClient(ctrl) - + ctx, cancel := context.WithCancel(context.Background()) validatorClient.EXPECT().WaitForActivation( gomock.Any(), ðpb.ValidatorActivationRequest{ @@ -49,9 +50,7 @@ func TestWaitActivation_ContextCanceled(t *testing.T) { clientStream.EXPECT().Recv().Return( ðpb.ValidatorActivationResponse{}, nil, - ) - ctx, cancel := context.WithCancel(context.Background()) - cancel() + ).Do(func() { cancel() }) assert.ErrorContains(t, cancelledCtx, v.WaitForActivation(ctx, nil)) } @@ -193,12 +192,11 @@ func TestWaitForActivation_Exiting(t *testing.T) { } func TestWaitForActivation_RefetchKeys(t *testing.T) { - originalPeriod := keyRefetchPeriod - defer func() { - keyRefetchPeriod = originalPeriod - }() - keyRefetchPeriod = 1 * time.Second - + params.SetupTestConfigCleanup(t) + cfg := params.MainnetConfig().Copy() + cfg.ConfigName = "test" + cfg.SecondsPerSlot = 1 + params.OverrideBeaconConfig(cfg) hook := logTest.NewGlobal() ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -207,8 +205,7 @@ func TestWaitForActivation_RefetchKeys(t *testing.T) { prysmBeaconClient := validatormock.NewMockPrysmBeaconChainClient(ctrl) kp := randKeypair(t) - km := newMockKeymanager(t, kp) - km.fetchNoKeys = true + km := newMockKeymanager(t) v := validator{ validatorClient: validatorClient, @@ -233,7 +230,19 @@ func TestWaitForActivation_RefetchKeys(t *testing.T) { clientStream.EXPECT().Recv().Return( resp, nil) - assert.NoError(t, v.internalWaitForActivation(context.Background(), make(chan [][fieldparams.BLSPubkeyLength]byte)), "Could not wait for activation") + accountChan := make(chan [][fieldparams.BLSPubkeyLength]byte) + sub := km.SubscribeAccountChanges(accountChan) + defer func() { + sub.Unsubscribe() + close(accountChan) + }() + // update the accounts after a delay + go func() { + time.Sleep(2 * time.Second) + require.NoError(t, km.add(kp)) + km.SimulateAccountChanges([][48]byte{kp.pub}) + }() + assert.NoError(t, v.internalWaitForActivation(context.Background(), accountChan), "Could not wait for activation") assert.LogsContain(t, hook, msgNoKeysFetched) assert.LogsContain(t, hook, "Validator activated") } @@ -265,7 +274,11 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) { ðpb.ValidatorActivationRequest{ PublicKeys: [][]byte{inactive.pub[:]}, }, - ).Return(inactiveClientStream, nil) + ).DoAndReturn(func(ctx context.Context, in *ethpb.ValidatorActivationRequest) (*mock.MockBeaconNodeValidator_WaitForActivationClient, error) { + //delay a bit so that other key can be added + time.Sleep(time.Second * 2) + return inactiveClientStream, nil + }) prysmBeaconClient.EXPECT().GetValidatorCount( gomock.Any(), "head", @@ -353,7 +366,11 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) { ðpb.ValidatorActivationRequest{ PublicKeys: [][]byte{inactivePubKey[:]}, }, - ).Return(inactiveClientStream, nil) + ).DoAndReturn(func(ctx context.Context, in *ethpb.ValidatorActivationRequest) (*mock.MockBeaconNodeValidator_WaitForActivationClient, error) { + //delay a bit so that other key can be added + time.Sleep(time.Second * 2) + return inactiveClientStream, nil + }) prysmBeaconClient.EXPECT().GetValidatorCount( gomock.Any(), "head", @@ -393,3 +410,40 @@ func TestWaitForActivation_AccountsChanged(t *testing.T) { assert.LogsContain(t, hook, "Validator activated") }) } + +func TestWaitActivation_NotAllValidatorsActivatedOK(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + validatorClient := validatormock.NewMockValidatorClient(ctrl) + beaconClient := validatormock.NewMockBeaconChainClient(ctrl) + prysmBeaconClient := validatormock.NewMockPrysmBeaconChainClient(ctrl) + + kp := randKeypair(t) + v := validator{ + validatorClient: validatorClient, + keyManager: newMockKeymanager(t, kp), + beaconClient: beaconClient, + prysmBeaconClient: prysmBeaconClient, + } + resp := generateMockStatusResponse([][]byte{kp.pub[:]}) + resp.Statuses[0].Status.Status = ethpb.ValidatorStatus_ACTIVE + clientStream := mock.NewMockBeaconNodeValidator_WaitForActivationClient(ctrl) + validatorClient.EXPECT().WaitForActivation( + gomock.Any(), + gomock.Any(), + ).Return(clientStream, nil) + prysmBeaconClient.EXPECT().GetValidatorCount( + gomock.Any(), + "head", + []validatorType.Status{validatorType.Active}, + ).Return([]iface.ValidatorCount{}, nil).Times(2) + clientStream.EXPECT().Recv().Return( + ðpb.ValidatorActivationResponse{}, + nil, + ) + clientStream.EXPECT().Recv().Return( + resp, + nil, + ) + assert.NoError(t, v.WaitForActivation(context.Background(), nil), "Could not wait for activation") +} diff --git a/validator/keymanager/local/keymanager.go b/validator/keymanager/local/keymanager.go index 3b4a2c933f98..4cae2ed8f299 100644 --- a/validator/keymanager/local/keymanager.go +++ b/validator/keymanager/local/keymanager.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "path/filepath" "strings" "sync" @@ -282,18 +283,29 @@ func (km *Keymanager) SaveStoreAndReInitialize(ctx context.Context, store *accou if err != nil { return err } - if err := km.wallet.WriteFileAtPath(ctx, AccountsPath, AccountsKeystoreFileName, encodedAccounts); err != nil { + + existedPreviously, err := km.wallet.WriteFileAtPath(ctx, AccountsPath, AccountsKeystoreFileName, encodedAccounts) + if err != nil { return err } - // Reinitialize account store and cache - // This will update the in-memory information instead of reading from the file itself for safety concerns - km.accountsStore = store - err = km.initializeKeysCachesFromKeystore() - if err != nil { - return errors.Wrap(err, "failed to initialize keys caches") + if existedPreviously { + // Reinitialize account store and cache + // This will update the in-memory information instead of reading from the file itself for safety concerns + km.accountsStore = store + err = km.initializeKeysCachesFromKeystore() + if err != nil { + return errors.Wrap(err, "failed to initialize keys caches") + } + + return nil } - return err + + // manually reload the account from the keystore the first time + km.reloadAccountsFromKeystoreFile(filepath.Join(km.wallet.AccountsDir(), AccountsPath, AccountsKeystoreFileName)) + // listen to account changes of the new file + go km.listenForAccountChanges(ctx) + return nil } // CreateAccountsKeystoreRepresentation is a pure function that takes an accountStore and wallet password and returns the encrypted formatted json version for local writing. diff --git a/validator/keymanager/local/refresh.go b/validator/keymanager/local/refresh.go index 472d42831dd8..eeaa9e766e4a 100644 --- a/validator/keymanager/local/refresh.go +++ b/validator/keymanager/local/refresh.go @@ -26,6 +26,7 @@ func (km *Keymanager) listenForAccountChanges(ctx context.Context) { debounceFileChangesInterval := features.Get().KeystoreImportDebounceInterval accountsFilePath := filepath.Join(km.wallet.AccountsDir(), AccountsPath, AccountsKeystoreFileName) if !file.Exists(accountsFilePath) { + log.Warnf("Starting without accounts located in wallet at %s", accountsFilePath) return } watcher, err := fsnotify.NewWatcher() @@ -56,27 +57,7 @@ func (km *Keymanager) listenForAccountChanges(ctx context.Context) { log.Errorf("Type %T is not a valid file system event", event) return } - fileBytes, err := os.ReadFile(ev.Name) - if err != nil { - log.WithError(err).Errorf("Could not read file at path: %s", ev.Name) - return - } - if fileBytes == nil { - log.WithError(err).Errorf("Loaded in an empty file: %s", ev.Name) - return - } - accountsKeystore := &AccountsKeystoreRepresentation{} - if err := json.Unmarshal(fileBytes, accountsKeystore); err != nil { - log.WithError( - err, - ).Errorf("Could not read valid, EIP-2335 keystore json file at path: %s", ev.Name) - return - } - if err := km.reloadAccountsFromKeystore(accountsKeystore); err != nil { - log.WithError( - err, - ).Error("Could not replace the accounts store from keystore file") - } + km.reloadAccountsFromKeystoreFile(ev.Name) }) for { select { @@ -92,6 +73,34 @@ func (km *Keymanager) listenForAccountChanges(ctx context.Context) { } } +func (km *Keymanager) reloadAccountsFromKeystoreFile(accountsFilePath string) { + if km.wallet == nil { + log.Error("Could not reload accounts because wallet was undefined") + return + } + fileBytes, err := os.ReadFile(filepath.Clean(accountsFilePath)) + if err != nil { + log.WithError(err).Errorf("Could not read file at path: %s", accountsFilePath) + return + } + if fileBytes == nil { + log.WithError(err).Errorf("Loaded in an empty file: %s", accountsFilePath) + return + } + accountsKeystore := &AccountsKeystoreRepresentation{} + if err := json.Unmarshal(fileBytes, accountsKeystore); err != nil { + log.WithError( + err, + ).Errorf("Could not read valid, EIP-2335 keystore json file at path: %s", accountsFilePath) + return + } + if err := km.reloadAccountsFromKeystore(accountsKeystore); err != nil { + log.WithError( + err, + ).Error("Could not replace the accounts store from keystore file") + } +} + // Replaces the accounts store struct in the local keymanager with // the contents of a keystore file by decrypting it with the accounts password. func (km *Keymanager) reloadAccountsFromKeystore(keystore *AccountsKeystoreRepresentation) error { @@ -107,6 +116,7 @@ func (km *Keymanager) reloadAccountsFromKeystore(keystore *AccountsKeystoreRepre if len(newAccountsStore.PublicKeys) != len(newAccountsStore.PrivateKeys) { return errors.New("number of public and private keys in keystore do not match") } + pubKeys := make([][fieldparams.BLSPubkeyLength]byte, len(newAccountsStore.PublicKeys)) for i := 0; i < len(newAccountsStore.PrivateKeys); i++ { privKey, err := bls.SecretKeyFromBytes(newAccountsStore.PrivateKeys[i]) From fe431b9201272220e83565634e5527d33b568c96 Mon Sep 17 00:00:00 2001 From: terence Date: Wed, 17 Jan 2024 00:14:32 -0800 Subject: [PATCH 3/7] Use correct HistoricalRoots (#13477) --- beacon-chain/core/deneb/upgrade.go | 6 +++++- beacon-chain/core/deneb/upgrade_test.go | 7 +++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/beacon-chain/core/deneb/upgrade.go b/beacon-chain/core/deneb/upgrade.go index ea7b06b58e1c..4c8c0d6f5d71 100644 --- a/beacon-chain/core/deneb/upgrade.go +++ b/beacon-chain/core/deneb/upgrade.go @@ -57,6 +57,10 @@ func UpgradeToDeneb(state state.BeaconState) (state.BeaconState, error) { if err != nil { return nil, err } + historicalRoots, err := state.HistoricalRoots() + if err != nil { + return nil, err + } s := ðpb.BeaconStateDeneb{ GenesisTime: state.GenesisTime(), @@ -70,7 +74,7 @@ func UpgradeToDeneb(state state.BeaconState) (state.BeaconState, error) { LatestBlockHeader: state.LatestBlockHeader(), BlockRoots: state.BlockRoots(), StateRoots: state.StateRoots(), - HistoricalRoots: [][]byte{}, + HistoricalRoots: historicalRoots, Eth1Data: state.Eth1Data(), Eth1DataVotes: state.Eth1DataVotes(), Eth1DepositIndex: state.Eth1DepositIndex(), diff --git a/beacon-chain/core/deneb/upgrade_test.go b/beacon-chain/core/deneb/upgrade_test.go index d3518b53fd47..169f1d58289f 100644 --- a/beacon-chain/core/deneb/upgrade_test.go +++ b/beacon-chain/core/deneb/upgrade_test.go @@ -14,6 +14,7 @@ import ( func TestUpgradeToDeneb(t *testing.T) { st, _ := util.DeterministicGenesisStateCapella(t, params.BeaconConfig().MaxValidatorsPerCommittee) + require.NoError(t, st.SetHistoricalRoots([][]byte{{1}})) preForkState := st.Copy() mSt, err := deneb.UpgradeToDeneb(st) require.NoError(t, err) @@ -46,6 +47,12 @@ func TestUpgradeToDeneb(t *testing.T) { require.NoError(t, err) require.DeepSSZEqual(t, make([]uint64, numValidators), s) + hr1, err := preForkState.HistoricalRoots() + require.NoError(t, err) + hr2, err := mSt.HistoricalRoots() + require.NoError(t, err) + require.DeepEqual(t, hr1, hr2) + f := mSt.Fork() require.DeepSSZEqual(t, ðpb.Fork{ PreviousVersion: st.Fork().CurrentVersion, From 87b53db3b46767921801c83428010ab44b9718b5 Mon Sep 17 00:00:00 2001 From: terence Date: Wed, 17 Jan 2024 05:30:31 -0800 Subject: [PATCH 4/7] Capitalize Aggregated Unaggregated Attestations Log (#13473) --- beacon-chain/operations/attestations/prepare_forkchoice.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/beacon-chain/operations/attestations/prepare_forkchoice.go b/beacon-chain/operations/attestations/prepare_forkchoice.go index 1fc610cd1888..8f6242efe2dc 100644 --- a/beacon-chain/operations/attestations/prepare_forkchoice.go +++ b/beacon-chain/operations/attestations/prepare_forkchoice.go @@ -42,7 +42,7 @@ func (s *Service) prepareForkChoiceAtts() { switch slotInterval.Interval { case 0: duration := time.Since(t) - log.WithField("Duration", duration).Debug("aggregated unaggregated attestations") + log.WithField("Duration", duration).Debug("Aggregated unaggregated attestations") batchForkChoiceAttsT1.Observe(float64(duration.Milliseconds())) case 1: batchForkChoiceAttsT2.Observe(float64(time.Since(t).Milliseconds())) From 79bb7efbf882f911fd5f208cf417e0cdf8f86b3c Mon Sep 17 00:00:00 2001 From: Potuz Date: Wed, 17 Jan 2024 12:39:28 -0300 Subject: [PATCH 5/7] Check init sync before getting payload attributes (#13479) * Check init sync before getting payload attributes This PR adds a helper to forkchoice to return the delay of the latest imported block. It also adds a helper with an heuristic to check if the node is during init sync. If the highest imported node was imported with a delay of less than an epoch then the node is considered in regular sync. If on the other hand, in addition the highest imported node is more than two epochs old, then the node is considered in init Sync. The helper to check this only uses forkchoice and therefore requires a read lock. There are four paths to call this 1) During regular block processing, we defer a function to send the second FCU call with attributes. This function may not be called at all if we are not regularly syncing 2) During regular block processing, we check in the path `postBlockProces->getFCUArgs->computePayloadAttributes` the payload attributes if we are syncing a late block. In this case forkchoice is already locked and we add a call in `getFCUArgs` to return early if not regularly syncing 3) During handling of late blocks on `lateBlockTasks` we simply return early if not in regular sync (This is the biggest change as it takes a longer FC lock for lateBlockTasks) 4) On Attestation processing, in UpdateHead, we are already locked so we just add a check to not update head on this path if not regularly syncing. * fix build * Fix mocks --- beacon-chain/blockchain/chain_info.go | 15 ++++++++++++ beacon-chain/blockchain/chain_info_test.go | 23 +++++++++++++++++++ beacon-chain/blockchain/process_block.go | 16 +++++++------ .../blockchain/process_block_helpers.go | 3 +++ .../blockchain/receive_attestation.go | 4 +++- .../forkchoice/doubly-linked-tree/store.go | 14 +++++++++++ .../doubly-linked-tree/store_test.go | 7 ++++-- beacon-chain/forkchoice/interfaces.go | 1 + beacon-chain/forkchoice/ro.go | 7 ++++++ beacon-chain/forkchoice/ro_test.go | 11 +++++++++ 10 files changed, 91 insertions(+), 10 deletions(-) diff --git a/beacon-chain/blockchain/chain_info.go b/beacon-chain/blockchain/chain_info.go index be4682b82082..c7dbf96c2490 100644 --- a/beacon-chain/blockchain/chain_info.go +++ b/beacon-chain/blockchain/chain_info.go @@ -556,3 +556,18 @@ func (s *Service) RecentBlockSlot(root [32]byte) (primitives.Slot, error) { defer s.cfg.ForkChoiceStore.RUnlock() return s.cfg.ForkChoiceStore.Slot(root) } + +// inRegularSync applies the following heuristics to decide if the node is in +// regular sync mode vs init sync mode using only forkchoice. +// It checks that the highest received block is behind the current time by at least 2 epochs +// and that it was imported at least one epoch late if both of these +// tests pass then the node is in init sync. The caller of this function MUST +// have a lock on forkchoice +func (s *Service) inRegularSync() bool { + currentSlot := s.CurrentSlot() + fc := s.cfg.ForkChoiceStore + if currentSlot-fc.HighestReceivedBlockSlot() < 2*params.BeaconConfig().SlotsPerEpoch { + return true + } + return fc.HighestReceivedBlockDelay() < params.BeaconConfig().SlotsPerEpoch +} diff --git a/beacon-chain/blockchain/chain_info_test.go b/beacon-chain/blockchain/chain_info_test.go index 044eb89c8f36..5d1187e73d2d 100644 --- a/beacon-chain/blockchain/chain_info_test.go +++ b/beacon-chain/blockchain/chain_info_test.go @@ -593,3 +593,26 @@ func TestService_IsFinalized(t *testing.T) { require.Equal(t, true, c.IsFinalized(ctx, br)) require.Equal(t, false, c.IsFinalized(ctx, [32]byte{'c'})) } + +func TestService_inRegularSync(t *testing.T) { + ctx := context.Background() + c := &Service{cfg: &config{ForkChoiceStore: doublylinkedtree.New()}, head: &head{root: [32]byte{'b'}}} + ojc := ðpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]} + ofc := ðpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]} + st, blkRoot, err := prepareForkchoiceState(ctx, 100, [32]byte{'a'}, [32]byte{}, params.BeaconConfig().ZeroHash, ojc, ofc) + require.NoError(t, err) + require.NoError(t, c.cfg.ForkChoiceStore.InsertNode(ctx, st, blkRoot)) + require.Equal(t, false, c.inRegularSync()) + c.SetGenesisTime(time.Now().Add(time.Second * time.Duration(-1*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)))) + st, blkRoot, err = prepareForkchoiceState(ctx, 128, [32]byte{'b'}, [32]byte{'a'}, params.BeaconConfig().ZeroHash, ojc, ofc) + require.NoError(t, err) + require.NoError(t, c.cfg.ForkChoiceStore.InsertNode(ctx, st, blkRoot)) + require.Equal(t, false, c.inRegularSync()) + + c.SetGenesisTime(time.Now().Add(time.Second * time.Duration(-5*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)))) + require.Equal(t, true, c.inRegularSync()) + + c.SetGenesisTime(time.Now().Add(time.Second * time.Duration(-1*int64(params.BeaconConfig().SlotsPerEpoch)*int64(params.BeaconConfig().SecondsPerSlot)))) + c.cfg.ForkChoiceStore.SetGenesisTime(uint64(time.Now().Unix())) + require.Equal(t, true, c.inRegularSync()) +} diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index 824f213f77e2..ad68205d665e 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -67,7 +67,9 @@ func (s *Service) postBlockProcess(cfg *postBlockProcessConfig) error { startTime := time.Now() fcuArgs := &fcuConfig{} - defer s.handleSecondFCUCall(cfg, fcuArgs) + if s.inRegularSync() { + defer s.handleSecondFCUCall(cfg, fcuArgs) + } defer s.sendLightClientFeeds(cfg) defer s.sendStateFeedOnBlock(cfg) defer reportProcessingTime(startTime) @@ -580,10 +582,15 @@ func (s *Service) lateBlockTasks(ctx context.Context) { if s.CurrentSlot() == s.HeadSlot() { return } + s.cfg.ForkChoiceStore.RLock() + defer s.cfg.ForkChoiceStore.RUnlock() + // return early if we are in init sync + if !s.inRegularSync() { + return + } s.cfg.StateNotifier.StateFeed().Send(&feed.Event{ Type: statefeed.MissedSlot, }) - s.headLock.RLock() headRoot := s.headRoot() headState := s.headState(ctx) @@ -598,12 +605,9 @@ func (s *Service) lateBlockTasks(ctx context.Context) { if err := transition.UpdateNextSlotCache(ctx, lastRoot, lastState); err != nil { log.WithError(err).Debug("could not update next slot state cache") } - // handleEpochBoundary requires a forkchoice lock to obtain the target root. - s.cfg.ForkChoiceStore.RLock() if err := s.handleEpochBoundary(ctx, currentSlot, headState, headRoot[:]); err != nil { log.WithError(err).Error("lateBlockTasks: could not update epoch boundary caches") } - s.cfg.ForkChoiceStore.RUnlock() // return early if we already started building a block for the current // head root _, has := s.cfg.PayloadIDCache.PayloadID(s.CurrentSlot()+1, headRoot) @@ -629,9 +633,7 @@ func (s *Service) lateBlockTasks(ctx context.Context) { if fcuArgs.attributes.IsEmpty() { return } - s.cfg.ForkChoiceStore.RLock() _, err = s.notifyForkchoiceUpdate(ctx, fcuArgs) - s.cfg.ForkChoiceStore.RUnlock() if err != nil { log.WithError(err).Debug("could not perform late block tasks: failed to update forkchoice with engine") } diff --git a/beacon-chain/blockchain/process_block_helpers.go b/beacon-chain/blockchain/process_block_helpers.go index bfbc407da86f..db84896b65f7 100644 --- a/beacon-chain/blockchain/process_block_helpers.go +++ b/beacon-chain/blockchain/process_block_helpers.go @@ -37,6 +37,9 @@ func (s *Service) getFCUArgs(cfg *postBlockProcessConfig, fcuArgs *fcuConfig) er if err := s.getFCUArgsEarlyBlock(cfg, fcuArgs); err != nil { return err } + if !s.inRegularSync() { + return nil + } slot := cfg.signed.Block().Slot() if slots.WithinVotingWindow(uint64(s.genesisTime.Unix()), slot) { return nil diff --git a/beacon-chain/blockchain/receive_attestation.go b/beacon-chain/blockchain/receive_attestation.go index bdfd666ca0ef..db5c5de9c476 100644 --- a/beacon-chain/blockchain/receive_attestation.go +++ b/beacon-chain/blockchain/receive_attestation.go @@ -150,7 +150,9 @@ func (s *Service) UpdateHead(ctx context.Context, proposingSlot primitives.Slot) headBlock: headBlock, proposingSlot: proposingSlot, } - fcuArgs.attributes = s.getPayloadAttribute(ctx, headState, proposingSlot, newHeadRoot[:]) + if s.inRegularSync() { + fcuArgs.attributes = s.getPayloadAttribute(ctx, headState, proposingSlot, newHeadRoot[:]) + } if fcuArgs.attributes != nil && s.shouldOverrideFCU(newHeadRoot, proposingSlot) { return } diff --git a/beacon-chain/forkchoice/doubly-linked-tree/store.go b/beacon-chain/forkchoice/doubly-linked-tree/store.go index 79eb00f9ae0d..da98f1b8cbaf 100644 --- a/beacon-chain/forkchoice/doubly-linked-tree/store.go +++ b/beacon-chain/forkchoice/doubly-linked-tree/store.go @@ -240,6 +240,20 @@ func (f *ForkChoice) HighestReceivedBlockSlot() primitives.Slot { return f.store.highestReceivedNode.slot } +// HighestReceivedBlockSlotDelay returns the number of slots that the highest +// received block was late when receiving it +func (f *ForkChoice) HighestReceivedBlockDelay() primitives.Slot { + n := f.store.highestReceivedNode + if n == nil { + return 0 + } + secs, err := slots.SecondsSinceSlotStart(n.slot, f.store.genesisTime, n.timestamp) + if err != nil { + return 0 + } + return primitives.Slot(secs / params.BeaconConfig().SecondsPerSlot) +} + // ReceivedBlocksLastEpoch returns the number of blocks received in the last epoch func (f *ForkChoice) ReceivedBlocksLastEpoch() (uint64, error) { count := uint64(0) diff --git a/beacon-chain/forkchoice/doubly-linked-tree/store_test.go b/beacon-chain/forkchoice/doubly-linked-tree/store_test.go index b38d3686c4a3..580154a318a7 100644 --- a/beacon-chain/forkchoice/doubly-linked-tree/store_test.go +++ b/beacon-chain/forkchoice/doubly-linked-tree/store_test.go @@ -333,26 +333,29 @@ func TestForkChoice_ReceivedBlocksLastEpoch(t *testing.T) { require.NoError(t, err) require.Equal(t, uint64(1), count) require.Equal(t, primitives.Slot(1), f.HighestReceivedBlockSlot()) + require.Equal(t, primitives.Slot(0), f.HighestReceivedBlockDelay()) // 64 // Received block last epoch is 1 _, err = s.insert(context.Background(), 64, [32]byte{'A'}, b, b, 1, 1) require.NoError(t, err) - s.genesisTime = uint64(time.Now().Add(time.Duration(-64*int64(params.BeaconConfig().SecondsPerSlot)) * time.Second).Unix()) + s.genesisTime = uint64(time.Now().Add(time.Duration((-64*int64(params.BeaconConfig().SecondsPerSlot))-1) * time.Second).Unix()) count, err = f.ReceivedBlocksLastEpoch() require.NoError(t, err) require.Equal(t, uint64(1), count) require.Equal(t, primitives.Slot(64), f.HighestReceivedBlockSlot()) + require.Equal(t, primitives.Slot(0), f.HighestReceivedBlockDelay()) // 64 65 // Received block last epoch is 2 _, err = s.insert(context.Background(), 65, [32]byte{'B'}, b, b, 1, 1) require.NoError(t, err) - s.genesisTime = uint64(time.Now().Add(time.Duration(-65*int64(params.BeaconConfig().SecondsPerSlot)) * time.Second).Unix()) + s.genesisTime = uint64(time.Now().Add(time.Duration(-66*int64(params.BeaconConfig().SecondsPerSlot)) * time.Second).Unix()) count, err = f.ReceivedBlocksLastEpoch() require.NoError(t, err) require.Equal(t, uint64(2), count) require.Equal(t, primitives.Slot(65), f.HighestReceivedBlockSlot()) + require.Equal(t, primitives.Slot(1), f.HighestReceivedBlockDelay()) // 64 65 66 // Received block last epoch is 3 diff --git a/beacon-chain/forkchoice/interfaces.go b/beacon-chain/forkchoice/interfaces.go index 8ce0a80504cf..dd4bb65c5701 100644 --- a/beacon-chain/forkchoice/interfaces.go +++ b/beacon-chain/forkchoice/interfaces.go @@ -64,6 +64,7 @@ type FastGetter interface { FinalizedPayloadBlockHash() [32]byte HasNode([32]byte) bool HighestReceivedBlockSlot() primitives.Slot + HighestReceivedBlockDelay() primitives.Slot IsCanonical(root [32]byte) bool IsOptimistic(root [32]byte) (bool, error) IsViableForCheckpoint(*forkchoicetypes.Checkpoint) (bool, error) diff --git a/beacon-chain/forkchoice/ro.go b/beacon-chain/forkchoice/ro.go index 995bcd33e946..6d9caddd2fc0 100644 --- a/beacon-chain/forkchoice/ro.go +++ b/beacon-chain/forkchoice/ro.go @@ -114,6 +114,13 @@ func (ro *ROForkChoice) HighestReceivedBlockSlot() primitives.Slot { return ro.getter.HighestReceivedBlockSlot() } +// HighestReceivedBlockDelay delegates to the underlying forkchoice call, under a lock. +func (ro *ROForkChoice) HighestReceivedBlockDelay() primitives.Slot { + ro.l.RLock() + defer ro.l.RUnlock() + return ro.getter.HighestReceivedBlockDelay() +} + // ReceivedBlocksLastEpoch delegates to the underlying forkchoice call, under a lock. func (ro *ROForkChoice) ReceivedBlocksLastEpoch() (uint64, error) { ro.l.RLock() diff --git a/beacon-chain/forkchoice/ro_test.go b/beacon-chain/forkchoice/ro_test.go index 5cbbb34ef4ca..8bf94071e222 100644 --- a/beacon-chain/forkchoice/ro_test.go +++ b/beacon-chain/forkchoice/ro_test.go @@ -29,6 +29,7 @@ const ( unrealizedJustifiedPayloadBlockHashCalled nodeCountCalled highestReceivedBlockSlotCalled + highestReceivedBlockDelayCalled receivedBlocksLastEpochCalled weightCalled isOptimisticCalled @@ -113,6 +114,11 @@ func TestROLocking(t *testing.T) { call: highestReceivedBlockSlotCalled, cb: func(g FastGetter) { g.HighestReceivedBlockSlot() }, }, + { + name: "highestReceivedBlockDelayCalled", + call: highestReceivedBlockDelayCalled, + cb: func(g FastGetter) { g.HighestReceivedBlockDelay() }, + }, { name: "receivedBlocksLastEpochCalled", call: receivedBlocksLastEpochCalled, @@ -245,6 +251,11 @@ func (ro *mockROForkchoice) HighestReceivedBlockSlot() primitives.Slot { return 0 } +func (ro *mockROForkchoice) HighestReceivedBlockDelay() primitives.Slot { + ro.calls = append(ro.calls, highestReceivedBlockDelayCalled) + return 0 +} + func (ro *mockROForkchoice) ReceivedBlocksLastEpoch() (uint64, error) { ro.calls = append(ro.calls, receivedBlocksLastEpochCalled) return 0, nil From 93aba997f4d2a767811b136db52ec065293294e3 Mon Sep 17 00:00:00 2001 From: terence Date: Wed, 17 Jan 2024 10:42:56 -0800 Subject: [PATCH 6/7] Move checking of attribute empty earlier (#13465) --- beacon-chain/blockchain/process_block.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index ad68205d665e..152cd19adf80 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -614,6 +614,13 @@ func (s *Service) lateBlockTasks(ctx context.Context) { if has { return } + + attribute := s.getPayloadAttribute(ctx, headState, s.CurrentSlot()+1, headRoot[:]) + // return early if we are not proposing next slot + if attribute.IsEmpty() { + return + } + s.headLock.RLock() headBlock, err := s.headBlock() if err != nil { @@ -624,14 +631,10 @@ func (s *Service) lateBlockTasks(ctx context.Context) { s.headLock.RUnlock() fcuArgs := &fcuConfig{ - headState: headState, - headRoot: headRoot, - headBlock: headBlock, - } - fcuArgs.attributes = s.getPayloadAttribute(ctx, headState, s.CurrentSlot()+1, headRoot[:]) - // return early if we are not proposing next slot - if fcuArgs.attributes.IsEmpty() { - return + headState: headState, + headRoot: headRoot, + headBlock: headBlock, + attributes: attribute, } _, err = s.notifyForkchoiceUpdate(ctx, fcuArgs) if err != nil { From c3dbfa66d090ac40818f5ddc5a229599c78db8ab Mon Sep 17 00:00:00 2001 From: terence Date: Wed, 17 Jan 2024 15:28:42 -0800 Subject: [PATCH 7/7] Change blob latency metrics to ms (#13481) --- beacon-chain/db/filesystem/blob.go | 4 ++-- beacon-chain/db/filesystem/metrics.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/beacon-chain/db/filesystem/blob.go b/beacon-chain/db/filesystem/blob.go index 4482bf0ab916..adc07f329dbc 100644 --- a/beacon-chain/db/filesystem/blob.go +++ b/beacon-chain/db/filesystem/blob.go @@ -156,7 +156,7 @@ func (bs *BlobStorage) Save(sidecar blocks.VerifiedROBlob) error { } partialMoved = true blobsWrittenCounter.Inc() - blobSaveLatency.Observe(time.Since(startTime).Seconds()) + blobSaveLatency.Observe(float64(time.Since(startTime).Milliseconds())) return nil } @@ -180,7 +180,7 @@ func (bs *BlobStorage) Get(root [32]byte, idx uint64) (blocks.VerifiedROBlob, er return blocks.VerifiedROBlob{}, err } defer func() { - blobFetchLatency.Observe(time.Since(startTime).Seconds()) + blobFetchLatency.Observe(float64(time.Since(startTime).Milliseconds())) }() return verification.BlobSidecarNoop(ro) } diff --git a/beacon-chain/db/filesystem/metrics.go b/beacon-chain/db/filesystem/metrics.go index d3be677064a9..1e8791c879bd 100644 --- a/beacon-chain/db/filesystem/metrics.go +++ b/beacon-chain/db/filesystem/metrics.go @@ -6,15 +6,15 @@ import ( ) var ( - blobBuckets = []float64{0.00003, 0.00005, 0.00007, 0.00009, 0.00011, 0.00013, 0.00015} + blobBuckets = []float64{3, 5, 7, 9, 11, 13} blobSaveLatency = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "blob_storage_save_latency", - Help: "Latency of BlobSidecar storage save operations in seconds", + Help: "Latency of BlobSidecar storage save operations in milliseconds", Buckets: blobBuckets, }) blobFetchLatency = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "blob_storage_get_latency", - Help: "Latency of BlobSidecar storage get operations in seconds", + Help: "Latency of BlobSidecar storage get operations in milliseconds", Buckets: blobBuckets, }) blobsPrunedCounter = promauto.NewCounter(prometheus.CounterOpts{