Skip to content

Commit

Permalink
rework evenySyncer initialization/startup
Browse files Browse the repository at this point in the history
  • Loading branch information
iurii-ssv committed Nov 3, 2024
1 parent 2fd5cb3 commit 9900a0b
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 147 deletions.
247 changes: 116 additions & 131 deletions cli/operator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ var StartNodeCmd = &cobra.Command{
Use: "start-node",
Short: "Starts an instance of SSV node",
Run: func(cmd *cobra.Command, args []string) {
ctx := cmd.Context()

commons.SetBuildData(cmd.Parent().Short, cmd.Parent().Version)

err := globalcfg.Prepare(&cfg, &globalArgs)
Expand Down Expand Up @@ -136,7 +138,7 @@ var StartNodeCmd = &cobra.Command{
if err != nil {
logger.Fatal("could not setup network", zap.Error(err))
}
cfg.DBOptions.Ctx = cmd.Context()
cfg.DBOptions.Ctx = ctx
db, err := setupDB(logger, networkConfig.Beacon.GetNetwork())
if err != nil {
logger.Fatal("could not setup db", zap.Error(err))
Expand Down Expand Up @@ -180,7 +182,8 @@ var StartNodeCmd = &cobra.Command{
operatorDataStore := operatordatastore.New(operatorData)

usingLocalEvents := len(cfg.LocalEventsPath) != 0
if err := validateConfig(nodeStorage, networkConfig.AlanForkNetworkName(), usingLocalEvents); err != nil {

if err := checkCfgCompatibility(nodeStorage, networkConfig.AlanForkNetworkName(), usingLocalEvents); err != nil {
logger.Fatal("failed to validate config", zap.Error(err))
}

Expand All @@ -194,7 +197,7 @@ var StartNodeCmd = &cobra.Command{
logger.Fatal("could not create new eth-key-manager signer", zap.Error(err))
}

cfg.P2pNetworkConfig.Ctx = cmd.Context()
cfg.P2pNetworkConfig.Ctx = ctx

slotTickerProvider := func() slotticker.SlotTicker {
return slotticker.New(logger, slotticker.Config{
Expand All @@ -203,14 +206,14 @@ var StartNodeCmd = &cobra.Command{
})
}

cfg.ConsensusClient.Context = cmd.Context()
cfg.ConsensusClient.Context = ctx
cfg.ConsensusClient.GasLimit = spectypes.DefaultGasLimit
cfg.ConsensusClient.Network = networkConfig.Beacon.GetNetwork()

consensusClient := setupConsensusClient(logger, operatorDataStore, slotTickerProvider)

executionClient, err := executionclient.New(
cmd.Context(),
ctx,
cfg.ExecutionClient.Addr,
ethcommon.HexToAddress(networkConfig.RegistryContractAddr),
executionclient.WithLogger(logger),
Expand All @@ -230,7 +233,7 @@ var StartNodeCmd = &cobra.Command{
cfg.P2pNetworkConfig.FullNode = cfg.SSVOptions.ValidatorOptions.FullNode
cfg.P2pNetworkConfig.Network = networkConfig

validatorsMap := validators.New(cmd.Context())
validatorsMap := validators.New(ctx)

dutyStore := dutystore.New()
cfg.SSVOptions.DutyStore = dutyStore
Expand Down Expand Up @@ -272,15 +275,15 @@ var StartNodeCmd = &cobra.Command{

p2pNetwork, genesisP2pNetwork := setupP2P(logger, db, metricsReporter)

cfg.SSVOptions.Context = cmd.Context()
cfg.SSVOptions.Context = ctx
cfg.SSVOptions.DB = db
cfg.SSVOptions.BeaconNode = consensusClient
cfg.SSVOptions.ExecutionClient = executionClient
cfg.SSVOptions.Network = networkConfig
cfg.SSVOptions.P2PNetwork = p2pNetwork
cfg.SSVOptions.ValidatorOptions.NetworkConfig = networkConfig
cfg.SSVOptions.ValidatorOptions.BeaconNetwork = networkConfig.Beacon.GetNetwork()
cfg.SSVOptions.ValidatorOptions.Context = cmd.Context()
cfg.SSVOptions.ValidatorOptions.Context = ctx
cfg.SSVOptions.ValidatorOptions.DB = db
cfg.SSVOptions.ValidatorOptions.Network = p2pNetwork
cfg.SSVOptions.ValidatorOptions.Beacon = consensusClient
Expand All @@ -297,7 +300,7 @@ var StartNodeCmd = &cobra.Command{
cfg.SSVOptions.ValidatorOptions.GenesisControllerOptions.KeyManager = &ekm.GenesisKeyManagerAdapter{KeyManager: keyManager}

if cfg.WsAPIPort != 0 {
ws := exporterapi.NewWsServer(cmd.Context(), nil, http.NewServeMux(), cfg.WithPing)
ws := exporterapi.NewWsServer(ctx, nil, http.NewServeMux(), cfg.WithPing)
cfg.SSVOptions.WS = ws
cfg.SSVOptions.WsAPIPort = cfg.WsAPIPort
cfg.SSVOptions.ValidatorOptions.NewDecidedHandler = decided.NewStreamPublisher(logger, ws)
Expand Down Expand Up @@ -355,7 +358,7 @@ var StartNodeCmd = &cobra.Command{
operatorNode = operator.New(logger, cfg.SSVOptions, slotTickerProvider, storageMap)

if cfg.MetricsAPIPort > 0 {
go startMetricsHandler(cmd.Context(), logger, db, metricsReporter, cfg.MetricsAPIPort, cfg.EnableProfile)
go startMetricsHandler(ctx, logger, db, metricsReporter, cfg.MetricsAPIPort, cfg.EnableProfile)
}

nodeProber := nodeprobe.NewProber(
Expand All @@ -372,23 +375,57 @@ var StartNodeCmd = &cobra.Command{
nodeprobe.ConsensusClientNode: consensusClient,
},
)
if len(cfg.LocalEventsPath) == 0 {
eventSyncer := setupEventHandling(

eventFilterer, err := executionClient.Filterer()
if err != nil {
logger.Fatal("failed to set up event filterer", zap.Error(err))
}
eventHandler, err := eventhandler.New(
nodeStorage,
eventparser.New(eventFilterer),
validatorCtrl,
networkConfig,
operatorDataStore,
operatorPrivKey,
keyManager,
cfg.SSVOptions.ValidatorOptions.Beacon,
eventhandler.WithFullNode(),
eventhandler.WithLogger(logger),
eventhandler.WithMetrics(metricsReporter),
)
if err != nil {
logger.Fatal("failed to setup event data handler", zap.Error(err))
}

// load & parse local events yaml if exists, otherwise sync from contract
if usingLocalEvents {
localEvents, err := localevents.Load(cfg.LocalEventsPath)
if err != nil {
logger.Fatal("failed to load local events", zap.Error(err))
}
if err := eventHandler.HandleLocalEvents(localEvents); err != nil {
logger.Fatal("error occurred while running event data handler", zap.Error(err))
}
} else {
eventSyncer := eventsyncer.New(
nodeStorage,
executionClient,
eventHandler,
eventsyncer.WithLogger(logger),
eventsyncer.WithMetrics(metricsReporter),
)
startEventSyncer(
cmd.Context(),
logger,
executionClient,
validatorCtrl,
metricsReporter,
eventSyncer,
networkConfig,
nodeStorage,
operatorDataStore,
operatorPrivKey,
keyManager,
)
nodeProber.AddNode(nodeprobe.EventSyncerNode, eventSyncer)
}

nodeProber.Start(cmd.Context())
nodeProber.Start(ctx)
nodeProber.Wait()

logger.Info("Ethereum nodes are healthy")
Expand Down Expand Up @@ -438,12 +475,11 @@ var StartNodeCmd = &cobra.Command{
},
}

func validateConfig(nodeStorage operatorstorage.Storage, networkName string, usingLocalEvents bool) error {
func checkCfgCompatibility(nodeStorage operatorstorage.Storage, networkName string, usingLocalEvents bool) error {
storedConfig, foundConfig, err := nodeStorage.GetConfig(nil)
if err != nil {
return fmt.Errorf("failed to get stored config: %w", err)
}

currentConfig := &operatorstorage.ConfigLock{
NetworkName: networkName,
UsingLocalEvents: usingLocalEvents,
Expand All @@ -454,7 +490,6 @@ func validateConfig(nodeStorage operatorstorage.Storage, networkName string, usi
return fmt.Errorf("incompatible config change: %w", err)
}
} else {

if err := nodeStorage.SaveConfig(nil, currentConfig); err != nil {
return fmt.Errorf("failed to store config: %w", err)
}
Expand Down Expand Up @@ -661,135 +696,85 @@ func setupConsensusClient(
return cl
}

func setupEventHandling(
func startMetricsHandler(ctx context.Context, logger *zap.Logger, db basedb.Database, metricsReporter metricsreporter.MetricsReporter, port int, enableProf bool) {
logger = logger.Named(logging.NameMetricsHandler)
// init and start HTTP handler
metricsHandler := metrics.NewMetricsHandler(ctx, db, metricsReporter, enableProf, operatorNode.(metrics.HealthChecker))
addr := fmt.Sprintf(":%d", port)
if err := metricsHandler.Start(logger, http.NewServeMux(), addr); err != nil {
logger.Panic("failed to serve metrics", zap.Error(err))
}
}

func startEventSyncer(
ctx context.Context,
logger *zap.Logger,
executionClient *executionclient.ExecutionClient,
validatorCtrl validator.Controller,
metricsReporter metricsreporter.MetricsReporter,
eventSyncer *eventsyncer.EventSyncer,
networkConfig networkconfig.NetworkConfig,
nodeStorage operatorstorage.Storage,
operatorDataStore operatordatastore.OperatorDataStore,
operatorDecrypter keys.OperatorDecrypter,
keyManager ekm.KeyManager,
) *eventsyncer.EventSyncer {
eventFilterer, err := executionClient.Filterer()
if err != nil {
logger.Fatal("failed to set up event filterer", zap.Error(err))
}

eventParser := eventparser.New(eventFilterer)

eventHandler, err := eventhandler.New(
nodeStorage,
eventParser,
validatorCtrl,
networkConfig,
operatorDataStore,
operatorDecrypter,
keyManager,
cfg.SSVOptions.ValidatorOptions.Beacon,
eventhandler.WithFullNode(),
eventhandler.WithLogger(logger),
eventhandler.WithMetrics(metricsReporter),
)
if err != nil {
logger.Fatal("failed to setup event data handler", zap.Error(err))
}

eventSyncer := eventsyncer.New(
nodeStorage,
executionClient,
eventHandler,
eventsyncer.WithLogger(logger),
eventsyncer.WithMetrics(metricsReporter),
)

) {
fromBlock, found, err := nodeStorage.GetLastProcessedBlock(nil)
if err != nil {
logger.Fatal("syncing registry contract events failed, could not get last processed block", zap.Error(err))
}
if !found {
fromBlock = networkConfig.RegistrySyncOffset
} else if fromBlock == nil {
}
if fromBlock == nil {
logger.Fatal("syncing registry contract events failed, last processed block is nil")
} else {
// Start syncing from the next block.
fromBlock = new(big.Int).SetUint64(fromBlock.Uint64() + 1)
}
// Start syncing from the next block.
fromBlock = new(big.Int).SetUint64(fromBlock.Uint64() + 1)

// load & parse local events yaml if exists, otherwise sync from contract
if len(cfg.LocalEventsPath) != 0 {
localEvents, err := localevents.Load(cfg.LocalEventsPath)
if err != nil {
logger.Fatal("failed to load local events", zap.Error(err))
}
logger.Debug("syncing historical registry events", zap.Uint64("fromBlock", fromBlock.Uint64()))

if err := eventHandler.HandleLocalEvents(localEvents); err != nil {
logger.Fatal("error occurred while running event data handler", zap.Error(err))
}
} else {
// Sync historical registry events.
logger.Debug("syncing historical registry events", zap.Uint64("fromBlock", fromBlock.Uint64()))
lastProcessedBlock, err := eventSyncer.SyncHistory(ctx, fromBlock.Uint64())
switch {
case errors.Is(err, executionclient.ErrNothingToSync):
// Nothing was synced, keep fromBlock as is.
case err == nil:
// Advance fromBlock to the block after lastProcessedBlock.
fromBlock = new(big.Int).SetUint64(lastProcessedBlock + 1)
default:
logger.Fatal("failed to sync historical registry events", zap.Error(err))
}

// Print registry stats.
shares := nodeStorage.Shares().List(nil)
operators, err := nodeStorage.ListOperators(nil, 0, 0)
if err != nil {
logger.Error("failed to get operators", zap.Error(err))
}
lastProcessedBlock, err := eventSyncer.SyncHistory(ctx, fromBlock.Uint64())
switch {
case errors.Is(err, executionclient.ErrNothingToSync):
// Nothing was synced, keep fromBlock as is.
case err == nil:
// Advance fromBlock to the block after lastProcessedBlock.
fromBlock = new(big.Int).SetUint64(lastProcessedBlock + 1)
default:
logger.Fatal("failed to sync historical registry events", zap.Error(err))
}

operatorValidators := 0
liquidatedValidators := 0
operatorID := operatorDataStore.GetOperatorID()
if operatorDataStore.OperatorIDReady() {
for _, share := range shares {
if share.BelongsToOperator(operatorID) {
operatorValidators++
}
if share.Liquidated {
liquidatedValidators++
}
// Print registry stats.
shares := nodeStorage.Shares().List(nil)
operators, err := nodeStorage.ListOperators(nil, 0, 0)
if err != nil {
logger.Error("failed to get operators", zap.Error(err))
}

operatorValidators := 0
liquidatedValidators := 0
operatorID := operatorDataStore.GetOperatorID()
if operatorDataStore.OperatorIDReady() {
for _, share := range shares {
if share.BelongsToOperator(operatorID) {
operatorValidators++
}
if share.Liquidated {
liquidatedValidators++
}
}
logger.Info("historical registry sync stats",
zap.Uint64("my_operator_id", operatorID),
zap.Int("operators", len(operators)),
zap.Int("validators", len(shares)),
zap.Int("liquidated_validators", liquidatedValidators),
zap.Int("my_validators", operatorValidators),
)

// Sync ongoing registry events in the background.
go func() {
err = eventSyncer.SyncOngoing(ctx, fromBlock.Uint64())

// Crash if ongoing sync has stopped, regardless of the reason.
logger.Fatal("failed syncing ongoing registry events",
zap.Uint64("last_processed_block", lastProcessedBlock),
zap.Error(err))
}()
}
logger.Info("historical registry sync stats",
zap.Uint64("my_operator_id", operatorID),
zap.Int("operators", len(operators)),
zap.Int("validators", len(shares)),
zap.Int("liquidated_validators", liquidatedValidators),
zap.Int("my_validators", operatorValidators),
)

return eventSyncer
}
// Sync ongoing registry events in the background.
go func() {
err = eventSyncer.SyncOngoing(ctx, fromBlock.Uint64())

func startMetricsHandler(ctx context.Context, logger *zap.Logger, db basedb.Database, metricsReporter metricsreporter.MetricsReporter, port int, enableProf bool) {
logger = logger.Named(logging.NameMetricsHandler)
// init and start HTTP handler
metricsHandler := metrics.NewMetricsHandler(ctx, db, metricsReporter, enableProf, operatorNode.(metrics.HealthChecker))
addr := fmt.Sprintf(":%d", port)
if err := metricsHandler.Start(logger, http.NewServeMux(), addr); err != nil {
logger.Panic("failed to serve metrics", zap.Error(err))
}
// Crash if ongoing sync has stopped, regardless of the reason.
logger.Fatal("failed syncing ongoing registry events",
zap.Uint64("last_processed_block", lastProcessedBlock),
zap.Error(err))
}()
}
Loading

0 comments on commit 9900a0b

Please sign in to comment.