Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Caplin: Automatic retirement of state tables to their own snapshot files #12508

Merged
merged 140 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
140 commits
Select commit Hold shift + click to select a range
5567da9
save
Giulio2002 Oct 25, 2024
11dac56
save
Giulio2002 Oct 25, 2024
af37440
save
Giulio2002 Oct 25, 2024
ae5f2ba
save
Giulio2002 Oct 25, 2024
4f8eca4
save
Giulio2002 Oct 26, 2024
352332c
save
Giulio2002 Oct 26, 2024
9d9bd2f
save
Giulio2002 Oct 26, 2024
c62a248
save
Giulio2002 Oct 26, 2024
bda7346
save
Giulio2002 Oct 26, 2024
9a01dff
save
Giulio2002 Oct 26, 2024
93fc749
save
Giulio2002 Oct 26, 2024
a9ed413
save
Giulio2002 Oct 26, 2024
aa960d9
save
Giulio2002 Oct 26, 2024
b4801e2
save
Giulio2002 Oct 26, 2024
8b606a6
save
Giulio2002 Oct 26, 2024
0588cae
save
Giulio2002 Oct 26, 2024
d8d972b
save
Giulio2002 Oct 26, 2024
a98bd84
save
Giulio2002 Oct 26, 2024
d51b414
save
Giulio2002 Oct 26, 2024
3749e48
save
Giulio2002 Oct 26, 2024
29be491
save
Giulio2002 Oct 26, 2024
1bc75ac
save
Giulio2002 Oct 26, 2024
2071381
save
Giulio2002 Oct 26, 2024
b390a57
save
Giulio2002 Oct 26, 2024
193967a
save
Giulio2002 Oct 26, 2024
1f359a6
save
Giulio2002 Oct 26, 2024
52d3d1f
save
Giulio2002 Oct 26, 2024
8ffef87
save
Giulio2002 Oct 26, 2024
ea95bc3
save
Giulio2002 Oct 26, 2024
2e7f829
save
Giulio2002 Oct 26, 2024
5423862
save
Giulio2002 Oct 26, 2024
4a72820
save
Giulio2002 Oct 26, 2024
60317d9
save
Giulio2002 Oct 26, 2024
6c12a91
save
Giulio2002 Oct 26, 2024
e772766
save
Giulio2002 Oct 26, 2024
904248b
save
Giulio2002 Oct 26, 2024
e123bf7
save
Giulio2002 Oct 26, 2024
df8e2ef
save
Giulio2002 Oct 26, 2024
22e2565
save
Giulio2002 Oct 26, 2024
0fd5197
save
Giulio2002 Oct 26, 2024
f5c52bd
save
Giulio2002 Oct 26, 2024
35f6c3d
save
Giulio2002 Oct 26, 2024
8d71404
save
Giulio2002 Oct 26, 2024
296e2e5
save
Giulio2002 Oct 26, 2024
9efb112
save
Giulio2002 Oct 26, 2024
1421683
save
Giulio2002 Oct 26, 2024
0a79e0e
save
Giulio2002 Oct 26, 2024
d26fd6c
save
Giulio2002 Oct 26, 2024
9ba3457
save
Giulio2002 Oct 26, 2024
9a69e31
save
Giulio2002 Oct 26, 2024
7462a5c
save
Giulio2002 Oct 26, 2024
a23be29
save
Giulio2002 Oct 26, 2024
ede1c48
save
Giulio2002 Oct 26, 2024
44fc37f
save
Giulio2002 Oct 26, 2024
9ec82fd
save
Giulio2002 Oct 26, 2024
17db8c9
save
Giulio2002 Oct 26, 2024
a43d9d5
save
Giulio2002 Oct 26, 2024
f94332d
save
Giulio2002 Oct 26, 2024
6370a83
save
Giulio2002 Oct 26, 2024
6c4da9f
save
Giulio2002 Oct 26, 2024
5ed3f72
save
Giulio2002 Oct 26, 2024
c9c69d1
save
Giulio2002 Oct 26, 2024
155b5f2
save
Giulio2002 Oct 26, 2024
9a4a46e
save
Giulio2002 Oct 26, 2024
39b4336
save
Giulio2002 Oct 26, 2024
5574844
save
Giulio2002 Oct 26, 2024
33dab63
save
Giulio2002 Oct 26, 2024
1c208be
save
Giulio2002 Oct 26, 2024
66b0d8d
save
Giulio2002 Oct 26, 2024
ced1247
save
Giulio2002 Oct 26, 2024
f1d8710
save
Giulio2002 Oct 26, 2024
8063667
save
Giulio2002 Oct 26, 2024
a1b7331
save
Giulio2002 Oct 26, 2024
ce25fd7
save
Giulio2002 Oct 26, 2024
a270509
save
Giulio2002 Oct 26, 2024
8be282f
save
Giulio2002 Oct 26, 2024
5a376f9
save
Giulio2002 Oct 26, 2024
661d7bf
save
Giulio2002 Oct 26, 2024
c6810c5
save
Giulio2002 Oct 26, 2024
feac154
save
Giulio2002 Oct 26, 2024
a96de6b
save
Giulio2002 Oct 26, 2024
35913bc
save
Giulio2002 Oct 27, 2024
810e0a7
save
Giulio2002 Oct 27, 2024
5470be1
save
Giulio2002 Oct 27, 2024
1df5851
save
Giulio2002 Oct 27, 2024
2485742
save
Giulio2002 Oct 27, 2024
0fe1e64
save
Giulio2002 Oct 27, 2024
4866159
save
Giulio2002 Oct 27, 2024
ed7d368
save
Giulio2002 Oct 27, 2024
07c3075
save
Giulio2002 Oct 27, 2024
1052ee6
save
Giulio2002 Oct 27, 2024
ac4e98c
save
Giulio2002 Oct 27, 2024
a4de21e
save
Giulio2002 Oct 27, 2024
bfd791a
save
Giulio2002 Oct 27, 2024
0766013
save
Giulio2002 Oct 27, 2024
b07c55c
save
Giulio2002 Oct 27, 2024
26c7d9d
save
Giulio2002 Oct 27, 2024
b881496
save
Giulio2002 Oct 27, 2024
70c0538
save
Giulio2002 Oct 27, 2024
2953bbf
save
Giulio2002 Oct 27, 2024
37838b0
save
Giulio2002 Oct 27, 2024
a1259e1
save
Giulio2002 Oct 27, 2024
106f4e5
save
Giulio2002 Oct 27, 2024
a13ef79
save
Giulio2002 Oct 27, 2024
e8cc733
save
Giulio2002 Oct 27, 2024
4147064
save
Giulio2002 Oct 27, 2024
032aed1
save
Giulio2002 Oct 27, 2024
4df47c7
save
Giulio2002 Oct 27, 2024
20f055e
save
Giulio2002 Oct 27, 2024
8a8726e
save
Giulio2002 Oct 27, 2024
d8ed11b
save
Giulio2002 Oct 27, 2024
a99e1c4
save
Giulio2002 Oct 27, 2024
69b1837
save
Giulio2002 Oct 27, 2024
c654049
Merge remote-tracking branch 'origin/main' into antiquated_state_archive
Giulio2002 Oct 27, 2024
4397e10
save
Giulio2002 Oct 27, 2024
63c4812
save
Giulio2002 Oct 27, 2024
536ed7b
save
Giulio2002 Oct 27, 2024
7ba71b2
save
Giulio2002 Oct 27, 2024
2c562b1
save
Giulio2002 Oct 27, 2024
f1cd894
save
Giulio2002 Oct 27, 2024
1066312
save
Giulio2002 Oct 27, 2024
b903cf2
save
Giulio2002 Oct 27, 2024
84c6bc3
save
Giulio2002 Oct 27, 2024
97b50ea
save
Giulio2002 Oct 27, 2024
b0609b1
save
Giulio2002 Oct 27, 2024
0827072
save
Giulio2002 Oct 27, 2024
18f5a38
save
Giulio2002 Oct 27, 2024
00dffea
save
Giulio2002 Oct 27, 2024
006e5b7
save
Giulio2002 Oct 27, 2024
7405711
save
Giulio2002 Oct 27, 2024
9930833
save
Giulio2002 Oct 27, 2024
1cbf2a2
save
Giulio2002 Oct 29, 2024
9bcb2ff
save
Giulio2002 Oct 29, 2024
fad80fe
save
Giulio2002 Oct 29, 2024
443a56b
Merge branch 'main' into antiquated_state_archive
Giulio2002 Nov 2, 2024
af4b39f
save
Giulio2002 Nov 6, 2024
55107dc
Merge branch 'antiquated_state_archive' of https://github.com/erigont…
Giulio2002 Nov 6, 2024
7af45dc
save
Giulio2002 Nov 7, 2024
af0e95d
save
Giulio2002 Nov 7, 2024
96e1c4f
Merge remote-tracking branch 'origin/main' into antiquated_state_archive
Giulio2002 Nov 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cl/antiquary/antiquary.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/erigontech/erigon/cl/persistence/blob_storage"
state_accessors "github.com/erigontech/erigon/cl/persistence/state"
"github.com/erigontech/erigon/cl/phase1/core/state"
"github.com/erigontech/erigon/turbo/snapshotsync"
"github.com/erigontech/erigon/turbo/snapshotsync/freezeblocks"
)

