Skip to content

Commit

Permalink
remove duplicate code, explore cfg.LocalEventsPath usage
Browse files Browse the repository at this point in the history
  • Loading branch information
iurii-ssv committed Nov 3, 2024
1 parent d328a1a commit 44e4e0c
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 78 deletions.
111 changes: 49 additions & 62 deletions cli/operator/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/ssvlabs/ssv/eth/eventparser"
"github.com/ssvlabs/ssv/eth/eventsyncer"
"github.com/ssvlabs/ssv/eth/executionclient"
"github.com/ssvlabs/ssv/eth/localevents"
exporterapi "github.com/ssvlabs/ssv/exporter/api"
"github.com/ssvlabs/ssv/exporter/api/decided"
"github.com/ssvlabs/ssv/exporter/convert"
Expand Down Expand Up @@ -179,7 +178,8 @@ var StartNodeCmd = &cobra.Command{
operatorDataStore := operatordatastore.New(operatorData)

usingLocalEvents := len(cfg.LocalEventsPath) != 0
if err := validateConfig(nodeStorage, networkConfig.AlanForkNetworkName(), usingLocalEvents); err != nil {

if err := checkCfgCompatibility(nodeStorage, networkConfig.AlanForkNetworkName(), usingLocalEvents); err != nil {
logger.Fatal("failed to validate config", zap.Error(err))
}

Expand Down Expand Up @@ -371,7 +371,7 @@ var StartNodeCmd = &cobra.Command{
nodeprobe.ConsensusClientNode: consensusClient,
},
)
if len(cfg.LocalEventsPath) == 0 {
if !usingLocalEvents {
eventSyncer := setupEventHandling(
cmd.Context(),
logger,
Expand Down Expand Up @@ -433,12 +433,11 @@ var StartNodeCmd = &cobra.Command{
},
}

func validateConfig(nodeStorage operatorstorage.Storage, networkName string, usingLocalEvents bool) error {
func checkCfgCompatibility(nodeStorage operatorstorage.Storage, networkName string, usingLocalEvents bool) error {
storedConfig, foundConfig, err := nodeStorage.GetConfig(nil)
if err != nil {
return fmt.Errorf("failed to get stored config: %w", err)
}

currentConfig := &operatorstorage.ConfigLock{
NetworkName: networkName,
UsingLocalEvents: usingLocalEvents,
Expand All @@ -449,7 +448,6 @@ func validateConfig(nodeStorage operatorstorage.Storage, networkName string, usi
return fmt.Errorf("incompatible config change: %w", err)
}
} else {

if err := nodeStorage.SaveConfig(nil, currentConfig); err != nil {
return fmt.Errorf("failed to store config: %w", err)
}
Expand Down Expand Up @@ -703,68 +701,57 @@ func setupEventHandling(
fromBlock = new(big.Int).SetUint64(fromBlock.Uint64() + 1)
}

// load & parse local events yaml if exists, otherwise sync from contract
if len(cfg.LocalEventsPath) != 0 {
localEvents, err := localevents.Load(cfg.LocalEventsPath)
if err != nil {
logger.Fatal("failed to load local events", zap.Error(err))
}
// Sync historical registry events from Ethereum smart contract.
logger.Debug("syncing historical registry events", zap.Uint64("fromBlock", fromBlock.Uint64()))

if err := eventHandler.HandleLocalEvents(localEvents); err != nil {
logger.Fatal("error occurred while running event data handler", zap.Error(err))
}
} else {
// Sync historical registry events.
logger.Debug("syncing historical registry events", zap.Uint64("fromBlock", fromBlock.Uint64()))
lastProcessedBlock, err := eventSyncer.SyncHistory(ctx, fromBlock.Uint64())
switch {
case errors.Is(err, executionclient.ErrNothingToSync):
// Nothing was synced, keep fromBlock as is.
case err == nil:
// Advance fromBlock to the block after lastProcessedBlock.
fromBlock = new(big.Int).SetUint64(lastProcessedBlock + 1)
default:
logger.Fatal("failed to sync historical registry events", zap.Error(err))
}

// Print registry stats.
shares := nodeStorage.Shares().List(nil)
operators, err := nodeStorage.ListOperators(nil, 0, 0)
if err != nil {
logger.Error("failed to get operators", zap.Error(err))
}
lastProcessedBlock, err := eventSyncer.SyncHistory(ctx, fromBlock.Uint64())
switch {
case errors.Is(err, executionclient.ErrNothingToSync):
// Nothing was synced, keep fromBlock as is.
case err == nil:
// Advance fromBlock to the block after lastProcessedBlock.
fromBlock = new(big.Int).SetUint64(lastProcessedBlock + 1)
default:
logger.Fatal("failed to sync historical registry events", zap.Error(err))
}

operatorValidators := 0
liquidatedValidators := 0
operatorID := operatorDataStore.GetOperatorID()
if operatorDataStore.OperatorIDReady() {
for _, share := range shares {
if share.BelongsToOperator(operatorID) {
operatorValidators++
}
if share.Liquidated {
liquidatedValidators++
}
// Print registry stats.
shares := nodeStorage.Shares().List(nil)
operators, err := nodeStorage.ListOperators(nil, 0, 0)
if err != nil {
logger.Error("failed to get operators", zap.Error(err))
}

operatorValidators := 0
liquidatedValidators := 0
operatorID := operatorDataStore.GetOperatorID()
if operatorDataStore.OperatorIDReady() {
for _, share := range shares {
if share.BelongsToOperator(operatorID) {
operatorValidators++
}
if share.Liquidated {
liquidatedValidators++
}
}
logger.Info("historical registry sync stats",
zap.Uint64("my_operator_id", operatorID),
zap.Int("operators", len(operators)),
zap.Int("validators", len(shares)),
zap.Int("liquidated_validators", liquidatedValidators),
zap.Int("my_validators", operatorValidators),
)
}
logger.Info("historical registry sync stats",
zap.Uint64("my_operator_id", operatorID),
zap.Int("operators", len(operators)),
zap.Int("validators", len(shares)),
zap.Int("liquidated_validators", liquidatedValidators),
zap.Int("my_validators", operatorValidators),
)

// Sync ongoing registry events in the background.
go func() {
err = eventSyncer.SyncOngoing(ctx, fromBlock.Uint64())
// Sync ongoing registry events in the background.
go func() {
err = eventSyncer.SyncOngoing(ctx, fromBlock.Uint64())

// Crash if ongoing sync has stopped, regardless of the reason.
logger.Fatal("failed syncing ongoing registry events",
zap.Uint64("last_processed_block", lastProcessedBlock),
zap.Error(err))
}()
}
// Crash if ongoing sync has stopped, regardless of the reason.
logger.Fatal("failed syncing ongoing registry events",
zap.Uint64("last_processed_block", lastProcessedBlock),
zap.Error(err))
}()

return eventSyncer
}
Expand Down
19 changes: 9 additions & 10 deletions cli/operator/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@ package operator
import (
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"github.com/ssvlabs/ssv/networkconfig"
operatorstorage "github.com/ssvlabs/ssv/operator/storage"
"github.com/ssvlabs/ssv/storage/basedb"
"github.com/ssvlabs/ssv/storage/kv"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

func Test_verifyConfig(t *testing.T) {
Expand All @@ -29,7 +28,7 @@ func Test_verifyConfig(t *testing.T) {
NetworkName: testNetworkName,
UsingLocalEvents: true,
}
require.NoError(t, validateConfig(nodeStorage, c.NetworkName, c.UsingLocalEvents))
require.NoError(t, checkCfgCompatibility(nodeStorage, c.NetworkName, c.UsingLocalEvents))

storedConfig, found, err := nodeStorage.GetConfig(nil)
require.NoError(t, err)
Expand All @@ -45,7 +44,7 @@ func Test_verifyConfig(t *testing.T) {
UsingLocalEvents: true,
}
require.NoError(t, nodeStorage.SaveConfig(nil, c))
require.NoError(t, validateConfig(nodeStorage, c.NetworkName, c.UsingLocalEvents))
require.NoError(t, checkCfgCompatibility(nodeStorage, c.NetworkName, c.UsingLocalEvents))

storedConfig, found, err := nodeStorage.GetConfig(nil)
require.NoError(t, err)
Expand All @@ -62,7 +61,7 @@ func Test_verifyConfig(t *testing.T) {
}
require.NoError(t, nodeStorage.SaveConfig(nil, c))
require.ErrorContains(t,
validateConfig(nodeStorage, testNetworkName, true),
checkCfgCompatibility(nodeStorage, testNetworkName, true),
"incompatible config change: network mismatch. Stored network testnet:alan1 does not match current network testnet:alan. The database must be removed or reinitialized",
)

Expand All @@ -81,7 +80,7 @@ func Test_verifyConfig(t *testing.T) {
}
require.NoError(t, nodeStorage.SaveConfig(nil, c))
require.ErrorContains(t,
validateConfig(nodeStorage, testNetworkName, c.UsingLocalEvents),
checkCfgCompatibility(nodeStorage, testNetworkName, c.UsingLocalEvents),
"incompatible config change: network mismatch. Stored network testnet:alan1 does not match current network testnet:alan. The database must be removed or reinitialized",
)

Expand All @@ -100,7 +99,7 @@ func Test_verifyConfig(t *testing.T) {
}
require.NoError(t, nodeStorage.SaveConfig(nil, c))
require.ErrorContains(t,
validateConfig(nodeStorage, c.NetworkName, true),
checkCfgCompatibility(nodeStorage, c.NetworkName, true),
"incompatible config change: enabling local events is not allowed. The database must be removed or reinitialized",
)

Expand All @@ -119,7 +118,7 @@ func Test_verifyConfig(t *testing.T) {
}
require.NoError(t, nodeStorage.SaveConfig(nil, c))
require.ErrorContains(t,
validateConfig(nodeStorage, c.NetworkName, false),
checkCfgCompatibility(nodeStorage, c.NetworkName, false),
"incompatible config change: disabling local events is not allowed. The database must be removed or reinitialized",
)

Expand Down
10 changes: 4 additions & 6 deletions eth/eventhandler/task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,16 @@ import (
"encoding/binary"
"testing"

"go.uber.org/mock/gomock"

ethcommon "github.com/ethereum/go-ethereum/common"
ethtypes "github.com/ethereum/go-ethereum/core/types"
spectypes "github.com/ssvlabs/ssv-spec/types"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"

"github.com/ssvlabs/ssv/eth/executionclient"
ssvtypes "github.com/ssvlabs/ssv/protocol/v2/types"
"github.com/ssvlabs/ssv/registry/storage"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"go.uber.org/zap"
"go.uber.org/zap/zaptest/observer"
)

// const rawOperatorAdded = `{
Expand Down

0 comments on commit 44e4e0c

Please sign in to comment.