From e06968c4fdb92263bf90ecab6164ee3dec649618 Mon Sep 17 00:00:00 2001 From: Kewei Date: Fri, 6 Sep 2024 00:32:20 +0800 Subject: [PATCH] caplin memory usage improvement (#11854) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - memory usage improvement ![截圖 2024-09-04 凌晨3 59 42](https://github.com/user-attachments/assets/56e972dc-d6c0-4abd-b891-c4fd2b682b3e) ![截圖 2024-09-04 凌晨1 14 16](https://github.com/user-attachments/assets/eac1ca76-5522-48a3-b600-62deb78a63db) - fix `invalid target block` error --- cl/beacon/handler/block_production.go | 15 ++++---- cl/beacon/synced_data/synced_data.go | 26 +++++++++++--- cl/phase1/forkchoice/on_block.go | 1 + .../network/services/attestation_service.go | 15 ++++++-- .../attestation_producer.go | 34 +++++++++++-------- .../committee_subscription.go | 6 +++- 6 files changed, 66 insertions(+), 31 deletions(-) diff --git a/cl/beacon/handler/block_production.go b/cl/beacon/handler/block_production.go index ab005f814e9..2b40eed19a1 100644 --- a/cl/beacon/handler/block_production.go +++ b/cl/beacon/handler/block_production.go @@ -55,6 +55,7 @@ import ( "github.com/erigontech/erigon/cl/transition/impl/eth2" "github.com/erigontech/erigon/cl/transition/machine" "github.com/erigontech/erigon/cl/utils" + "github.com/erigontech/erigon/cl/validator/attestation_producer" "github.com/erigontech/erigon/core/types" "github.com/erigontech/erigon/turbo/engineapi/engine_types" ) @@ -104,16 +105,14 @@ func (a *ApiHandler) GetEthV1ValidatorAttestationData( *slot, *committeeIndex, ) - if err != nil { + if err == attestation_producer.ErrHeadStateBehind { + return nil, beaconhttp.NewEndpointError( + http.StatusServiceUnavailable, + errors.New("beacon node is still syncing"), + ) + } else if err != nil { return nil, beaconhttp.NewEndpointError(http.StatusInternalServerError, err) } - headBlockRoot, _, err := a.forkchoiceStore.GetHead() - if err != nil { - return nil, err - } - if headBlockRoot != (libcommon.Hash{}) { - attestationData.SetBeaconBlockRoot(headBlockRoot) - } return newBeaconResponse(attestationData), nil } diff --git a/cl/beacon/synced_data/synced_data.go b/cl/beacon/synced_data/synced_data.go index 8b4d5cba2fe..e549c471dcc 100644 --- a/cl/beacon/synced_data/synced_data.go +++ b/cl/beacon/synced_data/synced_data.go @@ -17,6 +17,7 @@ package synced_data import ( + "sync" "sync/atomic" "github.com/erigontech/erigon/cl/abstract" @@ -28,12 +29,16 @@ type SyncedDataManager struct { enabled bool cfg *clparams.BeaconChainConfig headState atomic.Value + + copyBuffer *state.CachingBeaconState + copyBufferMutex sync.Mutex } func NewSyncedDataManager(enabled bool, cfg *clparams.BeaconChainConfig) *SyncedDataManager { return &SyncedDataManager{ - enabled: enabled, - cfg: cfg, + enabled: enabled, + cfg: cfg, + copyBuffer: state.New(cfg), } } @@ -41,12 +46,23 @@ func (s *SyncedDataManager) OnHeadState(newState *state.CachingBeaconState) (err if !s.enabled { return } - st, err := newState.Copy() - if err != nil { + s.copyBufferMutex.Lock() + defer s.copyBufferMutex.Unlock() + newPtr := s.copyBuffer + if err := newState.CopyInto(newPtr); err != nil { return err } - s.headState.Store(st) + curPtr, ok := s.headState.Load().(*state.CachingBeaconState) + if !ok { + // No head state yet + s.headState.Store(newPtr) + s.copyBuffer = state.New(s.cfg) // acquire new buffer + return + } + // swap buffers + s.headState.Store(newPtr) + s.copyBuffer = curPtr return } diff --git a/cl/phase1/forkchoice/on_block.go b/cl/phase1/forkchoice/on_block.go index 758d5a75142..e9f502f89d2 100644 --- a/cl/phase1/forkchoice/on_block.go +++ b/cl/phase1/forkchoice/on_block.go @@ -79,6 +79,7 @@ func (f *ForkChoiceStore) OnBlock(ctx context.Context, block *cltypes.SignedBeac if f.Slot() < block.Block.Slot { return errors.New("block is too early compared to current_slot") } + // Check that block is later than the finalized epoch slot (optimization to reduce calls to get_ancestor) finalizedSlot := f.computeStartSlotAtEpoch(f.finalizedCheckpoint.Load().(solid.Checkpoint).Epoch()) if block.Block.Slot <= finalizedSlot { diff --git a/cl/phase1/network/services/attestation_service.go b/cl/phase1/network/services/attestation_service.go index 485c6c26826..4b7a848c25b 100644 --- a/cl/phase1/network/services/attestation_service.go +++ b/cl/phase1/network/services/attestation_service.go @@ -57,6 +57,7 @@ type attestationService struct { emitters *beaconevents.EventEmitter // validatorAttestationSeen maps from epoch to validator index. This is used to ignore duplicate validator attestations in the same epoch. validatorAttestationSeen *lru.CacheWithTTL[uint64, uint64] // validator index -> epoch + attestationProcessed *lru.CacheWithTTL[[32]byte, struct{}] attestationsToBeLaterProcessed sync.Map } @@ -80,6 +81,7 @@ func NewAttestationService( netCfg: netCfg, emitters: emitters, validatorAttestationSeen: lru.NewWithTTL[uint64, uint64]("validator_attestation_seen", validatorAttestationCacheSize, epochDuration), + attestationProcessed: lru.NewWithTTL[[32]byte, struct{}]("attestation_processed", validatorAttestationCacheSize, epochDuration), } go a.loop(ctx) return a @@ -97,6 +99,15 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64, return ErrIgnore } + key, err := att.HashSSZ() + if err != nil { + return err + } + if _, ok := s.attestationProcessed.Get(key); ok { + return ErrIgnore + } + s.attestationProcessed.Add(key, struct{}{}) + // [REJECT] The committee index is within the expected range committeeCount := computeCommitteeCountPerSlot(headState, slot, s.beaconCfg.SlotsPerEpoch) if committeeIndex >= committeeCount { @@ -196,8 +207,8 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64, // [REJECT] The attestation's target block is an ancestor of the block named in the LMD vote -- i.e. // get_checkpoint_block(store, attestation.data.beacon_block_root, attestation.data.target.epoch) == attestation.data.target.root startSlotAtEpoch := targetEpoch * s.beaconCfg.SlotsPerEpoch - if s.forkchoiceStore.Ancestor(root, startSlotAtEpoch) != att.AttestantionData().Target().BlockRoot() { - return fmt.Errorf("invalid target block. root %v targetEpoch %v targetBlockRoot %v", root.Hex(), targetEpoch, att.AttestantionData().Target().BlockRoot().Hex()) + if targetBlock := s.forkchoiceStore.Ancestor(root, startSlotAtEpoch); targetBlock != att.AttestantionData().Target().BlockRoot() { + return fmt.Errorf("invalid target block. root %v targetEpoch %v attTargetBlockRoot %v targetBlock %v", root.Hex(), targetEpoch, att.AttestantionData().Target().BlockRoot().Hex(), targetBlock.Hex()) } // [IGNORE] The current finalized_checkpoint is an ancestor of the block defined by attestation.data.beacon_block_root -- // i.e. get_checkpoint_block(store, attestation.data.beacon_block_root, store.finalized_checkpoint.epoch) == store.finalized_checkpoint.root diff --git a/cl/validator/attestation_producer/attestation_producer.go b/cl/validator/attestation_producer/attestation_producer.go index 23e75bb30d6..ec88ed39ca1 100644 --- a/cl/validator/attestation_producer/attestation_producer.go +++ b/cl/validator/attestation_producer/attestation_producer.go @@ -34,6 +34,7 @@ import ( var ( ErrHeadStateNotAvailable = errors.New("head state not available") + ErrHeadStateBehind = errors.New("head state is behind") ) const attestationsCacheSize = 21 @@ -72,6 +73,7 @@ func (ap *attestationProducer) ProduceAndCacheAttestationData(baseState *state.C return solid.AttestationData{}, err } } + return solid.NewAttestionDataFromParameters( slot, committeeIndex, @@ -104,6 +106,23 @@ func (ap *attestationProducer) ProduceAndCacheAttestationData(baseState *state.C ), nil } + targetEpoch := slot / ap.beaconCfg.SlotsPerEpoch + epochStartTargetSlot := targetEpoch * ap.beaconCfg.SlotsPerEpoch + var targetRoot libcommon.Hash + + if epochStartTargetSlot == baseState.Slot() { + targetRoot = baseStateBlockRoot + } else { + targetRoot, err = baseState.GetBlockRootAtSlot(epochStartTargetSlot) + if err != nil { + return solid.AttestationData{}, err + } + if targetRoot == (libcommon.Hash{}) { + // if the target root is not found, we can't generate the attestation + return solid.AttestationData{}, ErrHeadStateBehind + } + } + stateEpoch := state.Epoch(baseState) if baseState.Slot() > slot { return solid.AttestationData{}, errors.New("head state slot is bigger than requested slot, the attestation should have been cached, try again later") @@ -123,21 +142,6 @@ func (ap *attestationProducer) ProduceAndCacheAttestationData(baseState *state.C } } - targetEpoch := state.Epoch(baseState) - epochStartTargetSlot := targetEpoch * ap.beaconCfg.SlotsPerEpoch - var targetRoot libcommon.Hash - if epochStartTargetSlot == baseState.Slot() { - targetRoot = baseStateBlockRoot - } else { - targetRoot, err = baseState.GetBlockRootAtSlot(epochStartTargetSlot) - if err != nil { - return solid.AttestationData{}, err - } - if targetRoot == (libcommon.Hash{}) { - targetRoot = baseStateBlockRoot - } - } - baseAttestationData := solid.NewAttestionDataFromParameters( 0, // slot will be filled in later 0, // committee index will be filled in later diff --git a/cl/validator/committee_subscription/committee_subscription.go b/cl/validator/committee_subscription/committee_subscription.go index 31fe654d58d..5b4a9134443 100644 --- a/cl/validator/committee_subscription/committee_subscription.go +++ b/cl/validator/committee_subscription/committee_subscription.go @@ -159,7 +159,7 @@ func (c *CommitteeSubscribeMgmt) sweepByStaleSlots(ctx context.Context) { return curSlot-targetSlot > c.netConfig.AttestationPropagationSlotRange } // sweep every minute - ticker := time.NewTicker(time.Minute) + ticker := time.NewTicker(time.Duration(c.beaconConfig.SecondsPerSlot) * time.Second) defer ticker.Stop() for { select { @@ -173,6 +173,10 @@ func (c *CommitteeSubscribeMgmt) sweepByStaleSlots(ctx context.Context) { if slotIsStale(curSlot, sub.latestTargetSlot) { toRemoves = append(toRemoves, committeeIdx) } + // try remove aggregator flag to avoid unnecessary aggregation + if curSlot > sub.latestTargetSlot { + sub.aggregate = false + } } for _, idx := range toRemoves { delete(c.validatorSubs, idx)