diff --git a/beacon-chain/db/iface/BUILD.bazel b/beacon-chain/db/iface/BUILD.bazel index b7f105c76e4c..45d7fb03a81a 100644 --- a/beacon-chain/db/iface/BUILD.bazel +++ b/beacon-chain/db/iface/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "//beacon-chain/db/filters:go_default_library", "//beacon-chain/slasher/types:go_default_library", "//beacon-chain/state:go_default_library", + "//consensus-types/blocks:go_default_library", "//consensus-types/interfaces:go_default_library", "//consensus-types/primitives:go_default_library", "//monitoring/backup:go_default_library", diff --git a/beacon-chain/db/iface/interface.go b/beacon-chain/db/iface/interface.go index 8a8184d0ff04..6ad4809d6b36 100644 --- a/beacon-chain/db/iface/interface.go +++ b/beacon-chain/db/iface/interface.go @@ -11,6 +11,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filters" slashertypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/slasher/types" "github.com/prysmaticlabs/prysm/v4/beacon-chain/state" + "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v4/monitoring/backup" @@ -115,9 +116,10 @@ type HeadAccessDatabase interface { SaveGenesisData(ctx context.Context, state state.BeaconState) error EnsureEmbeddedGenesis(ctx context.Context) error - // initialization method needed for origin checkpoint sync + // Support for checkpoint sync and backfill. SaveOrigin(ctx context.Context, serState, serBlock []byte) error SaveBackfillStatus(context.Context, *dbval.BackfillStatus) error + BackfillFinalizedIndex(ctx context.Context, blocks []blocks.ROBlock, finalizedChildRoot [32]byte) error } // SlasherDatabase interface for persisting data related to detecting slashable offenses on Ethereum. diff --git a/beacon-chain/db/kv/error.go b/beacon-chain/db/kv/error.go index 29dd336ffeb4..d5ff4e5858dd 100644 --- a/beacon-chain/db/kv/error.go +++ b/beacon-chain/db/kv/error.go @@ -21,3 +21,8 @@ var ErrNotFoundBackfillBlockRoot = errors.Wrap(ErrNotFound, "BackfillBlockRoot") // ErrNotFoundFeeRecipient is a not found error specifically for the fee recipient getter var ErrNotFoundFeeRecipient = errors.Wrap(ErrNotFound, "fee recipient") + +var errEmptyBlockSlice = errors.New("[]blocks.ROBlock is empty") +var errIncorrectBlockParent = errors.New("Unexpected missing or forked blocks in a []ROBlock") +var errFinalizedChildNotFound = errors.New("Unable to find finalized root descending from backfill batch") +var errNotConnectedToFinalized = errors.New("Unable to finalize backfill blocks, finalized parent_root does not match") diff --git a/beacon-chain/db/kv/finalized_block_roots.go b/beacon-chain/db/kv/finalized_block_roots.go index 20aae076882b..f1a7a81c1cb1 100644 --- a/beacon-chain/db/kv/finalized_block_roots.go +++ b/beacon-chain/db/kv/finalized_block_roots.go @@ -4,6 +4,7 @@ import ( "bytes" "context" + "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filters" "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces" @@ -163,6 +164,83 @@ func (s *Store) updateFinalizedBlockRoots(ctx context.Context, tx *bolt.Tx, chec return bkt.Put(previousFinalizedCheckpointKey, enc) } +// BackfillFinalizedIndex updates the finalized index for a contiguous chain of blocks that are the ancestors of the +// given finalized child root. This is needed to update the finalized index during backfill, because the usual +// updateFinalizedBlockRoots has assumptions that are incompatible with backfill processing. +func (s *Store) BackfillFinalizedIndex(ctx context.Context, blocks []blocks.ROBlock, finalizedChildRoot [32]byte) error { + ctx, span := trace.StartSpan(ctx, "BeaconDB.BackfillFinalizedIndex") + defer span.End() + if len(blocks) == 0 { + return errEmptyBlockSlice + } + + fbrs := make([]*ethpb.FinalizedBlockRootContainer, len(blocks)) + encs := make([][]byte, len(blocks)) + for i := range blocks { + pr := blocks[i].Block().ParentRoot() + fbrs[i] = ðpb.FinalizedBlockRootContainer{ + ParentRoot: pr[:], + // ChildRoot: will be filled in on the next iteration when we look at the descendent block. + } + if i == 0 { + continue + } + if blocks[i-1].Root() != blocks[i].Block().ParentRoot() { + return errors.Wrapf(errIncorrectBlockParent, "previous root=%#x, slot=%d; child parent_root=%#x, root=%#x, slot=%d", + blocks[i-1].Root(), blocks[i-1].Block().Slot(), blocks[i].Block().ParentRoot(), blocks[i].Root(), blocks[i].Block().Slot()) + } + + // We know the previous index is the parent of this one thanks to the assertion above, + // so we can set the ChildRoot of the previous value to the root of the current value. + fbrs[i-1].ChildRoot = blocks[i].RootSlice() + // Now that the value for fbrs[i-1] is complete, perform encoding here to minimize time in Update, + // which holds the global db lock. + penc, err := encode(ctx, fbrs[i-1]) + if err != nil { + tracing.AnnotateError(span, err) + return err + } + encs[i-1] = penc + + // The final element is the parent of finalizedChildRoot. This is checked inside the db transaction using + // the parent_root value stored in the index data for finalizedChildRoot. + if i == len(blocks)-1 { + fbrs[i].ChildRoot = finalizedChildRoot[:] + // Final element is complete, so it is pre-encoded like the others. + enc, err := encode(ctx, fbrs[i]) + if err != nil { + tracing.AnnotateError(span, err) + return err + } + encs[i] = enc + } + } + + return s.db.Update(func(tx *bolt.Tx) error { + bkt := tx.Bucket(finalizedBlockRootsIndexBucket) + child := bkt.Get(finalizedChildRoot[:]) + if len(child) == 0 { + return errFinalizedChildNotFound + } + fcc := ðpb.FinalizedBlockRootContainer{} + if err := decode(ctx, child, fcc); err != nil { + return errors.Wrapf(err, "unable to decode finalized block root container for root=%#x", finalizedChildRoot) + } + // Ensure that the existing finalized chain descends from the new segment. + if !bytes.Equal(fcc.ParentRoot, blocks[len(blocks)-1].RootSlice()) { + return errors.Wrapf(errNotConnectedToFinalized, "finalized block root container for root=%#x has parent_root=%#x, not %#x", + finalizedChildRoot, fcc.ParentRoot, blocks[len(blocks)-1].RootSlice()) + } + // Update the finalized index with entries for each block in the new segment. + for i := range fbrs { + if err := bkt.Put(blocks[i].RootSlice(), encs[i]); err != nil { + return err + } + } + return nil + }) +} + // IsFinalizedBlock returns true if the block root is present in the finalized block root index. // A beacon block root contained exists in this index if it is considered finalized and canonical. // Note: beacon blocks from the latest finalized epoch return true, whether or not they are diff --git a/beacon-chain/db/kv/finalized_block_roots_test.go b/beacon-chain/db/kv/finalized_block_roots_test.go index 5115e826e5f7..a597a4bcb3ac 100644 --- a/beacon-chain/db/kv/finalized_block_roots_test.go +++ b/beacon-chain/db/kv/finalized_block_roots_test.go @@ -1,6 +1,7 @@ package kv import ( + "bytes" "context" "testing" @@ -14,6 +15,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/testing/assert" "github.com/prysmaticlabs/prysm/v4/testing/require" "github.com/prysmaticlabs/prysm/v4/testing/util" + bolt "go.etcd.io/bbolt" ) var genesisBlockRoot = bytesutil.ToBytes32([]byte{'G', 'E', 'N', 'E', 'S', 'I', 'S'}) @@ -234,3 +236,64 @@ func makeBlocksAltair(t *testing.T, startIdx, num uint64, previousRoot [32]byte) } return ifaceBlocks } + +func TestStore_BackfillFinalizedIndex(t *testing.T) { + db := setupDB(t) + ctx := context.Background() + require.ErrorIs(t, db.BackfillFinalizedIndex(ctx, []consensusblocks.ROBlock{}, [32]byte{}), errEmptyBlockSlice) + blks, err := consensusblocks.NewROBlockSlice(makeBlocks(t, 0, 66, [32]byte{})) + require.NoError(t, err) + + // set up existing finalized block + ebpr := blks[64].Block().ParentRoot() + ebr := blks[64].Root() + chldr := blks[65].Root() + ebf := ðpb.FinalizedBlockRootContainer{ + ParentRoot: ebpr[:], + ChildRoot: chldr[:], + } + disjoint := []consensusblocks.ROBlock{ + blks[0], + blks[2], + } + enc, err := encode(ctx, ebf) + require.NoError(t, err) + err = db.db.Update(func(tx *bolt.Tx) error { + bkt := tx.Bucket(finalizedBlockRootsIndexBucket) + return bkt.Put(ebr[:], enc) + }) + + // reslice to remove the existing blocks + blks = blks[0:64] + // check the other error conditions with a descendent root that really doesn't exist + require.NoError(t, err) + require.ErrorIs(t, db.BackfillFinalizedIndex(ctx, disjoint, [32]byte{}), errIncorrectBlockParent) + require.NoError(t, err) + require.ErrorIs(t, errFinalizedChildNotFound, db.BackfillFinalizedIndex(ctx, blks, [32]byte{})) + + // use the real root so that it succeeds + require.NoError(t, db.BackfillFinalizedIndex(ctx, blks, ebr)) + for i := range blks { + require.NoError(t, db.db.View(func(tx *bolt.Tx) error { + bkt := tx.Bucket(finalizedBlockRootsIndexBucket) + encfr := bkt.Get(blks[i].RootSlice()) + require.Equal(t, true, len(encfr) > 0) + fr := ðpb.FinalizedBlockRootContainer{} + require.NoError(t, decode(ctx, encfr, fr)) + require.Equal(t, 32, len(fr.ParentRoot)) + require.Equal(t, 32, len(fr.ChildRoot)) + pr := blks[i].Block().ParentRoot() + require.Equal(t, true, bytes.Equal(fr.ParentRoot, pr[:])) + if i > 0 { + require.Equal(t, true, bytes.Equal(fr.ParentRoot, blks[i-1].RootSlice())) + } + if i < len(blks)-1 { + require.DeepEqual(t, fr.ChildRoot, blks[i+1].RootSlice()) + } + if i == len(blks)-1 { + require.DeepEqual(t, fr.ChildRoot, ebr[:]) + } + return nil + })) + } +} diff --git a/beacon-chain/node/BUILD.bazel b/beacon-chain/node/BUILD.bazel index 1b5b12887939..e6662aaad297 100644 --- a/beacon-chain/node/BUILD.bazel +++ b/beacon-chain/node/BUILD.bazel @@ -46,6 +46,7 @@ go_library( "//beacon-chain/state/stategen:go_default_library", "//beacon-chain/sync:go_default_library", "//beacon-chain/sync/backfill:go_default_library", + "//beacon-chain/sync/backfill/coverage:go_default_library", "//beacon-chain/sync/checkpoint:go_default_library", "//beacon-chain/sync/genesis:go_default_library", "//beacon-chain/sync/initial-sync:go_default_library", diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 9f44aa971681..d8bd13eae002 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -40,6 +40,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee" "github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/voluntaryexits" "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/peers" "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc" "github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/apimiddleware" "github.com/prysmaticlabs/prysm/v4/beacon-chain/slasher" @@ -48,6 +49,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen" regularsync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync" "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/backfill" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/backfill/coverage" "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/checkpoint" "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/genesis" initialsync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/initial-sync" @@ -110,6 +112,7 @@ type BeaconNode struct { CheckpointInitializer checkpoint.Initializer forkChoicer forkchoice.ForkChoicer clockWaiter startup.ClockWaiter + BackfillOpts []backfill.ServiceOption initialSyncComplete chan struct{} } @@ -218,7 +221,8 @@ func New(cliCtx *cli.Context, opts ...Option) (*BeaconNode, error) { if err != nil { return nil, errors.Wrap(err, "backfill status initialization error") } - bf, err := backfill.NewService(ctx, bfs, beacon.clockWaiter, beacon.fetchP2P()) + pa := peers.NewAssigner(beacon.fetchP2P().Peers(), beacon.forkChoicer) + bf, err := backfill.NewService(ctx, bfs, beacon.clockWaiter, beacon.fetchP2P(), pa, beacon.BackfillOpts...) if err != nil { return nil, errors.Wrap(err, "error initializing backfill service") } @@ -518,7 +522,7 @@ func (b *BeaconNode) startSlasherDB(cliCtx *cli.Context) error { return nil } -func (b *BeaconNode) startStateGen(ctx context.Context, bfs *backfill.StatusUpdater, fc forkchoice.ForkChoicer) error { +func (b *BeaconNode) startStateGen(ctx context.Context, bfs coverage.AvailableBlocker, fc forkchoice.ForkChoicer) error { opts := []stategen.StateGenOption{stategen.WithAvailableBlocker(bfs)} sg := stategen.New(b.db, fc, opts...) diff --git a/beacon-chain/p2p/BUILD.bazel b/beacon-chain/p2p/BUILD.bazel index 6f78368cb5f6..6ed282db9ea9 100644 --- a/beacon-chain/p2p/BUILD.bazel +++ b/beacon-chain/p2p/BUILD.bazel @@ -4,7 +4,6 @@ go_library( name = "go_default_library", srcs = [ "addr_factory.go", - "assigner.go", "broadcaster.go", "config.go", "connection_gater.go", diff --git a/beacon-chain/p2p/assigner.go b/beacon-chain/p2p/assigner.go deleted file mode 100644 index df4ddd305b46..000000000000 --- a/beacon-chain/p2p/assigner.go +++ /dev/null @@ -1 +0,0 @@ -package p2p diff --git a/beacon-chain/p2p/peers/assigner.go b/beacon-chain/p2p/peers/assigner.go index 6e2eea198adf..1c5a91ae38dd 100644 --- a/beacon-chain/p2p/peers/assigner.go +++ b/beacon-chain/p2p/peers/assigner.go @@ -1,38 +1,39 @@ package peers import ( - "context" - "sync" - "time" - "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" + forkchoicetypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/types" "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags" "github.com/prysmaticlabs/prysm/v4/config/params" - "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/sirupsen/logrus" ) -// handshakePollingInterval is a polling interval for checking the number of received handshakes. -var handshakePollingInterval = 5 * time.Second +// FinalizedCheckpointer describes the minimum capability that Assigner needs from forkchoice. +// That is, the ability to retrieve the latest finalized checkpoint to help with peer evaluation. +type FinalizedCheckpointer interface { + FinalizedCheckpoint() *forkchoicetypes.Checkpoint +} -func NewAssigner(ctx context.Context, s *Status, max int, finalized primitives.Epoch) *Assigner { +// NewAssigner assists in the correct construction of an Assigner by code in other packages, +// assuring all the important private member fields are given values. +// The finalized parameter is used to specify a minimum finalized epoch that the peers must agree on. +func NewAssigner(s *Status, fc FinalizedCheckpointer) *Assigner { return &Assigner{ - ctx: ctx, - ps: s, - max: max, - finalized: finalized, + ps: s, + fc: fc, } } +// Assigner uses the "BestFinalized" peer scoring method to pick the next-best peer to receive rpc requests. type Assigner struct { - sync.Mutex - ctx context.Context - ps *Status - max int - finalized primitives.Epoch + ps *Status + fc FinalizedCheckpointer } +// ErrInsufficientSuitable is a sentinel error, signaling that a peer couldn't be assigned because there are currently +// not enough peers that match our selection criteria to serve rpc requests. It is the responsibility of the caller to +// look for this error and continue to try calling Assign with appropriate backoff logic. var ErrInsufficientSuitable = errors.New("no suitable peers") func (a *Assigner) freshPeers() ([]peer.ID, error) { @@ -40,7 +41,7 @@ func (a *Assigner) freshPeers() ([]peer.ID, error) { if flags.Get().MinimumSyncPeers < required { required = flags.Get().MinimumSyncPeers } - _, peers := a.ps.BestFinalized(params.BeaconConfig().MaxPeersToSync, a.finalized) + _, peers := a.ps.BestFinalized(params.BeaconConfig().MaxPeersToSync, a.fc.FinalizedCheckpoint().Epoch) if len(peers) < required { log.WithFields(logrus.Fields{ "suitable": len(peers), @@ -50,7 +51,7 @@ func (a *Assigner) freshPeers() ([]peer.ID, error) { return peers, nil } -// Assign uses the BestFinalized method to select the best peers that agree on a canonical block +// Assign uses the "BestFinalized" method to select the best peers that agree on a canonical block // for the configured finalized epoch. At most `n` peers will be returned. The `busy` param can be used // to filter out peers that we know we don't want to connect to, for instance if we are trying to limit // the number of outbound requests to each peer from a given component. diff --git a/beacon-chain/sync/backfill/batch.go b/beacon-chain/sync/backfill/batch.go index 03fdaa48da82..6028335b04ca 100644 --- a/beacon-chain/sync/backfill/batch.go +++ b/beacon-chain/sync/backfill/batch.go @@ -6,13 +6,14 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" - "github.com/prysmaticlabs/prysm/v4/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" eth "github.com/prysmaticlabs/prysm/v4/proto/prysm/v1alpha1" log "github.com/sirupsen/logrus" ) -var ErrChainBroken = errors.New("batch is not the ancestor of backfilled batch") +// ErrChainBroken is usedbatch with no results, skipping importer when a backfill batch can't be imported to the db because it is not known to be the ancestor +// of the canonical chain. +var ErrChainBroken = errors.New("batch is not the ancestor of a known finalized root") type batchState int @@ -75,11 +76,10 @@ func (b batch) logFields() log.Fields { } } -func (b *batch) inc() { - b.seq += 1 -} - func (b batch) replaces(r batch) bool { + if r.state == batchImportComplete { + return false + } if b.begin != r.begin { return false } @@ -93,10 +93,6 @@ func (b batch) id() batchId { return batchId(fmt.Sprintf("%d:%d", b.begin, b.end)) } -func (b batch) size() primitives.Slot { - return b.end - b.begin -} - func (b batch) ensureParent(expected [32]byte) error { tail := b.results[len(b.results)-1] if tail.Root() != expected { @@ -105,10 +101,6 @@ func (b batch) ensureParent(expected [32]byte) error { return nil } -func (b batch) lowest() blocks.ROBlock { - return b.results[0] -} - func (b batch) request() *eth.BeaconBlocksByRangeRequest { return ð.BeaconBlocksByRangeRequest{ StartSlot: b.begin, @@ -123,12 +115,13 @@ func (b batch) withState(s batchState) batch { switch b.state { case batchErrRetryable: b.retries += 1 + log.WithFields(b.logFields()).Info("sequencing batch for retry") case batchInit, batchNil: b.firstScheduled = b.scheduled } } if s == batchImportComplete { - backfillBatchTimeRoundtrip.Observe(float64(time.Now().Sub(b.firstScheduled).Milliseconds())) + backfillBatchTimeRoundtrip.Observe(float64(time.Since(b.firstScheduled).Milliseconds())) log.WithFields(b.logFields()).Debug("Backfill batch imported.") } b.state = s @@ -138,7 +131,7 @@ func (b batch) withState(s batchState) batch { func (b batch) withPeer(p peer.ID) batch { b.pid = p - backfillBatchTimeWaiting.Observe(float64(time.Now().Sub(b.scheduled).Milliseconds())) + backfillBatchTimeWaiting.Observe(float64(time.Since(b.scheduled).Milliseconds())) return b } diff --git a/beacon-chain/sync/backfill/batcher.go b/beacon-chain/sync/backfill/batcher.go index 95894400c436..1a0e03b4a822 100644 --- a/beacon-chain/sync/backfill/batcher.go +++ b/beacon-chain/sync/backfill/batcher.go @@ -5,17 +5,15 @@ import ( "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" ) -var errSequencerMisconfigured = errors.New("backfill sequencer initialization error") var errMaxBatches = errors.New("backfill batch requested in excess of max outstanding batches") var errEndSequence = errors.New("sequence has terminated, no more backfill batches will be produced") +var errCannotDecreaseMinimum = errors.New("The minimum backfill slot can only be increased, not decreased") type batchSequencer struct { batcher batcher seq []batch } -var errCannotDecreaseMinimum = errors.New("The minimum backfill slot can only be increased, not decreased") - // moveMinimum enables the backfill service to change the slot where the batcher will start replying with // batch state batchEndSequence (signaling that no new batches will be produced). This is done in response to // epochs advancing, which shrinks the gap between and -MIN_EPOCHS_FOR_BLOCK_REQUESTS, @@ -28,10 +26,6 @@ func (c *batchSequencer) moveMinimum(min primitives.Slot) error { return nil } -func (c *batchSequencer) minimum() primitives.Slot { - return c.batcher.min -} - func (c *batchSequencer) countWithState(s batchState) int { n := 0 for i := 0; i < len(c.seq); i++ { @@ -87,7 +81,6 @@ func (c *batchSequencer) sequence() ([]batch, error) { if len(s) == 0 { s = append(s, c.seq[i]) } - break default: continue } diff --git a/beacon-chain/sync/backfill/metrics.go b/beacon-chain/sync/backfill/metrics.go index 01a8cadd70d9..e8a1ae48350d 100644 --- a/beacon-chain/sync/backfill/metrics.go +++ b/beacon-chain/sync/backfill/metrics.go @@ -30,6 +30,12 @@ var ( Help: "Number of backfill batches downloaded and imported.", }, ) + backfillBatchApproximateBytes = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "backfill_batch_bytes_downloaded", + Help: "Count of bytes downloaded from peers", + }, + ) backfillBatchTimeRoundtrip = promauto.NewHistogram( prometheus.HistogramOpts{ Name: "backfill_batch_time_roundtrip", diff --git a/beacon-chain/sync/backfill/pool.go b/beacon-chain/sync/backfill/pool.go index e48d0d0f94ac..e617b5775467 100644 --- a/beacon-chain/sync/backfill/pool.go +++ b/beacon-chain/sync/backfill/pool.go @@ -15,15 +15,11 @@ import ( ) type BatchWorkerPool interface { - Spawn(ctx context.Context, n int, clock *startup.Clock, a peerAssigner, v *verifier) + Spawn(ctx context.Context, n int, clock *startup.Clock, a PeerAssigner, v *verifier) Todo(b batch) Complete() (batch, error) } -type peerAssigner interface { - Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error) -} - type worker interface { run(context.Context) } @@ -39,7 +35,6 @@ func DefaultNewWorker(p p2p.P2P) newWorker { type p2pBatchWorkerPool struct { maxBatches int newWorker newWorker - assigner peerAssigner toWorkers chan batch fromWorkers chan batch toRouter chan batch @@ -65,7 +60,7 @@ func newP2PBatchWorkerPool(p p2p.P2P, maxBatches int) *p2pBatchWorkerPool { } } -func (p *p2pBatchWorkerPool) Spawn(ctx context.Context, n int, c *startup.Clock, a peerAssigner, v *verifier) { +func (p *p2pBatchWorkerPool) Spawn(ctx context.Context, n int, c *startup.Clock, a PeerAssigner, v *verifier) { p.ctx, p.cancel = context.WithCancel(ctx) go p.batchRouter(a) for i := 0; i < n; i++ { @@ -101,7 +96,7 @@ func (p *p2pBatchWorkerPool) Complete() (batch, error) { } } -func (p *p2pBatchWorkerPool) batchRouter(pa peerAssigner) { +func (p *p2pBatchWorkerPool) batchRouter(pa PeerAssigner) { busy := make(map[peer.ID]bool) todo := make([]batch, 0) rt := time.NewTicker(time.Second) diff --git a/beacon-chain/sync/backfill/pool_test.go b/beacon-chain/sync/backfill/pool_test.go index 6bedfa442bb9..e952e5c28b2b 100644 --- a/beacon-chain/sync/backfill/pool_test.go +++ b/beacon-chain/sync/backfill/pool_test.go @@ -24,7 +24,7 @@ func (m MockAssigner) Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error) { return m.assign, nil } -var _ peerAssigner = &MockAssigner{} +var _ PeerAssigner = &MockAssigner{} func TestPoolDetectAllEnded(t *testing.T) { nw := 5 @@ -55,7 +55,7 @@ type mockPool struct { todoChan chan batch } -func (m *mockPool) Spawn(_ context.Context, _ int, _ *startup.Clock, _ peerAssigner, _ *verifier) { +func (m *mockPool) Spawn(_ context.Context, _ int, _ *startup.Clock, _ PeerAssigner, _ *verifier) { } func (m *mockPool) Todo(b batch) { diff --git a/beacon-chain/sync/backfill/service.go b/beacon-chain/sync/backfill/service.go index e296b4524986..35f7882963e0 100644 --- a/beacon-chain/sync/backfill/service.go +++ b/beacon-chain/sync/backfill/service.go @@ -3,12 +3,11 @@ package backfill import ( "context" + "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v4/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p" - "github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/peers" "github.com/prysmaticlabs/prysm/v4/beacon-chain/startup" - "github.com/prysmaticlabs/prysm/v4/config/params" "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" "github.com/prysmaticlabs/prysm/v4/encoding/bytesutil" "github.com/prysmaticlabs/prysm/v4/proto/dbval" @@ -17,23 +16,19 @@ import ( log "github.com/sirupsen/logrus" ) -const defaultWorkerCount = 5 - -// TODO use the correct beacon param for blocks by range size instead -const defaultBatchSize = 64 - type Service struct { ctx context.Context - su *StatusUpdater + su *Store ms minimumSlotter cw startup.ClockWaiter + enabled bool // service is disabled by default while feature is experimental nWorkers int - errChan chan error batchSeq *batchSequencer batchSize uint64 pool BatchWorkerPool verifier *verifier p2p p2p.P2P + pa PeerAssigner batchImporter batchImporter } @@ -41,6 +36,13 @@ var _ runtime.Service = (*Service)(nil) type ServiceOption func(*Service) error +func WithEnableBackfill(enabled bool) ServiceOption { + return func(s *Service) error { + s.enabled = enabled + return nil + } +} + func WithWorkerCount(n int) ServiceOption { return func(s *Service) error { s.nWorkers = n @@ -83,9 +85,9 @@ func (d defaultMinimumSlotter) setClock(c *startup.Clock) { var _ minimumSlotter = &defaultMinimumSlotter{} -type batchImporter func(ctx context.Context, b batch, su *StatusUpdater) (*dbval.BackfillStatus, error) +type batchImporter func(ctx context.Context, b batch, su *Store) (*dbval.BackfillStatus, error) -func defaultBatchImporter(ctx context.Context, b batch, su *StatusUpdater) (*dbval.BackfillStatus, error) { +func defaultBatchImporter(ctx context.Context, b batch, su *Store) (*dbval.BackfillStatus, error) { status := su.status() if err := b.ensureParent(bytesutil.ToBytes32(status.LowParentRoot)); err != nil { return status, err @@ -100,7 +102,14 @@ func defaultBatchImporter(ctx context.Context, b batch, su *StatusUpdater) (*dbv return status, nil } -func NewService(ctx context.Context, su *StatusUpdater, cw startup.ClockWaiter, p p2p.P2P, opts ...ServiceOption) (*Service, error) { +// PeerAssigner describes a type that provides an Assign method, which can assign the best peer +// to service an RPC request. The Assign method takes a map of peers that should be excluded, +// allowing the caller to avoid making multiple concurrent requests to the same peer. +type PeerAssigner interface { + Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error) +} + +func NewService(ctx context.Context, su *Store, cw startup.ClockWaiter, p p2p.P2P, pa PeerAssigner, opts ...ServiceOption) (*Service, error) { s := &Service{ ctx: ctx, su: su, @@ -114,12 +123,6 @@ func NewService(ctx context.Context, su *StatusUpdater, cw startup.ClockWaiter, return nil, err } } - if s.nWorkers == 0 { - s.nWorkers = defaultWorkerCount - } - if s.batchSize == 0 { - s.batchSize = defaultBatchSize - } s.pool = newP2PBatchWorkerPool(p, s.nWorkers) return s, nil @@ -158,9 +161,8 @@ func (s *Service) importBatches(ctx context.Context) { }() for i := range importable { ib := importable[i] - // TODO: can we have an entire batch of skipped slots? if len(ib.results) == 0 { - log.Error("wtf") + log.WithFields(ib.logFields()).Error("batch with no results, skipping importer") } _, err := s.batchImporter(ctx, ib, s.su) if err != nil { @@ -199,6 +201,10 @@ func (s *Service) scheduleTodos() { } func (s *Service) Start() { + if !s.enabled { + log.Info("exiting backfill service; not enabled") + return + } ctx, cancel := context.WithCancel(s.ctx) defer func() { cancel() @@ -213,15 +219,16 @@ func (s *Service) Start() { s.batchSeq = newBatchSequencer(s.nWorkers, s.ms.minimumSlot(), primitives.Slot(status.LowSlot), primitives.Slot(s.batchSize)) // Exit early if there aren't going to be any batches to backfill. if primitives.Slot(status.LowSlot) < s.ms.minimumSlot() { + log.WithField("minimum_required_slot", s.ms.minimumSlot()). + WithField("backfill_lowest_slot", status.LowSlot). + Info("Exiting backfill service; minimum block retention slot > lowest backfilled block") return } - originE := slots.ToEpoch(primitives.Slot(status.OriginSlot)) - assigner := peers.NewAssigner(ctx, s.p2p.Peers(), params.BeaconConfig().MaxPeersToSync, originE) s.verifier, err = s.initVerifier(ctx) if err != nil { log.WithError(err).Fatal("Unable to initialize backfill verifier, quitting.") } - s.pool.Spawn(ctx, s.nWorkers, clock, assigner, s.verifier) + s.pool.Spawn(ctx, s.nWorkers, clock, s.pa, s.verifier) if err = s.initBatches(); err != nil { log.WithError(err).Fatal("Non-recoverable error in backfill service, quitting.") diff --git a/beacon-chain/sync/backfill/service_test.go b/beacon-chain/sync/backfill/service_test.go index d05b5c7e6843..a6ea61d39aa5 100644 --- a/beacon-chain/sync/backfill/service_test.go +++ b/beacon-chain/sync/backfill/service_test.go @@ -50,11 +50,11 @@ func TestServiceInit(t *testing.T) { require.NoError(t, cw.SetClock(startup.NewClock(time.Now(), [32]byte{}))) pool := &mockPool{todoChan: make(chan batch, nWorkers), finishedChan: make(chan batch, nWorkers)} p2pt := p2ptest.NewTestP2P(t) - srv, err := NewService(ctx, su, cw, p2pt, WithBatchSize(batchSize), WithWorkerCount(nWorkers)) + srv, err := NewService(ctx, su, cw, p2pt, &MockAssigner{}, WithBatchSize(batchSize), WithWorkerCount(nWorkers)) require.NoError(t, err) srv.ms = mockMinimumSlotter{min: primitives.Slot(high - batchSize*uint64(nBatches))} srv.pool = pool - srv.batchImporter = func(context.Context, batch, *StatusUpdater) (*dbval.BackfillStatus, error) { + srv.batchImporter = func(context.Context, batch, *Store) (*dbval.BackfillStatus, error) { return &dbval.BackfillStatus{}, nil } go srv.Start() diff --git a/beacon-chain/sync/backfill/status.go b/beacon-chain/sync/backfill/status.go index eeb42ed57300..b9426b984163 100644 --- a/beacon-chain/sync/backfill/status.go +++ b/beacon-chain/sync/backfill/status.go @@ -14,9 +14,11 @@ import ( "github.com/prysmaticlabs/prysm/v4/proto/dbval" ) +var errBatchDisconnected = errors.New("Highest block root in backfill batch doesn't match next parent_root") + // NewUpdater correctly initializes a StatusUpdater value with the required database value. -func NewUpdater(ctx context.Context, store BackfillDB) (*StatusUpdater, error) { - s := &StatusUpdater{ +func NewUpdater(ctx context.Context, store BackfillDB) (*Store, error) { + s := &Store{ store: store, } status, err := s.store.BackfillStatus(ctx) @@ -24,17 +26,18 @@ func NewUpdater(ctx context.Context, store BackfillDB) (*StatusUpdater, error) { if errors.Is(err, db.ErrNotFound) { return s, s.recoverLegacy(ctx) } + return nil, errors.Wrap(err, "db error while reading status of previous backfill") } s.swapStatus(status) return s, nil } -// StatusUpdater provides a way to update and query the status of a backfill process that may be necessary to track when +// Store provides a way to update and query the status of a backfill process that may be necessary to track when // a node was initialized via checkpoint sync. With checkpoint sync, there will be a gap in node history from genesis -// until the checkpoint sync origin block. StatusUpdater provides the means to update the value keeping track of the lower +// until the checkpoint sync origin block. Store provides the means to update the value keeping track of the lower // end of the missing block range via the FillFwd() method, to check whether a Slot is missing from the database // via the AvailableBlock() method, and to see the current StartGap() and EndGap(). -type StatusUpdater struct { +type Store struct { sync.RWMutex store BackfillDB genesisSync bool @@ -44,7 +47,7 @@ type StatusUpdater struct { // AvailableBlock determines if the given slot is covered by the current chain history. // If the slot is <= backfill low slot, or >= backfill high slot, the result is true. // If the slot is between the backfill low and high slots, the result is false. -func (s *StatusUpdater) AvailableBlock(sl primitives.Slot) bool { +func (s *Store) AvailableBlock(sl primitives.Slot) bool { s.RLock() defer s.RUnlock() // short circuit if the node was synced from genesis @@ -55,7 +58,7 @@ func (s *StatusUpdater) AvailableBlock(sl primitives.Slot) bool { } // Status is a threadsafe method to access a copy of the BackfillStatus value. -func (s *StatusUpdater) status() *dbval.BackfillStatus { +func (s *Store) status() *dbval.BackfillStatus { s.RLock() defer s.RUnlock() return &dbval.BackfillStatus{ @@ -70,31 +73,44 @@ func (s *StatusUpdater) status() *dbval.BackfillStatus { // fillBack saves the slice of blocks and updates the BackfillStatus LowSlot/Root/ParentRoot tracker to the values // from the first block in the slice. This method assumes that the block slice has been fully validated and // sorted in slot order by the calling function. -func (s *StatusUpdater) fillBack(ctx context.Context, blocks []blocks.ROBlock) (*dbval.BackfillStatus, error) { +func (s *Store) fillBack(ctx context.Context, blocks []blocks.ROBlock) (*dbval.BackfillStatus, error) { status := s.status() if len(blocks) == 0 { return status, nil } + highest := blocks[len(blocks)-1] + // The root of the highest block needs to match the parent root of the previous status. The backfill service will do + // the same check, but this is an extra defensive layer in front of the db index. + if highest.Root() != bytesutil.ToBytes32(status.LowParentRoot) { + return nil, errors.Wrapf(errBatchDisconnected, "prev parent_root=%#x, root=%#x, prev slot=%d, slot=%d", + status.LowParentRoot, highest.Root(), status.LowSlot, highest.Block().Slot()) + } + for _, b := range blocks { if err := s.store.SaveBlock(ctx, b); err != nil { return nil, errors.Wrapf(err, "error saving backfill block with root=%#x, slot=%d", b.Root(), b.Block().Slot()) } } + // Update finalized block index. + if err := s.store.BackfillFinalizedIndex(ctx, blocks, bytesutil.ToBytes32(status.LowRoot)); err != nil { + return nil, errors.Wrapf(err, "failed to update finalized index for batch, connecting root %#x to previously finalized block %#x", + highest.Root(), status.LowRoot) + } + // Update backfill status based on the block with the lowest slot in the batch. lowest := blocks[0] - r := lowest.Root() pr := lowest.Block().ParentRoot() status.LowSlot = uint64(lowest.Block().Slot()) - status.LowRoot = r[:] + status.LowRoot = lowest.RootSlice() status.LowParentRoot = pr[:] return status, s.saveStatus(ctx, status) } // recoverLegacy will check to see if the db is from a legacy checkpoint sync, and either build a new BackfillStatus // or label the node as synced from genesis. -func (s *StatusUpdater) recoverLegacy(ctx context.Context) error { +func (s *Store) recoverLegacy(ctx context.Context) error { cpr, err := s.store.OriginCheckpointBlockRoot(ctx) if errors.Is(err, db.ErrNotFoundOriginBlockRoot) { s.genesisSync = true @@ -120,7 +136,7 @@ func (s *StatusUpdater) recoverLegacy(ctx context.Context) error { return s.saveStatus(ctx, bs) } -func (s *StatusUpdater) saveStatus(ctx context.Context, bs *dbval.BackfillStatus) error { +func (s *Store) saveStatus(ctx context.Context, bs *dbval.BackfillStatus) error { if err := s.store.SaveBackfillStatus(ctx, bs); err != nil { return err } @@ -129,7 +145,7 @@ func (s *StatusUpdater) saveStatus(ctx context.Context, bs *dbval.BackfillStatus return nil } -func (s *StatusUpdater) swapStatus(bs *dbval.BackfillStatus) { +func (s *Store) swapStatus(bs *dbval.BackfillStatus) { s.Lock() defer s.Unlock() s.bs = bs @@ -138,7 +154,7 @@ func (s *StatusUpdater) swapStatus(bs *dbval.BackfillStatus) { // originState looks up the state for the checkpoint sync origin. This is a hack, because StatusUpdater is the only // thing that needs db access and it has the origin root handy, so it's convenient to look it up here. The state is // needed by the verifier. -func (s *StatusUpdater) originState(ctx context.Context) (state.BeaconState, error) { +func (s *Store) originState(ctx context.Context) (state.BeaconState, error) { return s.store.StateOrError(ctx, bytesutil.ToBytes32(s.status().OriginRoot)) } @@ -146,6 +162,7 @@ func (s *StatusUpdater) originState(ctx context.Context) (state.BeaconState, err type BackfillDB interface { SaveBackfillStatus(context.Context, *dbval.BackfillStatus) error BackfillStatus(context.Context) (*dbval.BackfillStatus, error) + BackfillFinalizedIndex(ctx context.Context, blocks []blocks.ROBlock, finalizedChildRoot [32]byte) error OriginCheckpointBlockRoot(context.Context) ([32]byte, error) Block(context.Context, [32]byte) (interfaces.ReadOnlySignedBeaconBlock, error) SaveBlock(ctx context.Context, signed interfaces.ReadOnlySignedBeaconBlock) error diff --git a/beacon-chain/sync/backfill/status_test.go b/beacon-chain/sync/backfill/status_test.go index 9775ea14272b..66b94a1656ed 100644 --- a/beacon-chain/sync/backfill/status_test.go +++ b/beacon-chain/sync/backfill/status_test.go @@ -95,34 +95,38 @@ func (d *mockBackfillDB) SaveBlock(ctx context.Context, signed interfaces.ReadOn return nil } +func (d *mockBackfillDB) BackfillFinalizedIndex(ctx context.Context, blocks []blocks.ROBlock, finalizedChildRoot [32]byte) error { + return nil +} + func TestSlotCovered(t *testing.T) { cases := []struct { name string slot primitives.Slot - status *StatusUpdater + status *Store result bool }{ { name: "genesis true", - status: &StatusUpdater{bs: &dbval.BackfillStatus{LowSlot: 10}}, + status: &Store{bs: &dbval.BackfillStatus{LowSlot: 10}}, slot: 0, result: true, }, { name: "above end true", - status: &StatusUpdater{bs: &dbval.BackfillStatus{LowSlot: 1}}, + status: &Store{bs: &dbval.BackfillStatus{LowSlot: 1}}, slot: 2, result: true, }, { name: "equal end true", - status: &StatusUpdater{bs: &dbval.BackfillStatus{LowSlot: 1}}, + status: &Store{bs: &dbval.BackfillStatus{LowSlot: 1}}, slot: 1, result: true, }, { name: "genesisSync always true", - status: &StatusUpdater{genesisSync: true}, + status: &Store{genesisSync: true}, slot: 100, result: true, }, @@ -138,7 +142,7 @@ func TestSlotCovered(t *testing.T) { func TestStatusUpdater_FillBack(t *testing.T) { ctx := context.Background() mdb := &mockBackfillDB{} - s := &StatusUpdater{bs: &dbval.BackfillStatus{LowSlot: 100}, store: mdb} + s := &Store{bs: &dbval.BackfillStatus{LowSlot: 100}, store: mdb} b, err := setupTestBlock(90) require.NoError(t, err) rob, err := blocks.NewROBlock(b) @@ -183,7 +187,7 @@ func TestReload(t *testing.T) { name string db BackfillDB err error - expected *StatusUpdater + expected *Store }{ /*{ name: "origin not found, implying genesis sync ", @@ -346,7 +350,7 @@ func TestReload(t *testing.T) { backfillStatus: func(context.Context) (*dbval.BackfillStatus, error) { return nil, db.ErrNotFound }, }, err: derp, - expected: &StatusUpdater{genesisSync: false, bs: &dbval.BackfillStatus{LowSlot: uint64(originSlot)}}, + expected: &Store{genesisSync: false, bs: &dbval.BackfillStatus{LowSlot: uint64(originSlot)}}, }, } diff --git a/beacon-chain/sync/backfill/worker.go b/beacon-chain/sync/backfill/worker.go index 1ea23181db93..bc6e0b31a11b 100644 --- a/beacon-chain/sync/backfill/worker.go +++ b/beacon-chain/sync/backfill/worker.go @@ -43,10 +43,19 @@ func (w *p2pWorker) handle(ctx context.Context, b batch) batch { return b.withRetryableError(err) } vb, err := w.v.verify(results) - backfillBatchTimeVerifying.Observe(float64(time.Now().Sub(dlt).Milliseconds())) + backfillBatchTimeVerifying.Observe(float64(time.Since(dlt).Milliseconds())) if err != nil { return b.withRetryableError(err) } + // This is a hack to get the rough size of the batch. This helps us approximate the amount of memory needed + // to hold batches and relative sizes between batches, but will be inaccurate when it comes to measuring actual + // bytes downloaded from peers, mainly because the p2p messages are snappy compressed. + bdl := 0 + for i := range vb { + bdl += vb[i].SizeSSZ() + } + backfillBatchApproximateBytes.Add(float64(bdl)) + log.WithField("dlbytes", bdl).Debug("backfill batch bytes downloaded") b.results = vb return b.withState(batchImportable) } diff --git a/cmd/beacon-chain/BUILD.bazel b/cmd/beacon-chain/BUILD.bazel index 63fdc2c675ba..11493aaa6666 100644 --- a/cmd/beacon-chain/BUILD.bazel +++ b/cmd/beacon-chain/BUILD.bazel @@ -24,6 +24,7 @@ go_library( "//cmd/beacon-chain/execution:go_default_library", "//cmd/beacon-chain/flags:go_default_library", "//cmd/beacon-chain/jwt:go_default_library", + "//cmd/beacon-chain/sync/backfill:go_default_library", "//cmd/beacon-chain/sync/checkpoint:go_default_library", "//cmd/beacon-chain/sync/genesis:go_default_library", "//config/features:go_default_library", diff --git a/cmd/beacon-chain/main.go b/cmd/beacon-chain/main.go index df7d8db20a9a..09730e0f38e4 100644 --- a/cmd/beacon-chain/main.go +++ b/cmd/beacon-chain/main.go @@ -18,6 +18,7 @@ import ( "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/execution" "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags" jwtcommands "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/jwt" + "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/backfill" "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/checkpoint" "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/genesis" "github.com/prysmaticlabs/prysm/v4/config/features" @@ -278,6 +279,7 @@ func startNode(ctx *cli.Context) error { optFuncs := []func(*cli.Context) (node.Option, error){ genesis.BeaconNodeOptions, checkpoint.BeaconNodeOptions, + backfill.BeaconNodeOptions, } for _, of := range optFuncs { ofo, err := of(ctx) diff --git a/cmd/beacon-chain/sync/backfill/BUILD.bazel b/cmd/beacon-chain/sync/backfill/BUILD.bazel new file mode 100644 index 000000000000..8ca14bbf81f9 --- /dev/null +++ b/cmd/beacon-chain/sync/backfill/BUILD.bazel @@ -0,0 +1,13 @@ +load("@prysm//tools/go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["options.go"], + importpath = "github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/sync/backfill", + visibility = ["//visibility:public"], + deps = [ + "//beacon-chain/node:go_default_library", + "//beacon-chain/sync/backfill:go_default_library", + "@com_github_urfave_cli_v2//:go_default_library", + ], +) diff --git a/cmd/beacon-chain/sync/backfill/options.go b/cmd/beacon-chain/sync/backfill/options.go new file mode 100644 index 000000000000..4e6b28054aa7 --- /dev/null +++ b/cmd/beacon-chain/sync/backfill/options.go @@ -0,0 +1,52 @@ +package backfill + +import ( + "github.com/prysmaticlabs/prysm/v4/beacon-chain/node" + "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/backfill" + "github.com/urfave/cli/v2" +) + +var ( + backfillBatchSizeName = "backfill-batch-size" + backfillWorkerCountName = "backfill-worker-count" + // EnableExperimentalBackfill enables backfill for checkpoint synced nodes. + // This flag will be removed onced backfill is enabled by default. + EnableExperimentalBackfill = &cli.BoolFlag{ + Name: "enable-experimental-backfill", + Usage: "Backfill is still experimental at this time." + + "It will only be enabled if this flag is specified and the node was started using checkpoint sync.", + } + // BackfillBatchSize allows users to tune block backfill request sizes to maximize network utilization + // at the cost of higher memory. + BackfillBatchSize = &cli.Uint64Flag{ + Name: backfillBatchSizeName, + Usage: "Number of blocks per backfill batch. " + + "A larger number will request more blocks at once from peers, but also consume more system memory to " + + "hold batches in memory during processing. This has a multiplicative effect with " + backfillWorkerCountName, + Value: 64, + } + // BackfillWorkerCount allows users to tune the number of concurrent backfill batches to download, to maximize + // network utilization at the cost of higher memory. + BackfillWorkerCount = &cli.IntFlag{ + Name: backfillWorkerCountName, + Usage: "Number of concurrent backfill batch requests." + + "A larger number will better utilize network resources, up to a system-dependent limit, but will also " + + "consume more system memory to hold batches in memory during processing. Multiply by backfill-batch-size and " + + "average block size (~2MB before deneb) to find the right number for your system. " + + "This has a multiplicatice effect with " + backfillBatchSizeName, + Value: 5, + } +) + +// BeaconNodeOptions sets the appropriate functional opts on the *node.BeaconNode value, to decouple options +// from flag parsing. +func BeaconNodeOptions(c *cli.Context) (node.Option, error) { + return func(node *node.BeaconNode) (err error) { + node.BackfillOpts = []backfill.ServiceOption{ + backfill.WithBatchSize(c.Uint64(BackfillBatchSize.Name)), + backfill.WithWorkerCount(c.Int(BackfillWorkerCount.Name)), + backfill.WithEnableBackfill(c.Bool(EnableExperimentalBackfill.Name)), + } + return nil + }, nil +} diff --git a/consensus-types/blocks/roblock.go b/consensus-types/blocks/roblock.go index 11f5292761e4..4ea6c7c45e24 100644 --- a/consensus-types/blocks/roblock.go +++ b/consensus-types/blocks/roblock.go @@ -21,6 +21,12 @@ func (b ROBlock) Root() [32]byte { return b.root } +// RootSlice returns a slice of the value returned by Root(). This is convenient because slicing the result of a func +// is not allowed, so only offering a fixed-length array version results in boilerplate code to +func (b ROBlock) RootSlice() []byte { + return append(make([]byte, 0, 32), b.root[:]...) +} + // NewROBlockWithRoot creates an ROBlock embedding the given block with its root. It accepts the root as parameter rather than // computing it internally, because in some cases a block is retrieved by its root and recomputing it is a waste. func NewROBlockWithRoot(b interfaces.ReadOnlySignedBeaconBlock, root [32]byte) (ROBlock, error) { @@ -43,6 +49,20 @@ func NewROBlock(b interfaces.ReadOnlySignedBeaconBlock) (ROBlock, error) { return ROBlock{ReadOnlySignedBeaconBlock: b, root: root}, nil } +// NewROBlockSlice is a helper method for converting a slice of the ReadOnlySignedBeaconBlock interface +// to a slice of ROBlock. +func NewROBlockSlice(blks []interfaces.ReadOnlySignedBeaconBlock) ([]ROBlock, error) { + robs := make([]ROBlock, len(blks)) + var err error + for i := range blks { + robs[i], err = NewROBlock(blks[i]) + if err != nil { + return nil, err + } + } + return robs, nil +} + // ROBlockSlice implements sort.Interface so that slices of ROBlocks can be easily sorted. // A slice of ROBlock is sorted first by slot, with ties broken by cached block roots. type ROBlockSlice []ROBlock