diff --git a/cl/beacon/handler/pool.go b/cl/beacon/handler/pool.go index 1269c888827..b2c769dbe82 100644 --- a/cl/beacon/handler/pool.go +++ b/cl/beacon/handler/pool.go @@ -96,7 +96,22 @@ func (a *ApiHandler) PostEthV1BeaconPoolAttestations(w http.ResponseWriter, r *h subnet = subnets.ComputeSubnetForAttestation(committeeCountPerSlot, slot, cIndex, a.beaconChainCfg.SlotsPerEpoch, a.netConfig.AttestationSubnetCount) ) _ = i - if err := a.attestationService.ProcessMessage(r.Context(), &subnet, attestation); err != nil { + + encodedSSZ, err := attestation.EncodeSSZ(nil) + if err != nil { + beaconhttp.NewEndpointError(http.StatusInternalServerError, err).WriteTo(w) + return + } + attestationWithGossipData := &services.AttestationWithGossipData{ + Attestation: attestation, + GossipData: &sentinel.GossipData{ + Data: encodedSSZ, + Name: gossip.TopicNamePrefixBeaconAttestation, + SubnetId: &subnet, + }, + } + + if err := a.attestationService.ProcessMessage(r.Context(), &subnet, attestationWithGossipData); err != nil { log.Warn("[Beacon REST] failed to process attestation in attestation service", "err", err) failures = append(failures, poolingFailure{ Index: i, @@ -104,21 +119,6 @@ func (a *ApiHandler) PostEthV1BeaconPoolAttestations(w http.ResponseWriter, r *h }) continue } - if a.sentinel != nil { - encodedSSZ, err := attestation.EncodeSSZ(nil) - if err != nil { - beaconhttp.NewEndpointError(http.StatusInternalServerError, err).WriteTo(w) - return - } - if _, err := a.sentinel.PublishGossip(r.Context(), &sentinel.GossipData{ - Data: encodedSSZ, - Name: gossip.TopicNamePrefixBeaconAttestation, - SubnetId: &subnet, - }); err != nil { - beaconhttp.NewEndpointError(http.StatusInternalServerError, err).WriteTo(w) - return - } - } } if len(failures) > 0 { errResp := poolingError{ diff --git a/cl/phase1/network/gossip_manager.go b/cl/phase1/network/gossip_manager.go index 113a9ac9f7b..d4a6f006553 100644 --- a/cl/phase1/network/gossip_manager.go +++ b/cl/phase1/network/gossip_manager.go @@ -228,13 +228,19 @@ func (g *GossipManager) routeAndProcess(ctx context.Context, data *sentinel.Goss } return g.syncCommitteeMessagesService.ProcessMessage(ctx, data.SubnetId, msg) case gossip.IsTopicBeaconAttestation(data.Name): - att := &solid.Attestation{} - if err := att.DecodeSSZ(data.Data, int(version)); err != nil { + obj := &services.AttestationWithGossipData{ + GossipData: data, + Attestation: &solid.Attestation{}, + } + + if err := obj.Attestation.DecodeSSZ(data.Data, int(version)); err != nil { return err } - if g.committeeSub.NeedToAggregate(att.AttestantionData().CommitteeIndex()) { - return g.attestationService.ProcessMessage(ctx, data.SubnetId, att) + + if g.committeeSub.NeedToAggregate(obj.Attestation.AttestantionData().CommitteeIndex()) { + return g.attestationService.ProcessMessage(ctx, data.SubnetId, obj) } + return nil default: return fmt.Errorf("unknown topic %s", data.Name) diff --git a/cl/phase1/network/services/attestation_service.go b/cl/phase1/network/services/attestation_service.go index 4b7a848c25b..dcd7218e35a 100644 --- a/cl/phase1/network/services/attestation_service.go +++ b/cl/phase1/network/services/attestation_service.go @@ -24,6 +24,7 @@ import ( "time" "github.com/Giulio2002/bls" + sentinel "github.com/erigontech/erigon-lib/gointerfaces/sentinelproto" "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon/cl/aggregation" "github.com/erigontech/erigon/cl/beacon/beaconevents" @@ -37,7 +38,6 @@ import ( "github.com/erigontech/erigon/cl/utils" "github.com/erigontech/erigon/cl/utils/eth_clock" "github.com/erigontech/erigon/cl/validator/committee_subscription" - "github.com/erigontech/erigon/common" ) var ( @@ -45,9 +45,12 @@ var ( computeCommitteeCountPerSlot = subnets.ComputeCommitteeCountPerSlot computeSigningRoot = fork.ComputeSigningRoot blsVerify = bls.Verify + blsVerifyMultipleSignatures = bls.VerifyMultipleSignatures + batchCheckInterval = 50 * time.Millisecond ) type attestationService struct { + ctx context.Context forkchoiceStore forkchoice.ForkChoiceStorage committeeSubscribe committee_subscription.CommitteeSubscribe ethClock eth_clock.EthereumClock @@ -55,12 +58,20 @@ type attestationService struct { beaconCfg *clparams.BeaconChainConfig netCfg *clparams.NetworkConfig emitters *beaconevents.EventEmitter + sentinel sentinel.SentinelClient + verifyAndExecute chan *AggregateVerificationData // validatorAttestationSeen maps from epoch to validator index. This is used to ignore duplicate validator attestations in the same epoch. validatorAttestationSeen *lru.CacheWithTTL[uint64, uint64] // validator index -> epoch attestationProcessed *lru.CacheWithTTL[[32]byte, struct{}] attestationsToBeLaterProcessed sync.Map } +// AttestationWithGossipData type represents attestation with the gossip data where it's coming from. +type AttestationWithGossipData struct { + Attestation *solid.Attestation + GossipData *sentinel.GossipData +} + func NewAttestationService( ctx context.Context, forkchoiceStore forkchoice.ForkChoiceStorage, @@ -70,9 +81,11 @@ func NewAttestationService( beaconCfg *clparams.BeaconChainConfig, netCfg *clparams.NetworkConfig, emitters *beaconevents.EventEmitter, + sentinel sentinel.SentinelClient, ) AttestationService { epochDuration := time.Duration(beaconCfg.SlotsPerEpoch*beaconCfg.SecondsPerSlot) * time.Second a := &attestationService{ + ctx: ctx, forkchoiceStore: forkchoiceStore, committeeSubscribe: committeeSubscribe, ethClock: ethClock, @@ -80,26 +93,132 @@ func NewAttestationService( beaconCfg: beaconCfg, netCfg: netCfg, emitters: emitters, + sentinel: sentinel, + verifyAndExecute: make(chan *AggregateVerificationData, 128), validatorAttestationSeen: lru.NewWithTTL[uint64, uint64]("validator_attestation_seen", validatorAttestationCacheSize, epochDuration), attestationProcessed: lru.NewWithTTL[[32]byte, struct{}]("attestation_processed", validatorAttestationCacheSize, epochDuration), } + + go a.startAttestationBatchSignatureVerification() go a.loop(ctx) return a } -func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64, att *solid.Attestation) error { +// When receiving AttestationWithGossipData, we simply collect all the signature verification data +// and verify them together - running all the final functions afterwards +func (a *attestationService) startAttestationBatchSignatureVerification() { + ticker := time.NewTicker(batchCheckInterval) + aggregateVerificationData := make([]*AggregateVerificationData, 0, 128) + for { + select { + case verification := <-a.verifyAndExecute: + aggregateVerificationData = append(aggregateVerificationData, verification) + if len(aggregateVerificationData)*3 > BatchSignatureVerificationThreshold { + a.processSignatureVerification(aggregateVerificationData) + aggregateVerificationData = make([]*AggregateVerificationData, 0, 128) + ticker.Reset(batchCheckInterval) + } + case <-ticker.C: + if len(aggregateVerificationData) != 0 { + a.processSignatureVerification(aggregateVerificationData) + aggregateVerificationData = make([]*AggregateVerificationData, 0, 128) + ticker.Reset(batchCheckInterval) + } + } + } +} + +// processSignatureVerification Runs signature verification for all the signatures altogether, if it +// succeeds we publish all accumulated gossip data. If verification fails, start verifying each AggregateVerificationData one by +// one, publish corresponding gossip data if verification succeeds, if not ban the corresponding peer that sent it. +func (a *attestationService) processSignatureVerification(aggregateVerificationData []*AggregateVerificationData) { + signatures, signRoots, pks, fns := + make([][]byte, 0, 128), + make([][]byte, 0, 128), + make([][]byte, 0, 128), + make([]func(), 0, 64) + + for _, v := range aggregateVerificationData { + signatures, signRoots, pks, fns = + append(signatures, v.Signatures...), + append(signRoots, v.SignRoots...), + append(pks, v.Pks...), + append(fns, v.F) + } + if err := a.runBatchVerification(signatures, signRoots, pks, fns); err != nil { + a.handleIncorrectSignatures(aggregateVerificationData) + log.Warn(err.Error()) + return + } + + // Everything went well, run corresponding Fs and send all the gossip data to the network + for _, v := range aggregateVerificationData { + v.F() + if a.sentinel != nil && v.GossipData != nil { + if _, err := a.sentinel.PublishGossip(a.ctx, v.GossipData); err != nil { + log.Warn("failed publish gossip", "err", err) + } + } + } +} + +// we could locate failing signature with binary search but for now let's choose simplicity over optimisation. +func (a *attestationService) handleIncorrectSignatures(aggregateVerificationData []*AggregateVerificationData) { + for _, v := range aggregateVerificationData { + valid, err := blsVerifyMultipleSignatures(v.Signatures, v.SignRoots, v.Pks) + if err != nil { + log.Warn("attestation_service signature verification failed with the error: " + err.Error()) + if a.sentinel != nil && v.GossipData != nil && v.GossipData.Peer != nil { + a.sentinel.BanPeer(a.ctx, v.GossipData.Peer) + } + continue + } + + if !valid { + log.Warn("attestation_service signature verification failed") + if a.sentinel != nil && v.GossipData != nil && v.GossipData.Peer != nil { + a.sentinel.BanPeer(a.ctx, v.GossipData.Peer) + } + continue + } + + // run corresponding function and publish the gossip into the network + v.F() + + if a.sentinel != nil && v.GossipData != nil { + if _, err := a.sentinel.PublishGossip(a.ctx, v.GossipData); err != nil { + log.Warn("failed publish gossip", "err", err) + } + } + } +} + +func (a *attestationService) runBatchVerification(signatures [][]byte, signRoots [][]byte, pks [][]byte, fns []func()) error { + valid, err := blsVerifyMultipleSignatures(signatures, signRoots, pks) + if err != nil { + return errors.New("attestation_service batch signature verification failed with the error: " + err.Error()) + } + + if !valid { + return errors.New("attestation_service signature verification failed") + } + + return nil +} + +func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64, att *AttestationWithGossipData) error { var ( - root = att.AttestantionData().BeaconBlockRoot() - slot = att.AttestantionData().Slot() - committeeIndex = att.AttestantionData().CommitteeIndex() - targetEpoch = att.AttestantionData().Target().Epoch() + root = att.Attestation.AttestantionData().BeaconBlockRoot() + slot = att.Attestation.AttestantionData().Slot() + committeeIndex = att.Attestation.AttestantionData().CommitteeIndex() + targetEpoch = att.Attestation.AttestantionData().Target().Epoch() ) headState := s.syncedDataManager.HeadStateReader() if headState == nil { return ErrIgnore } - key, err := att.HashSSZ() + key, err := att.Attestation.HashSSZ() if err != nil { return err } @@ -133,7 +252,7 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64, if err != nil { return err } - bits := att.AggregationBits() + bits := att.Attestation.AggregationBits() expectedAggregationBitsLength := len(beaconCommittee) actualAggregationBitsLength := utils.GetBitlistLength(bits) if actualAggregationBitsLength != expectedAggregationBitsLength { @@ -177,7 +296,7 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64, s.validatorAttestationSeen.Add(vIndex, targetEpoch) // [REJECT] The signature of attestation is valid. - signature := att.Signature() + signature := att.Attestation.Signature() pubKey, err := headState.ValidatorPublicKey(int(beaconCommittee[onBitIndex])) if err != nil { return fmt.Errorf("unable to get public key: %v", err) @@ -186,16 +305,10 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64, if err != nil { return fmt.Errorf("unable to get the domain: %v", err) } - signingRoot, err := computeSigningRoot(att.AttestantionData(), domain) + signingRoot, err := computeSigningRoot(att.Attestation.AttestantionData(), domain) if err != nil { return fmt.Errorf("unable to get signing root: %v", err) } - if valid, err := blsVerify(signature[:], signingRoot[:], pubKey[:]); err != nil { - return err - } else if !valid { - log.Warn("invalid signature", "signature", common.Bytes2Hex(signature[:]), "signningRoot", common.Bytes2Hex(signingRoot[:]), "pubKey", common.Bytes2Hex(pubKey[:])) - return errors.New("invalid signature") - } // [IGNORE] The block being voted for (attestation.data.beacon_block_root) has been seen (via both gossip and non-gossip sources) // (a client MAY queue attestations for processing once block is retrieved). @@ -207,8 +320,8 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64, // [REJECT] The attestation's target block is an ancestor of the block named in the LMD vote -- i.e. // get_checkpoint_block(store, attestation.data.beacon_block_root, attestation.data.target.epoch) == attestation.data.target.root startSlotAtEpoch := targetEpoch * s.beaconCfg.SlotsPerEpoch - if targetBlock := s.forkchoiceStore.Ancestor(root, startSlotAtEpoch); targetBlock != att.AttestantionData().Target().BlockRoot() { - return fmt.Errorf("invalid target block. root %v targetEpoch %v attTargetBlockRoot %v targetBlock %v", root.Hex(), targetEpoch, att.AttestantionData().Target().BlockRoot().Hex(), targetBlock.Hex()) + if targetBlock := s.forkchoiceStore.Ancestor(root, startSlotAtEpoch); targetBlock != att.Attestation.AttestantionData().Target().BlockRoot() { + return fmt.Errorf("invalid target block. root %v targetEpoch %v attTargetBlockRoot %v targetBlock %v", root.Hex(), targetEpoch, att.Attestation.AttestantionData().Target().BlockRoot().Hex(), targetBlock.Hex()) } // [IGNORE] The current finalized_checkpoint is an ancestor of the block defined by attestation.data.beacon_block_root -- // i.e. get_checkpoint_block(store, attestation.data.beacon_block_root, store.finalized_checkpoint.epoch) == store.finalized_checkpoint.root @@ -217,25 +330,42 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64, return fmt.Errorf("invalid finalized checkpoint %w", ErrIgnore) } - err = s.committeeSubscribe.CheckAggregateAttestation(att) - if errors.Is(err, aggregation.ErrIsSuperset) { - return ErrIgnore - } - if err != nil { - return err + aggregateVerificationData := &AggregateVerificationData{ + Signatures: [][]byte{signature[:]}, + SignRoots: [][]byte{signingRoot[:]}, + Pks: [][]byte{pubKey[:]}, + GossipData: att.GossipData, + F: func() { + err = s.committeeSubscribe.CheckAggregateAttestation(att.Attestation) + if errors.Is(err, aggregation.ErrIsSuperset) { + return + } + if err != nil { + log.Warn("could not check aggregate attestation", "err", err) + return + } + s.emitters.Operation().SendAttestation(att.Attestation) + }, } - s.emitters.Operation().SendAttestation(att) - return nil + + // push the signatures to verify asynchronously and run final functions after that. + s.verifyAndExecute <- aggregateVerificationData + + // As the logic goes, if we return ErrIgnore there will be no peer banning and further publishing + // gossip data into the network by the gossip manager. That's what we want because we will be doing that ourselves + // in startBatchSignatureVerification function. After validating signatures, if they are valid we will publish the + // gossip ourselves or ban the peer which sent that particular invalid signature. + return ErrIgnore } type attestationJob struct { - att *solid.Attestation + att *AttestationWithGossipData creationTime time.Time subnet uint64 } -func (a *attestationService) scheduleAttestationForLaterProcessing(att *solid.Attestation) { - key, err := att.HashSSZ() +func (a *attestationService) scheduleAttestationForLaterProcessing(att *AttestationWithGossipData) { + key, err := att.Attestation.HashSSZ() if err != nil { return } @@ -263,7 +393,7 @@ func (a *attestationService) loop(ctx context.Context) { return true } - root := v.att.AttestantionData().BeaconBlockRoot() + root := v.att.Attestation.AttestantionData().BeaconBlockRoot() if _, ok := a.forkchoiceStore.GetHeader(root); !ok { return true } diff --git a/cl/phase1/network/services/attestation_service_test.go b/cl/phase1/network/services/attestation_service_test.go index fc169a15c1e..912994afb76 100644 --- a/cl/phase1/network/services/attestation_service_test.go +++ b/cl/phase1/network/services/attestation_service_test.go @@ -20,6 +20,7 @@ import ( "context" "log" "testing" + "time" "github.com/stretchr/testify/suite" "go.uber.org/mock/gomock" @@ -77,9 +78,10 @@ func (t *attestationTestSuite) SetupTest() { emitters := beaconevents.NewEventEmitter() computeSigningRoot = func(obj ssz.HashableSSZ, domain []byte) ([32]byte, error) { return [32]byte{}, nil } blsVerify = func(sig []byte, msg []byte, pubKeys []byte) (bool, error) { return true, nil } + batchCheckInterval = 1 * time.Millisecond ctx, cn := context.WithCancel(context.Background()) cn() - t.attService = NewAttestationService(ctx, t.mockForkChoice, t.committeeSubscibe, t.ethClock, t.syncedData, t.beaconConfig, netConfig, emitters) + t.attService = NewAttestationService(ctx, t.mockForkChoice, t.committeeSubscibe, t.ethClock, t.syncedData, t.beaconConfig, netConfig, emitters, nil) } func (t *attestationTestSuite) TearDownTest() { @@ -93,10 +95,9 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { msg *solid.Attestation } tests := []struct { - name string - mock func() - args args - wantErr bool + name string + mock func() + args args }{ { name: "Test attestation with committee index out of range", @@ -111,7 +112,6 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { subnet: nil, msg: att, }, - wantErr: true, }, { name: "Test attestation with wrong subnet", @@ -129,7 +129,6 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { subnet: uint64Ptr(1), msg: att, }, - wantErr: true, }, { name: "Test attestation with wrong slot (current_slot < slot)", @@ -148,7 +147,6 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { subnet: uint64Ptr(1), msg: att, }, - wantErr: true, }, { name: "Attestation is aggregated", @@ -171,7 +169,6 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { [96]byte{0, 1, 2, 3, 4, 5}, ), }, - wantErr: true, }, { name: "Attestation is empty", @@ -194,7 +191,6 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { [96]byte{0, 1, 2, 3, 4, 5}, ), }, - wantErr: true, }, { name: "invalid signature", @@ -221,7 +217,6 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { subnet: uint64Ptr(1), msg: att, }, - wantErr: true, }, { name: "block header not found", @@ -248,7 +243,6 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { subnet: uint64Ptr(1), msg: att, }, - wantErr: true, }, { name: "invalid target block", @@ -278,7 +272,6 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { subnet: uint64Ptr(1), msg: att, }, - wantErr: true, }, { name: "invalid finality checkpoint", @@ -316,7 +309,6 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { subnet: uint64Ptr(1), msg: att, }, - wantErr: true, }, { name: "success", @@ -337,6 +329,9 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { blsVerify = func(sig []byte, msg []byte, pubKeys []byte) (bool, error) { return true, nil } + blsVerifyMultipleSignatures = func(signatures [][]byte, signRoots [][]byte, pks [][]byte) (bool, error) { + return true, nil + } t.mockForkChoice.Headers = map[common.Hash]*cltypes.BeaconBlockHeader{ att.AttestantionData().BeaconBlockRoot(): {}, } @@ -363,13 +358,9 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { log.Printf("test case: %s", tt.name) t.SetupTest() tt.mock() - err := t.attService.ProcessMessage(tt.args.ctx, tt.args.subnet, tt.args.msg) - if tt.wantErr { - log.Printf("err msg: %v", err) - t.Require().Error(err, err.Error()) - } else { - t.Require().NoError(err) - } + err := t.attService.ProcessMessage(tt.args.ctx, tt.args.subnet, &AttestationWithGossipData{Attestation: tt.args.msg, GossipData: nil}) + time.Sleep(time.Millisecond * 60) + t.Require().Error(err, ErrIgnore) t.True(t.gomockCtrl.Satisfied()) } } diff --git a/cl/phase1/network/services/interface.go b/cl/phase1/network/services/interface.go index 04741c8386b..175babb414f 100644 --- a/cl/phase1/network/services/interface.go +++ b/cl/phase1/network/services/interface.go @@ -20,7 +20,6 @@ import ( "context" "github.com/erigontech/erigon/cl/cltypes" - "github.com/erigontech/erigon/cl/cltypes/solid" ) // Note: BlobSidecarService and BlockService are tested in spectests @@ -45,7 +44,7 @@ type SyncContributionService Service[*cltypes.SignedContributionAndProof] type AggregateAndProofService Service[*cltypes.SignedAggregateAndProofData] //go:generate mockgen -typed=true -destination=./mock_services/attestation_service_mock.go -package=mock_services . AttestationService -type AttestationService Service[*solid.Attestation] +type AttestationService Service[*AttestationWithGossipData] //go:generate mockgen -typed=true -destination=./mock_services/voluntary_exit_service_mock.go -package=mock_services . VoluntaryExitService type VoluntaryExitService Service[*cltypes.SignedVoluntaryExit] diff --git a/cmd/caplin/caplin1/run.go b/cmd/caplin/caplin1/run.go index 3d5e340b922..a0464c09acf 100644 --- a/cmd/caplin/caplin1/run.go +++ b/cmd/caplin/caplin1/run.go @@ -312,7 +312,7 @@ func RunCaplinService(ctx context.Context, engine execution_client.ExecutionEngi blockService := services.NewBlockService(ctx, indexDB, forkChoice, syncedDataManager, ethClock, beaconConfig, emitters) blobService := services.NewBlobSidecarService(ctx, beaconConfig, forkChoice, syncedDataManager, ethClock, emitters, false) syncCommitteeMessagesService := services.NewSyncCommitteeMessagesService(beaconConfig, ethClock, syncedDataManager, syncContributionPool, false) - attestationService := services.NewAttestationService(ctx, forkChoice, committeeSub, ethClock, syncedDataManager, beaconConfig, networkConfig, emitters) + attestationService := services.NewAttestationService(ctx, forkChoice, committeeSub, ethClock, syncedDataManager, beaconConfig, networkConfig, emitters, sentinel) syncContributionService := services.NewSyncContributionService(syncedDataManager, beaconConfig, syncContributionPool, ethClock, emitters, false) aggregateAndProofService := services.NewAggregateAndProofService(ctx, syncedDataManager, forkChoice, beaconConfig, pool, sentinel, false) voluntaryExitService := services.NewVoluntaryExitService(pool, emitters, syncedDataManager, beaconConfig, ethClock)