diff --git a/cl/phase1/network/gossip_manager.go b/cl/phase1/network/gossip_manager.go index 9eaeca70547..8a1ef9f97c0 100644 --- a/cl/phase1/network/gossip_manager.go +++ b/cl/phase1/network/gossip_manager.go @@ -236,7 +236,12 @@ func (g *GossipManager) routeAndProcess(ctx context.Context, data *sentinel.Goss if err := obj.Attestation.DecodeSSZ(data.Data, int(version)); err != nil { return err } - return g.attestationService.ProcessMessage(ctx, data.SubnetId, obj) + + if g.committeeSub.NeedToAggregate(obj.Attestation.AttestantionData().CommitteeIndex()) { + return g.attestationService.ProcessMessage(ctx, data.SubnetId, obj) + } + + return nil default: return fmt.Errorf("unknown topic %s", data.Name) } diff --git a/cl/phase1/network/services/attestation_service.go b/cl/phase1/network/services/attestation_service.go index 1e2fdc9bb70..12d70dd678b 100644 --- a/cl/phase1/network/services/attestation_service.go +++ b/cl/phase1/network/services/attestation_service.go @@ -45,6 +45,8 @@ var ( computeCommitteeCountPerSlot = subnets.ComputeCommitteeCountPerSlot computeSigningRoot = fork.ComputeSigningRoot blsVerify = bls.Verify + blsVerifyMultipleSignatures = bls.VerifyMultipleSignatures + batchCheckInterval = 3 * time.Second ) type attestationService struct { @@ -99,7 +101,6 @@ func NewAttestationService( // When receiving AttestationWithGossipData, we simply collect all the signature verification data // and verify them together - running all the final functions afterwards func (a *attestationService) startAttestationBatchSignatureVerification() { - batchCheckInterval := 3 * time.Second ticker := time.NewTicker(batchCheckInterval) aggregateVerificationData := make([]*AggregateVerificationData, 0, 128) for { @@ -148,7 +149,7 @@ func (a *attestationService) processSignatureVerification(aggregateVerificationD // Everything went well, run corresponding Fs and send all the gossip data to the network for _, v := range aggregateVerificationData { v.F() - if v.GossipData != nil { + if a.sentinel != nil && v.GossipData != nil { if _, err := a.sentinel.PublishGossip(a.ctx, v.GossipData); err != nil { log.Warn("failed publish gossip", "err", err) } @@ -159,10 +160,10 @@ func (a *attestationService) processSignatureVerification(aggregateVerificationD // we could locate failing signature with binary search but for now let's choose simplicity over optimisation. func (a *attestationService) handleIncorrectSignatures(aggregateVerificationData []*AggregateVerificationData) { for _, v := range aggregateVerificationData { - valid, err := bls.VerifyMultipleSignatures(v.Signatures, v.SignRoots, v.Pks) + valid, err := blsVerifyMultipleSignatures(v.Signatures, v.SignRoots, v.Pks) if err != nil { log.Warn("attestation_service signature verification failed with the error: " + err.Error()) - if v.GossipData != nil && v.GossipData.Peer != nil { + if a.sentinel != nil && v.GossipData != nil && v.GossipData.Peer != nil { a.sentinel.BanPeer(a.ctx, v.GossipData.Peer) } continue @@ -170,7 +171,7 @@ func (a *attestationService) handleIncorrectSignatures(aggregateVerificationData if !valid { log.Warn("attestation_service signature verification failed") - if v.GossipData != nil && v.GossipData.Peer != nil { + if a.sentinel != nil && v.GossipData != nil && v.GossipData.Peer != nil { a.sentinel.BanPeer(a.ctx, v.GossipData.Peer) } continue @@ -179,7 +180,7 @@ func (a *attestationService) handleIncorrectSignatures(aggregateVerificationData // run corresponding function and publish the gossip into the network v.F() - if v.GossipData != nil { + if a.sentinel != nil && v.GossipData != nil { if _, err := a.sentinel.PublishGossip(a.ctx, v.GossipData); err != nil { log.Warn("failed publish gossip", "err", err) } @@ -188,9 +189,9 @@ func (a *attestationService) handleIncorrectSignatures(aggregateVerificationData } func (a *attestationService) runBatchVerification(signatures [][]byte, signRoots [][]byte, pks [][]byte, fns []func()) error { - valid, err := bls.VerifyMultipleSignatures(signatures, signRoots, pks) + valid, err := blsVerifyMultipleSignatures(signatures, signRoots, pks) if err != nil { - return errors.New("attestation_service signature verification failed with the error: " + err.Error()) + return errors.New("attestation_service batch signature verification failed with the error: " + err.Error()) } if !valid { @@ -330,7 +331,7 @@ func (s *attestationService) ProcessMessage(ctx context.Context, subnet *uint64, // get_checkpoint_block(store, attestation.data.beacon_block_root, attestation.data.target.epoch) == attestation.data.target.root startSlotAtEpoch := targetEpoch * s.beaconCfg.SlotsPerEpoch if targetBlock := s.forkchoiceStore.Ancestor(root, startSlotAtEpoch); targetBlock != att.Attestation.AttestantionData().Target().BlockRoot() { - log.Warn(fmt.Sprint("invalid target block. root %v targetEpoch %v attTargetBlockRoot %v targetBlock %v", root.Hex(), targetEpoch, att.Attestation.AttestantionData().Target().BlockRoot().Hex(), targetBlock.Hex())) + log.Warn("invalid target block. root ", root.Hex(), " targetEpoch ", targetEpoch, " attTargetBlockRoot ", att.Attestation.AttestantionData().Target().BlockRoot().Hex(), " targetBlock ", targetBlock.Hex()) return } // [IGNORE] The current finalized_checkpoint is an ancestor of the block defined by attestation.data.beacon_block_root -- diff --git a/cl/phase1/network/services/attestation_service_test.go b/cl/phase1/network/services/attestation_service_test.go index 5b5fa3e2eec..723a54be0d8 100644 --- a/cl/phase1/network/services/attestation_service_test.go +++ b/cl/phase1/network/services/attestation_service_test.go @@ -20,6 +20,7 @@ import ( "context" "log" "testing" + "time" "github.com/stretchr/testify/suite" "go.uber.org/mock/gomock" @@ -77,6 +78,7 @@ func (t *attestationTestSuite) SetupTest() { emitters := beaconevents.NewEventEmitter() computeSigningRoot = func(obj ssz.HashableSSZ, domain []byte) ([32]byte, error) { return [32]byte{}, nil } blsVerify = func(sig []byte, msg []byte, pubKeys []byte) (bool, error) { return true, nil } + batchCheckInterval = 1 * time.Millisecond ctx, cn := context.WithCancel(context.Background()) cn() t.attService = NewAttestationService(ctx, t.mockForkChoice, t.committeeSubscibe, t.ethClock, t.syncedData, t.beaconConfig, netConfig, emitters, nil) @@ -93,10 +95,9 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { msg *solid.Attestation } tests := []struct { - name string - mock func() - args args - wantErr bool + name string + mock func() + args args }{ { name: "Test attestation with committee index out of range", @@ -111,7 +112,6 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { subnet: nil, msg: att, }, - wantErr: true, }, { name: "Test attestation with wrong subnet", @@ -129,7 +129,6 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { subnet: uint64Ptr(1), msg: att, }, - wantErr: true, }, { name: "Test attestation with wrong slot (current_slot < slot)", @@ -148,7 +147,6 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { subnet: uint64Ptr(1), msg: att, }, - wantErr: true, }, { name: "Attestation is aggregated", @@ -171,7 +169,6 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { [96]byte{0, 1, 2, 3, 4, 5}, ), }, - wantErr: true, }, { name: "Attestation is empty", @@ -194,7 +191,6 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { [96]byte{0, 1, 2, 3, 4, 5}, ), }, - wantErr: true, }, { name: "invalid signature", @@ -221,7 +217,6 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { subnet: uint64Ptr(1), msg: att, }, - wantErr: true, }, { name: "block header not found", @@ -248,7 +243,6 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { subnet: uint64Ptr(1), msg: att, }, - wantErr: true, }, { name: "invalid target block", @@ -278,7 +272,6 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { subnet: uint64Ptr(1), msg: att, }, - wantErr: true, }, { name: "invalid finality checkpoint", @@ -316,7 +309,6 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { subnet: uint64Ptr(1), msg: att, }, - wantErr: true, }, { name: "success", @@ -337,6 +329,9 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { blsVerify = func(sig []byte, msg []byte, pubKeys []byte) (bool, error) { return true, nil } + blsVerifyMultipleSignatures = func(signatures [][]byte, signRoots [][]byte, pks [][]byte) (bool, error) { + return true, nil + } t.mockForkChoice.Headers = map[common.Hash]*cltypes.BeaconBlockHeader{ att.AttestantionData().BeaconBlockRoot(): {}, } @@ -363,13 +358,9 @@ func (t *attestationTestSuite) TestAttestationProcessMessage() { log.Printf("test case: %s", tt.name) t.SetupTest() tt.mock() - err := t.attService.ProcessMessage(tt.args.ctx, tt.args.subnet, tt.args.msg) - if tt.wantErr { - log.Printf("err msg: %v", err) - t.Require().Error(err, err.Error()) - } else { - t.Require().NoError(err) - } + err := t.attService.ProcessMessage(tt.args.ctx, tt.args.subnet, &solid.AttestationWithGossipData{tt.args.msg, nil}) + time.Sleep(time.Millisecond * 10) + t.Require().Error(err, ErrIgnore) t.True(t.gomockCtrl.Satisfied()) } } diff --git a/cl/sentinel/service/start.go b/cl/sentinel/service/start.go index e2e4c37c13b..b61a097c8f6 100644 --- a/cl/sentinel/service/start.go +++ b/cl/sentinel/service/start.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net" + "strings" "time" "github.com/erigontech/erigon/cl/gossip" @@ -59,6 +60,11 @@ func generateSubnetsTopics(template string, maxIds int) []sentinel.GossipTopic { } func getExpirationForTopic(topic string) time.Time { + if strings.Contains(topic, "beacon_attestation") || + (strings.Contains(topic, "sync_committee_") && !strings.Contains(topic, gossip.TopicNameSyncCommitteeContributionAndProof)) { + return time.Unix(0, 0) + } + return time.Unix(0, math.MaxInt64) }