Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block backfilling #12968

Merged
merged 14 commits into from
Jan 23, 2024
Merged

Block backfilling #12968

merged 14 commits into from
Jan 23, 2024

Conversation

kasey
Copy link
Contributor

@kasey kasey commented Sep 28, 2023

What type of PR is this?

Feature

What does this PR do? Why is it needed?

This PR implements backfilling for blocks. When a node starts from checkpoint sync, a database record is created to track the slot and root of the origin root. If MIN_EPOCHS_FOR_BLOCK_REQUESTS is an earlier epoch than the lowest slot recorded in this db entry, the backfill service will initialize and start back-filling blocks until we have coverage over the MIN_EPOCHS_FOR_BLOCK_REQUESTS span.

Note that this PR does not include deneb blob syncing, which will be implemented in a follow-up PR.

Other notes for review

This is a draft PR. I will update this section with notes on issues that need to be addressed before this can be merged.

Some commentary on the design to help with review:

  • backfill.Store exposes db methods needed to manage the backfill. A new db method has been added to update the finalized index based on imported blocks. I added the proto/dbval package for values like BackfillStatus that represent some structured data we want to store in the db. I'm ambivalent about adding another sort of protobuf dependency, maybe json would be better?
  • verifier exploits the fact that all blocks to be backfilled were signed by validators that are present in the checkpoint sync origin state. At startup the public keys for the validator registry are copied from the origin state.
  • batch is the unit of work for backfill. It encapsulates everything a worker needs to process it, and state information etc that other components can use to act on the results (retry, import blocks etc).
  • peers.Assigner is an attempt to extricate the initial-sync peer selection logic to be used by other components. It is used by the backfill pool to pick the next best peer when routing a batch to be worked on.
  • pool manages peer selection for batches and plumbing batches between the Service and the workers. It handles issues like persistently trying to find peers when one can't immediately be found, ensuring that the same peer doesn't receive multiple requests concurrently, and handling the special case where all batches have completed, shutting down the worker pool and telling the Service that everything is done.
  • worker is the simplest component. It represents a worker loop that runs in a separate goroutine, reading from a shared fan-out channel until it receives a batch, then does the work described by that batch.
  • backfill.Service reading the Start method of Service is probably the best way to get an overview of how the pieces come together. The run loop there orchestrates the overall backfill process. It uses batcherSequencer.sequence to populate the worker pool's todo queue, receives completed batches from the pool, updates batcher to notify it of state changes, requests importable batches from batcher, imports them, calls update with the results, rinse and repeat.

