Skip to content

Commit

Permalink
retrieve and save blobs during backfill
Browse files Browse the repository at this point in the history
  • Loading branch information
kasey committed Feb 10, 2024
1 parent 256a05b commit 6155179
Show file tree
Hide file tree
Showing 17 changed files with 494 additions and 80 deletions.
18 changes: 10 additions & 8 deletions beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,14 +229,6 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
if err != nil {
return nil, errors.Wrap(err, "backfill status initialization error")
}
pa := peers.NewAssigner(beacon.fetchP2P().Peers(), beacon.forkChoicer)
bf, err := backfill.NewService(ctx, bfs, beacon.clockWaiter, beacon.fetchP2P(), pa, beacon.BackfillOpts...)
if err != nil {
return nil, errors.Wrap(err, "error initializing backfill service")
}
if err := beacon.services.RegisterService(bf); err != nil {
return nil, errors.Wrap(err, "error registering backfill service")
}

log.Debugln("Starting State Gen")
if err := beacon.startStateGen(ctx, bfs, beacon.forkChoicer); err != nil {
Expand All @@ -251,6 +243,16 @@ func New(cliCtx *cli.Context, cancel context.CancelFunc, opts ...Option) (*Beaco
beacon.verifyInitWaiter = verification.NewInitializerWaiter(
beacon.clockWaiter, forkchoice.NewROForkChoice(beacon.forkChoicer), beacon.stateGen)

pa := peers.NewAssigner(beacon.fetchP2P().Peers(), beacon.forkChoicer)
beacon.BackfillOpts = append(beacon.BackfillOpts, backfill.WithVerifierWaiter(beacon.verifyInitWaiter))
bf, err := backfill.NewService(ctx, bfs, beacon.BlobStorage, beacon.clockWaiter, beacon.fetchP2P(), pa, beacon.BackfillOpts...)
if err != nil {
return nil, errors.Wrap(err, "error initializing backfill service")
}
if err := beacon.services.RegisterService(bf); err != nil {
return nil, errors.Wrap(err, "error registering backfill service")
}

log.Debugln("Registering POW Chain Service")
if err := beacon.registerPOWChainService(); err != nil {
return nil, err
Expand Down
10 changes: 10 additions & 0 deletions beacon-chain/sync/backfill/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"batch.go",
"batcher.go",
"blobs.go",
"metrics.go",
"pool.go",
"service.go",
Expand All @@ -17,12 +18,15 @@ go_library(
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/das:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/filesystem:go_default_library",
"//beacon-chain/p2p:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/startup:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/sync:go_default_library",
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
Expand All @@ -34,6 +38,7 @@ go_library(
"//proto/dbval:go_default_library",
"//proto/prysm/v1alpha1:go_default_library",
"//runtime:go_default_library",
"//runtime/version:go_default_library",
"//time/slots:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_pkg_errors//:go_default_library",
Expand All @@ -46,6 +51,7 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"batch_test.go",
"batcher_test.go",
"pool_test.go",
"service_test.go",
Expand All @@ -56,10 +62,14 @@ go_test(
deps = [
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/core/signing:go_default_library",
"//beacon-chain/das:go_default_library",
"//beacon-chain/db:go_default_library",
"//beacon-chain/db/filesystem:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
"//beacon-chain/startup:go_default_library",
"//beacon-chain/state:go_default_library",
"//beacon-chain/sync:go_default_library",
"//beacon-chain/verification:go_default_library",
"//config/fieldparams:go_default_library",
"//config/params:go_default_library",
"//consensus-types/blocks:go_default_library",
Expand Down
46 changes: 41 additions & 5 deletions beacon-chain/sync/backfill/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package backfill

import (
"fmt"
"sort"
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -33,6 +36,8 @@ func (s batchState) String() string {
return "import_complete"
case batchEndSequence:
return "end_sequence"
case batchBlobSync:
return "blob_sync"
default:
return "unknown"
}
Expand All @@ -43,6 +48,7 @@ const (
batchInit
batchSequenced
batchErrRetryable
batchBlobSync
batchImportable
batchImportComplete
batchEndSequence
Expand All @@ -57,10 +63,13 @@ type batch struct {
retries int
begin primitives.Slot
end primitives.Slot // half-open interval, [begin, end), ie >= start, < end.
results VerifiedROBlocks
results verifiedROBlocks
err error
state batchState
pid peer.ID
busy peer.ID
blockPid peer.ID
blobPid peer.ID
bs *blobSync
}

func (b batch) logFields() log.Fields {
Expand All @@ -72,7 +81,9 @@ func (b batch) logFields() log.Fields {
"retries": b.retries,
"begin": b.begin,
"end": b.end,
"pid": b.pid,
"busyPid": b.busy,
"blockPid": b.blockPid,
"blobPid": b.blobPid,
}
}

Expand Down Expand Up @@ -101,14 +112,21 @@ func (b batch) ensureParent(expected [32]byte) error {
return nil
}

func (b batch) request() *eth.BeaconBlocksByRangeRequest {
func (b batch) blockRequest() *eth.BeaconBlocksByRangeRequest {
return &eth.BeaconBlocksByRangeRequest{
StartSlot: b.begin,
Count: uint64(b.end - b.begin),
Step: 1,
}
}

func (b batch) blobRequest() *eth.BlobSidecarsByRangeRequest {
return &eth.BlobSidecarsByRangeRequest{
StartSlot: b.begin,
Count: uint64(b.end - b.begin),
}
}

func (b batch) withState(s batchState) batch {
if s == batchSequenced {
b.scheduled = time.Now()
Expand All @@ -130,7 +148,7 @@ func (b batch) withState(s batchState) batch {
}

func (b batch) withPeer(p peer.ID) batch {
b.pid = p
b.blockPid = p
backfillBatchTimeWaiting.Observe(float64(time.Since(b.scheduled).Milliseconds()))
return b
}
Expand All @@ -139,3 +157,21 @@ func (b batch) withRetryableError(err error) batch {
b.err = err
return b.withState(batchErrRetryable)
}

func (b batch) blobsNeeded() int {
return b.bs.blobsNeeded()
}

func (b batch) blobResponseValidator() sync.BlobResponseValidation {
return b.bs.validateNext
}

func (b batch) availabilityStore() das.AvailabilityStore {
return b.bs.store
}

func sortBatchDesc(todo []batch) {
sort.Slice(todo, func(i, j int) bool {
return todo[j].end < todo[i].end
})
}
21 changes: 21 additions & 0 deletions beacon-chain/sync/backfill/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package backfill

import (
"testing"

"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/testing/require"
)

func TestSortBatchDesc(t *testing.T) {
orderIn := []primitives.Slot{100, 10000, 1}
orderOut := []primitives.Slot{10000, 100, 1}
batches := make([]batch, len(orderIn))
for i := range orderIn {
batches[i] = batch{end: orderIn[i]}
}
sortBatchDesc(batches)
for i := range orderOut {
require.Equal(t, orderOut[i], batches[i].end)
}
}
125 changes: 125 additions & 0 deletions beacon-chain/sync/backfill/blobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package backfill

import (
"context"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/das"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/verification"
fieldparams "github.com/prysmaticlabs/prysm/v4/config/fieldparams"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/encoding/bytesutil"
)

var (
errUnexpectedResponseSize = errors.New("received more blobs than expected for the requested range")
errUnexpectedBlobIndex = errors.New("BlobSidecar responses out of order or non-contiguous")
errUnexpectedCommitment = errors.New("BlobSidecar commitment does not match block")
)

type blobSummary struct {
blockRoot [32]byte
index uint64
commitment [48]byte
signature [fieldparams.BLSSignatureLength]byte
}

func newBlobSync(cs, retentionStart primitives.Slot, vbs verifiedROBlocks, nbv verification.NewBlobVerifier, st *filesystem.BlobStorage) (*blobSync, error) {
todo, err := vbs.blobIdents(retentionStart)
if err != nil {
return nil, err
}
bbv := newBlobBatchVerifier(nbv)
as := das.NewLazilyPersistentStore(st, bbv)
return &blobSync{cs: cs, expected: todo, bbv: bbv, store: as}, nil
}

type blobVerifierMap map[[32]byte][fieldparams.MaxBlobsPerBlock]verification.BlobVerifier

type blobSync struct {
store das.AvailabilityStore
expected []blobSummary
next int
bbv *blobBatchVerifier
cs primitives.Slot
}

func (bs *blobSync) blobsNeeded() int {
return len(bs.expected) - bs.next
}

func (bs *blobSync) validateNext(rb blocks.ROBlob) error {
if bs.next >= len(bs.expected) {
return errUnexpectedResponseSize
}
next := bs.expected[bs.next]
bs.next += 1
v := bs.bbv.newVerifier(rb)
if err := v.BlobIndexInBounds(); err != nil {
return err
}
if next.blockRoot != rb.BlockRoot() || bytesutil.ToBytes96(rb.SignedBlockHeader.Signature) != next.signature {
return verification.ErrInvalidProposerSignature
}
v.SatisfyRequirement(verification.RequireValidProposerSignature)
if next.index != rb.Index {
return errUnexpectedBlobIndex
}
if next.commitment != bytesutil.ToBytes48(rb.KzgCommitment) {
return errUnexpectedCommitment
}
if err := v.SidecarInclusionProven(); err != nil {
return err
}
if err := v.SidecarKzgProofVerified(); err != nil {
return err
}
if err := bs.store.Persist(bs.cs, rb); err != nil {
return err
}

return nil
}

func newBlobBatchVerifier(nbv verification.NewBlobVerifier) *blobBatchVerifier {
return &blobBatchVerifier{newBlobVerifier: nbv, verifiers: make(blobVerifierMap)}
}

type blobBatchVerifier struct {
newBlobVerifier verification.NewBlobVerifier
verifiers blobVerifierMap
}

func (bbv *blobBatchVerifier) newVerifier(rb blocks.ROBlob) verification.BlobVerifier {
m := bbv.verifiers[rb.BlockRoot()]
m[rb.Index] = bbv.newBlobVerifier(rb, verification.BackfillSidecarRequirements)
bbv.verifiers[rb.BlockRoot()] = m
return m[rb.Index]
}

func (bbv blobBatchVerifier) VerifiedROBlobs(_ context.Context, blk blocks.ROBlock, _ []blocks.ROBlob) ([]blocks.VerifiedROBlob, error) {
m, ok := bbv.verifiers[blk.Root()]
if !ok {
return nil, errors.Wrapf(verification.ErrMissingVerification, "no record of verifiers for root %#x", blk.Root())
}
c, err := blk.Block().Body().BlobKzgCommitments()
if err != nil {
return nil, errors.Wrapf(errUnexpectedCommitment, "error reading commitments from block root %#x", blk.Root())
}
vbs := make([]blocks.VerifiedROBlob, len(c))
for i := range c {
if m[i] == nil {
return nil, errors.Wrapf(errUnexpectedCommitment, "do not have verifier for block root %#x idx %d", blk.Root(), i)
}
vb, err := m[i].VerifiedROBlob()
if err != nil {
return nil, err
}
vbs[i] = vb
}
return vbs, nil
}

var _ das.BlobBatchVerifier = &blobBatchVerifier{}
Loading

0 comments on commit 6155179

Please sign in to comment.