Skip to content

Commit

Permalink
cleanup based on draft review
Browse files Browse the repository at this point in the history
  • Loading branch information
kasey committed Oct 6, 2023
1 parent d9b1f72 commit 04b242b
Show file tree
Hide file tree
Showing 25 changed files with 371 additions and 106 deletions.
1 change: 1 addition & 0 deletions beacon-chain/db/iface/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//beacon-chain/db/filters:go_default_library",
"//beacon-chain/slasher/types:go_default_library",
"//beacon-chain/state:go_default_library",
"//consensus-types/blocks:go_default_library",
"//consensus-types/interfaces:go_default_library",
"//consensus-types/primitives:go_default_library",
"//monitoring/backup:go_default_library",
Expand Down
4 changes: 3 additions & 1 deletion beacon-chain/db/iface/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filters"
slashertypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/slasher/types"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v4/monitoring/backup"
Expand Down Expand Up @@ -115,9 +116,10 @@ type HeadAccessDatabase interface {
SaveGenesisData(ctx context.Context, state state.BeaconState) error
EnsureEmbeddedGenesis(ctx context.Context) error

// initialization method needed for origin checkpoint sync
// Support for checkpoint sync and backfill.
SaveOrigin(ctx context.Context, serState, serBlock []byte) error
SaveBackfillStatus(context.Context, *dbval.BackfillStatus) error
BackfillFinalizedIndex(ctx context.Context, blocks []blocks.ROBlock, finalizedChildRoot [32]byte) error
}

// SlasherDatabase interface for persisting data related to detecting slashable offenses on Ethereum.
Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/db/kv/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,8 @@ var ErrNotFoundBackfillBlockRoot = errors.Wrap(ErrNotFound, "BackfillBlockRoot")

// ErrNotFoundFeeRecipient is a not found error specifically for the fee recipient getter
var ErrNotFoundFeeRecipient = errors.Wrap(ErrNotFound, "fee recipient")

var errEmptyBlockSlice = errors.New("[]blocks.ROBlock is empty")
var errIncorrectBlockParent = errors.New("Unexpected missing or forked blocks in a []ROBlock")
var errFinalizedChildNotFound = errors.New("Unable to find finalized root descending from backfill batch")
var errNotConnectedToFinalized = errors.New("Unable to finalize backfill blocks, finalized parent_root does not match")
78 changes: 78 additions & 0 deletions beacon-chain/db/kv/finalized_block_roots.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/db/filters"
"github.com/prysmaticlabs/prysm/v4/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v4/consensus-types/interfaces"
Expand Down Expand Up @@ -163,6 +164,83 @@ func (s *Store) updateFinalizedBlockRoots(ctx context.Context, tx *bolt.Tx, chec
return bkt.Put(previousFinalizedCheckpointKey, enc)
}

// BackfillFinalizedIndex updates the finalized index for a contiguous chain of blocks that are the ancestors of the
// given finalized child root. This is needed to update the finalized index during backfill, because the usual
// updateFinalizedBlockRoots has assumptions that are incompatible with backfill processing.
func (s *Store) BackfillFinalizedIndex(ctx context.Context, blocks []blocks.ROBlock, finalizedChildRoot [32]byte) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.BackfillFinalizedIndex")
defer span.End()
if len(blocks) == 0 {
return errEmptyBlockSlice
}

