Skip to content

Commit

Permalink
Caplin: Automatic retirement of state tables to their own snapshot fi…
Browse files Browse the repository at this point in the history
…les (#12508)
  • Loading branch information
Giulio2002 authored Nov 7, 2024
1 parent b996e91 commit 76f4107
Show file tree
Hide file tree
Showing 32 changed files with 1,407 additions and 269 deletions.
5 changes: 4 additions & 1 deletion cl/antiquary/antiquary.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/erigontech/erigon/cl/persistence/blob_storage"
state_accessors "github.com/erigontech/erigon/cl/persistence/state"
"github.com/erigontech/erigon/cl/phase1/core/state"
"github.com/erigontech/erigon/turbo/snapshotsync"
"github.com/erigontech/erigon/turbo/snapshotsync/freezeblocks"
)

Expand All @@ -50,6 +51,7 @@ type Antiquary struct {
downloader proto_downloader.DownloaderClient
logger log.Logger
sn *freezeblocks.CaplinSnapshots
stateSn *snapshotsync.CaplinStateSnapshots
snReader freezeblocks.BeaconSnapshotReader
snBuildSema *semaphore.Weighted // semaphore for building only one type (blocks, caplin, v3) at a time
ctx context.Context
Expand All @@ -65,7 +67,7 @@ type Antiquary struct {
balances32 []byte
}

func NewAntiquary(ctx context.Context, blobStorage blob_storage.BlobStorage, genesisState *state.CachingBeaconState, validatorsTable *state_accessors.StaticValidatorTable, cfg *clparams.BeaconChainConfig, dirs datadir.Dirs, downloader proto_downloader.DownloaderClient, mainDB kv.RwDB, sn *freezeblocks.CaplinSnapshots, reader freezeblocks.BeaconSnapshotReader, logger log.Logger, states, blocks, blobs, snapgen bool, snBuildSema *semaphore.Weighted) *Antiquary {
func NewAntiquary(ctx context.Context, blobStorage blob_storage.BlobStorage, genesisState *state.CachingBeaconState, validatorsTable *state_accessors.StaticValidatorTable, cfg *clparams.BeaconChainConfig, dirs datadir.Dirs, downloader proto_downloader.DownloaderClient, mainDB kv.RwDB, stateSn *snapshotsync.CaplinStateSnapshots, sn *freezeblocks.CaplinSnapshots, reader freezeblocks.BeaconSnapshotReader, logger log.Logger, states, blocks, blobs, snapgen bool, snBuildSema *semaphore.Weighted) *Antiquary {
backfilled := &atomic.Bool{}
blobBackfilled := &atomic.Bool{}
backfilled.Store(false)
Expand All @@ -89,6 +91,7 @@ func NewAntiquary(ctx context.Context, blobStorage blob_storage.BlobStorage, gen
blocks: blocks,
blobs: blobs,
snapgen: snapgen,
stateSn: stateSn,
}
}

Expand Down
134 changes: 132 additions & 2 deletions cl/antiquary/state_antiquary.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (

"github.com/erigontech/erigon-lib/common"
libcommon "github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/downloader/snaptype"
"github.com/erigontech/erigon-lib/etl"
proto_downloader "github.com/erigontech/erigon-lib/gointerfaces/downloaderproto"
"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon/cl/clparams"
Expand All @@ -42,6 +44,7 @@ import (
"github.com/erigontech/erigon/cl/phase1/core/state/raw"
"github.com/erigontech/erigon/cl/transition"
"github.com/erigontech/erigon/cl/transition/impl/eth2"
"github.com/erigontech/erigon/turbo/snapshotsync"
)

// pool for buffers
Expand Down Expand Up @@ -111,6 +114,9 @@ func (s *Antiquary) readHistoricalProcessingProgress(ctx context.Context) (progr
if err != nil {
return
}
if s.stateSn != nil {
progress = max(progress, s.stateSn.BlocksAvailable())
}

finalized, err = beacon_indicies.ReadHighestFinalized(tx)
if err != nil {
Expand All @@ -119,8 +125,68 @@ func (s *Antiquary) readHistoricalProcessingProgress(ctx context.Context) (progr
return
}

func FillStaticValidatorsTableIfNeeded(ctx context.Context, logger log.Logger, stateSn *snapshotsync.CaplinStateSnapshots, validatorsTable *state_accessors.StaticValidatorTable) (bool, error) {
if stateSn == nil || validatorsTable.Slot() != 0 {
return false, nil
}
if err := stateSn.OpenFolder(); err != nil {
return false, err
}
blocksAvaiable := stateSn.BlocksAvailable()
stateSnRoTx := stateSn.View()
defer stateSnRoTx.Close()

start := time.Now()
for slot := uint64(0); slot <= stateSn.BlocksAvailable(); slot++ {
seg, ok := stateSnRoTx.VisibleSegment(slot, kv.StateEvents)
if !ok {
return false, fmt.Errorf("segment not found for slot %d", slot)
}
buf, err := seg.Get(slot)
if err != nil {
return false, err
}
if len(buf) == 0 {
continue
}
event := state_accessors.NewStateEventsFromBytes(buf)
state_accessors.ReplayEvents(
func(validatorIndex uint64, validator solid.Validator) error {
return validatorsTable.AddValidator(validator, validatorIndex, slot)
},
func(validatorIndex uint64, exitEpoch uint64) error {
return validatorsTable.AddExitEpoch(validatorIndex, slot, exitEpoch)
},
func(validatorIndex uint64, withdrawableEpoch uint64) error {
return validatorsTable.AddWithdrawableEpoch(validatorIndex, slot, withdrawableEpoch)
},
func(validatorIndex uint64, withdrawalCredentials libcommon.Hash) error {
return validatorsTable.AddWithdrawalCredentials(validatorIndex, slot, withdrawalCredentials)
},
func(validatorIndex uint64, activationEpoch uint64) error {
return validatorsTable.AddActivationEpoch(validatorIndex, slot, activationEpoch)
},
func(validatorIndex uint64, activationEligibilityEpoch uint64) error {
return validatorsTable.AddActivationEligibility(validatorIndex, slot, activationEligibilityEpoch)
},
func(validatorIndex uint64, slashed bool) error {
return validatorsTable.AddSlashed(validatorIndex, slot, slashed)
},
event,
)
validatorsTable.SetSlot(slot)
}
logger.Info("[Antiquary] Filled static validators table", "slots", blocksAvaiable, "elapsed", time.Since(start))
return true, nil
}

func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
var tx kv.Tx

// Check if you need to fill the static validators table
refilledStaticValidators, err := FillStaticValidatorsTableIfNeeded(ctx, s.logger, s.stateSn, s.validatorsTable)
if err != nil {
return err
}

tx, err := s.mainDB.BeginRo(ctx)
if err != nil {
Expand All @@ -131,6 +197,13 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
// maps which validators changes
var changedValidators sync.Map

if refilledStaticValidators {
s.validatorsTable.ForEach(func(validatorIndex uint64, validator *state_accessors.StaticValidator) bool {
changedValidators.Store(validatorIndex, struct{}{})
return true
})
}

stateAntiquaryCollector := newBeaconStatesCollector(s.cfg, s.dirs.Tmp, s.logger)
defer stateAntiquaryCollector.close()

Expand Down Expand Up @@ -413,6 +486,59 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
return err
}
log.Info("Historical states antiquated", "slot", s.currentState.Slot(), "root", libcommon.Hash(stateRoot), "latency", endTime)
if s.snapgen {
if err := s.stateSn.OpenFolder(); err != nil {
return err
}

// Keep gnosis out for a bit
if s.currentState.BeaconConfig().ConfigName == "gnosis" {
return nil
}
blocksPerStatefulFile := uint64(snaptype.CaplinMergeLimit * 5)
from := s.stateSn.BlocksAvailable() + 1
if from+blocksPerStatefulFile+safetyMargin > s.currentState.Slot() {
return nil
}
to := s.currentState.Slot()
if to < (safetyMargin + blocksPerStatefulFile) {
return nil
}
to = to - (safetyMargin + blocksPerStatefulFile)
if from >= to {
return nil
}
if err := s.stateSn.DumpCaplinState(
ctx,
s.stateSn.BlocksAvailable()+1,
to,
blocksPerStatefulFile,
s.sn.Salt,
s.dirs,
1,
log.LvlInfo,
s.logger,
); err != nil {
return err
}
paths := s.stateSn.SegFileNames(from, to)
downloadItems := make([]*proto_downloader.AddItem, len(paths))
for i, path := range paths {
downloadItems[i] = &proto_downloader.AddItem{
Path: path,
}
}
if s.downloader != nil {
// Notify bittorent to seed the new snapshots
if _, err := s.downloader.Add(s.ctx, &proto_downloader.AddRequest{Items: downloadItems}); err != nil {
s.logger.Warn("[Antiquary] Failed to add items to bittorent", "err", err)
}
}
if err := s.stateSn.OpenFolder(); err != nil {
return err
}
}

return nil
}

Expand All @@ -439,12 +565,15 @@ func (s *Antiquary) initializeStateAntiquaryIfNeeded(ctx context.Context, tx kv.
if err != nil {
return err
}
if s.stateSn != nil {
targetSlot = max(targetSlot, s.stateSn.BlocksAvailable())
}
// We want to backoff by some slots until we get a correct state from DB.
// we start from 10 * clparams.SlotsPerDump.
backoffStrides := uint64(10)
backoffStep := backoffStrides

historicalReader := historical_states_reader.NewHistoricalStatesReader(s.cfg, s.snReader, s.validatorsTable, s.genesisState)
historicalReader := historical_states_reader.NewHistoricalStatesReader(s.cfg, s.snReader, s.validatorsTable, s.genesisState, s.stateSn)

for {
attempt, err := computeSlotToBeRequested(tx, s.cfg, s.genesisState.Slot(), targetSlot, backoffStep)
Expand All @@ -465,6 +594,7 @@ func (s *Antiquary) initializeStateAntiquaryIfNeeded(ctx context.Context, tx kv.
if err != nil {
return fmt.Errorf("failed to read historical state at slot %d: %w", attempt, err)
}

if s.currentState == nil {
log.Warn("historical state not found, backoff more and try again", "slot", attempt)
backoffStep += backoffStrides
Expand Down
2 changes: 1 addition & 1 deletion cl/antiquary/state_antiquary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func runTest(t *testing.T, blocks []*cltypes.SignedBeaconBlock, preState, postSt

ctx := context.Background()
vt := state_accessors.NewStaticValidatorTable()
a := NewAntiquary(ctx, nil, preState, vt, &clparams.MainnetBeaconConfig, datadir.New("/tmp"), nil, db, nil, reader, log.New(), true, true, true, false, nil)
a := NewAntiquary(ctx, nil, preState, vt, &clparams.MainnetBeaconConfig, datadir.New("/tmp"), nil, db, nil, nil, reader, log.New(), true, true, true, false, nil)
require.NoError(t, a.IncrementBeaconState(ctx, blocks[len(blocks)-1].Block.Slot+33))
}

Expand Down
14 changes: 9 additions & 5 deletions cl/beacon/handler/attestation_rewards.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,26 +178,30 @@ func (a *ApiHandler) PostEthV1BeaconRewardsAttestations(w http.ResponseWriter, r
if lastSlot > stateProgress {
return nil, beaconhttp.NewEndpointError(http.StatusNotFound, errors.New("requested range is not yet processed or the node is not archivial"))
}
snRoTx := a.caplinStateSnapshots.View()
defer snRoTx.Close()

epochData, err := state_accessors.ReadEpochData(tx, a.beaconChainCfg.RoundSlotToEpoch(lastSlot))
stateGetter := state_accessors.GetValFnTxAndSnapshot(tx, snRoTx)

epochData, err := state_accessors.ReadEpochData(stateGetter, a.beaconChainCfg.RoundSlotToEpoch(lastSlot))
if err != nil {
return nil, err
}

validatorSet, err := a.stateReader.ReadValidatorsForHistoricalState(tx, lastSlot)
validatorSet, err := a.stateReader.ReadValidatorsForHistoricalState(tx, stateGetter, lastSlot)
if err != nil {
return nil, err
}
if validatorSet == nil {
return nil, beaconhttp.NewEndpointError(http.StatusNotFound, errors.New("no validator set found for this epoch"))
}

_, previousIdx, err := a.stateReader.ReadParticipations(tx, lastSlot)
_, previousIdx, err := a.stateReader.ReadParticipations(tx, stateGetter, lastSlot)
if err != nil {
return nil, err
}

_, _, finalizedCheckpoint, ok, err := state_accessors.ReadCheckpoints(tx, epoch*a.beaconChainCfg.SlotsPerEpoch)
_, _, finalizedCheckpoint, ok, err := state_accessors.ReadCheckpoints(stateGetter, epoch*a.beaconChainCfg.SlotsPerEpoch)
if err != nil {
return nil, err
}
Expand All @@ -212,7 +216,7 @@ func (a *ApiHandler) PostEthV1BeaconRewardsAttestations(w http.ResponseWriter, r
return resp.WithFinalized(true).WithOptimistic(a.forkchoiceStore.IsRootOptimistic(root)), nil
}
inactivityScores := solid.NewUint64ListSSZ(int(a.beaconChainCfg.ValidatorRegistryLimit))
if err := a.stateReader.ReconstructUint64ListDump(tx, lastSlot, kv.InactivityScores, validatorSet.Length(), inactivityScores); err != nil {
if err := a.stateReader.ReconstructUint64ListDump(stateGetter, lastSlot, kv.InactivityScores, validatorSet.Length(), inactivityScores); err != nil {
return nil, err
}
resp, err := a.computeAttestationsRewardsForAltair(
Expand Down
9 changes: 7 additions & 2 deletions cl/beacon/handler/committees.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,13 @@ func (a *ApiHandler) getCommittees(w http.ResponseWriter, r *http.Request) (*bea

return newBeaconResponse(resp).WithFinalized(isFinalized).WithOptimistic(isOptimistic), nil
}
snRoTx := a.caplinStateSnapshots.View()
defer snRoTx.Close()
stateGetter := state_accessors.GetValFnTxAndSnapshot(tx, snRoTx)
// finality case
activeIdxs, err := state_accessors.ReadActiveIndicies(tx, epoch*a.beaconChainCfg.SlotsPerEpoch)
activeIdxs, err := state_accessors.ReadActiveIndicies(
stateGetter,
epoch*a.beaconChainCfg.SlotsPerEpoch)
if err != nil {
return nil, err
}
Expand All @@ -138,7 +143,7 @@ func (a *ApiHandler) getCommittees(w http.ResponseWriter, r *http.Request) (*bea
}

mixPosition := (epoch + a.beaconChainCfg.EpochsPerHistoricalVector - a.beaconChainCfg.MinSeedLookahead - 1) % a.beaconChainCfg.EpochsPerHistoricalVector
mix, err := a.stateReader.ReadRandaoMixBySlotAndIndex(tx, epoch*a.beaconChainCfg.SlotsPerEpoch, mixPosition)
mix, err := a.stateReader.ReadRandaoMixBySlotAndIndex(tx, stateGetter, epoch*a.beaconChainCfg.SlotsPerEpoch, mixPosition)
if err != nil {
return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Errorf("could not read randao mix: %v", err))
}
Expand Down
11 changes: 9 additions & 2 deletions cl/beacon/handler/duties_attester.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,15 @@ func (a *ApiHandler) getAttesterDuties(w http.ResponseWriter, r *http.Request) (
if (epoch)*a.beaconChainCfg.SlotsPerEpoch >= stageStateProgress {
return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, fmt.Errorf("epoch %d is too far in the future", epoch))
}

snRoTx := a.caplinStateSnapshots.View()
defer snRoTx.Close()

stateGetter := state_accessors.GetValFnTxAndSnapshot(tx, snRoTx)
// finality case
activeIdxs, err := state_accessors.ReadActiveIndicies(tx, epoch*a.beaconChainCfg.SlotsPerEpoch)
activeIdxs, err := state_accessors.ReadActiveIndicies(
stateGetter,
epoch*a.beaconChainCfg.SlotsPerEpoch)
if err != nil {
return nil, err
}
Expand All @@ -170,7 +177,7 @@ func (a *ApiHandler) getAttesterDuties(w http.ResponseWriter, r *http.Request) (
}

mixPosition := (epoch + a.beaconChainCfg.EpochsPerHistoricalVector - a.beaconChainCfg.MinSeedLookahead - 1) % a.beaconChainCfg.EpochsPerHistoricalVector
mix, err := a.stateReader.ReadRandaoMixBySlotAndIndex(tx, epoch*a.beaconChainCfg.SlotsPerEpoch, mixPosition)
mix, err := a.stateReader.ReadRandaoMixBySlotAndIndex(tx, stateGetter, epoch*a.beaconChainCfg.SlotsPerEpoch, mixPosition)
if err != nil {
return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Errorf("could not read randao mix: %v", err))
}
Expand Down
6 changes: 5 additions & 1 deletion cl/beacon/handler/duties_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,13 @@ func (a *ApiHandler) getSyncDuties(w http.ResponseWriter, r *http.Request) (*bea
if !ok {
_, syncCommittee, ok = a.forkchoiceStore.GetSyncCommittees(period - 1)
}
snRoTx := a.caplinStateSnapshots.View()
defer snRoTx.Close()
// Read them from the archive node if we do not have them in the fast-access storage
if !ok {
syncCommittee, err = state_accessors.ReadCurrentSyncCommittee(tx, a.beaconChainCfg.RoundSlotToSyncCommitteePeriod(startSlotAtEpoch))
syncCommittee, err = state_accessors.ReadCurrentSyncCommittee(
state_accessors.GetValFnTxAndSnapshot(tx, snRoTx),
a.beaconChainCfg.RoundSlotToSyncCommitteePeriod(startSlotAtEpoch))
if syncCommittee == nil {
log.Warn("could not find sync committee for epoch", "epoch", epoch, "period", period)
return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Errorf("could not find sync committee for epoch %d", epoch))
Expand Down
Loading

0 comments on commit 76f4107

Please sign in to comment.