Skip to content

Commit

Permalink
optimise attestation signature verification (#11940)
Browse files Browse the repository at this point in the history
Using signature aggregate validation now for attestation service.

pprof BEFORE the changes:
<img width="1728" alt="Screenshot 2024-09-11 at 00 18 28"
src="https://github.com/user-attachments/assets/bfd2b446-9394-482f-9fbd-da590c824ab1">

pprof AFTER the changes:
<img width="1728" alt="Screenshot 2024-09-10 at 23 27 51"
src="https://github.com/user-attachments/assets/d7f65d80-d90c-450a-9a24-3302ddae8b24">

It shows to speed up the process more than 10 times. Also note that in
the tests I was accumulating ALL the attestation regardless having a
validator/aggregator for that specific slot/committee. But we can safely
say that it would have the same speeding up effect in other cases as
well.

---------

Co-authored-by: shota.silagadze <[email protected]>
  • Loading branch information
shotasilagadze and shotasilagadzetaal authored Sep 12, 2024
1 parent 6ac7005 commit be2b431
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 74 deletions.
32 changes: 16 additions & 16 deletions cl/beacon/handler/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,29 +96,29 @@ func (a *ApiHandler) PostEthV1BeaconPoolAttestations(w http.ResponseWriter, r *h
subnet = subnets.ComputeSubnetForAttestation(committeeCountPerSlot, slot, cIndex, a.beaconChainCfg.SlotsPerEpoch, a.netConfig.AttestationSubnetCount)
)
_ = i
if err := a.attestationService.ProcessMessage(r.Context(), &subnet, attestation); err != nil {

encodedSSZ, err := attestation.EncodeSSZ(nil)
if err != nil {
beaconhttp.NewEndpointError(http.StatusInternalServerError, err).WriteTo(w)
return
}
attestationWithGossipData := &services.AttestationWithGossipData{
Attestation: attestation,
GossipData: &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNamePrefixBeaconAttestation,
SubnetId: &subnet,
},
}

if err := a.attestationService.ProcessMessage(r.Context(), &subnet, attestationWithGossipData); err != nil {
log.Warn("[Beacon REST] failed to process attestation in attestation service", "err", err)
failures = append(failures, poolingFailure{
Index: i,
Message: err.Error(),
})
continue
}
if a.sentinel != nil {
encodedSSZ, err := attestation.EncodeSSZ(nil)
if err != nil {
beaconhttp.NewEndpointError(http.StatusInternalServerError, err).WriteTo(w)
return
}
if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{
Data: encodedSSZ,
Name: gossip.TopicNamePrefixBeaconAttestation,
SubnetId: &subnet,
}); err != nil {
beaconhttp.NewEndpointError(http.StatusInternalServerError, err).WriteTo(w)
return
}
}
}
if len(failures) > 0 {
errResp := poolingError{
Expand Down
14 changes: 10 additions & 4 deletions cl/phase1/network/gossip_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,13 +228,19 @@ func (g *GossipManager) routeAndProcess(ctx context.Context, data *sentinel.Goss
}
return g.syncCommitteeMessagesService.ProcessMessage(ctx, data.SubnetId, msg)
case gossip.IsTopicBeaconAttestation(data.Name):
att := &solid.Attestation{}
if err := att.DecodeSSZ(data.Data, int(version)); err != nil {
obj := &services.AttestationWithGossipData{
GossipData: data,
Attestation: &solid.Attestation{},
}

if err := obj.Attestation.DecodeSSZ(data.Data, int(version)); err != nil {
return err
}
if g.committeeSub.NeedToAggregate(att.AttestantionData().CommitteeIndex()) {
return g.attestationService.ProcessMessage(ctx, data.SubnetId, att)

if g.committeeSub.NeedToAggregate(obj.Attestation.AttestantionData().CommitteeIndex()) {
return g.attestationService.ProcessMessage(ctx, data.SubnetId, obj)
}

return nil
default:
return fmt.Errorf("unknown topic %s", data.Name)
Expand Down
190 changes: 160 additions & 30 deletions cl/phase1/network/services/attestation_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/Giulio2002/bls"
sentinel "github.com/erigontech/erigon-lib/gointerfaces/sentinelproto"
"github.com/erigontech/erigon-lib/log/v3"
"github.com/erigontech/erigon/cl/aggregation"
"github.com/erigontech/erigon/cl/beacon/beaconevents"
Expand All @@ -37,30 +38,40 @@ import (
"github.com/erigontech/erigon/cl/utils"
"github.com/erigontech/erigon/cl/utils/eth_clock"
"github.com/erigontech/erigon/cl/validator/committee_subscription"
"github.com/erigontech/erigon/common"
)

var (
computeSubnetForAttestation = subnets.ComputeSubnetForAttestation
computeCommitteeCountPerSlot = subnets.ComputeCommitteeCountPerSlot
computeSigningRoot = fork.ComputeSigningRoot
blsVerify = bls.Verify
blsVerifyMultipleSignatures = bls.VerifyMultipleSignatures
batchCheckInterval = 50 * time.Millisecond
)

type attestationService struct {
ctx context.Context
forkchoiceStore forkchoice.ForkChoiceStorage
committeeSubscribe committee_subscription.CommitteeSubscribe
ethClock eth_clock.EthereumClock
syncedDataManager synced_data.SyncedData
beaconCfg *clparams.BeaconChainConfig
netCfg *clparams.NetworkConfig
emitters *beaconevents.EventEmitter
sentinel sentinel.SentinelClient
verifyAndExecute chan *AggregateVerificationData
// validatorAttestationSeen maps from epoch to validator index. This is used to ignore duplicate validator attestations in the same epoch.
validatorAttestationSeen *lru.CacheWithTTL[uint64, uint64] // validator index -> epoch
attestationProcessed *lru.CacheWithTTL[[32]byte, struct{}]
attestationsToBeLaterProcessed sync.Map
}

// AttestationWithGossipData type represents attestation with the gossip data where it's coming from.
type AttestationWithGossipData struct {
Attestation *solid.Attestation
GossipData *sentinel.GossipData
}

func NewAttestationService(
ctx context.Context,
forkchoiceStore forkchoice.ForkChoiceStorage,
Expand All @@ -70,36 +81,144 @@ func NewAttestationService(
beaconCfg *clparams.BeaconChainConfig,
netCfg *clparams.NetworkConfig,
emitters *beaconevents.EventEmitter,
sentinel sentinel.SentinelClient,
) AttestationService {
epochDuration := time.Duration(beaconCfg.SlotsPerEpoch*beaconCfg.SecondsPerSlot) * time.Second
a := &attestationService{
ctx: ctx,
forkchoiceStore: forkchoiceStore,
committeeSubscribe: committeeSubscribe,
ethClock: ethClock,
syncedDataManager: syncedDataManager,
beaconCfg: beaconCfg,
netCfg: netCfg,
emitters: emitters,
sentinel: sentinel,
verifyAndExecute: make(chan *AggregateVerificationData, 128),
validatorAttestationSeen: lru.NewWithTTL[uint64, uint64]("validator_attestation_seen", validatorAttestationCacheSize, epochDuration),
attestationProcessed: lru.NewWithTTL[[32]byte, struct{}]("attestation_processed", validatorAttestationCacheSize, epochDuration),
}

go a.startAttestationBatchSignatureVerification()
go a.loop(ctx)
return a
}

func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64, att *solid.Attestation) error {
// When receiving AttestationWithGossipData, we simply collect all the signature verification data
// and verify them together - running all the final functions afterwards
func (a *attestationService) startAttestationBatchSignatureVerification() {
ticker := time.NewTicker(batchCheckInterval)
aggregateVerificationData := make([]*AggregateVerificationData, 0, 128)
for {
select {
case verification := <-a.verifyAndExecute:
aggregateVerificationData = append(aggregateVerificationData, verification)
if len(aggregateVerificationData)*3 > BatchSignatureVerificationThreshold {
a.processSignatureVerification(aggregateVerificationData)
aggregateVerificationData = make([]*AggregateVerificationData, 0, 128)
ticker.Reset(batchCheckInterval)
}
case <-ticker.C:
if len(aggregateVerificationData) != 0 {
a.processSignatureVerification(aggregateVerificationData)
aggregateVerificationData = make([]*AggregateVerificationData, 0, 128)
ticker.Reset(batchCheckInterval)
}
}
}
}

// processSignatureVerification Runs signature verification for all the signatures altogether, if it
// succeeds we publish all accumulated gossip data. If verification fails, start verifying each AggregateVerificationData one by
// one, publish corresponding gossip data if verification succeeds, if not ban the corresponding peer that sent it.
func (a *attestationService) processSignatureVerification(aggregateVerificationData []*AggregateVerificationData) {
signatures, signRoots, pks, fns :=
make([][]byte, 0, 128),
make([][]byte, 0, 128),
make([][]byte, 0, 128),
make([]func(), 0, 64)

for _, v := range aggregateVerificationData {
signatures, signRoots, pks, fns =
append(signatures, v.Signatures...),
append(signRoots, v.SignRoots...),
append(pks, v.Pks...),
append(fns, v.F)
}
if err := a.runBatchVerification(signatures, signRoots, pks, fns); err != nil {
a.handleIncorrectSignatures(aggregateVerificationData)
log.Warn(err.Error())
return
}

// Everything went well, run corresponding Fs and send all the gossip data to the network
for _, v := range aggregateVerificationData {
v.F()
if a.sentinel != nil && v.GossipData != nil {
if _, err := a.sentinel.PublishGossip(a.ctx, v.GossipData); err != nil {
log.Warn("failed publish gossip", "err", err)
}
}
}
}

// we could locate failing signature with binary search but for now let's choose simplicity over optimisation.
func (a *attestationService) handleIncorrectSignatures(aggregateVerificationData []*AggregateVerificationData) {
for _, v := range aggregateVerificationData {
valid, err := blsVerifyMultipleSignatures(v.Signatures, v.SignRoots, v.Pks)
if err != nil {
log.Warn("attestation_service signature verification failed with the error: " + err.Error())
if a.sentinel != nil && v.GossipData != nil && v.GossipData.Peer != nil {
a.sentinel.BanPeer(a.ctx, v.GossipData.Peer)
}
continue
}

if !valid {
log.Warn("attestation_service signature verification failed")
if a.sentinel != nil && v.GossipData != nil && v.GossipData.Peer != nil {
a.sentinel.BanPeer(a.ctx, v.GossipData.Peer)
}
continue
}

// run corresponding function and publish the gossip into the network
v.F()

if a.sentinel != nil && v.GossipData != nil {
if _, err := a.sentinel.PublishGossip(a.ctx, v.GossipData); err != nil {
log.Warn("failed publish gossip", "err", err)
}
}
}
}

func (a *attestationService) runBatchVerification(signatures [][]byte, signRoots [][]byte, pks [][]byte, fns []func()) error {
valid, err := blsVerifyMultipleSignatures(signatures, signRoots, pks)
if err != nil {
return errors.New("attestation_service batch signature verification failed with the error: " + err.Error())
}

if !valid {
return errors.New("attestation_service signature verification failed")
}

return nil
}

func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64, att *AttestationWithGossipData) error {
var (
root = att.AttestantionData().BeaconBlockRoot()
slot = att.AttestantionData().Slot()
committeeIndex = att.AttestantionData().CommitteeIndex()
targetEpoch = att.AttestantionData().Target().Epoch()
root = att.Attestation.AttestantionData().BeaconBlockRoot()
slot = att.Attestation.AttestantionData().Slot()
committeeIndex = att.Attestation.AttestantionData().CommitteeIndex()
targetEpoch = att.Attestation.AttestantionData().Target().Epoch()
)
headState := s.syncedDataManager.HeadStateReader()
if headState == nil {
return ErrIgnore
}

key, err := att.HashSSZ()
key, err := att.Attestation.HashSSZ()
if err != nil {
return err
}
Expand Down Expand Up @@ -133,7 +252,7 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,
if err != nil {
return err
}
bits := att.AggregationBits()
bits := att.Attestation.AggregationBits()
expectedAggregationBitsLength := len(beaconCommittee)
actualAggregationBitsLength := utils.GetBitlistLength(bits)
if actualAggregationBitsLength != expectedAggregationBitsLength {
Expand Down Expand Up @@ -177,7 +296,7 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,
s.validatorAttestationSeen.Add(vIndex, targetEpoch)

// [REJECT] The signature of attestation is valid.
signature := att.Signature()
signature := att.Attestation.Signature()
pubKey, err := headState.ValidatorPublicKey(int(beaconCommittee[onBitIndex]))
if err != nil {
return fmt.Errorf("unable to get public key: %v", err)
Expand All @@ -186,16 +305,10 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,
if err != nil {
return fmt.Errorf("unable to get the domain: %v", err)
}
signingRoot, err := computeSigningRoot(att.AttestantionData(), domain)
signingRoot, err := computeSigningRoot(att.Attestation.AttestantionData(), domain)
if err != nil {
return fmt.Errorf("unable to get signing root: %v", err)
}
if valid, err := blsVerify(signature[:], signingRoot[:], pubKey[:]); err != nil {
return err
} else if !valid {
log.Warn("invalid signature", "signature", common.Bytes2Hex(signature[:]), "signningRoot", common.Bytes2Hex(signingRoot[:]), "pubKey", common.Bytes2Hex(pubKey[:]))
return errors.New("invalid signature")
}

// [IGNORE] The block being voted for (attestation.data.beacon_block_root) has been seen (via both gossip and non-gossip sources)
// (a client MAY queue attestations for processing once block is retrieved).
Expand All @@ -207,8 +320,8 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,
// [REJECT] The attestation's target block is an ancestor of the block named in the LMD vote -- i.e.
// get_checkpoint_block(store, attestation.data.beacon_block_root, attestation.data.target.epoch) == attestation.data.target.root
startSlotAtEpoch := targetEpoch * s.beaconCfg.SlotsPerEpoch
if targetBlock := s.forkchoiceStore.Ancestor(root, startSlotAtEpoch); targetBlock != att.AttestantionData().Target().BlockRoot() {
return fmt.Errorf("invalid target block. root %v targetEpoch %v attTargetBlockRoot %v targetBlock %v", root.Hex(), targetEpoch, att.AttestantionData().Target().BlockRoot().Hex(), targetBlock.Hex())
if targetBlock := s.forkchoiceStore.Ancestor(root, startSlotAtEpoch); targetBlock != att.Attestation.AttestantionData().Target().BlockRoot() {
return fmt.Errorf("invalid target block. root %v targetEpoch %v attTargetBlockRoot %v targetBlock %v", root.Hex(), targetEpoch, att.Attestation.AttestantionData().Target().BlockRoot().Hex(), targetBlock.Hex())
}
// [IGNORE] The current finalized_checkpoint is an ancestor of the block defined by attestation.data.beacon_block_root --
// i.e. get_checkpoint_block(store, attestation.data.beacon_block_root, store.finalized_checkpoint.epoch) == store.finalized_checkpoint.root
Expand All @@ -217,25 +330,42 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64,
return fmt.Errorf("invalid finalized checkpoint %w", ErrIgnore)
}

err = s.committeeSubscribe.CheckAggregateAttestation(att)
if errors.Is(err, aggregation.ErrIsSuperset) {
return ErrIgnore
}
if err != nil {
return err
aggregateVerificationData := &AggregateVerificationData{
Signatures: [][]byte{signature[:]},
SignRoots: [][]byte{signingRoot[:]},
Pks: [][]byte{pubKey[:]},
GossipData: att.GossipData,
F: func() {
err = s.committeeSubscribe.CheckAggregateAttestation(att.Attestation)
if errors.Is(err, aggregation.ErrIsSuperset) {
return
}
if err != nil {
log.Warn("could not check aggregate attestation", "err", err)
return
}
s.emitters.Operation().SendAttestation(att.Attestation)
},
}
s.emitters.Operation().SendAttestation(att)
return nil

// push the signatures to verify asynchronously and run final functions after that.
s.verifyAndExecute <- aggregateVerificationData

// As the logic goes, if we return ErrIgnore there will be no peer banning and further publishing
// gossip data into the network by the gossip manager. That's what we want because we will be doing that ourselves
// in startBatchSignatureVerification function. After validating signatures, if they are valid we will publish the
// gossip ourselves or ban the peer which sent that particular invalid signature.
return ErrIgnore
}

type attestationJob struct {
att *solid.Attestation
att *AttestationWithGossipData
creationTime time.Time
subnet uint64
}

func (a *attestationService) scheduleAttestationForLaterProcessing(att *solid.Attestation) {
key, err := att.HashSSZ()
func (a *attestationService) scheduleAttestationForLaterProcessing(att *AttestationWithGossipData) {
key, err := att.Attestation.HashSSZ()
if err != nil {
return
}
Expand Down Expand Up @@ -263,7 +393,7 @@ func (a *attestationService) loop(ctx context.Context) {
return true
}

root := v.att.AttestantionData().BeaconBlockRoot()
root := v.att.Attestation.AttestantionData().BeaconBlockRoot()
if _, ok := a.forkchoiceStore.GetHeader(root); !ok {
return true
}
Expand Down
Loading

0 comments on commit be2b431

Please sign in to comment.