fbrs := make([]*ethpb.FinalizedBlockRootContainer, len(blocks))
encs := make([][]byte, len(blocks))
for i := range blocks {
pr := blocks[i].Block().ParentRoot()
fbrs[i] = &ethpb.FinalizedBlockRootContainer{
ParentRoot: pr[:],
// ChildRoot: will be filled in on the next iteration when we look at the descendent block.
}
if i == 0 {
continue
}
if blocks[i-1].Root() != blocks[i].Block().ParentRoot() {
return errors.Wrapf(errIncorrectBlockParent, "previous root=%#x, slot=%d; child parent_root=%#x, root=%#x, slot=%d",
blocks[i-1].Root(), blocks[i-1].Block().Slot(), blocks[i].Block().ParentRoot(), blocks[i].Root(), blocks[i].Block().Slot())
}

// We know the previous index is the parent of this one thanks to the assertion above,
// so we can set the ChildRoot of the previous value to the root of the current value.
fbrs[i-1].ChildRoot = blocks[i].RootSlice()
// Now that the value for fbrs[i-1] is complete, perform encoding here to minimize time in Update,
// which holds the global db lock.
penc, err := encode(ctx, fbrs[i-1])
if err != nil {
tracing.AnnotateError(span, err)
return err
}
encs[i-1] = penc

// The final element is the parent of finalizedChildRoot. This is checked inside the db transaction using
// the parent_root value stored in the index data for finalizedChildRoot.
if i == len(blocks)-1 {
fbrs[i].ChildRoot = finalizedChildRoot[:]
// Final element is complete, so it is pre-encoded like the others.
enc, err := encode(ctx, fbrs[i])
if err != nil {
tracing.AnnotateError(span, err)
return err
}
encs[i] = enc
}
}

return s.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(finalizedBlockRootsIndexBucket)
child := bkt.Get(finalizedChildRoot[:])
if len(child) == 0 {
return errFinalizedChildNotFound
}
fcc := &ethpb.FinalizedBlockRootContainer{}
if err := decode(ctx, child, fcc); err != nil {
return errors.Wrapf(err, "unable to decode finalized block root container for root=%#x", finalizedChildRoot)
}
// Ensure that the existing finalized chain descends from the new segment.
if !bytes.Equal(fcc.ParentRoot, blocks[len(blocks)-1].RootSlice()) {
return errors.Wrapf(errNotConnectedToFinalized, "finalized block root container for root=%#x has parent_root=%#x, not %#x",
finalizedChildRoot, fcc.ParentRoot, blocks[len(blocks)-1].RootSlice())
}
// Update the finalized index with entries for each block in the new segment.
for i := range fbrs {
if err := bkt.Put(blocks[i].RootSlice(), encs[i]); err != nil {
return err
}
}
return nil
})
}

// IsFinalizedBlock returns true if the block root is present in the finalized block root index.
// A beacon block root contained exists in this index if it is considered finalized and canonical.
// Note: beacon blocks from the latest finalized epoch return true, whether or not they are
Expand Down
63 changes: 63 additions & 0 deletions beacon-chain/db/kv/finalized_block_roots_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kv

import (
"bytes"
"context"
"testing"

Expand All @@ -14,6 +15,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/testing/assert"
"github.com/prysmaticlabs/prysm/v4/testing/require"
"github.com/prysmaticlabs/prysm/v4/testing/util"
bolt "go.etcd.io/bbolt"
)

var genesisBlockRoot = bytesutil.ToBytes32([]byte{'G', 'E', 'N', 'E', 'S', 'I', 'S'})
Expand Down Expand Up @@ -234,3 +236,64 @@ func makeBlocksAltair(t *testing.T, startIdx, num uint64, previousRoot [32]byte)
}
return ifaceBlocks
}

