diff --git a/pkg/eigenState/eigenState.go b/pkg/eigenState/eigenState.go index b95f4ed..ecdac8c 100644 --- a/pkg/eigenState/eigenState.go +++ b/pkg/eigenState/eigenState.go @@ -4,6 +4,9 @@ import ( "github.com/Layr-Labs/sidecar/internal/config" "github.com/Layr-Labs/sidecar/pkg/eigenState/avsOperators" "github.com/Layr-Labs/sidecar/pkg/eigenState/disabledDistributionRoots" + "github.com/Layr-Labs/sidecar/pkg/eigenState/operatorAVSSplits" + "github.com/Layr-Labs/sidecar/pkg/eigenState/operatorDirectedRewardSubmissions" + "github.com/Layr-Labs/sidecar/pkg/eigenState/operatorPISplits" "github.com/Layr-Labs/sidecar/pkg/eigenState/operatorShares" "github.com/Layr-Labs/sidecar/pkg/eigenState/rewardSubmissions" "github.com/Layr-Labs/sidecar/pkg/eigenState/stakerDelegations" @@ -48,5 +51,17 @@ func LoadEigenStateModels( l.Sugar().Errorw("Failed to create DisabledDistributionRootsModel", zap.Error(err)) return err } + if _, err := operatorDirectedRewardSubmissions.NewOperatorDirectedRewardSubmissionsModel(sm, grm, l, cfg); err != nil { + l.Sugar().Errorw("Failed to create OperatorDirectedRewardSubmissionsModel", zap.Error(err)) + return err + } + if _, err := operatorAVSSplits.NewOperatorAVSSplitModel(sm, grm, l, cfg); err != nil { + l.Sugar().Errorw("Failed to create OperatorAVSSplitModel", zap.Error(err)) + return err + } + if _, err := operatorPISplits.NewOperatorPISplitModel(sm, grm, l, cfg); err != nil { + l.Sugar().Errorw("Failed to create OperatorPISplitModel", zap.Error(err)) + return err + } return nil } diff --git a/pkg/eigenState/operatorAVSSplits/operatorAVSSplits.go b/pkg/eigenState/operatorAVSSplits/operatorAVSSplits.go new file mode 100644 index 0000000..ddaa802 --- /dev/null +++ b/pkg/eigenState/operatorAVSSplits/operatorAVSSplits.go @@ -0,0 +1,278 @@ +package operatorAVSSplits + +import ( + "encoding/json" + "fmt" + "slices" + "sort" + "strings" + "time" + + "github.com/Layr-Labs/sidecar/pkg/storage" + "github.com/Layr-Labs/sidecar/pkg/utils" + + "github.com/Layr-Labs/sidecar/internal/config" + "github.com/Layr-Labs/sidecar/pkg/eigenState/base" + "github.com/Layr-Labs/sidecar/pkg/eigenState/stateManager" + "github.com/Layr-Labs/sidecar/pkg/eigenState/types" + "go.uber.org/zap" + "golang.org/x/xerrors" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +type OperatorAVSSplit struct { + Operator string + Avs string + ActivatedAt *time.Time + OldOperatorAVSSplitBips uint64 + NewOperatorAVSSplitBips uint64 + BlockNumber uint64 + TransactionHash string + LogIndex uint64 +} + +type OperatorAVSSplitModel struct { + base.BaseEigenState + StateTransitions types.StateTransitions[[]*OperatorAVSSplit] + DB *gorm.DB + Network config.Network + Environment config.Environment + logger *zap.Logger + globalConfig *config.Config + + // Accumulates state changes for SlotIds, grouped by block number + stateAccumulator map[uint64]map[types.SlotID]*OperatorAVSSplit +} + +func NewOperatorAVSSplitModel( + esm *stateManager.EigenStateManager, + grm *gorm.DB, + logger *zap.Logger, + globalConfig *config.Config, +) (*OperatorAVSSplitModel, error) { + model := &OperatorAVSSplitModel{ + BaseEigenState: base.BaseEigenState{ + Logger: logger, + }, + DB: grm, + logger: logger, + globalConfig: globalConfig, + stateAccumulator: make(map[uint64]map[types.SlotID]*OperatorAVSSplit), + } + + esm.RegisterState(model, 8) + return model, nil +} + +func (oas *OperatorAVSSplitModel) GetModelName() string { + return "OperatorAVSSplitModel" +} + +type operatorAVSSplitOutputData struct { + ActivatedAt uint64 `json:"activatedAt"` + OldOperatorAVSSplitBips uint64 `json:"oldOperatorAVSSplitBips"` + NewOperatorAVSSplitBips uint64 `json:"newOperatorAVSSplitBips"` +} + +func parseOperatorAVSSplitOutputData(outputDataStr string) (*operatorAVSSplitOutputData, error) { + outputData := &operatorAVSSplitOutputData{} + decoder := json.NewDecoder(strings.NewReader(outputDataStr)) + decoder.UseNumber() + + err := decoder.Decode(&outputData) + if err != nil { + return nil, err + } + + return outputData, err +} + +func (oas *OperatorAVSSplitModel) handleOperatorAVSSplitBipsSetEvent(log *storage.TransactionLog) (*OperatorAVSSplit, error) { + arguments, err := oas.ParseLogArguments(log) + if err != nil { + return nil, err + } + + outputData, err := parseOperatorAVSSplitOutputData(log.OutputData) + if err != nil { + return nil, err + } + + activatedAt := time.Unix(int64(outputData.ActivatedAt), 0) + + split := &OperatorAVSSplit{ + Operator: strings.ToLower(arguments[1].Value.(string)), + Avs: strings.ToLower(arguments[2].Value.(string)), + ActivatedAt: &activatedAt, + OldOperatorAVSSplitBips: outputData.OldOperatorAVSSplitBips, + NewOperatorAVSSplitBips: outputData.NewOperatorAVSSplitBips, + BlockNumber: log.BlockNumber, + TransactionHash: log.TransactionHash, + LogIndex: log.LogIndex, + } + + return split, nil +} + +func (oas *OperatorAVSSplitModel) GetStateTransitions() (types.StateTransitions[*OperatorAVSSplit], []uint64) { + stateChanges := make(types.StateTransitions[*OperatorAVSSplit]) + + stateChanges[0] = func(log *storage.TransactionLog) (*OperatorAVSSplit, error) { + operatorAVSSplit, err := oas.handleOperatorAVSSplitBipsSetEvent(log) + if err != nil { + return nil, err + } + + slotId := base.NewSlotID(operatorAVSSplit.TransactionHash, operatorAVSSplit.LogIndex) + + _, ok := oas.stateAccumulator[log.BlockNumber][slotId] + if ok { + err := xerrors.Errorf("Duplicate operator AVS split submitted for slot %s at block %d", slotId, log.BlockNumber) + oas.logger.Sugar().Errorw("Duplicate operator AVS split submitted", zap.Error(err)) + return nil, err + } + + oas.stateAccumulator[log.BlockNumber][slotId] = operatorAVSSplit + + return operatorAVSSplit, nil + } + + // Create an ordered list of block numbers + blockNumbers := make([]uint64, 0) + for blockNumber := range stateChanges { + blockNumbers = append(blockNumbers, blockNumber) + } + sort.Slice(blockNumbers, func(i, j int) bool { + return blockNumbers[i] < blockNumbers[j] + }) + slices.Reverse(blockNumbers) + + return stateChanges, blockNumbers +} + +func (oas *OperatorAVSSplitModel) getContractAddressesForEnvironment() map[string][]string { + contracts := oas.globalConfig.GetContractsMapForChain() + return map[string][]string{ + contracts.RewardsCoordinator: { + "OperatorAVSSplitBipsSet", + }, + } +} + +func (oas *OperatorAVSSplitModel) IsInterestingLog(log *storage.TransactionLog) bool { + addresses := oas.getContractAddressesForEnvironment() + return oas.BaseEigenState.IsInterestingLog(addresses, log) +} + +func (oas *OperatorAVSSplitModel) SetupStateForBlock(blockNumber uint64) error { + oas.stateAccumulator[blockNumber] = make(map[types.SlotID]*OperatorAVSSplit) + return nil +} + +func (oas *OperatorAVSSplitModel) CleanupProcessedStateForBlock(blockNumber uint64) error { + delete(oas.stateAccumulator, blockNumber) + return nil +} + +func (oas *OperatorAVSSplitModel) HandleStateChange(log *storage.TransactionLog) (interface{}, error) { + stateChanges, sortedBlockNumbers := oas.GetStateTransitions() + + for _, blockNumber := range sortedBlockNumbers { + if log.BlockNumber >= blockNumber { + oas.logger.Sugar().Debugw("Handling state change", zap.Uint64("blockNumber", log.BlockNumber)) + + change, err := stateChanges[blockNumber](log) + if err != nil { + return nil, err + } + if change == nil { + return nil, nil + } + return change, nil + } + } + return nil, nil +} + +// prepareState prepares the state for commit by adding the new state to the existing state. +func (oas *OperatorAVSSplitModel) prepareState(blockNumber uint64) ([]*OperatorAVSSplit, error) { + accumulatedState, ok := oas.stateAccumulator[blockNumber] + if !ok { + err := xerrors.Errorf("No accumulated state found for block %d", blockNumber) + oas.logger.Sugar().Errorw(err.Error(), zap.Error(err), zap.Uint64("blockNumber", blockNumber)) + return nil, err + } + + recordsToInsert := make([]*OperatorAVSSplit, 0) + for _, split := range accumulatedState { + recordsToInsert = append(recordsToInsert, split) + } + return recordsToInsert, nil +} + +// CommitFinalState commits the final state for the given block number. +func (oas *OperatorAVSSplitModel) CommitFinalState(blockNumber uint64) error { + recordsToInsert, err := oas.prepareState(blockNumber) + if err != nil { + return err + } + + if len(recordsToInsert) > 0 { + for _, record := range recordsToInsert { + res := oas.DB.Model(&OperatorAVSSplit{}).Clauses(clause.Returning{}).Create(&record) + if res.Error != nil { + oas.logger.Sugar().Errorw("Failed to insert records", zap.Error(res.Error)) + return res.Error + } + } + } + return nil +} + +// GenerateStateRoot generates the state root for the given block number using the results of the state changes. +func (oas *OperatorAVSSplitModel) GenerateStateRoot(blockNumber uint64) (types.StateRoot, error) { + inserts, err := oas.prepareState(blockNumber) + if err != nil { + return "", err + } + + inputs := oas.sortValuesForMerkleTree(inserts) + + if len(inputs) == 0 { + return "", nil + } + + fullTree, err := oas.MerkleizeState(blockNumber, inputs) + if err != nil { + oas.logger.Sugar().Errorw("Failed to create merkle tree", + zap.Error(err), + zap.Uint64("blockNumber", blockNumber), + zap.Any("inputs", inputs), + ) + return "", err + } + return types.StateRoot(utils.ConvertBytesToString(fullTree.Root())), nil +} + +func (oas *OperatorAVSSplitModel) sortValuesForMerkleTree(splits []*OperatorAVSSplit) []*base.MerkleTreeInput { + inputs := make([]*base.MerkleTreeInput, 0) + for _, split := range splits { + slotID := base.NewSlotID(split.TransactionHash, split.LogIndex) + value := fmt.Sprintf("%s_%s_%d_%d_%d", split.Operator, split.Avs, split.ActivatedAt.Unix(), split.OldOperatorAVSSplitBips, split.NewOperatorAVSSplitBips) + inputs = append(inputs, &base.MerkleTreeInput{ + SlotID: slotID, + Value: []byte(value), + }) + } + + slices.SortFunc(inputs, func(i, j *base.MerkleTreeInput) int { + return strings.Compare(string(i.SlotID), string(j.SlotID)) + }) + + return inputs +} + +func (oas *OperatorAVSSplitModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error { + return oas.BaseEigenState.DeleteState("operator_avs_splits", startBlockNumber, endBlockNumber, oas.DB) +} diff --git a/pkg/eigenState/operatorAVSSplits/operatorAVSSplits_test.go b/pkg/eigenState/operatorAVSSplits/operatorAVSSplits_test.go new file mode 100644 index 0000000..f7dd2fe --- /dev/null +++ b/pkg/eigenState/operatorAVSSplits/operatorAVSSplits_test.go @@ -0,0 +1,143 @@ +package operatorAVSSplits + +import ( + "fmt" + "math/big" + "strings" + "testing" + "time" + + "github.com/Layr-Labs/sidecar/pkg/postgres" + "github.com/Layr-Labs/sidecar/pkg/storage" + + "github.com/Layr-Labs/sidecar/internal/config" + "github.com/Layr-Labs/sidecar/internal/logger" + "github.com/Layr-Labs/sidecar/internal/tests" + "github.com/Layr-Labs/sidecar/pkg/eigenState/stateManager" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + "gorm.io/gorm" +) + +func setup() ( + string, + *gorm.DB, + *zap.Logger, + *config.Config, + error, +) { + cfg := config.NewConfig() + cfg.DatabaseConfig = *tests.GetDbConfigFromEnv() + + l, _ := logger.NewLogger(&logger.LoggerConfig{Debug: true}) + + dbname, _, grm, err := postgres.GetTestPostgresDatabase(cfg.DatabaseConfig, l) + if err != nil { + return dbname, nil, nil, nil, err + } + + return dbname, grm, l, cfg, nil +} + +func teardown(model *OperatorAVSSplitModel) { + queries := []string{ + `truncate table operator_avs_splits`, + `truncate table blocks cascade`, + } + for _, query := range queries { + res := model.DB.Exec(query) + if res.Error != nil { + fmt.Printf("Failed to run query: %v\n", res.Error) + } + } +} + +func createBlock(model *OperatorAVSSplitModel, blockNumber uint64) error { + block := &storage.Block{ + Number: blockNumber, + Hash: "some hash", + BlockTime: time.Now().Add(time.Hour * time.Duration(blockNumber)), + } + res := model.DB.Model(&storage.Block{}).Create(block) + if res.Error != nil { + return res.Error + } + return nil +} + +func Test_OperatorAVSSplit(t *testing.T) { + dbName, grm, l, cfg, err := setup() + + if err != nil { + t.Fatal(err) + } + + t.Run("Test each event type", func(t *testing.T) { + esm := stateManager.NewEigenStateManager(l, grm) + + model, err := NewOperatorAVSSplitModel(esm, grm, l, cfg) + + t.Run("Handle an operator avs split", func(t *testing.T) { + blockNumber := uint64(102) + + if err := createBlock(model, blockNumber); err != nil { + t.Fatal(err) + } + + log := &storage.TransactionLog{ + TransactionHash: "some hash", + TransactionIndex: big.NewInt(100).Uint64(), + BlockNumber: blockNumber, + Address: cfg.GetContractsMapForChain().RewardsCoordinator, + Arguments: `[{"Name": "caller", "Type": "address", "Value": "0xd36b6e5eee8311d7bffb2f3bb33301a1ab7de101", "Indexed": true}, {"Name": "operator", "Type": "address", "Value": "0xd36b6e5eee8311d7bffb2f3bb33301a1ab7de101", "Indexed": true}, {"Name": "avs", "Type": "address", "Value": "0x9401E5E6564DB35C0f86573a9828DF69Fc778aF1", "Indexed": true}, {"Name": "activatedAt", "Type": "uint32", "Value": 1725494400, "Indexed": false}, {"Name": "oldOperatorAVSSplitBips", "Type": "uint16", "Value": 1000, "Indexed": false}, {"Name": "newOperatorAVSSplitBips", "Type": "uint16", "Value": 2000, "Indexed": false}]`, + EventName: "OperatorAVSSplitBipsSet", + LogIndex: big.NewInt(12).Uint64(), + OutputData: `{"activatedAt": 1725494400, "oldOperatorAVSSplitBips": 1000, "newOperatorAVSSplitBips": 2000}`, + } + + err = model.SetupStateForBlock(blockNumber) + assert.Nil(t, err) + + isInteresting := model.IsInterestingLog(log) + assert.True(t, isInteresting) + + change, err := model.HandleStateChange(log) + assert.Nil(t, err) + assert.NotNil(t, change) + + split := change.(*OperatorAVSSplit) + + assert.Equal(t, strings.ToLower("0xd36b6e5eee8311d7bffb2f3bb33301a1ab7de101"), strings.ToLower(split.Operator)) + assert.Equal(t, strings.ToLower("0x9401E5E6564DB35C0f86573a9828DF69Fc778aF1"), strings.ToLower(split.Avs)) + assert.Equal(t, int64(1725494400), split.ActivatedAt.Unix()) + assert.Equal(t, uint64(1000), split.OldOperatorAVSSplitBips) + assert.Equal(t, uint64(2000), split.NewOperatorAVSSplitBips) + + err = model.CommitFinalState(blockNumber) + assert.Nil(t, err) + + splits := make([]*OperatorAVSSplit, 0) + query := `select * from operator_avs_splits where block_number = ?` + res := model.DB.Raw(query, blockNumber).Scan(&splits) + assert.Nil(t, res.Error) + assert.Equal(t, 1, len(splits)) + + stateRoot, err := model.GenerateStateRoot(blockNumber) + assert.Nil(t, err) + assert.NotNil(t, stateRoot) + assert.True(t, len(stateRoot) > 0) + + t.Cleanup(func() { + teardown(model) + }) + }) + + t.Cleanup(func() { + teardown(model) + }) + }) + + t.Cleanup(func() { + postgres.TeardownTestDatabase(dbName, cfg, grm, l) + }) +} diff --git a/pkg/eigenState/operatorDirectedRewardSubmissions/operatorDirectedRewardSubmissions.go b/pkg/eigenState/operatorDirectedRewardSubmissions/operatorDirectedRewardSubmissions.go new file mode 100644 index 0000000..9b8757f --- /dev/null +++ b/pkg/eigenState/operatorDirectedRewardSubmissions/operatorDirectedRewardSubmissions.go @@ -0,0 +1,333 @@ +package operatorDirectedRewardSubmissions + +import ( + "encoding/json" + "fmt" + "slices" + "sort" + "strings" + "time" + + "github.com/Layr-Labs/sidecar/pkg/storage" + "github.com/Layr-Labs/sidecar/pkg/types/numbers" + "github.com/Layr-Labs/sidecar/pkg/utils" + + "github.com/Layr-Labs/sidecar/internal/config" + "github.com/Layr-Labs/sidecar/pkg/eigenState/base" + "github.com/Layr-Labs/sidecar/pkg/eigenState/stateManager" + "github.com/Layr-Labs/sidecar/pkg/eigenState/types" + "go.uber.org/zap" + "golang.org/x/xerrors" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +type OperatorDirectedRewardSubmission struct { + Avs string + RewardHash string + Token string + Operator string + OperatorIndex uint64 + Amount string + Strategy string + StrategyIndex uint64 + Multiplier string + StartTimestamp *time.Time + EndTimestamp *time.Time + Duration uint64 + BlockNumber uint64 + TransactionHash string + LogIndex uint64 +} + +func NewSlotID(transactionHash string, logIndex uint64, rewardHash string, strategyIndex uint64, operatorIndex uint64) types.SlotID { + return base.NewSlotIDWithSuffix(transactionHash, logIndex, fmt.Sprintf("%s_%d_%d", rewardHash, strategyIndex, operatorIndex)) +} + +type OperatorDirectedRewardSubmissionsModel struct { + base.BaseEigenState + StateTransitions types.StateTransitions[[]*OperatorDirectedRewardSubmission] + DB *gorm.DB + Network config.Network + Environment config.Environment + logger *zap.Logger + globalConfig *config.Config + + // Accumulates state changes for SlotIds, grouped by block number + stateAccumulator map[uint64]map[types.SlotID]*OperatorDirectedRewardSubmission +} + +func NewOperatorDirectedRewardSubmissionsModel( + esm *stateManager.EigenStateManager, + grm *gorm.DB, + logger *zap.Logger, + globalConfig *config.Config, +) (*OperatorDirectedRewardSubmissionsModel, error) { + model := &OperatorDirectedRewardSubmissionsModel{ + BaseEigenState: base.BaseEigenState{ + Logger: logger, + }, + DB: grm, + logger: logger, + globalConfig: globalConfig, + stateAccumulator: make(map[uint64]map[types.SlotID]*OperatorDirectedRewardSubmission), + } + + esm.RegisterState(model, 7) + return model, nil +} + +func (odrs *OperatorDirectedRewardSubmissionsModel) GetModelName() string { + return "OperatorDirectedRewardSubmissionsModel" +} + +type operatorDirectedRewardData struct { + StrategiesAndMultipliers []struct { + Strategy string `json:"strategy"` + Multiplier json.Number `json:"multiplier"` + } `json:"strategiesAndMultipliers"` + Token string `json:"token"` + OperatorRewards []struct { + Operator string `json:"operator"` + Amount json.Number `json:"amount"` + } `json:"operatorRewards"` + StartTimestamp uint64 `json:"startTimestamp"` + Duration uint64 `json:"duration"` + Description string `json:"description"` +} + +type operatorDirectedRewardSubmissionOutputData struct { + SubmissionNonce json.Number `json:"submissionNonce"` + OperatorDirectedRewardsSubmission *operatorDirectedRewardData `json:"operatorDirectedRewardsSubmission"` +} + +func parseRewardSubmissionOutputData(outputDataStr string) (*operatorDirectedRewardSubmissionOutputData, error) { + outputData := &operatorDirectedRewardSubmissionOutputData{} + decoder := json.NewDecoder(strings.NewReader(outputDataStr)) + decoder.UseNumber() + + err := decoder.Decode(&outputData) + if err != nil { + return nil, err + } + + return outputData, err +} + +func (odrs *OperatorDirectedRewardSubmissionsModel) handleOperatorDirectedRewardSubmissionCreatedEvent(log *storage.TransactionLog) ([]*OperatorDirectedRewardSubmission, error) { + arguments, err := odrs.ParseLogArguments(log) + if err != nil { + return nil, err + } + + outputData, err := parseRewardSubmissionOutputData(log.OutputData) + if err != nil { + return nil, err + } + outputRewardData := outputData.OperatorDirectedRewardsSubmission + + rewardSubmissions := make([]*OperatorDirectedRewardSubmission, 0) + + for i, strategyAndMultiplier := range outputRewardData.StrategiesAndMultipliers { + startTimestamp := time.Unix(int64(outputRewardData.StartTimestamp), 0) + endTimestamp := startTimestamp.Add(time.Duration(outputRewardData.Duration) * time.Second) + + multiplierBig, success := numbers.NewBig257().SetString(strategyAndMultiplier.Multiplier.String(), 10) + if !success { + return nil, xerrors.Errorf("Failed to parse multiplier to Big257: %s", strategyAndMultiplier.Multiplier.String()) + } + + for j, operatorReward := range outputRewardData.OperatorRewards { + amountBig, success := numbers.NewBig257().SetString(operatorReward.Amount.String(), 10) + if !success { + return nil, xerrors.Errorf("Failed to parse amount to Big257: %s", operatorReward.Amount.String()) + } + + rewardSubmission := &OperatorDirectedRewardSubmission{ + Avs: strings.ToLower(arguments[1].Value.(string)), + RewardHash: strings.ToLower(arguments[2].Value.(string)), + Token: strings.ToLower(outputRewardData.Token), + Operator: strings.ToLower(operatorReward.Operator), + OperatorIndex: uint64(j), + Amount: amountBig.String(), + Strategy: strings.ToLower(strategyAndMultiplier.Strategy), + StrategyIndex: uint64(i), + Multiplier: multiplierBig.String(), + StartTimestamp: &startTimestamp, + EndTimestamp: &endTimestamp, + Duration: outputRewardData.Duration, + BlockNumber: log.BlockNumber, + TransactionHash: log.TransactionHash, + LogIndex: log.LogIndex, + } + + rewardSubmissions = append(rewardSubmissions, rewardSubmission) + } + } + + return rewardSubmissions, nil +} + +func (odrs *OperatorDirectedRewardSubmissionsModel) GetStateTransitions() (types.StateTransitions[[]*OperatorDirectedRewardSubmission], []uint64) { + stateChanges := make(types.StateTransitions[[]*OperatorDirectedRewardSubmission]) + + stateChanges[0] = func(log *storage.TransactionLog) ([]*OperatorDirectedRewardSubmission, error) { + rewardSubmissions, err := odrs.handleOperatorDirectedRewardSubmissionCreatedEvent(log) + if err != nil { + return nil, err + } + + for _, rewardSubmission := range rewardSubmissions { + slotId := NewSlotID(rewardSubmission.TransactionHash, rewardSubmission.LogIndex, rewardSubmission.RewardHash, rewardSubmission.StrategyIndex, rewardSubmission.OperatorIndex) + + _, ok := odrs.stateAccumulator[log.BlockNumber][slotId] + if ok { + err := xerrors.Errorf("Duplicate operator directed reward submission submitted for slot %s at block %d", slotId, log.BlockNumber) + odrs.logger.Sugar().Errorw("Duplicate operator directed reward submission submitted", zap.Error(err)) + return nil, err + } + + odrs.stateAccumulator[log.BlockNumber][slotId] = rewardSubmission + } + + return rewardSubmissions, nil + } + + // Create an ordered list of block numbers + blockNumbers := make([]uint64, 0) + for blockNumber := range stateChanges { + blockNumbers = append(blockNumbers, blockNumber) + } + sort.Slice(blockNumbers, func(i, j int) bool { + return blockNumbers[i] < blockNumbers[j] + }) + slices.Reverse(blockNumbers) + + return stateChanges, blockNumbers +} + +func (odrs *OperatorDirectedRewardSubmissionsModel) getContractAddressesForEnvironment() map[string][]string { + contracts := odrs.globalConfig.GetContractsMapForChain() + return map[string][]string{ + contracts.RewardsCoordinator: { + "OperatorDirectedAVSRewardsSubmissionCreated", + }, + } +} + +func (odrs *OperatorDirectedRewardSubmissionsModel) IsInterestingLog(log *storage.TransactionLog) bool { + addresses := odrs.getContractAddressesForEnvironment() + return odrs.BaseEigenState.IsInterestingLog(addresses, log) +} + +func (odrs *OperatorDirectedRewardSubmissionsModel) SetupStateForBlock(blockNumber uint64) error { + odrs.stateAccumulator[blockNumber] = make(map[types.SlotID]*OperatorDirectedRewardSubmission) + return nil +} + +func (odrs *OperatorDirectedRewardSubmissionsModel) CleanupProcessedStateForBlock(blockNumber uint64) error { + delete(odrs.stateAccumulator, blockNumber) + return nil +} + +func (odrs *OperatorDirectedRewardSubmissionsModel) HandleStateChange(log *storage.TransactionLog) (interface{}, error) { + stateChanges, sortedBlockNumbers := odrs.GetStateTransitions() + + for _, blockNumber := range sortedBlockNumbers { + if log.BlockNumber >= blockNumber { + odrs.logger.Sugar().Debugw("Handling state change", zap.Uint64("blockNumber", log.BlockNumber)) + + change, err := stateChanges[blockNumber](log) + if err != nil { + return nil, err + } + if change == nil { + return nil, nil + } + return change, nil + } + } + return nil, nil +} + +// prepareState prepares the state for commit by adding the new state to the existing state. +func (odrs *OperatorDirectedRewardSubmissionsModel) prepareState(blockNumber uint64) ([]*OperatorDirectedRewardSubmission, error) { + accumulatedState, ok := odrs.stateAccumulator[blockNumber] + if !ok { + err := xerrors.Errorf("No accumulated state found for block %d", blockNumber) + odrs.logger.Sugar().Errorw(err.Error(), zap.Error(err), zap.Uint64("blockNumber", blockNumber)) + return nil, err + } + + recordsToInsert := make([]*OperatorDirectedRewardSubmission, 0) + for _, submission := range accumulatedState { + recordsToInsert = append(recordsToInsert, submission) + } + return recordsToInsert, nil +} + +// CommitFinalState commits the final state for the given block number. +func (odrs *OperatorDirectedRewardSubmissionsModel) CommitFinalState(blockNumber uint64) error { + recordsToInsert, err := odrs.prepareState(blockNumber) + if err != nil { + return err + } + + if len(recordsToInsert) > 0 { + for _, record := range recordsToInsert { + res := odrs.DB.Model(&OperatorDirectedRewardSubmission{}).Clauses(clause.Returning{}).Create(&record) + if res.Error != nil { + odrs.logger.Sugar().Errorw("Failed to insert records", zap.Error(res.Error)) + return res.Error + } + } + } + return nil +} + +// GenerateStateRoot generates the state root for the given block number using the results of the state changes. +func (odrs *OperatorDirectedRewardSubmissionsModel) GenerateStateRoot(blockNumber uint64) (types.StateRoot, error) { + inserts, err := odrs.prepareState(blockNumber) + if err != nil { + return "", err + } + + inputs := odrs.sortValuesForMerkleTree(inserts) + + if len(inputs) == 0 { + return "", nil + } + + fullTree, err := odrs.MerkleizeState(blockNumber, inputs) + if err != nil { + odrs.logger.Sugar().Errorw("Failed to create merkle tree", + zap.Error(err), + zap.Uint64("blockNumber", blockNumber), + zap.Any("inputs", inputs), + ) + return "", err + } + return types.StateRoot(utils.ConvertBytesToString(fullTree.Root())), nil +} + +func (odrs *OperatorDirectedRewardSubmissionsModel) sortValuesForMerkleTree(submissions []*OperatorDirectedRewardSubmission) []*base.MerkleTreeInput { + inputs := make([]*base.MerkleTreeInput, 0) + for _, submission := range submissions { + slotID := NewSlotID(submission.TransactionHash, submission.LogIndex, submission.RewardHash, submission.StrategyIndex, submission.OperatorIndex) + value := fmt.Sprintf("%s_%s_%s_%s_%s", submission.RewardHash, submission.Strategy, submission.Multiplier, submission.Operator, submission.Amount) + inputs = append(inputs, &base.MerkleTreeInput{ + SlotID: slotID, + Value: []byte(value), + }) + } + + slices.SortFunc(inputs, func(i, j *base.MerkleTreeInput) int { + return strings.Compare(string(i.SlotID), string(j.SlotID)) + }) + + return inputs +} + +func (odrs *OperatorDirectedRewardSubmissionsModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error { + return odrs.BaseEigenState.DeleteState("operator_directed_reward_submissions", startBlockNumber, endBlockNumber, odrs.DB) +} diff --git a/pkg/eigenState/operatorDirectedRewardSubmissions/operatorDirectedRewardSubmissions_test.go b/pkg/eigenState/operatorDirectedRewardSubmissions/operatorDirectedRewardSubmissions_test.go new file mode 100644 index 0000000..6081e2a --- /dev/null +++ b/pkg/eigenState/operatorDirectedRewardSubmissions/operatorDirectedRewardSubmissions_test.go @@ -0,0 +1,173 @@ +package operatorDirectedRewardSubmissions + +import ( + "fmt" + "math/big" + "strings" + "testing" + "time" + + "github.com/Layr-Labs/sidecar/pkg/postgres" + "github.com/Layr-Labs/sidecar/pkg/storage" + + "github.com/Layr-Labs/sidecar/internal/config" + "github.com/Layr-Labs/sidecar/internal/logger" + "github.com/Layr-Labs/sidecar/internal/tests" + "github.com/Layr-Labs/sidecar/pkg/eigenState/stateManager" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + "gorm.io/gorm" +) + +func setup() ( + string, + *gorm.DB, + *zap.Logger, + *config.Config, + error, +) { + cfg := config.NewConfig() + cfg.DatabaseConfig = *tests.GetDbConfigFromEnv() + + l, _ := logger.NewLogger(&logger.LoggerConfig{Debug: true}) + + dbname, _, grm, err := postgres.GetTestPostgresDatabase(cfg.DatabaseConfig, l) + if err != nil { + return dbname, nil, nil, nil, err + } + + return dbname, grm, l, cfg, nil +} + +func teardown(model *OperatorDirectedRewardSubmissionsModel) { + queries := []string{ + `truncate table operator_directed_reward_submissions`, + `truncate table blocks cascade`, + } + for _, query := range queries { + res := model.DB.Exec(query) + if res.Error != nil { + fmt.Printf("Failed to run query: %v\n", res.Error) + } + } +} + +func createBlock(model *OperatorDirectedRewardSubmissionsModel, blockNumber uint64) error { + block := &storage.Block{ + Number: blockNumber, + Hash: "some hash", + BlockTime: time.Now().Add(time.Hour * time.Duration(blockNumber)), + } + res := model.DB.Model(&storage.Block{}).Create(block) + if res.Error != nil { + return res.Error + } + return nil +} + +func Test_OperatorDirectedRewardSubmissions(t *testing.T) { + dbName, grm, l, cfg, err := setup() + + if err != nil { + t.Fatal(err) + } + + t.Run("Test each event type", func(t *testing.T) { + esm := stateManager.NewEigenStateManager(l, grm) + + model, err := NewOperatorDirectedRewardSubmissionsModel(esm, grm, l, cfg) + + submissionCounter := 0 + + t.Run("Handle an operator directed reward submission", func(t *testing.T) { + blockNumber := uint64(102) + + if err := createBlock(model, blockNumber); err != nil { + t.Fatal(err) + } + + log := &storage.TransactionLog{ + TransactionHash: "some hash", + TransactionIndex: big.NewInt(100).Uint64(), + BlockNumber: blockNumber, + Address: cfg.GetContractsMapForChain().RewardsCoordinator, + Arguments: `[{"Name": "caller", "Type": "address", "Value": "0xd36b6e5eee8311d7bffb2f3bb33301a1ab7de101", "Indexed": true}, {"Name": "avs", "Type": "address", "Value": "0xd36b6e5eee8311d7bffb2f3bb33301a1ab7de101", "Indexed": true}, {"Name": "operatorDirectedRewardsSubmissionHash", "Type": "bytes32", "Value": "0x7402669fb2c8a0cfe8108acb8a0070257c77ec6906ecb07d97c38e8a5ddc66a9", "Indexed": true}, {"Name": "submissionNonce", "Type": "uint256", "Value": 0, "Indexed": false}, {"Name": "rewardsSubmission", "Type": "((address,uint96)[],address,(address,uint256)[],uint32,uint32,string)", "Value": null, "Indexed": false}]`, + EventName: "OperatorDirectedAVSRewardsSubmissionCreated", + LogIndex: big.NewInt(12).Uint64(), + OutputData: `{"submissionNonce": 0, "operatorDirectedRewardsSubmission": {"token": "0x0ddd9dc88e638aef6a8e42d0c98aaa6a48a98d24", "operatorRewards": [{"operator": "0x9401E5E6564DB35C0f86573a9828DF69Fc778aF1", "amount": 30000000000000000000000}, {"operator": "0xF50Cba7a66b5E615587157e43286DaA7aF94009e", "amount": 40000000000000000000000}], "duration": 2419200, "startTimestamp": 1725494400, "strategiesAndMultipliers": [{"strategy": "0x5074dfd18e9498d9e006fb8d4f3fecdc9af90a2c", "multiplier": 1000000000000000000}, {"strategy": "0xD56e4eAb23cb81f43168F9F45211Eb027b9aC7cc", "multiplier": 2000000000000000000}]}}`, + } + + err = model.SetupStateForBlock(blockNumber) + assert.Nil(t, err) + + isInteresting := model.IsInterestingLog(log) + assert.True(t, isInteresting) + + change, err := model.HandleStateChange(log) + assert.Nil(t, err) + assert.NotNil(t, change) + + strategiesAndMultipliers := []struct { + Strategy string + Multiplier string + }{ + {"0x5074dfd18e9498d9e006fb8d4f3fecdc9af90a2c", "1000000000000000000"}, + {"0xD56e4eAb23cb81f43168F9F45211Eb027b9aC7cc", "2000000000000000000"}, + } + + operatorRewards := []struct { + Operator string + Amount string + }{ + {"0x9401E5E6564DB35C0f86573a9828DF69Fc778aF1", "30000000000000000000000"}, + {"0xF50Cba7a66b5E615587157e43286DaA7aF94009e", "40000000000000000000000"}, + } + + typedChange := change.([]*OperatorDirectedRewardSubmission) + assert.Equal(t, len(strategiesAndMultipliers)*len(operatorRewards), len(typedChange)) + + for _, submission := range typedChange { + assert.Equal(t, strings.ToLower("0xd36b6e5eee8311d7bffb2f3bb33301a1ab7de101"), strings.ToLower(submission.Avs)) + assert.Equal(t, strings.ToLower("0x0ddd9dc88e638aef6a8e42d0c98aaa6a48a98d24"), strings.ToLower(submission.Token)) + assert.Equal(t, strings.ToLower("0x7402669fb2c8a0cfe8108acb8a0070257c77ec6906ecb07d97c38e8a5ddc66a9"), strings.ToLower(submission.RewardHash)) + assert.Equal(t, uint64(2419200), submission.Duration) + assert.Equal(t, int64(1725494400), submission.StartTimestamp.Unix()) + assert.Equal(t, int64(2419200+1725494400), submission.EndTimestamp.Unix()) + + assert.Equal(t, strings.ToLower(strategiesAndMultipliers[submission.StrategyIndex].Strategy), strings.ToLower(submission.Strategy)) + assert.Equal(t, strategiesAndMultipliers[submission.StrategyIndex].Multiplier, submission.Multiplier) + + assert.Equal(t, strings.ToLower(operatorRewards[submission.OperatorIndex].Operator), strings.ToLower(submission.Operator)) + assert.Equal(t, operatorRewards[submission.OperatorIndex].Amount, submission.Amount) + } + + err = model.CommitFinalState(blockNumber) + assert.Nil(t, err) + + rewards := make([]*OperatorDirectedRewardSubmission, 0) + query := `select * from operator_directed_reward_submissions where block_number = ?` + res := model.DB.Raw(query, blockNumber).Scan(&rewards) + assert.Nil(t, res.Error) + assert.Equal(t, len(strategiesAndMultipliers)*len(operatorRewards), len(rewards)) + + submissionCounter += len(strategiesAndMultipliers) * len(operatorRewards) + + stateRoot, err := model.GenerateStateRoot(blockNumber) + assert.Nil(t, err) + assert.NotNil(t, stateRoot) + assert.True(t, len(stateRoot) > 0) + + t.Cleanup(func() { + teardown(model) + }) + }) + + t.Cleanup(func() { + teardown(model) + }) + }) + + t.Cleanup(func() { + postgres.TeardownTestDatabase(dbName, cfg, grm, l) + }) +} diff --git a/pkg/eigenState/operatorPISplits/operatorPISplits.go b/pkg/eigenState/operatorPISplits/operatorPISplits.go new file mode 100644 index 0000000..baea18b --- /dev/null +++ b/pkg/eigenState/operatorPISplits/operatorPISplits.go @@ -0,0 +1,276 @@ +package operatorPISplits + +import ( + "encoding/json" + "fmt" + "slices" + "sort" + "strings" + "time" + + "github.com/Layr-Labs/sidecar/pkg/storage" + "github.com/Layr-Labs/sidecar/pkg/utils" + + "github.com/Layr-Labs/sidecar/internal/config" + "github.com/Layr-Labs/sidecar/pkg/eigenState/base" + "github.com/Layr-Labs/sidecar/pkg/eigenState/stateManager" + "github.com/Layr-Labs/sidecar/pkg/eigenState/types" + "go.uber.org/zap" + "golang.org/x/xerrors" + "gorm.io/gorm" + "gorm.io/gorm/clause" +) + +type OperatorPISplit struct { + Operator string + ActivatedAt *time.Time + OldOperatorAVSSplitBips uint64 + NewOperatorAVSSplitBips uint64 + BlockNumber uint64 + TransactionHash string + LogIndex uint64 +} + +type OperatorPISplitModel struct { + base.BaseEigenState + StateTransitions types.StateTransitions[[]*OperatorPISplit] + DB *gorm.DB + Network config.Network + Environment config.Environment + logger *zap.Logger + globalConfig *config.Config + + // Accumulates state changes for SlotIds, grouped by block number + stateAccumulator map[uint64]map[types.SlotID]*OperatorPISplit +} + +func NewOperatorPISplitModel( + esm *stateManager.EigenStateManager, + grm *gorm.DB, + logger *zap.Logger, + globalConfig *config.Config, +) (*OperatorPISplitModel, error) { + model := &OperatorPISplitModel{ + BaseEigenState: base.BaseEigenState{ + Logger: logger, + }, + DB: grm, + logger: logger, + globalConfig: globalConfig, + stateAccumulator: make(map[uint64]map[types.SlotID]*OperatorPISplit), + } + + esm.RegisterState(model, 9) + return model, nil +} + +func (ops *OperatorPISplitModel) GetModelName() string { + return "OperatorPISplitModel" +} + +type operatorPISplitOutputData struct { + ActivatedAt uint64 `json:"activatedAt"` + OldOperatorAVSSplitBips uint64 `json:"oldOperatorAVSSplitBips"` + NewOperatorAVSSplitBips uint64 `json:"newOperatorAVSSplitBips"` +} + +func parseOperatorPISplitOutputData(outputDataStr string) (*operatorPISplitOutputData, error) { + outputData := &operatorPISplitOutputData{} + decoder := json.NewDecoder(strings.NewReader(outputDataStr)) + decoder.UseNumber() + + err := decoder.Decode(&outputData) + if err != nil { + return nil, err + } + + return outputData, err +} + +func (ops *OperatorPISplitModel) handleOperatorPISplitBipsSetEvent(log *storage.TransactionLog) (*OperatorPISplit, error) { + arguments, err := ops.ParseLogArguments(log) + if err != nil { + return nil, err + } + + outputData, err := parseOperatorPISplitOutputData(log.OutputData) + if err != nil { + return nil, err + } + + activatedAt := time.Unix(int64(outputData.ActivatedAt), 0) + + split := &OperatorPISplit{ + Operator: strings.ToLower(arguments[1].Value.(string)), + ActivatedAt: &activatedAt, + OldOperatorAVSSplitBips: outputData.OldOperatorAVSSplitBips, + NewOperatorAVSSplitBips: outputData.NewOperatorAVSSplitBips, + BlockNumber: log.BlockNumber, + TransactionHash: log.TransactionHash, + LogIndex: log.LogIndex, + } + + return split, nil +} + +func (ops *OperatorPISplitModel) GetStateTransitions() (types.StateTransitions[*OperatorPISplit], []uint64) { + stateChanges := make(types.StateTransitions[*OperatorPISplit]) + + stateChanges[0] = func(log *storage.TransactionLog) (*OperatorPISplit, error) { + operatorPISplit, err := ops.handleOperatorPISplitBipsSetEvent(log) + if err != nil { + return nil, err + } + + slotId := base.NewSlotID(operatorPISplit.TransactionHash, operatorPISplit.LogIndex) + + _, ok := ops.stateAccumulator[log.BlockNumber][slotId] + if ok { + err := xerrors.Errorf("Duplicate operator PI split submitted for slot %s at block %d", slotId, log.BlockNumber) + ops.logger.Sugar().Errorw("Duplicate operator PI split submitted", zap.Error(err)) + return nil, err + } + + ops.stateAccumulator[log.BlockNumber][slotId] = operatorPISplit + + return operatorPISplit, nil + } + + // Create an ordered list of block numbers + blockNumbers := make([]uint64, 0) + for blockNumber := range stateChanges { + blockNumbers = append(blockNumbers, blockNumber) + } + sort.Slice(blockNumbers, func(i, j int) bool { + return blockNumbers[i] < blockNumbers[j] + }) + slices.Reverse(blockNumbers) + + return stateChanges, blockNumbers +} + +func (ops *OperatorPISplitModel) getContractAddressesForEnvironment() map[string][]string { + contracts := ops.globalConfig.GetContractsMapForChain() + return map[string][]string{ + contracts.RewardsCoordinator: { + "OperatorPISplitBipsSet", + }, + } +} + +func (ops *OperatorPISplitModel) IsInterestingLog(log *storage.TransactionLog) bool { + addresses := ops.getContractAddressesForEnvironment() + return ops.BaseEigenState.IsInterestingLog(addresses, log) +} + +func (ops *OperatorPISplitModel) SetupStateForBlock(blockNumber uint64) error { + ops.stateAccumulator[blockNumber] = make(map[types.SlotID]*OperatorPISplit) + return nil +} + +func (ops *OperatorPISplitModel) CleanupProcessedStateForBlock(blockNumber uint64) error { + delete(ops.stateAccumulator, blockNumber) + return nil +} + +func (ops *OperatorPISplitModel) HandleStateChange(log *storage.TransactionLog) (interface{}, error) { + stateChanges, sortedBlockNumbers := ops.GetStateTransitions() + + for _, blockNumber := range sortedBlockNumbers { + if log.BlockNumber >= blockNumber { + ops.logger.Sugar().Debugw("Handling state change", zap.Uint64("blockNumber", log.BlockNumber)) + + change, err := stateChanges[blockNumber](log) + if err != nil { + return nil, err + } + if change == nil { + return nil, nil + } + return change, nil + } + } + return nil, nil +} + +// prepareState prepares the state for commit by adding the new state to the existing state. +func (ops *OperatorPISplitModel) prepareState(blockNumber uint64) ([]*OperatorPISplit, error) { + accumulatedState, ok := ops.stateAccumulator[blockNumber] + if !ok { + err := xerrors.Errorf("No accumulated state found for block %d", blockNumber) + ops.logger.Sugar().Errorw(err.Error(), zap.Error(err), zap.Uint64("blockNumber", blockNumber)) + return nil, err + } + + recordsToInsert := make([]*OperatorPISplit, 0) + for _, split := range accumulatedState { + recordsToInsert = append(recordsToInsert, split) + } + return recordsToInsert, nil +} + +// CommitFinalState commits the final state for the given block number. +func (ops *OperatorPISplitModel) CommitFinalState(blockNumber uint64) error { + recordsToInsert, err := ops.prepareState(blockNumber) + if err != nil { + return err + } + + if len(recordsToInsert) > 0 { + for _, record := range recordsToInsert { + res := ops.DB.Model(&OperatorPISplit{}).Clauses(clause.Returning{}).Create(&record) + if res.Error != nil { + ops.logger.Sugar().Errorw("Failed to insert records", zap.Error(res.Error)) + return res.Error + } + } + } + return nil +} + +// GenerateStateRoot generates the state root for the given block number using the results of the state changes. +func (ops *OperatorPISplitModel) GenerateStateRoot(blockNumber uint64) (types.StateRoot, error) { + inserts, err := ops.prepareState(blockNumber) + if err != nil { + return "", err + } + + inputs := ops.sortValuesForMerkleTree(inserts) + + if len(inputs) == 0 { + return "", nil + } + + fullTree, err := ops.MerkleizeState(blockNumber, inputs) + if err != nil { + ops.logger.Sugar().Errorw("Failed to create merkle tree", + zap.Error(err), + zap.Uint64("blockNumber", blockNumber), + zap.Any("inputs", inputs), + ) + return "", err + } + return types.StateRoot(utils.ConvertBytesToString(fullTree.Root())), nil +} + +func (ops *OperatorPISplitModel) sortValuesForMerkleTree(splits []*OperatorPISplit) []*base.MerkleTreeInput { + inputs := make([]*base.MerkleTreeInput, 0) + for _, split := range splits { + slotID := base.NewSlotID(split.TransactionHash, split.LogIndex) + value := fmt.Sprintf("%s_%d_%d_%d", split.Operator, split.ActivatedAt.Unix(), split.OldOperatorAVSSplitBips, split.NewOperatorAVSSplitBips) + inputs = append(inputs, &base.MerkleTreeInput{ + SlotID: slotID, + Value: []byte(value), + }) + } + + slices.SortFunc(inputs, func(i, j *base.MerkleTreeInput) int { + return strings.Compare(string(i.SlotID), string(j.SlotID)) + }) + + return inputs +} + +func (ops *OperatorPISplitModel) DeleteState(startBlockNumber uint64, endBlockNumber uint64) error { + return ops.BaseEigenState.DeleteState("operator_pi_splits", startBlockNumber, endBlockNumber, ops.DB) +} diff --git a/pkg/eigenState/operatorPISplits/operatorPISplits_test.go b/pkg/eigenState/operatorPISplits/operatorPISplits_test.go new file mode 100644 index 0000000..97672bb --- /dev/null +++ b/pkg/eigenState/operatorPISplits/operatorPISplits_test.go @@ -0,0 +1,142 @@ +package operatorPISplits + +import ( + "fmt" + "math/big" + "strings" + "testing" + "time" + + "github.com/Layr-Labs/sidecar/pkg/postgres" + "github.com/Layr-Labs/sidecar/pkg/storage" + + "github.com/Layr-Labs/sidecar/internal/config" + "github.com/Layr-Labs/sidecar/internal/logger" + "github.com/Layr-Labs/sidecar/internal/tests" + "github.com/Layr-Labs/sidecar/pkg/eigenState/stateManager" + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + "gorm.io/gorm" +) + +func setup() ( + string, + *gorm.DB, + *zap.Logger, + *config.Config, + error, +) { + cfg := config.NewConfig() + cfg.DatabaseConfig = *tests.GetDbConfigFromEnv() + + l, _ := logger.NewLogger(&logger.LoggerConfig{Debug: true}) + + dbname, _, grm, err := postgres.GetTestPostgresDatabase(cfg.DatabaseConfig, l) + if err != nil { + return dbname, nil, nil, nil, err + } + + return dbname, grm, l, cfg, nil +} + +func teardown(model *OperatorPISplitModel) { + queries := []string{ + `truncate table operator_pi_splits`, + `truncate table blocks cascade`, + } + for _, query := range queries { + res := model.DB.Exec(query) + if res.Error != nil { + fmt.Printf("Failed to run query: %v\n", res.Error) + } + } +} + +func createBlock(model *OperatorPISplitModel, blockNumber uint64) error { + block := &storage.Block{ + Number: blockNumber, + Hash: "some hash", + BlockTime: time.Now().Add(time.Hour * time.Duration(blockNumber)), + } + res := model.DB.Model(&storage.Block{}).Create(block) + if res.Error != nil { + return res.Error + } + return nil +} + +func Test_OperatorPISplit(t *testing.T) { + dbName, grm, l, cfg, err := setup() + + if err != nil { + t.Fatal(err) + } + + t.Run("Test each event type", func(t *testing.T) { + esm := stateManager.NewEigenStateManager(l, grm) + + model, err := NewOperatorPISplitModel(esm, grm, l, cfg) + + t.Run("Handle an operator pi split", func(t *testing.T) { + blockNumber := uint64(102) + + if err := createBlock(model, blockNumber); err != nil { + t.Fatal(err) + } + + log := &storage.TransactionLog{ + TransactionHash: "some hash", + TransactionIndex: big.NewInt(100).Uint64(), + BlockNumber: blockNumber, + Address: cfg.GetContractsMapForChain().RewardsCoordinator, + Arguments: `[{"Name": "caller", "Type": "address", "Value": "0xd36b6e5eee8311d7bffb2f3bb33301a1ab7de101", "Indexed": true}, {"Name": "operator", "Type": "address", "Value": "0xd36b6e5eee8311d7bffb2f3bb33301a1ab7de101", "Indexed": true}, {"Name": "activatedAt", "Type": "uint32", "Value": 1725494400, "Indexed": false}, {"Name": "oldOperatorAVSSplitBips", "Type": "uint16", "Value": 1000, "Indexed": false}, {"Name": "newOperatorAVSSplitBips", "Type": "uint16", "Value": 2000, "Indexed": false}]`, + EventName: "OperatorPISplitBipsSet", + LogIndex: big.NewInt(12).Uint64(), + OutputData: `{"activatedAt": 1725494400, "oldOperatorAVSSplitBips": 1000, "newOperatorAVSSplitBips": 2000}`, + } + + err = model.SetupStateForBlock(blockNumber) + assert.Nil(t, err) + + isInteresting := model.IsInterestingLog(log) + assert.True(t, isInteresting) + + change, err := model.HandleStateChange(log) + assert.Nil(t, err) + assert.NotNil(t, change) + + split := change.(*OperatorPISplit) + + assert.Equal(t, strings.ToLower("0xd36b6e5eee8311d7bffb2f3bb33301a1ab7de101"), strings.ToLower(split.Operator)) + assert.Equal(t, int64(1725494400), split.ActivatedAt.Unix()) + assert.Equal(t, uint64(1000), split.OldOperatorAVSSplitBips) + assert.Equal(t, uint64(2000), split.NewOperatorAVSSplitBips) + + err = model.CommitFinalState(blockNumber) + assert.Nil(t, err) + + splits := make([]*OperatorPISplit, 0) + query := `select * from operator_pi_splits where block_number = ?` + res := model.DB.Raw(query, blockNumber).Scan(&splits) + assert.Nil(t, res.Error) + assert.Equal(t, 1, len(splits)) + + stateRoot, err := model.GenerateStateRoot(blockNumber) + assert.Nil(t, err) + assert.NotNil(t, stateRoot) + assert.True(t, len(stateRoot) > 0) + + t.Cleanup(func() { + teardown(model) + }) + }) + + t.Cleanup(func() { + teardown(model) + }) + }) + + t.Cleanup(func() { + postgres.TeardownTestDatabase(dbName, cfg, grm, l) + }) +} diff --git a/pkg/postgres/migrations/202411151931_operatorDirectedRewardSubmissions/up.go b/pkg/postgres/migrations/202411151931_operatorDirectedRewardSubmissions/up.go new file mode 100644 index 0000000..48022de --- /dev/null +++ b/pkg/postgres/migrations/202411151931_operatorDirectedRewardSubmissions/up.go @@ -0,0 +1,42 @@ +package _202411151931_operatorDirectedRewardSubmissions + +import ( + "database/sql" + + "gorm.io/gorm" +) + +type Migration struct { +} + +func (m *Migration) Up(db *sql.DB, grm *gorm.DB) error { + query := ` + create table if not exists operator_directed_reward_submissions ( + avs varchar not null, + reward_hash varchar not null, + token varchar not null, + operator varchar not null, + operator_index integer not null, + amount numeric not null, + strategy varchar not null, + strategy_index integer not null, + multiplier numeric(78) not null, + start_timestamp timestamp(6) not null, + end_timestamp timestamp(6) not null, + duration bigint not null, + block_number bigint not null, + transaction_hash varchar not null, + log_index bigint not null, + unique(transaction_hash, log_index, block_number, reward_hash, strategy_index, operator_index), + CONSTRAINT operator_directed_reward_submissions_block_number_fkey FOREIGN KEY (block_number) REFERENCES blocks(number) ON DELETE CASCADE + ); + ` + if err := grm.Exec(query).Error; err != nil { + return err + } + return nil +} + +func (m *Migration) GetName() string { + return "202411151931_operatorDirectedRewardSubmissions" +} diff --git a/pkg/postgres/migrations/202411191550_operatorAVSSplits/up.go b/pkg/postgres/migrations/202411191550_operatorAVSSplits/up.go new file mode 100644 index 0000000..15261d8 --- /dev/null +++ b/pkg/postgres/migrations/202411191550_operatorAVSSplits/up.go @@ -0,0 +1,35 @@ +package _202411191550_operatorAVSSplits + +import ( + "database/sql" + + "gorm.io/gorm" +) + +type Migration struct { +} + +func (m *Migration) Up(db *sql.DB, grm *gorm.DB) error { + query := ` + create table if not exists operator_avs_splits ( + operator varchar not null, + avs varchar not null, + activated_at timestamp(6) not null, + old_operator_avs_split_bips integer not null, + new_operator_avs_split_bips integer not null, + block_number bigint not null, + transaction_hash varchar not null, + log_index bigint not null, + unique(transaction_hash, log_index, block_number), + CONSTRAINT operator_avs_splits_block_number_fkey FOREIGN KEY (block_number) REFERENCES blocks(number) ON DELETE CASCADE + ); + ` + if err := grm.Exec(query).Error; err != nil { + return err + } + return nil +} + +func (m *Migration) GetName() string { + return "202411191550_operatorAVSSplits" +} diff --git a/pkg/postgres/migrations/202411191708_operatorPISplits/up.go b/pkg/postgres/migrations/202411191708_operatorPISplits/up.go new file mode 100644 index 0000000..a03d414 --- /dev/null +++ b/pkg/postgres/migrations/202411191708_operatorPISplits/up.go @@ -0,0 +1,34 @@ +package _202411191708_operatorPISplits + +import ( + "database/sql" + + "gorm.io/gorm" +) + +type Migration struct { +} + +func (m *Migration) Up(db *sql.DB, grm *gorm.DB) error { + query := ` + create table if not exists operator_pi_splits ( + operator varchar not null, + activated_at timestamp(6) not null, + old_operator_avs_split_bips integer not null, + new_operator_avs_split_bips integer not null, + block_number bigint not null, + transaction_hash varchar not null, + log_index bigint not null, + unique(transaction_hash, log_index, block_number), + CONSTRAINT operator_pi_splits_block_number_fkey FOREIGN KEY (block_number) REFERENCES blocks(number) ON DELETE CASCADE + ); + ` + if err := grm.Exec(query).Error; err != nil { + return err + } + return nil +} + +func (m *Migration) GetName() string { + return "202411191708_operatorPISplits" +} diff --git a/pkg/postgres/migrations/migrator.go b/pkg/postgres/migrations/migrator.go index da3bfbc..ab7a197 100644 --- a/pkg/postgres/migrations/migrator.go +++ b/pkg/postgres/migrations/migrator.go @@ -3,6 +3,8 @@ package migrations import ( "database/sql" "fmt" + "time" + _202409061249_bootstrapDb "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202409061249_bootstrapDb" _202409061250_eigenlayerStateTables "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202409061250_eigenlayerStateTables" _202409061720_operatorShareChanges "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202409061720_operatorShareChanges" @@ -32,10 +34,13 @@ import ( _202411120947_disabledDistributionRoots "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202411120947_disabledDistributionRoots" _202411130953_addHashColumns "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202411130953_addHashColumns" _202411131200_eigenStateModelConstraints "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202411131200_eigenStateModelConstraints" + _202411151931_operatorDirectedRewardSubmissions "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202411151931_operatorDirectedRewardSubmissions" + _202411191550_operatorAVSSplits "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202411191550_operatorAVSSplits" + _202411191708_operatorPISplits "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202411191708_operatorPISplits" _202411191947_cleanupUnusedTables "github.com/Layr-Labs/sidecar/pkg/postgres/migrations/202411191947_cleanupUnusedTables" + "go.uber.org/zap" "gorm.io/gorm" - "time" ) type Migration interface { @@ -103,6 +108,9 @@ func (m *Migrator) MigrateAll() error { &_202411120947_disabledDistributionRoots.Migration{}, &_202411130953_addHashColumns.Migration{}, &_202411131200_eigenStateModelConstraints.Migration{}, + &_202411151931_operatorDirectedRewardSubmissions.Migration{}, + &_202411191550_operatorAVSSplits.Migration{}, + &_202411191708_operatorPISplits.Migration{}, &_202411191947_cleanupUnusedTables.Migration{}, }