Expand All @@ -50,6 +51,7 @@ type Antiquary struct {
downloader proto_downloader.DownloaderClient
logger log.Logger
sn *freezeblocks.CaplinSnapshots
stateSn *snapshotsync.CaplinStateSnapshots
snReader freezeblocks.BeaconSnapshotReader
snBuildSema *semaphore.Weighted // semaphore for building only one type (blocks, caplin, v3) at a time
ctx context.Context
Expand All @@ -65,7 +67,7 @@ type Antiquary struct {
balances32 []byte
}

func NewAntiquary(ctx context.Context, blobStorage blob_storage.BlobStorage, genesisState *state.CachingBeaconState, validatorsTable *state_accessors.StaticValidatorTable, cfg *clparams.BeaconChainConfig, dirs datadir.Dirs, downloader proto_downloader.DownloaderClient, mainDB kv.RwDB, sn *freezeblocks.CaplinSnapshots, reader freezeblocks.BeaconSnapshotReader, logger log.Logger, states, blocks, blobs, snapgen bool, snBuildSema *semaphore.Weighted) *Antiquary {
func NewAntiquary(ctx context.Context, blobStorage blob_storage.BlobStorage, genesisState *state.CachingBeaconState, validatorsTable *state_accessors.StaticValidatorTable, cfg *clparams.BeaconChainConfig, dirs datadir.Dirs, downloader proto_downloader.DownloaderClient, mainDB kv.RwDB, stateSn *snapshotsync.CaplinStateSnapshots, sn *freezeblocks.CaplinSnapshots, reader freezeblocks.BeaconSnapshotReader, logger log.Logger, states, blocks, blobs, snapgen bool, snBuildSema *semaphore.Weighted) *Antiquary {
backfilled := &atomic.Bool{}
blobBackfilled := &atomic.Bool{}
backfilled.Store(false)
Expand All @@ -89,6 +91,7 @@ func NewAntiquary(ctx context.Context, blobStorage blob_storage.BlobStorage, gen
blocks: blocks,
blobs: blobs,
snapgen: snapgen,
stateSn: stateSn,
}
}

Expand Down
134 changes: 132 additions & 2 deletions cl/antiquary/state_antiquary.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (

"github.com/erigontech/erigon-lib/common"
libcommon "github.com/erigontech/erigon-lib/common"
"github.com/erigontech/erigon-lib/downloader/snaptype"
"github.com/erigontech/erigon-lib/etl"
proto_downloader "github.com/erigontech/erigon-lib/gointerfaces/downloaderproto"
"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon/cl/clparams"
Expand All @@ -42,6 +44,7 @@ import (
"github.com/erigontech/erigon/cl/phase1/core/state/raw"
"github.com/erigontech/erigon/cl/transition"
"github.com/erigontech/erigon/cl/transition/impl/eth2"
"github.com/erigontech/erigon/turbo/snapshotsync"
)

// pool for buffers
Expand Down Expand Up @@ -111,6 +114,9 @@ func (s *Antiquary) readHistoricalProcessingProgress(ctx context.Context) (progr
if err != nil {
return
}
if s.stateSn != nil {
progress = max(progress, s.stateSn.BlocksAvailable())
}

finalized, err = beacon_indicies.ReadHighestFinalized(tx)
if err != nil {
Expand All @@ -119,8 +125,68 @@ func (s *Antiquary) readHistoricalProcessingProgress(ctx context.Context) (progr
return
}

func FillStaticValidatorsTableIfNeeded(ctx context.Context, logger log.Logger, stateSn *snapshotsync.CaplinStateSnapshots, validatorsTable *state_accessors.StaticValidatorTable) (bool, error) {
if stateSn == nil || validatorsTable.Slot() != 0 {
return false, nil
}
if err := stateSn.OpenFolder(); err != nil {
return false, err
}
blocksAvaiable := stateSn.BlocksAvailable()
stateSnRoTx := stateSn.View()
defer stateSnRoTx.Close()

start := time.Now()
for slot := uint64(0); slot <= stateSn.BlocksAvailable(); slot++ {
seg, ok := stateSnRoTx.VisibleSegment(slot, kv.StateEvents)
if !ok {
return false, fmt.Errorf("segment not found for slot %d", slot)
}
buf, err := seg.Get(slot)
if err != nil {
return false, err
}
if len(buf) == 0 {
continue
}
event := state_accessors.NewStateEventsFromBytes(buf)
state_accessors.ReplayEvents(
func(validatorIndex uint64, validator solid.Validator) error {
return validatorsTable.AddValidator(validator, validatorIndex, slot)
},
func(validatorIndex uint64, exitEpoch uint64) error {
return validatorsTable.AddExitEpoch(validatorIndex, slot, exitEpoch)
},
func(validatorIndex uint64, withdrawableEpoch uint64) error {
return validatorsTable.AddWithdrawableEpoch(validatorIndex, slot, withdrawableEpoch)
},
func(validatorIndex uint64, withdrawalCredentials libcommon.Hash) error {
return validatorsTable.AddWithdrawalCredentials(validatorIndex, slot, withdrawalCredentials)
},
func(validatorIndex uint64, activationEpoch uint64) error {
return validatorsTable.AddActivationEpoch(validatorIndex, slot, activationEpoch)
},
func(validatorIndex uint64, activationEligibilityEpoch uint64) error {
return validatorsTable.AddActivationEligibility(validatorIndex, slot, activationEligibilityEpoch)
},
func(validatorIndex uint64, slashed bool) error {
return validatorsTable.AddSlashed(validatorIndex, slot, slashed)
},
event,
)
validatorsTable.SetSlot(slot)
}
logger.Info("[Antiquary] Filled static validators table", "slots", blocksAvaiable, "elapsed", time.Since(start))
return true, nil
}

func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
var tx kv.Tx

// Check if you need to fill the static validators table
refilledStaticValidators, err := FillStaticValidatorsTableIfNeeded(ctx, s.logger, s.stateSn, s.validatorsTable)
if err != nil {
return err
}

tx, err := s.mainDB.BeginRo(ctx)
if err != nil {
Expand All @@ -131,6 +197,13 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
// maps which validators changes
var changedValidators sync.Map

if refilledStaticValidators {
s.validatorsTable.ForEach(func(validatorIndex uint64, validator *state_accessors.StaticValidator) bool {
changedValidators.Store(validatorIndex, struct{}{})
return true
})
}

stateAntiquaryCollector := newBeaconStatesCollector(s.cfg, s.dirs.Tmp, s.logger)
defer stateAntiquaryCollector.close()

Expand Down Expand Up @@ -413,6 +486,59 @@ func (s *Antiquary) IncrementBeaconState(ctx context.Context, to uint64) error {
return err
}
log.Info("Historical states antiquated", "slot", s.currentState.Slot(), "root", libcommon.Hash(stateRoot), "latency", endTime)
if s.snapgen {
if err := s.stateSn.OpenFolder(); err != nil {
return err
}

// Keep gnosis out for a bit
if s.currentState.BeaconConfig().ConfigName == "gnosis" {
return nil
}
blocksPerStatefulFile := uint64(snaptype.CaplinMergeLimit * 5)
from := s.stateSn.BlocksAvailable() + 1
if from+blocksPerStatefulFile+safetyMargin > s.currentState.Slot() {
return nil
}
to := s.currentState.Slot()
if to < (safetyMargin + blocksPerStatefulFile) {
return nil
}
to = to - (safetyMargin + blocksPerStatefulFile)
if from >= to {
return nil
}
if err := s.stateSn.DumpCaplinState(
ctx,
s.stateSn.BlocksAvailable()+1,
to,
blocksPerStatefulFile,
s.sn.Salt,
s.dirs,
1,
log.LvlInfo,
s.logger,
); err != nil {
return err
}
paths := s.stateSn.SegFileNames(from, to)
downloadItems := make([]*proto_downloader.AddItem, len(paths))
for i, path := range paths {
downloadItems[i] = &proto_downloader.AddItem{
Path: path,
}
}
if s.downloader != nil {
// Notify bittorent to seed the new snapshots
if _, err := s.downloader.Add(s.ctx, &proto_downloader.AddRequest{Items: downloadItems}); err != nil {
s.logger.Warn("[Antiquary] Failed to add items to bittorent", "err", err)
}
}
if err := s.stateSn.OpenFolder(); err != nil {
return err
}
}

return nil
}

Expand All @@ -439,12 +565,15 @@ func (s *Antiquary) initializeStateAntiquaryIfNeeded(ctx context.Context, tx kv.
if err != nil {
return err
}
if s.stateSn != nil {
targetSlot = max(targetSlot, s.stateSn.BlocksAvailable())
}
// We want to backoff by some slots until we get a correct state from DB.
// we start from 10 * clparams.SlotsPerDump.
backoffStrides := uint64(10)
backoffStep := backoffStrides

historicalReader := historical_states_reader.NewHistoricalStatesReader(s.cfg, s.snReader, s.validatorsTable, s.genesisState)
historicalReader := historical_states_reader.NewHistoricalStatesReader(s.cfg, s.snReader, s.validatorsTable, s.genesisState, s.stateSn)

for {
attempt, err := computeSlotToBeRequested(tx, s.cfg, s.genesisState.Slot(), targetSlot, backoffStep)
Expand All @@ -465,6 +594,7 @@ func (s *Antiquary) initializeStateAntiquaryIfNeeded(ctx context.Context, tx kv.
if err != nil {
return fmt.Errorf("failed to read historical state at slot %d: %w", attempt, err)
}

if s.currentState == nil {
log.Warn("historical state not found, backoff more and try again", "slot", attempt)
backoffStep += backoffStrides
Expand Down
2 changes: 1 addition & 1 deletion cl/antiquary/state_antiquary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func runTest(t *testing.T, blocks []*cltypes.SignedBeaconBlock, preState, postSt

ctx := context.Background()
vt := state_accessors.NewStaticValidatorTable()
a := NewAntiquary(ctx, nil, preState, vt, &clparams.MainnetBeaconConfig, datadir.New("/tmp"), nil, db, nil, reader, log.New(), true, true, true, false, nil)
a := NewAntiquary(ctx, nil, preState, vt, &clparams.MainnetBeaconConfig, datadir.New("/tmp"), nil, db, nil, nil, reader, log.New(), true, true, true, false, nil)
require.NoError(t, a.IncrementBeaconState(ctx, blocks[len(blocks)-1].Block.Slot+33))
}

Expand Down
14 changes: 9 additions & 5 deletions cl/beacon/handler/attestation_rewards.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,26 +178,30 @@ func (a *ApiHandler) PostEthV1BeaconRewardsAttestations(w http.ResponseWriter, r
if lastSlot > stateProgress {
return nil, beaconhttp.NewEndpointError(http.StatusNotFound, errors.New("requested range is not yet processed or the node is not archivial"))
}
snRoTx := a.caplinStateSnapshots.View()
defer snRoTx.Close()

epochData, err := state_accessors.ReadEpochData(tx, a.beaconChainCfg.RoundSlotToEpoch(lastSlot))
stateGetter := state_accessors.GetValFnTxAndSnapshot(tx, snRoTx)

epochData, err := state_accessors.ReadEpochData(stateGetter, a.beaconChainCfg.RoundSlotToEpoch(lastSlot))
if err != nil {
return nil, err
}

validatorSet, err := a.stateReader.ReadValidatorsForHistoricalState(tx, lastSlot)
validatorSet, err := a.stateReader.ReadValidatorsForHistoricalState(tx, stateGetter, lastSlot)
if err != nil {
return nil, err
}
if validatorSet == nil {
return nil, beaconhttp.NewEndpointError(http.StatusNotFound, errors.New("no validator set found for this epoch"))
}

_, previousIdx, err := a.stateReader.ReadParticipations(tx, lastSlot)
_, previousIdx, err := a.stateReader.ReadParticipations(tx, stateGetter, lastSlot)
if err != nil {
return nil, err
}

_, _, finalizedCheckpoint, ok, err := state_accessors.ReadCheckpoints(tx, epoch*a.beaconChainCfg.SlotsPerEpoch)
_, _, finalizedCheckpoint, ok, err := state_accessors.ReadCheckpoints(stateGetter, epoch*a.beaconChainCfg.SlotsPerEpoch)
if err != nil {
return nil, err
}
Expand All @@ -212,7 +216,7 @@ func (a *ApiHandler) PostEthV1BeaconRewardsAttestations(w http.ResponseWriter, r
return resp.WithFinalized(true).WithOptimistic(a.forkchoiceStore.IsRootOptimistic(root)), nil
}
inactivityScores := solid.NewUint64ListSSZ(int(a.beaconChainCfg.ValidatorRegistryLimit))
if err := a.stateReader.ReconstructUint64ListDump(tx, lastSlot, kv.InactivityScores, validatorSet.Length(), inactivityScores); err != nil {
if err := a.stateReader.ReconstructUint64ListDump(stateGetter, lastSlot, kv.InactivityScores, validatorSet.Length(), inactivityScores); err != nil {
return nil, err
}
resp, err := a.computeAttestationsRewardsForAltair(
Expand Down
9 changes: 7 additions & 2 deletions cl/beacon/handler/committees.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,13 @@ func (a *ApiHandler) getCommittees(w http.ResponseWriter, r *http.Request) (*bea

return newBeaconResponse(resp).WithFinalized(isFinalized).WithOptimistic(isOptimistic), nil
}
snRoTx := a.caplinStateSnapshots.View()
defer snRoTx.Close()
stateGetter := state_accessors.GetValFnTxAndSnapshot(tx, snRoTx)
// finality case
activeIdxs, err := state_accessors.ReadActiveIndicies(tx, epoch*a.beaconChainCfg.SlotsPerEpoch)
activeIdxs, err := state_accessors.ReadActiveIndicies(
stateGetter,
epoch*a.beaconChainCfg.SlotsPerEpoch)
if err != nil {
return nil, err
}
Expand All @@ -138,7 +143,7 @@ func (a *ApiHandler) getCommittees(w http.ResponseWriter, r *http.Request) (*bea
}

mixPosition := (epoch + a.beaconChainCfg.EpochsPerHistoricalVector - a.beaconChainCfg.MinSeedLookahead - 1) % a.beaconChainCfg.EpochsPerHistoricalVector
mix, err := a.stateReader.ReadRandaoMixBySlotAndIndex(tx, epoch*a.beaconChainCfg.SlotsPerEpoch, mixPosition)
mix, err := a.stateReader.ReadRandaoMixBySlotAndIndex(tx, stateGetter, epoch*a.beaconChainCfg.SlotsPerEpoch, mixPosition)
if err != nil {
return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Errorf("could not read randao mix: %v", err))
}
Expand Down
11 changes: 9 additions & 2 deletions cl/beacon/handler/duties_attester.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,15 @@ func (a *ApiHandler) getAttesterDuties(w http.ResponseWriter, r *http.Request) (
if (epoch)*a.beaconChainCfg.SlotsPerEpoch >= stageStateProgress {
return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, fmt.Errorf("epoch %d is too far in the future", epoch))
}

snRoTx := a.caplinStateSnapshots.View()
defer snRoTx.Close()

stateGetter := state_accessors.GetValFnTxAndSnapshot(tx, snRoTx)
// finality case
activeIdxs, err := state_accessors.ReadActiveIndicies(tx, epoch*a.beaconChainCfg.SlotsPerEpoch)
activeIdxs, err := state_accessors.ReadActiveIndicies(
stateGetter,
epoch*a.beaconChainCfg.SlotsPerEpoch)
if err != nil {
return nil, err
}
Expand All @@ -170,7 +177,7 @@ func (a *ApiHandler) getAttesterDuties(w http.ResponseWriter, r *http.Request) (
}

mixPosition := (epoch + a.beaconChainCfg.EpochsPerHistoricalVector - a.beaconChainCfg.MinSeedLookahead - 1) % a.beaconChainCfg.EpochsPerHistoricalVector
mix, err := a.stateReader.ReadRandaoMixBySlotAndIndex(tx, epoch*a.beaconChainCfg.SlotsPerEpoch, mixPosition)
mix, err := a.stateReader.ReadRandaoMixBySlotAndIndex(tx, stateGetter, epoch*a.beaconChainCfg.SlotsPerEpoch, mixPosition)
if err != nil {
return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Errorf("could not read randao mix: %v", err))
}
Expand Down
6 changes: 5 additions & 1 deletion cl/beacon/handler/duties_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,13 @@ func (a *ApiHandler) getSyncDuties(w http.ResponseWriter, r *http.Request) (*bea
if !ok {
_, syncCommittee, ok = a.forkchoiceStore.GetSyncCommittees(period - 1)
}
snRoTx := a.caplinStateSnapshots.View()
defer snRoTx.Close()
// Read them from the archive node if we do not have them in the fast-access storage
if !ok {
syncCommittee, err = state_accessors.ReadCurrentSyncCommittee(tx, a.beaconChainCfg.RoundSlotToSyncCommitteePeriod(startSlotAtEpoch))
syncCommittee, err = state_accessors.ReadCurrentSyncCommittee(
state_accessors.GetValFnTxAndSnapshot(tx, snRoTx),
a.beaconChainCfg.RoundSlotToSyncCommitteePeriod(startSlotAtEpoch))
if syncCommittee == nil {
log.Warn("could not find sync committee for epoch", "epoch", epoch, "period", period)
return nil, beaconhttp.NewEndpointError(http.StatusNotFound, fmt.Errorf("could not find sync committee for epoch %d", epoch))
Expand Down
Loading
Loading