func TestStore_BackfillFinalizedIndex(t *testing.T) {
db := setupDB(t)
ctx := context.Background()
require.ErrorIs(t, db.BackfillFinalizedIndex(ctx, []consensusblocks.ROBlock{}, [32]byte{}), errEmptyBlockSlice)
blks, err := consensusblocks.NewROBlockSlice(makeBlocks(t, 0, 66, [32]byte{}))
require.NoError(t, err)

// set up existing finalized block
ebpr := blks[64].Block().ParentRoot()
ebr := blks[64].Root()
chldr := blks[65].Root()
ebf := &ethpb.FinalizedBlockRootContainer{
ParentRoot: ebpr[:],
ChildRoot: chldr[:],
}
disjoint := []consensusblocks.ROBlock{
blks[0],
blks[2],
}
enc, err := encode(ctx, ebf)
require.NoError(t, err)
err = db.db.Update(func(tx *bolt.Tx) error {
bkt := tx.Bucket(finalizedBlockRootsIndexBucket)
return bkt.Put(ebr[:], enc)
})

// reslice to remove the existing blocks
blks = blks[0:64]
// check the other error conditions with a descendent root that really doesn't exist
require.NoError(t, err)
require.ErrorIs(t, db.BackfillFinalizedIndex(ctx, disjoint, [32]byte{}), errIncorrectBlockParent)
require.NoError(t, err)
require.ErrorIs(t, errFinalizedChildNotFound, db.BackfillFinalizedIndex(ctx, blks, [32]byte{}))

// use the real root so that it succeeds
require.NoError(t, db.BackfillFinalizedIndex(ctx, blks, ebr))
for i := range blks {
require.NoError(t, db.db.View(func(tx *bolt.Tx) error {
bkt := tx.Bucket(finalizedBlockRootsIndexBucket)
encfr := bkt.Get(blks[i].RootSlice())
require.Equal(t, true, len(encfr) > 0)
fr := &ethpb.FinalizedBlockRootContainer{}
require.NoError(t, decode(ctx, encfr, fr))
require.Equal(t, 32, len(fr.ParentRoot))
require.Equal(t, 32, len(fr.ChildRoot))
pr := blks[i].Block().ParentRoot()
require.Equal(t, true, bytes.Equal(fr.ParentRoot, pr[:]))
if i > 0 {
require.Equal(t, true, bytes.Equal(fr.ParentRoot, blks[i-1].RootSlice()))
}
if i < len(blks)-1 {
require.DeepEqual(t, fr.ChildRoot, blks[i+1].RootSlice())
}
if i == len(blks)-1 {
require.DeepEqual(t, fr.ChildRoot, ebr[:])
}
return nil
}))
}
}
1 change: 1 addition & 0 deletions beacon-chain/node/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ go_library(
"//beacon-chain/state/stategen:go_default_library",
"//beacon-chain/sync:go_default_library",
"//beacon-chain/sync/backfill:go_default_library",
"//beacon-chain/sync/backfill/coverage:go_default_library",
"//beacon-chain/sync/checkpoint:go_default_library",
"//beacon-chain/sync/genesis:go_default_library",
"//beacon-chain/sync/initial-sync:go_default_library",
Expand Down
8 changes: 6 additions & 2 deletions beacon-chain/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/synccommittee"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/operations/voluntaryexits"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/p2p/peers"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/rpc/apimiddleware"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/slasher"
Expand All @@ -48,6 +49,7 @@ import (
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state/stategen"
regularsync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/backfill"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/backfill/coverage"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/checkpoint"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/genesis"
initialsync "github.com/prysmaticlabs/prysm/v4/beacon-chain/sync/initial-sync"
Expand Down Expand Up @@ -110,6 +112,7 @@ type BeaconNode struct {
CheckpointInitializer checkpoint.Initializer
forkChoicer forkchoice.ForkChoicer
clockWaiter startup.ClockWaiter
BackfillOpts []backfill.ServiceOption
initialSyncComplete chan struct{}
}

Expand Down Expand Up @@ -218,7 +221,8 @@ func New(cliCtx *cli.Context, opts ...Option) (*BeaconNode, error) {
if err != nil {
return nil, errors.Wrap(err, "backfill status initialization error")
}
bf, err := backfill.NewService(ctx, bfs, beacon.clockWaiter, beacon.fetchP2P())
pa := peers.NewAssigner(beacon.fetchP2P().Peers(), beacon.forkChoicer)
bf, err := backfill.NewService(ctx, bfs, beacon.clockWaiter, beacon.fetchP2P(), pa, beacon.BackfillOpts...)
if err != nil {
return nil, errors.Wrap(err, "error initializing backfill service")
}
Expand Down Expand Up @@ -518,7 +522,7 @@ func (b *BeaconNode) startSlasherDB(cliCtx *cli.Context) error {
return nil
}

func (b *BeaconNode) startStateGen(ctx context.Context, bfs *backfill.StatusUpdater, fc forkchoice.ForkChoicer) error {
func (b *BeaconNode) startStateGen(ctx context.Context, bfs coverage.AvailableBlocker, fc forkchoice.ForkChoicer) error {
opts := []stategen.StateGenOption{stategen.WithAvailableBlocker(bfs)}
sg := stategen.New(b.db, fc, opts...)

Expand Down
1 change: 0 additions & 1 deletion beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go_library(
name = "go_default_library",
srcs = [
"addr_factory.go",
"assigner.go",
"broadcaster.go",
"config.go",
"connection_gater.go",
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/p2p/assigner.go

This file was deleted.

39 changes: 20 additions & 19 deletions beacon-chain/p2p/peers/assigner.go
Original file line number Diff line number Diff line change
@@ -1,46 +1,47 @@
package peers

import (
"context"
"sync"
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
forkchoicetypes "github.com/prysmaticlabs/prysm/v4/beacon-chain/forkchoice/types"
"github.com/prysmaticlabs/prysm/v4/cmd/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/v4/config/params"
"github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"
"github.com/sirupsen/logrus"
)

// handshakePollingInterval is a polling interval for checking the number of received handshakes.
var handshakePollingInterval = 5 * time.Second
// FinalizedCheckpointer describes the minimum capability that Assigner needs from forkchoice.
// That is, the ability to retrieve the latest finalized checkpoint to help with peer evaluation.
type FinalizedCheckpointer interface {
FinalizedCheckpoint() *forkchoicetypes.Checkpoint
}

func NewAssigner(ctx context.Context, s *Status, max int, finalized primitives.Epoch) *Assigner {
// NewAssigner assists in the correct construction of an Assigner by code in other packages,
// assuring all the important private member fields are given values.
// The finalized parameter is used to specify a minimum finalized epoch that the peers must agree on.
func NewAssigner(s *Status, fc FinalizedCheckpointer) *Assigner {
return &Assigner{
ctx: ctx,
ps: s,
max: max,
finalized: finalized,
ps: s,
fc: fc,
}
}

// Assigner uses the "BestFinalized" peer scoring method to pick the next-best peer to receive rpc requests.
type Assigner struct {
sync.Mutex
ctx context.Context
ps *Status
max int
finalized primitives.Epoch
ps *Status
fc FinalizedCheckpointer
}

// ErrInsufficientSuitable is a sentinel error, signaling that a peer couldn't be assigned because there are currently
// not enough peers that match our selection criteria to serve rpc requests. It is the responsibility of the caller to
// look for this error and continue to try calling Assign with appropriate backoff logic.
var ErrInsufficientSuitable = errors.New("no suitable peers")

func (a *Assigner) freshPeers() ([]peer.ID, error) {
required := params.BeaconConfig().MaxPeersToSync
if flags.Get().MinimumSyncPeers < required {
required = flags.Get().MinimumSyncPeers
}
_, peers := a.ps.BestFinalized(params.BeaconConfig().MaxPeersToSync, a.finalized)
_, peers := a.ps.BestFinalized(params.BeaconConfig().MaxPeersToSync, a.fc.FinalizedCheckpoint().Epoch)
if len(peers) < required {
log.WithFields(logrus.Fields{
"suitable": len(peers),
Expand All @@ -50,7 +51,7 @@ func (a *Assigner) freshPeers() ([]peer.ID, error) {
return peers, nil
}

// Assign uses the BestFinalized method to select the best peers that agree on a canonical block
// Assign uses the "BestFinalized" method to select the best peers that agree on a canonical block
// for the configured finalized epoch. At most `n` peers will be returned. The `busy` param can be used
// to filter out peers that we know we don't want to connect to, for instance if we are trying to limit
// the number of outbound requests to each peer from a given component.
Expand Down
Loading

0 comments on commit 04b242b

Please sign in to comment.