Skip to content

Commit

Permalink
caplin memory usage improvement (#11854)
Browse files Browse the repository at this point in the history
- memory usage improvement
![截圖 2024-09-04 凌晨3 59
42](https://github.com/user-attachments/assets/56e972dc-d6c0-4abd-b891-c4fd2b682b3e)

![截圖 2024-09-04 凌晨1 14
16](https://github.com/user-attachments/assets/eac1ca76-5522-48a3-b600-62deb78a63db)

- fix `invalid target block` error
  • Loading branch information
domiwei authored Sep 5, 2024
1 parent 4c19715 commit e06968c
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 31 deletions.
15 changes: 7 additions & 8 deletions cl/beacon/handler/block_production.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import (
"github.com/erigontech/erigon/cl/transition/impl/eth2"
"github.com/erigontech/erigon/cl/transition/machine"
"github.com/erigontech/erigon/cl/utils"
"github.com/erigontech/erigon/cl/validator/attestation_producer"
"github.com/erigontech/erigon/core/types"
"github.com/erigontech/erigon/turbo/engineapi/engine_types"
)
Expand Down Expand Up @@ -104,16 +105,14 @@ func (a *ApiHandler) GetEthV1ValidatorAttestationData(
*slot,
*committeeIndex,
)
if err != nil {
if err == attestation_producer.ErrHeadStateBehind {
return nil, beaconhttp.NewEndpointError(
http.StatusServiceUnavailable,
errors.New("beacon node is still syncing"),
)
} else if err != nil {
return nil, beaconhttp.NewEndpointError(http.StatusInternalServerError, err)
}
headBlockRoot, _, err := a.forkchoiceStore.GetHead()
if err != nil {
return nil, err
}
if headBlockRoot != (libcommon.Hash{}) {
attestationData.SetBeaconBlockRoot(headBlockRoot)
}

return newBeaconResponse(attestationData), nil
}
Expand Down
26 changes: 21 additions & 5 deletions cl/beacon/synced_data/synced_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package synced_data

import (
"sync"
"sync/atomic"

"github.com/erigontech/erigon/cl/abstract"
Expand All @@ -28,25 +29,40 @@ type SyncedDataManager struct {
enabled bool
cfg *clparams.BeaconChainConfig
headState atomic.Value

copyBuffer *state.CachingBeaconState
copyBufferMutex sync.Mutex
}

func NewSyncedDataManager(enabled bool, cfg *clparams.BeaconChainConfig) *SyncedDataManager {
return &SyncedDataManager{
enabled: enabled,
cfg: cfg,
enabled: enabled,
cfg: cfg,
copyBuffer: state.New(cfg),
}
}

func (s *SyncedDataManager) OnHeadState(newState *state.CachingBeaconState) (err error) {
if !s.enabled {
return
}
st, err := newState.Copy()
if err != nil {
s.copyBufferMutex.Lock()
defer s.copyBufferMutex.Unlock()
newPtr := s.copyBuffer
if err := newState.CopyInto(newPtr); err != nil {
return err
}
s.headState.Store(st)
curPtr, ok := s.headState.Load().(*state.CachingBeaconState)
if !ok {
// No head state yet
s.headState.Store(newPtr)
s.copyBuffer = state.New(s.cfg) // acquire new buffer
return
}

// swap buffers
s.headState.Store(newPtr)
s.copyBuffer = curPtr
return
}

Expand Down
1 change: 1 addition & 0 deletions cl/phase1/forkchoice/on_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (f *ForkChoiceStore) OnBlock(ctx context.Context, block *cltypes.SignedBeac
if f.Slot() < block.Block.Slot {
return errors.New("block is too early compared to current_slot")
}

// Check that block is later than the finalized epoch slot (optimization to reduce calls to get_ancestor)
finalizedSlot := f.computeStartSlotAtEpoch(f.finalizedCheckpoint.Load().(solid.Checkpoint).Epoch())
if block.Block.Slot <= finalizedSlot {
Expand Down
15 changes: 13 additions & 2 deletions cl/phase1/network/services/attestation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type attestationService struct {
emitters *beaconevents.EventEmitter
// validatorAttestationSeen maps from epoch to validator index. This is used to ignore duplicate validator attestations in the same epoch.
validatorAttestationSeen *lru.CacheWithTTL[uint64, uint64] // validator index -> epoch
attestationProcessed *lru.CacheWithTTL[[32]byte, struct{}]
attestationsToBeLaterProcessed sync.Map
}

Expand All @@ -80,6 +81,7 @@ func NewAttestationService(
netCfg: netCfg,
emitters: emitters,
validatorAttestationSeen: lru.NewWithTTL[uint64, uint64]("validator_attestation_seen", validatorAttestationCacheSize, epochDuration),
attestationProcessed: lru.NewWithTTL[[32]byte, struct{}]("attestation_processed", validatorAttestationCacheSize, epochDuration),
}
go a.loop(ctx)
return a
Expand All @@ -97,6 +99,15 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,
return ErrIgnore
}

key, err := att.HashSSZ()
if err != nil {
return err
}
if _, ok := s.attestationProcessed.Get(key); ok {
return ErrIgnore
}
s.attestationProcessed.Add(key, struct{}{})

// [REJECT] The committee index is within the expected range
committeeCount := computeCommitteeCountPerSlot(headState, slot, s.beaconCfg.SlotsPerEpoch)
if committeeIndex >= committeeCount {
Expand Down Expand Up @@ -196,8 +207,8 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,
// [REJECT] The attestation's target block is an ancestor of the block named in the LMD vote -- i.e.
// get_checkpoint_block(store, attestation.data.beacon_block_root, attestation.data.target.epoch) == attestation.data.target.root
startSlotAtEpoch := targetEpoch * s.beaconCfg.SlotsPerEpoch
if s.forkchoiceStore.Ancestor(root, startSlotAtEpoch) != att.AttestantionData().Target().BlockRoot() {
return fmt.Errorf("invalid target block. root %v targetEpoch %v targetBlockRoot %v", root.Hex(), targetEpoch, att.AttestantionData().Target().BlockRoot().Hex())
if targetBlock := s.forkchoiceStore.Ancestor(root, startSlotAtEpoch); targetBlock != att.AttestantionData().Target().BlockRoot() {
return fmt.Errorf("invalid target block. root %v targetEpoch %v attTargetBlockRoot %v targetBlock %v", root.Hex(), targetEpoch, att.AttestantionData().Target().BlockRoot().Hex(), targetBlock.Hex())
}
// [IGNORE] The current finalized_checkpoint is an ancestor of the block defined by attestation.data.beacon_block_root --
// i.e. get_checkpoint_block(store, attestation.data.beacon_block_root, store.finalized_checkpoint.epoch) == store.finalized_checkpoint.root
Expand Down
34 changes: 19 additions & 15 deletions cl/validator/attestation_producer/attestation_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

var (
ErrHeadStateNotAvailable = errors.New("head state not available")
ErrHeadStateBehind = errors.New("head state is behind")
)

const attestationsCacheSize = 21
Expand Down Expand Up @@ -72,6 +73,7 @@ func (ap *attestationProducer) ProduceAndCacheAttestationData(baseState *state.C
return solid.AttestationData{}, err
}
}

return solid.NewAttestionDataFromParameters(
slot,
committeeIndex,
Expand Down Expand Up @@ -104,6 +106,23 @@ func (ap *attestationProducer) ProduceAndCacheAttestationData(baseState *state.C
), nil
}

targetEpoch := slot / ap.beaconCfg.SlotsPerEpoch
epochStartTargetSlot := targetEpoch * ap.beaconCfg.SlotsPerEpoch
var targetRoot libcommon.Hash

if epochStartTargetSlot == baseState.Slot() {
targetRoot = baseStateBlockRoot
} else {
targetRoot, err = baseState.GetBlockRootAtSlot(epochStartTargetSlot)
if err != nil {
return solid.AttestationData{}, err
}
if targetRoot == (libcommon.Hash{}) {
// if the target root is not found, we can't generate the attestation
return solid.AttestationData{}, ErrHeadStateBehind
}
}

stateEpoch := state.Epoch(baseState)
if baseState.Slot() > slot {
return solid.AttestationData{}, errors.New("head state slot is bigger than requested slot, the attestation should have been cached, try again later")
Expand All @@ -123,21 +142,6 @@ func (ap *attestationProducer) ProduceAndCacheAttestationData(baseState *state.C
}
}

targetEpoch := state.Epoch(baseState)
epochStartTargetSlot := targetEpoch * ap.beaconCfg.SlotsPerEpoch
var targetRoot libcommon.Hash
if epochStartTargetSlot == baseState.Slot() {
targetRoot = baseStateBlockRoot
} else {
targetRoot, err = baseState.GetBlockRootAtSlot(epochStartTargetSlot)
if err != nil {
return solid.AttestationData{}, err
}
if targetRoot == (libcommon.Hash{}) {
targetRoot = baseStateBlockRoot
}
}

baseAttestationData := solid.NewAttestionDataFromParameters(
0, // slot will be filled in later
0, // committee index will be filled in later
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (c *CommitteeSubscribeMgmt) sweepByStaleSlots(ctx context.Context) {
return curSlot-targetSlot > c.netConfig.AttestationPropagationSlotRange
}
// sweep every minute
ticker := time.NewTicker(time.Minute)
ticker := time.NewTicker(time.Duration(c.beaconConfig.SecondsPerSlot) * time.Second)
defer ticker.Stop()
for {
select {
Expand All @@ -173,6 +173,10 @@ func (c *CommitteeSubscribeMgmt) sweepByStaleSlots(ctx context.Context) {
if slotIsStale(curSlot, sub.latestTargetSlot) {
toRemoves = append(toRemoves, committeeIdx)
}
// try remove aggregator flag to avoid unnecessary aggregation
if curSlot > sub.latestTargetSlot {
sub.aggregate = false
}
}
for _, idx := range toRemoves {
delete(c.validatorSubs, idx)
Expand Down

0 comments on commit e06968c

Please sign in to comment.