@kasey kasey force-pushed the backfill-blocks branch 2 times, most recently from fafbc6b to 94ff07f Compare September 28, 2023 18:05
if err != nil {
if errors.Is(err, db.ErrNotFound) {
return s, s.recoverLegacy(ctx)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: handle errors other than db.ErrNotFound

s.batchSeq = newBatchSequencer(s.nWorkers, s.ms.minimumSlot(), primitives.Slot(status.LowSlot), primitives.Slot(s.batchSize))
// Exit early if there aren't going to be any batches to backfill.
if primitives.Slot(status.LowSlot) < s.ms.minimumSlot() {
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be nice to log something here to indicate nothing needs to be done. Pretty relevant condition imo

}

func (s *Service) initVerifier(ctx context.Context) (*verifier, error) {
cps, err := s.su.originState(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could there be a scenario where the to-be-validated blocks are newer than the origin state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I don't think so. The origin state is the state used to initiate the checkpoint sync, so it is by definition the state we are backfilling from, and it must be finalized, and the descendant of all the blocks that we backfill.

A future case to consider, which we don't currently support, is triggering a new checkpoint sync on top of an existing db - see #13003

Once we do enable that feature, we should always write the updated origin to the db and use that state for the validator registry. Note that we may need to track additional data in the status db record to support this, because currently we only track the low/high end of one range of blocks to backfill, and stategen uses the low end in this db record to figure out if blocks are available. if we allow multiple checkpoint syncs against the same db, we need to track the target of the backfill separately from the lowest available slot, and we need to account for "holes" in the range.

beacon-chain/p2p/assigner.go Outdated Show resolved Hide resolved
@@ -145,6 +145,22 @@ func (b *BeaconState) PubkeyAtIndex(idx primitives.ValidatorIndex) [fieldparams.
return bytesutil.ToBytes48(b.validators[idx].PublicKey)
}

// PublicKeys builds a list of all validator public keys, with each key's index aligned to its validator index.
func (b *BeaconState) PublicKeys() [][fieldparams.BLSPubkeyLength]byte {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we have this cached? this is a large slice, it's probably better to grab the cache first, updating it here on calls to this functions with the new pubkeys from the state, and return the cache.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For backfill service, this is only called once at start-up, validator public keys re cached in verify.go's verifier's keys. With that said, I think your suggestion is still valid

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only run this once at startup, for the checkpoint state we're backfilling from. I don't know if it will be in cache at that point.

b.scheduled = time.Now()
switch b.state {
case batchErrRetryable:
b.retries += 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to log here?

seq []batch
}

var errCannotDecreaseMinimum = errors.New("The minimum backfill slot can only be increased, not decreased")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably up with the other errors.

if b.end != r.end {
return false
}
return b.seq >= r.seq
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we worry about misuse where we replace a batch that has batchImportComplete with something that doesn't?

Specifically if we send this on update below coud cause trouble.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can add a check that always returns false if r has state batchImportComplete.


func (b batch) withRetryableError(err error) batch {
b.err = err
return b.withState(batchErrRetryable)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: we might want to handle the error differently down he line. For example, a validation error is different than a p2p error. But I guess it's all the same now, if there's an error we just retry it

Comment on lines 90 to 92
if err := s.store.SaveBlock(ctx, b); err != nil {
return nil, errors.Wrapf(err, "error saving backfill block with root=%#x, slot=%d", b.Root(), b.Block().Slot())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. I found that StatusUpdater not only updates statuses but also handles DB block saving and finalized root updates. This seems to diverge from the principle of single responsibility based on the name. Maybe renaming a few things fix this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed the type to Store to reflect the fact that it exposes a broader swath of the db interface. What do you think?

@CLAassistant
Copy link

CLAassistant commented Oct 4, 2023

CLA assistant check
All committers have signed the CLA.

if len(peers) < required {
log.WithFields(logrus.Fields{
"suitable": len(peers),
"required": required}).Info("Unable to assign peer while suitable peers < required ")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This to me smells more like a WARN than INFO

// the number of outbound requests to each peer from a given component.
func (a *Assigner) Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error) {
best, err := a.freshPeers()
ps := make([]peer.ID, 0, n)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this line be after the if err != nil {?

// the number of outbound requests to each peer from a given component.
func (a *Assigner) Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error) {
best, err := a.freshPeers()
ps := make([]peer.ID, 0, n)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to handle the case if n=0 for the input?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm should that be an empty list or an error? Maybe an error since that would be a strange bug on the caller side.

@kasey kasey marked this pull request as ready for review October 10, 2023 18:00
@kasey kasey requested a review from a team as a code owner October 10, 2023 18:00
@kasey kasey mentioned this pull request Oct 10, 2023
// Assumes invariant that batches complete and update is called in order.
// This should be true because the code using the sequencer doesn't know the expected parent
// for a batch until it imports the previous batch.
if c.seq[i].state == batchImportComplete {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if b.state already has batchImportComplete. Should we even allow that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean should we allow c.seq[i] = b if c.seq[i].state == batchImportComplete? In that instance replaces will return false, so we won't overwrite it.

continue
}
// Move the unfinished batches to overwrite the finished ones.
c.seq[i-done] = c.seq[i]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something smells here. If i=0 and is batchImportComplete, wouldn't we get c.seq[0-1] and underflow?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a continue in the batchImportComplete conditional where done is incremented, so i can't ever be less than done.

@@ -141,7 +141,7 @@ func ValidateBLSToExecutionChange(st state.ReadOnlyBeaconState, signed *ethpb.Si
// next_validator_index = ValidatorIndex((expected_withdrawals[-1].validator_index + 1) % len(state.validators))
// state.next_withdrawal_validator_index = next_validator_index
// else:
// # Advance sweep by the max length of the sweep if there was not a full set of withdrawals
// # FillFwd sweep by the max length of the sweep if there was not a full set of withdrawals
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bad refactor

@@ -202,3 +202,14 @@ func ParseWeakSubjectivityInputString(wsCheckpointString string) (*v1alpha1.Chec
Root: bRoot,
}, nil
}

// MinEpochsForBlockRequests computes the number of epochs of block history that we need to maintain,
// relative to the current epoch, per the p2p specs. This is used to compute the slot where backfill is complete.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

backfill is complete

Does this mean I cannot backfill any further? I was imagining one could backfill all the way back to genesis, thus having the best of both worlds: an archival node that follows the head as soon as possible.

Copy link
Contributor Author

@kasey kasey Oct 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in the PR description, we'll do a follow up feature to add a flag like --backfill-to-epoch that will allow the user to specify an earlier backfill target: --backfill-to-epoch=0 to go all the way to genesis. Actually this reminds me that I need to file a separate issue for this, currently we only have #13003 which is related but doesn't cover it. Just added this one #13031

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this is the minimum by the spec, shouldn't we have this value by default ?

backfill-to-epoch=0

Reasoning is that on node default's all of the beacon history is always maintained. If a user does not care about it, they can then just give a recent epoch

Copy link
Contributor Author

@kasey kasey Nov 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My stance is that the main purpose of beacon nodes is to participate in the consensus protocol. Following from the principals of finalization and weak subjectivity, most nodes are really only interested in blocks forward from the beginning of the weak subjectivity period. Older history is not useful for participating in consensus, it is only interesting for archival purposes. So our default behavior should not be to download older history that is not useful to most of the network.

I am open to having my mind changed on this issue :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than participating in consensus, having the ability to download and serve historical blocks should be an expected duty of a normal node. There is no alternate protocol to serve historical blocks from the consensus layer network. If all nodes simply stop saving historical blocks, you would end up with very few peers who would have full chain histories. It is the same reason execution layer nodes also save all historical blocks even when snap syncing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @kasey here and would leave this default. Blocks are kept in the EL and that allows state recovery. We do not need the CL blocks more than for accounting purposes on archive nodes, and it's their business to have these data, instead of the default user providing them.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strongly disagree, its the same reason execution clients haven not relinquished support for serving/persisting historical blocks. Even though those blocks are purely used to access historical state which is of no use now, users are still able to do so. We can continue the conversation offline

// BackfillStatus retrieves the most recently saved version of the BackfillStatus protobuf struct.
// This is used to persist information about backfill status across restarts.
func (s *Store) BackfillStatus(ctx context.Context) (*dbval.BackfillStatus, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveBackfillStatus")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveBackfillStatus")
ctx, span := trace.StartSpan(ctx, "BeaconDB.BackfillStatus")

Comment on lines 26 to 28
var errIncorrectBlockParent = errors.New("Unexpected missing or forked blocks in a []ROBlock")
var errFinalizedChildNotFound = errors.New("Unable to find finalized root descending from backfill batch")
var errNotConnectedToFinalized = errors.New("Unable to finalize backfill blocks, finalized parent_root does not match")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error should be lowercase

}
}

return s.db.Update(func(tx *bolt.Tx) error {
Copy link
Contributor

@rkapka rkapka Oct 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can split this code into two parts and perform everything up to and including if !bytes.Equal(fcc.ParentRoot, blocks[len(blocks)-1].RootSlice()) at the beginning of the function. This will allow an early exit without having to encode input blocks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way things are currently written, we are fairly confident the tail root will be present, because the backfill service code checks that batches are all linked to the finalized origin. This is a defensive check to make sure that we don't break the correctness of the finalized index.

I'm not sure if you split this by using an additional db tx to check the tail, or if you were thinking that we would encode the checkpoints inside the func? In general my thinking is that the database is a more contentious resource than ssz encoding and compression - these are very small values, encoding them basically amounts to copying 2 32 byte slices. So IMO it's better to minimize the number of db transactions (hence check and set in one), and to minimize the amount of time spent inside the db tx (hence doing any work possible out of the db tx).

}

if err = s.SaveBackfillStatus(ctx, bf); err != nil {
return errors.Wrap(err, "unable to save backfill status data to db for checkpoint sync.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return errors.Wrap(err, "unable to save backfill status data to db for checkpoint sync.")
return errors.Wrap(err, "unable to save backfill status data to db for checkpoint sync")


option go_package = "github.com/prysmaticlabs/prysm/v4/proto/dbval;dbval";

message BackfillStatus {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comments explaining the meaning of these fields? I would expect both origin and low to hold low-end values, but this can't be true.


// NewAssigner assists in the correct construction of an Assigner by code in other packages,
// assuring all the important private member fields are given values.
// The finalized parameter is used to specify a minimum finalized epoch that the peers must agree on.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// The finalized parameter is used to specify a minimum finalized epoch that the peers must agree on.
// The finalized checkpointer parameter is used to specify a minimum finalized epoch that the peers must agree on.

log "github.com/sirupsen/logrus"
)

// ErrChainBroken is usedbatch with no results, skipping importer when a backfill batch can't be imported to the db because it is not known to be the ancestor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// ErrChainBroken is usedbatch with no results, skipping importer when a backfill batch can't be imported to the db because it is not known to be the ancestor
// ErrChainBroken is used for batch with no results, skipping importer when a backfill batch can't be imported to the db because it is not known to be the ancestor


var errMaxBatches = errors.New("backfill batch requested in excess of max outstanding batches")
var errEndSequence = errors.New("sequence has terminated, no more backfill batches will be produced")
var errCannotDecreaseMinimum = errors.New("The minimum backfill slot can only be increased, not decreased")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var errCannotDecreaseMinimum = errors.New("The minimum backfill slot can only be increased, not decreased")
var errCannotDecreaseMinimum = errors.New("the minimum backfill slot can only be increased, not decreased")

batchesWaiting = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "backfill_importable_batches_waiting",
Help: "Number of batches that are ready to be imported once the they can be connected to the existing chain.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra the

Suggested change
Help: "Number of batches that are ready to be imported once the they can be connected to the existing chain.",
Help: "Number of batches that are ready to be imported once they can be connected to the existing chain.",

// low_slot is the slot of the last block that backfill will attempt to download and import.
// This is determined by MIN_EPOCHS_FOR_BLOCK_REQUESTS, or by a user-specified override.
uint64 low_slot = 1;
// low_slot is the root of the last block that backfill will attempt to download and import.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// low_slot is the root of the last block that backfill will attempt to download and import.
// low_root is the root of the last block that backfill will attempt to download and import.

// BackfillFinalizedIndex updates the finalized index for a contiguous chain of blocks that are the ancestors of the
// given finalized child root. This is needed to update the finalized index during backfill, because the usual
// updateFinalizedBlockRoots has assumptions that are incompatible with backfill processing.
func (s *Store) BackfillFinalizedIndex(ctx context.Context, blocks []blocks.ROBlock, finalizedChildRoot [32]byte) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the meaning of the word Index here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Index is being used as a noun here. The "finalized index" is comprised of values in the db bucket finalizedBlockRootsIndexBucket. Each value is keyed by the root of a finalized block and contains the roots of the parent and child of the given root. The finalized index is prysm's view on which block roots are canonical <= the most recently finalized root (which is the only one stored in fork choice).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually we update this index from the blockchain package when it is determined that we have determined a new finalized checkpoint. But in the case of backfill, we are adding blocks that bypass all those mechanisms, so we want to update the index to be aware of the earlier blocks.

type VerifiedROBlocks []blocks.ROBlock

type verifier struct {
// chkptVals is the set of validators from the state used to initialize the node via checkpoint sync.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no chkptVals field

forkDomains map[[4]byte][]byte
}

func (bs verifier) verify(blks []interfaces.ReadOnlySignedBeaconBlock) (VerifiedROBlocks, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Receiver name should probably be v

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! This had some other bad name before where bs made sense. Fixed the bad name, kept the bad receiver.

"github.com/prysmaticlabs/prysm/v4/time/slots"
)

var errInvalidBatchChain = errors.New("parent_root of block does not match root of previous")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var errInvalidBatchChain = errors.New("parent_root of block does not match root of previous")
var errInvalidBatchChain = errors.New("parent_root of block does not match root of previous block")

if i > 0 && result[i-1].Root() != result[i].Block().ParentRoot() {
p, b := result[i-1], result[i]
return nil, errors.Wrapf(errInvalidBatchChain,
"slot %d parent_root=%#x, slot %d root = %#x",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"slot %d parent_root=%#x, slot %d root = %#x",
"slot %d parent_root=%#x, slot %d root=%#x",

// chkptVals is the set of validators from the state used to initialize the node via checkpoint sync.
keys [][fieldparams.BLSPubkeyLength]byte
maxVal primitives.ValidatorIndex
vr []byte
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make this variable name a bit more clear? I suggest valRoot

maxVal primitives.ValidatorIndex
vr []byte
fsched forks.OrderedSchedule
dt [bls.DomainByteLength]byte
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was hard for me to figure out what dt means. Can you rename it to domain?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Short for DomainType. domain would be a confusing name, because the domain is derived jointly from the DomainType, Fork Version, and Genesis Validator Root. Maybe dtype?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general I think that the meaning of a value can be made clear by context. This value is initialized like dt: params.BeaconConfig().DomainBeaconProposer (where the value assigned is a DomainType) and passed to ComputeDomain (where it's known to be the DomainType).

// VerifiedROBlocks represents a slice of blocks that have passed signature verification.
type VerifiedROBlocks []blocks.ROBlock

type verifier struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The verifier code has 0 tests

Copy link
Contributor

@rkapka rkapka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm ambivalent about adding another sort of protobuf dependency, maybe json would be better?

My opinion is that we should avoid protos unless we need SSZ code for the new type.

// - They are in state batchImportable, which means their data has been downloaded and proposer signatures have been verified.
// - There are no batches that are not in state batchImportable between them and the start of the slice. This ensures that they
// can be connected to the canonical chain, either because the root of the last block in the batch matches the parent_root of
// the oldest block in the canonical chain, or because the root of the last block in the batch mathces the parent_root of the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// the oldest block in the canonical chain, or because the root of the last block in the batch mathces the parent_root of the
// the oldest block in the canonical chain, or because the root of the last block in the batch matches the parent_root of the

func (p *p2pBatchWorkerPool) todo(b batch) {
// Intercept batchEndSequence batches so workers can remain unaware of this state.
// Workers don't know what to do with batchEndSequence batches. They are a signal to the pool that the batcher
// has stopped producing things for the workers to do and the pool is close to winding down. See Complete()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// has stopped producing things for the workers to do and the pool is close to winding down. See Complete()
// has stopped producing things for the workers to do and the pool is close to winding down. See complete()

case err := <-p.shutdownErr:
return batch{}, errors.Wrap(err, "fatal error from backfill worker pool")
case <-p.ctx.Done():
return batch{}, p.ctx.Err()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding the same log as in batchRouter might be useful

// Start begins the runloop of backfill.Service in the current goroutine.
func (s *Service) Start() {
if !s.enabled {
log.Info("exiting backfill service; not enabled")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.Info("exiting backfill service; not enabled")
log.Info("Exiting backfill service; not enabled")

}()
clock, err := s.cw.WaitForClock(ctx)
if err != nil {
log.WithError(err).Fatal("backfill service failed to start while waiting for genesis data")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.WithError(err).Fatal("backfill service failed to start while waiting for genesis data")
log.WithError(err).Fatal("Backfill service failed to start while waiting for genesis data")


type Service struct {
ctx context.Context
su *Store
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this called su? Not sure where the u comes from. Can you change it to store?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was called StatusUpdater before and then renamed. I will fix!

status := s.su.status()
s.batchSeq = newBatchSequencer(s.nWorkers, s.ms.minimumSlot(), primitives.Slot(status.LowSlot), primitives.Slot(s.batchSize))
// Exit early if there aren't going to be any batches to backfill.
if primitives.Slot(status.LowSlot) < s.ms.minimumSlot() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Can this conditional be moved up to avoid initializing things that are not needed anyway, e.g. s.batchSeq?
  • Shouldn't this be a <=?

Comment on lines 149 to 150
log.WithError(err).Fatal("Non-recoverable error in backfill service, quitting.")
return false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fatal will kill the app, so return false will not get triggered

for i := range importable {
ib := importable[i]
if len(ib.results) == 0 {
log.WithFields(ib.logFields()).Error("batch with no results, skipping importer")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.WithFields(ib.logFields()).Error("batch with no results, skipping importer")
log.WithFields(ib.logFields()).Error("Batch with no results, skipping importer")

// network utilization at the cost of higher memory.
BackfillWorkerCount = &cli.IntFlag{
Name: backfillWorkerCountName,
Usage: "Number of concurrent backfill batch requests." +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Usage: "Number of concurrent backfill batch requests." +
Usage: "Number of concurrent backfill batch requests. " +


// AvailableBlocker can be used to check whether there is a finalized block in the db for the given slot.
// This interface is typically fulfilled by backfill.Store.
type AvailableBlocker interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this interface defined in a file called coverage.go?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My rationale was that coverage describes which slice of the overall block history is covered. "Is this block available?" is analogous to "is this range in block history covered?".

// for the configured finalized epoch. At most `n` peers will be returned. The `busy` param can be used
// to filter out peers that we know we don't want to connect to, for instance if we are trying to limit
// the number of outbound requests to each peer from a given component.
func (a *Assigner) Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some unit tests would be nice. The fact that this function calls freshPeers might make it hard though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

refactored to be easier to test and added unit tests here: e055269

s.genesisSync = true
return nil
for _, b := range blocks {
if err := s.store.SaveBlock(ctx, b); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@terencechain pointed out that the db/kv block methods use an LRU under the hood. Note to self to look into SaveBlock and determine whether backfilling will invalidate this cache.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been addressed @terencechain please take a look.

@kasey kasey force-pushed the backfill-blocks branch 3 times, most recently from 8b8d486 to 9eb0f87 Compare January 17, 2024 20:27
@@ -138,6 +139,9 @@ var appFlags = []cli.Flag{
flags.JwtId,
storage.BlobStoragePathFlag,
storage.BlobRetentionEpochFlag,
backfill.EnableExperimentalBackfill,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add this as a feature flag instead ? since we eventually will deprecate it once the feature is the default.

potuz
potuz previously approved these changes Jan 18, 2024
@@ -202,3 +202,14 @@ func ParseWeakSubjectivityInputString(wsCheckpointString string) (*v1alpha1.Chec
Root: bRoot,
}, nil
}

// MinEpochsForBlockRequests computes the number of epochs of block history that we need to maintain,
// relative to the current epoch, per the p2p specs. This is used to compute the slot where backfill is complete.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @kasey here and would leave this default. Blocks are kept in the EL and that allows state recovery. We do not need the CL blocks more than for accounting purposes on archive nodes, and it's their business to have these data, instead of the default user providing them.

@kasey kasey force-pushed the backfill-blocks branch 2 times, most recently from c18ffa4 to eb3cdf3 Compare January 19, 2024 17:56
@nisdas nisdas added this pull request to the merge queue Jan 23, 2024
Merged via the queue into develop with commit 1df173e Jan 23, 2024
17 checks passed
@nisdas nisdas deleted the backfill-blocks branch January 23, 2024 08:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

7 participants