Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backfill Blobs #13595

Merged
merged 9 commits into from
Feb 14, 2024
Merged
18 changes: 10 additions & 8 deletions beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,6 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
if err != nil {
return nil, errors.Wrap(err, "backfill status initialization error")
}
pa := peers.NewAssigner(beacon.fetchP2P().Peers(), beacon.forkChoicer)
bf, err := backfill.NewService(ctx, bfs, beacon.clockWaiter, beacon.fetchP2P(), pa, beacon.BackfillOpts...)
if err != nil {
return nil, errors.Wrap(err, "error initializing backfill service")
}
if err := beacon.services.RegisterService(bf); err != nil {
return nil, errors.Wrap(err, "error registering backfill service")
}

log.Debugln("Starting State Gen")
if err := beacon.startStateGen(ctx, bfs, beacon.forkChoicer); err != nil {
Expand All @@ -251,6 +243,16 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
beacon.verifyInitWaiter = verification.NewInitializerWaiter(
beacon.clockWaiter, forkchoice.NewROForkChoice(beacon.forkChoicer), beacon.stateGen)

pa := peers.NewAssigner(beacon.fetchP2P().Peers(), beacon.forkChoicer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the reason to change the ordering here ?

Copy link
Contributor Author

@kasey kasey Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because backfill service now depends on initwaiter (ie blob validation), which depends on stategen, which depends on backfill db. stategen depends on the backfill database (to check if a given slot is available), but not on the rest of backfill. So first we init the backfill db, then stategen and blob verification, then the backfill service. This is why the backfill db coverage package and db init is separate, to avoid circularity.

beacon.BackfillOpts = append(beacon.BackfillOpts, backfill.WithVerifierWaiter(beacon.verifyInitWaiter))
bf, err := backfill.NewService(ctx, bfs, beacon.BlobStorage, beacon.clockWaiter, beacon.fetchP2P(), pa, beacon.BackfillOpts...)
if err != nil {
return nil, errors.Wrap(err, "error initializing backfill service")
}
if err := beacon.services.RegisterService(bf); err != nil {
return nil, errors.Wrap(err, "error registering backfill service")
}

log.Debugln("Registering POW Chain Service")
if err := beacon.registerPOWChainService(); err != nil {
return nil, err
Expand Down
11 changes: 11 additions & 0 deletions beacon-chain/sync/backfill/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"batch.go",
"batcher.go",
"blobs.go",
"metrics.go",
"pool.go",
"service.go",
Expand All @@ -17,12 +18,15 @@ go_library(
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/das:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/filesystem:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/startup:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/sync:go_default_library",
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
Expand All @@ -34,6 +38,7 @@ go_library(
"//proto/dbval:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime:go_default_library",
"//runtime/version:go_default_library",
"//time/slots:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_pkg_errors//:go_default_library",
Expand All @@ -46,7 +51,9 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"batch_test.go",
"batcher_test.go",
"blobs_test.go",
"pool_test.go",
"service_test.go",
"status_test.go",
Expand All @@ -56,10 +63,14 @@ go_test(
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/das:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/filesystem:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//beacon-chain/startup:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/sync:go_default_library",
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
Expand Down
66 changes: 61 additions & 5 deletions beacon-chain/sync/backfill/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package backfill

import (
"fmt"
"sort"
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -33,6 +37,8 @@ func (s batchState) String() string {
return "import_complete"
case batchEndSequence:
return "end_sequence"
case batchBlobSync:
return "blob_sync"
default:
return "unknown"
}
Expand All @@ -43,6 +49,7 @@ const (
batchInit
batchSequenced
batchErrRetryable
batchBlobSync
batchImportable
batchImportComplete
batchEndSequence
Expand All @@ -57,10 +64,13 @@ type batch struct {
retries int
begin primitives.Slot
end primitives.Slot // half-open interval, [begin, end), ie >= start, < end.
results VerifiedROBlocks
results verifiedROBlocks
err error
state batchState
pid peer.ID
busy peer.ID
blockPid peer.ID
blobPid peer.ID
bs *blobSync
}

func (b batch) logFields() log.Fields {
Expand All @@ -72,7 +82,9 @@ func (b batch) logFields() log.Fields {
"retries": b.retries,
"begin": b.begin,
"end": b.end,
"pid": b.pid,
"busyPid": b.busy,
"blockPid": b.blockPid,
"blobPid": b.blobPid,
}
}

Expand Down Expand Up @@ -101,14 +113,40 @@ func (b batch) ensureParent(expected [32]byte) error {
return nil
}

func (b batch) request() *eth.BeaconBlocksByRangeRequest {
func (b batch) blockRequest() *eth.BeaconBlocksByRangeRequest {
return &eth.BeaconBlocksByRangeRequest{
StartSlot: b.begin,
Count: uint64(b.end - b.begin),
Step: 1,
}
}

func (b batch) blobRequest() *eth.BlobSidecarsByRangeRequest {
return &eth.BlobSidecarsByRangeRequest{
StartSlot: b.begin,
Count: uint64(b.end - b.begin),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is b.end > b.begin requirement verified somewhere?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be implicitly done in the before method of the batcher

}
}

func (b batch) withResults(results verifiedROBlocks, bs *blobSync) batch {
b.results = results
b.bs = bs
if bs.blobsNeeded() > 0 {
return b.withState(batchBlobSync)
}
return b.withState(batchImportable)
}

func (b batch) postBlobSync() batch {
if b.blobsNeeded() > 0 {
log.WithFields(b.logFields()).WithField("blobs_missing", b.blobsNeeded()).Error("batch still missing blobs after downloading from peer")
b.bs = nil
b.results = []blocks.ROBlock{}
return b.withState(batchErrRetryable)
}
return b.withState(batchImportable)
}

func (b batch) withState(s batchState) batch {
if s == batchSequenced {
b.scheduled = time.Now()
Expand All @@ -130,7 +168,7 @@ func (b batch) withState(s batchState) batch {
}

func (b batch) withPeer(p peer.ID) batch {
b.pid = p
b.blockPid = p
backfillBatchTimeWaiting.Observe(float64(time.Since(b.scheduled).Milliseconds()))
return b
}
Expand All @@ -139,3 +177,21 @@ func (b batch) withRetryableError(err error) batch {
b.err = err
return b.withState(batchErrRetryable)
}

func (b batch) blobsNeeded() int {
return b.bs.blobsNeeded()
}

func (b batch) blobResponseValidator() sync.BlobResponseValidation {
return b.bs.validateNext
}

func (b batch) availabilityStore() das.AvailabilityStore {
return b.bs.store
}

func sortBatchDesc(bb []batch) {
sort.Slice(bb, func(i, j int) bool {
return bb[j].end < bb[i].end
})
}
21 changes: 21 additions & 0 deletions beacon-chain/sync/backfill/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package backfill

import (
"testing"

"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/testing/require"
)

func TestSortBatchDesc(t *testing.T) {
orderIn := []primitives.Slot{100, 10000, 1}
orderOut := []primitives.Slot{10000, 100, 1}
batches := make([]batch, len(orderIn))
for i := range orderIn {
batches[i] = batch{end: orderIn[i]}
}
sortBatchDesc(batches)
for i := range orderOut {
require.Equal(t, orderOut[i], batches[i].end)
}
}
141 changes: 141 additions & 0 deletions beacon-chain/sync/backfill/blobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package backfill

import (
"bytes"
"context"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/verification"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
)

var (
errUnexpectedResponseSize = errors.New("received more blobs than expected for the requested range")
errUnexpectedCommitment = errors.New("BlobSidecar commitment does not match block")
errUnexpectedResponseContent = errors.New("BlobSidecar response does not include expected values in expected order")
errBatchVerifierMismatch = errors.New("the list of blocks passed to the availability check does not match what was verified")
)

type blobSummary struct {
blockRoot [32]byte
index uint64
commitment [48]byte
signature [fieldparams.BLSSignatureLength]byte
}

type blobSyncConfig struct {
retentionStart primitives.Slot
nbv verification.NewBlobVerifier
store *filesystem.BlobStorage
}

func newBlobSync(current primitives.Slot, vbs verifiedROBlocks, cfg *blobSyncConfig) (*blobSync, error) {
expected, err := vbs.blobIdents(cfg.retentionStart)
if err != nil {
return nil, err
}
bbv := newBlobBatchVerifier(cfg.nbv)
as := das.NewLazilyPersistentStore(cfg.store, bbv)
return &blobSync{current: current, expected: expected, bbv: bbv, store: as}, nil
}

type blobVerifierMap map[[32]byte][fieldparams.MaxBlobsPerBlock]verification.BlobVerifier

type blobSync struct {
store das.AvailabilityStore
expected []blobSummary
next int
bbv *blobBatchVerifier
current primitives.Slot
}

func (bs *blobSync) blobsNeeded() int {
return len(bs.expected) - bs.next
}

func (bs *blobSync) validateNext(rb blocks.ROBlob) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this function need any unit test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will mostly piggy back on the tests of the verification package but I could add some additional coverage.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we verify the blob's slot?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if bs.next >= len(bs.expected) {
return errUnexpectedResponseSize
}
next := bs.expected[bs.next]
bs.next += 1
// Get the super cheap verifications out of the way before we init a verifier.
if next.blockRoot != rb.BlockRoot() {
return errors.Wrapf(errUnexpectedResponseContent, "next expected root=%#x, saw=%#x", next.blockRoot, rb.BlockRoot())
}
if next.index != rb.Index {
return errors.Wrapf(errUnexpectedResponseContent, "next expected root=%#x, saw=%#x for root=%#x", next.index, rb.Index, next.blockRoot)
}
if next.commitment != bytesutil.ToBytes48(rb.KzgCommitment) {
return errors.Wrapf(errUnexpectedResponseContent, "next expected commitment=%#x, saw=%#x for root=%#x", next.commitment, rb.KzgCommitment, rb.BlockRoot())
}

if bytesutil.ToBytes96(rb.SignedBlockHeader.Signature) != next.signature {
return verification.ErrInvalidProposerSignature
}
v := bs.bbv.newVerifier(rb)
if err := v.BlobIndexInBounds(); err != nil {
return err
}
v.SatisfyRequirement(verification.RequireValidProposerSignature)
if err := v.SidecarInclusionProven(); err != nil {
return err
}
if err := v.SidecarKzgProofVerified(); err != nil {
return err
}
if err := bs.store.Persist(bs.current, rb); err != nil {
return err
}

return nil
}

func newBlobBatchVerifier(nbv verification.NewBlobVerifier) *blobBatchVerifier {
return &blobBatchVerifier{newBlobVerifier: nbv, verifiers: make(blobVerifierMap)}
}

type blobBatchVerifier struct {
newBlobVerifier verification.NewBlobVerifier
verifiers blobVerifierMap
}

func (bbv *blobBatchVerifier) newVerifier(rb blocks.ROBlob) verification.BlobVerifier {
m := bbv.verifiers[rb.BlockRoot()]
m[rb.Index] = bbv.newBlobVerifier(rb, verification.BackfillSidecarRequirements)
bbv.verifiers[rb.BlockRoot()] = m
return m[rb.Index]
}

func (bbv blobBatchVerifier) VerifiedROBlobs(_ context.Context, blk blocks.ROBlock, _ []blocks.ROBlob) ([]blocks.VerifiedROBlob, error) {
m, ok := bbv.verifiers[blk.Root()]
if !ok {
return nil, errors.Wrapf(verification.ErrMissingVerification, "no record of verifiers for root %#x", blk.Root())
}
c, err := blk.Block().Body().BlobKzgCommitments()
if err != nil {
return nil, errors.Wrapf(errUnexpectedCommitment, "error reading commitments from block root %#x", blk.Root())
}
vbs := make([]blocks.VerifiedROBlob, len(c))
for i := range c {
if m[i] == nil {
return nil, errors.Wrapf(errBatchVerifierMismatch, "do not have verifier for block root %#x idx %d", blk.Root(), i)
}
vb, err := m[i].VerifiedROBlob()
if err != nil {
return nil, err
}
if !bytes.Equal(vb.KzgCommitment, c[i]) {
return nil, errors.Wrapf(errBatchVerifierMismatch, "commitments do not match, verified=%#x da check=%#x for root %#x", vb.KzgCommitment, c[i], vb.BlockRoot())
}
vbs[i] = vb
}
return vbs, nil
}

var _ das.BlobBatchVerifier = &blobBatchVerifier{}
Loading
Loading