From 9900a0bc9a925ec2abfce7cd5ff376f369fd61e4 Mon Sep 17 00:00:00 2001 From: iurii Date: Sun, 3 Nov 2024 21:10:21 +0200 Subject: [PATCH] rework evenySyncer initialization/startup --- cli/operator/node.go | 247 ++++++++++++------------- cli/operator/node_test.go | 19 +- eth/eventhandler/task_executor_test.go | 10 +- 3 files changed, 129 insertions(+), 147 deletions(-) diff --git a/cli/operator/node.go b/cli/operator/node.go index e6aeb9ed4b..ca67a7711a 100644 --- a/cli/operator/node.go +++ b/cli/operator/node.go @@ -103,6 +103,8 @@ var StartNodeCmd = &cobra.Command{ Use: "start-node", Short: "Starts an instance of SSV node", Run: func(cmd *cobra.Command, args []string) { + ctx := cmd.Context() + commons.SetBuildData(cmd.Parent().Short, cmd.Parent().Version) err := globalcfg.Prepare(&cfg, &globalArgs) @@ -136,7 +138,7 @@ var StartNodeCmd = &cobra.Command{ if err != nil { logger.Fatal("could not setup network", zap.Error(err)) } - cfg.DBOptions.Ctx = cmd.Context() + cfg.DBOptions.Ctx = ctx db, err := setupDB(logger, networkConfig.Beacon.GetNetwork()) if err != nil { logger.Fatal("could not setup db", zap.Error(err)) @@ -180,7 +182,8 @@ var StartNodeCmd = &cobra.Command{ operatorDataStore := operatordatastore.New(operatorData) usingLocalEvents := len(cfg.LocalEventsPath) != 0 - if err := validateConfig(nodeStorage, networkConfig.AlanForkNetworkName(), usingLocalEvents); err != nil { + + if err := checkCfgCompatibility(nodeStorage, networkConfig.AlanForkNetworkName(), usingLocalEvents); err != nil { logger.Fatal("failed to validate config", zap.Error(err)) } @@ -194,7 +197,7 @@ var StartNodeCmd = &cobra.Command{ logger.Fatal("could not create new eth-key-manager signer", zap.Error(err)) } - cfg.P2pNetworkConfig.Ctx = cmd.Context() + cfg.P2pNetworkConfig.Ctx = ctx slotTickerProvider := func() slotticker.SlotTicker { return slotticker.New(logger, slotticker.Config{ @@ -203,14 +206,14 @@ var StartNodeCmd = &cobra.Command{ }) } - cfg.ConsensusClient.Context = cmd.Context() + cfg.ConsensusClient.Context = ctx cfg.ConsensusClient.GasLimit = spectypes.DefaultGasLimit cfg.ConsensusClient.Network = networkConfig.Beacon.GetNetwork() consensusClient := setupConsensusClient(logger, operatorDataStore, slotTickerProvider) executionClient, err := executionclient.New( - cmd.Context(), + ctx, cfg.ExecutionClient.Addr, ethcommon.HexToAddress(networkConfig.RegistryContractAddr), executionclient.WithLogger(logger), @@ -230,7 +233,7 @@ var StartNodeCmd = &cobra.Command{ cfg.P2pNetworkConfig.FullNode = cfg.SSVOptions.ValidatorOptions.FullNode cfg.P2pNetworkConfig.Network = networkConfig - validatorsMap := validators.New(cmd.Context()) + validatorsMap := validators.New(ctx) dutyStore := dutystore.New() cfg.SSVOptions.DutyStore = dutyStore @@ -272,7 +275,7 @@ var StartNodeCmd = &cobra.Command{ p2pNetwork, genesisP2pNetwork := setupP2P(logger, db, metricsReporter) - cfg.SSVOptions.Context = cmd.Context() + cfg.SSVOptions.Context = ctx cfg.SSVOptions.DB = db cfg.SSVOptions.BeaconNode = consensusClient cfg.SSVOptions.ExecutionClient = executionClient @@ -280,7 +283,7 @@ var StartNodeCmd = &cobra.Command{ cfg.SSVOptions.P2PNetwork = p2pNetwork cfg.SSVOptions.ValidatorOptions.NetworkConfig = networkConfig cfg.SSVOptions.ValidatorOptions.BeaconNetwork = networkConfig.Beacon.GetNetwork() - cfg.SSVOptions.ValidatorOptions.Context = cmd.Context() + cfg.SSVOptions.ValidatorOptions.Context = ctx cfg.SSVOptions.ValidatorOptions.DB = db cfg.SSVOptions.ValidatorOptions.Network = p2pNetwork cfg.SSVOptions.ValidatorOptions.Beacon = consensusClient @@ -297,7 +300,7 @@ var StartNodeCmd = &cobra.Command{ cfg.SSVOptions.ValidatorOptions.GenesisControllerOptions.KeyManager = &ekm.GenesisKeyManagerAdapter{KeyManager: keyManager} if cfg.WsAPIPort != 0 { - ws := exporterapi.NewWsServer(cmd.Context(), nil, http.NewServeMux(), cfg.WithPing) + ws := exporterapi.NewWsServer(ctx, nil, http.NewServeMux(), cfg.WithPing) cfg.SSVOptions.WS = ws cfg.SSVOptions.WsAPIPort = cfg.WsAPIPort cfg.SSVOptions.ValidatorOptions.NewDecidedHandler = decided.NewStreamPublisher(logger, ws) @@ -355,7 +358,7 @@ var StartNodeCmd = &cobra.Command{ operatorNode = operator.New(logger, cfg.SSVOptions, slotTickerProvider, storageMap) if cfg.MetricsAPIPort > 0 { - go startMetricsHandler(cmd.Context(), logger, db, metricsReporter, cfg.MetricsAPIPort, cfg.EnableProfile) + go startMetricsHandler(ctx, logger, db, metricsReporter, cfg.MetricsAPIPort, cfg.EnableProfile) } nodeProber := nodeprobe.NewProber( @@ -372,23 +375,57 @@ var StartNodeCmd = &cobra.Command{ nodeprobe.ConsensusClientNode: consensusClient, }, ) - if len(cfg.LocalEventsPath) == 0 { - eventSyncer := setupEventHandling( + + eventFilterer, err := executionClient.Filterer() + if err != nil { + logger.Fatal("failed to set up event filterer", zap.Error(err)) + } + eventHandler, err := eventhandler.New( + nodeStorage, + eventparser.New(eventFilterer), + validatorCtrl, + networkConfig, + operatorDataStore, + operatorPrivKey, + keyManager, + cfg.SSVOptions.ValidatorOptions.Beacon, + eventhandler.WithFullNode(), + eventhandler.WithLogger(logger), + eventhandler.WithMetrics(metricsReporter), + ) + if err != nil { + logger.Fatal("failed to setup event data handler", zap.Error(err)) + } + + // load & parse local events yaml if exists, otherwise sync from contract + if usingLocalEvents { + localEvents, err := localevents.Load(cfg.LocalEventsPath) + if err != nil { + logger.Fatal("failed to load local events", zap.Error(err)) + } + if err := eventHandler.HandleLocalEvents(localEvents); err != nil { + logger.Fatal("error occurred while running event data handler", zap.Error(err)) + } + } else { + eventSyncer := eventsyncer.New( + nodeStorage, + executionClient, + eventHandler, + eventsyncer.WithLogger(logger), + eventsyncer.WithMetrics(metricsReporter), + ) + startEventSyncer( cmd.Context(), logger, - executionClient, - validatorCtrl, - metricsReporter, + eventSyncer, networkConfig, nodeStorage, operatorDataStore, - operatorPrivKey, - keyManager, ) nodeProber.AddNode(nodeprobe.EventSyncerNode, eventSyncer) } - nodeProber.Start(cmd.Context()) + nodeProber.Start(ctx) nodeProber.Wait() logger.Info("Ethereum nodes are healthy") @@ -438,12 +475,11 @@ var StartNodeCmd = &cobra.Command{ }, } -func validateConfig(nodeStorage operatorstorage.Storage, networkName string, usingLocalEvents bool) error { +func checkCfgCompatibility(nodeStorage operatorstorage.Storage, networkName string, usingLocalEvents bool) error { storedConfig, foundConfig, err := nodeStorage.GetConfig(nil) if err != nil { return fmt.Errorf("failed to get stored config: %w", err) } - currentConfig := &operatorstorage.ConfigLock{ NetworkName: networkName, UsingLocalEvents: usingLocalEvents, @@ -454,7 +490,6 @@ func validateConfig(nodeStorage operatorstorage.Storage, networkName string, usi return fmt.Errorf("incompatible config change: %w", err) } } else { - if err := nodeStorage.SaveConfig(nil, currentConfig); err != nil { return fmt.Errorf("failed to store config: %w", err) } @@ -661,135 +696,85 @@ func setupConsensusClient( return cl } -func setupEventHandling( +func startMetricsHandler(ctx context.Context, logger *zap.Logger, db basedb.Database, metricsReporter metricsreporter.MetricsReporter, port int, enableProf bool) { + logger = logger.Named(logging.NameMetricsHandler) + // init and start HTTP handler + metricsHandler := metrics.NewMetricsHandler(ctx, db, metricsReporter, enableProf, operatorNode.(metrics.HealthChecker)) + addr := fmt.Sprintf(":%d", port) + if err := metricsHandler.Start(logger, http.NewServeMux(), addr); err != nil { + logger.Panic("failed to serve metrics", zap.Error(err)) + } +} + +func startEventSyncer( ctx context.Context, logger *zap.Logger, - executionClient *executionclient.ExecutionClient, - validatorCtrl validator.Controller, - metricsReporter metricsreporter.MetricsReporter, + eventSyncer *eventsyncer.EventSyncer, networkConfig networkconfig.NetworkConfig, nodeStorage operatorstorage.Storage, operatorDataStore operatordatastore.OperatorDataStore, - operatorDecrypter keys.OperatorDecrypter, - keyManager ekm.KeyManager, -) *eventsyncer.EventSyncer { - eventFilterer, err := executionClient.Filterer() - if err != nil { - logger.Fatal("failed to set up event filterer", zap.Error(err)) - } - - eventParser := eventparser.New(eventFilterer) - - eventHandler, err := eventhandler.New( - nodeStorage, - eventParser, - validatorCtrl, - networkConfig, - operatorDataStore, - operatorDecrypter, - keyManager, - cfg.SSVOptions.ValidatorOptions.Beacon, - eventhandler.WithFullNode(), - eventhandler.WithLogger(logger), - eventhandler.WithMetrics(metricsReporter), - ) - if err != nil { - logger.Fatal("failed to setup event data handler", zap.Error(err)) - } - - eventSyncer := eventsyncer.New( - nodeStorage, - executionClient, - eventHandler, - eventsyncer.WithLogger(logger), - eventsyncer.WithMetrics(metricsReporter), - ) - +) { fromBlock, found, err := nodeStorage.GetLastProcessedBlock(nil) if err != nil { logger.Fatal("syncing registry contract events failed, could not get last processed block", zap.Error(err)) } if !found { fromBlock = networkConfig.RegistrySyncOffset - } else if fromBlock == nil { + } + if fromBlock == nil { logger.Fatal("syncing registry contract events failed, last processed block is nil") - } else { - // Start syncing from the next block. - fromBlock = new(big.Int).SetUint64(fromBlock.Uint64() + 1) } + // Start syncing from the next block. + fromBlock = new(big.Int).SetUint64(fromBlock.Uint64() + 1) - // load & parse local events yaml if exists, otherwise sync from contract - if len(cfg.LocalEventsPath) != 0 { - localEvents, err := localevents.Load(cfg.LocalEventsPath) - if err != nil { - logger.Fatal("failed to load local events", zap.Error(err)) - } + logger.Debug("syncing historical registry events", zap.Uint64("fromBlock", fromBlock.Uint64())) - if err := eventHandler.HandleLocalEvents(localEvents); err != nil { - logger.Fatal("error occurred while running event data handler", zap.Error(err)) - } - } else { - // Sync historical registry events. - logger.Debug("syncing historical registry events", zap.Uint64("fromBlock", fromBlock.Uint64())) - lastProcessedBlock, err := eventSyncer.SyncHistory(ctx, fromBlock.Uint64()) - switch { - case errors.Is(err, executionclient.ErrNothingToSync): - // Nothing was synced, keep fromBlock as is. - case err == nil: - // Advance fromBlock to the block after lastProcessedBlock. - fromBlock = new(big.Int).SetUint64(lastProcessedBlock + 1) - default: - logger.Fatal("failed to sync historical registry events", zap.Error(err)) - } - - // Print registry stats. - shares := nodeStorage.Shares().List(nil) - operators, err := nodeStorage.ListOperators(nil, 0, 0) - if err != nil { - logger.Error("failed to get operators", zap.Error(err)) - } + lastProcessedBlock, err := eventSyncer.SyncHistory(ctx, fromBlock.Uint64()) + switch { + case errors.Is(err, executionclient.ErrNothingToSync): + // Nothing was synced, keep fromBlock as is. + case err == nil: + // Advance fromBlock to the block after lastProcessedBlock. + fromBlock = new(big.Int).SetUint64(lastProcessedBlock + 1) + default: + logger.Fatal("failed to sync historical registry events", zap.Error(err)) + } - operatorValidators := 0 - liquidatedValidators := 0 - operatorID := operatorDataStore.GetOperatorID() - if operatorDataStore.OperatorIDReady() { - for _, share := range shares { - if share.BelongsToOperator(operatorID) { - operatorValidators++ - } - if share.Liquidated { - liquidatedValidators++ - } + // Print registry stats. + shares := nodeStorage.Shares().List(nil) + operators, err := nodeStorage.ListOperators(nil, 0, 0) + if err != nil { + logger.Error("failed to get operators", zap.Error(err)) + } + + operatorValidators := 0 + liquidatedValidators := 0 + operatorID := operatorDataStore.GetOperatorID() + if operatorDataStore.OperatorIDReady() { + for _, share := range shares { + if share.BelongsToOperator(operatorID) { + operatorValidators++ + } + if share.Liquidated { + liquidatedValidators++ } } - logger.Info("historical registry sync stats", - zap.Uint64("my_operator_id", operatorID), - zap.Int("operators", len(operators)), - zap.Int("validators", len(shares)), - zap.Int("liquidated_validators", liquidatedValidators), - zap.Int("my_validators", operatorValidators), - ) - - // Sync ongoing registry events in the background. - go func() { - err = eventSyncer.SyncOngoing(ctx, fromBlock.Uint64()) - - // Crash if ongoing sync has stopped, regardless of the reason. - logger.Fatal("failed syncing ongoing registry events", - zap.Uint64("last_processed_block", lastProcessedBlock), - zap.Error(err)) - }() } + logger.Info("historical registry sync stats", + zap.Uint64("my_operator_id", operatorID), + zap.Int("operators", len(operators)), + zap.Int("validators", len(shares)), + zap.Int("liquidated_validators", liquidatedValidators), + zap.Int("my_validators", operatorValidators), + ) - return eventSyncer -} + // Sync ongoing registry events in the background. + go func() { + err = eventSyncer.SyncOngoing(ctx, fromBlock.Uint64()) -func startMetricsHandler(ctx context.Context, logger *zap.Logger, db basedb.Database, metricsReporter metricsreporter.MetricsReporter, port int, enableProf bool) { - logger = logger.Named(logging.NameMetricsHandler) - // init and start HTTP handler - metricsHandler := metrics.NewMetricsHandler(ctx, db, metricsReporter, enableProf, operatorNode.(metrics.HealthChecker)) - addr := fmt.Sprintf(":%d", port) - if err := metricsHandler.Start(logger, http.NewServeMux(), addr); err != nil { - logger.Panic("failed to serve metrics", zap.Error(err)) - } + // Crash if ongoing sync has stopped, regardless of the reason. + logger.Fatal("failed syncing ongoing registry events", + zap.Uint64("last_processed_block", lastProcessedBlock), + zap.Error(err)) + }() } diff --git a/cli/operator/node_test.go b/cli/operator/node_test.go index 297a6f61fd..67d34a68ed 100644 --- a/cli/operator/node_test.go +++ b/cli/operator/node_test.go @@ -3,14 +3,13 @@ package operator import ( "testing" - "github.com/stretchr/testify/require" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "github.com/ssvlabs/ssv/networkconfig" operatorstorage "github.com/ssvlabs/ssv/operator/storage" "github.com/ssvlabs/ssv/storage/basedb" "github.com/ssvlabs/ssv/storage/kv" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" ) func Test_verifyConfig(t *testing.T) { @@ -29,7 +28,7 @@ func Test_verifyConfig(t *testing.T) { NetworkName: testNetworkName, UsingLocalEvents: true, } - require.NoError(t, validateConfig(nodeStorage, c.NetworkName, c.UsingLocalEvents)) + require.NoError(t, checkCfgCompatibility(nodeStorage, c.NetworkName, c.UsingLocalEvents)) storedConfig, found, err := nodeStorage.GetConfig(nil) require.NoError(t, err) @@ -45,7 +44,7 @@ func Test_verifyConfig(t *testing.T) { UsingLocalEvents: true, } require.NoError(t, nodeStorage.SaveConfig(nil, c)) - require.NoError(t, validateConfig(nodeStorage, c.NetworkName, c.UsingLocalEvents)) + require.NoError(t, checkCfgCompatibility(nodeStorage, c.NetworkName, c.UsingLocalEvents)) storedConfig, found, err := nodeStorage.GetConfig(nil) require.NoError(t, err) @@ -62,7 +61,7 @@ func Test_verifyConfig(t *testing.T) { } require.NoError(t, nodeStorage.SaveConfig(nil, c)) require.ErrorContains(t, - validateConfig(nodeStorage, testNetworkName, true), + checkCfgCompatibility(nodeStorage, testNetworkName, true), "incompatible config change: network mismatch. Stored network testnet:alan1 does not match current network testnet:alan. The database must be removed or reinitialized", ) @@ -81,7 +80,7 @@ func Test_verifyConfig(t *testing.T) { } require.NoError(t, nodeStorage.SaveConfig(nil, c)) require.ErrorContains(t, - validateConfig(nodeStorage, testNetworkName, c.UsingLocalEvents), + checkCfgCompatibility(nodeStorage, testNetworkName, c.UsingLocalEvents), "incompatible config change: network mismatch. Stored network testnet:alan1 does not match current network testnet:alan. The database must be removed or reinitialized", ) @@ -100,7 +99,7 @@ func Test_verifyConfig(t *testing.T) { } require.NoError(t, nodeStorage.SaveConfig(nil, c)) require.ErrorContains(t, - validateConfig(nodeStorage, c.NetworkName, true), + checkCfgCompatibility(nodeStorage, c.NetworkName, true), "incompatible config change: enabling local events is not allowed. The database must be removed or reinitialized", ) @@ -119,7 +118,7 @@ func Test_verifyConfig(t *testing.T) { } require.NoError(t, nodeStorage.SaveConfig(nil, c)) require.ErrorContains(t, - validateConfig(nodeStorage, c.NetworkName, false), + checkCfgCompatibility(nodeStorage, c.NetworkName, false), "incompatible config change: disabling local events is not allowed. The database must be removed or reinitialized", ) diff --git a/eth/eventhandler/task_executor_test.go b/eth/eventhandler/task_executor_test.go index a944b75361..aae8db67ca 100644 --- a/eth/eventhandler/task_executor_test.go +++ b/eth/eventhandler/task_executor_test.go @@ -5,18 +5,16 @@ import ( "encoding/binary" "testing" - "go.uber.org/mock/gomock" - ethcommon "github.com/ethereum/go-ethereum/common" ethtypes "github.com/ethereum/go-ethereum/core/types" spectypes "github.com/ssvlabs/ssv-spec/types" - "github.com/stretchr/testify/require" - "go.uber.org/zap" - "go.uber.org/zap/zaptest/observer" - "github.com/ssvlabs/ssv/eth/executionclient" ssvtypes "github.com/ssvlabs/ssv/protocol/v2/types" "github.com/ssvlabs/ssv/registry/storage" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" ) // const rawOperatorAdded = `{