Skip to content

Commit

Permalink
exported symbol cleanup - comment or make private
Browse files Browse the repository at this point in the history
  • Loading branch information
kasey committed Oct 10, 2023
1 parent 5f2c904 commit 86c815c
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 30 deletions.
5 changes: 5 additions & 0 deletions beacon-chain/db/kv/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"google.golang.org/protobuf/proto"
)

// SaveBackfillStatus encodes the given BackfillStatus protobuf struct and writes it to a single key in the db.
// This value is used by the backfill service to keep track of the range of blocks that need to be synced. It is also used by the
// code that serves blocks or regenerates states to keep track of what range of blocks are available.
func (s *Store) SaveBackfillStatus(ctx context.Context, bf *dbval.BackfillStatus) error {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveBackfillStatus")
defer span.End()
Expand All @@ -23,6 +26,8 @@ func (s *Store) SaveBackfillStatus(ctx context.Context, bf *dbval.BackfillStatus
})
}

// BackfillStatus retrieves the most recently saved version of the BackfillStatus protobuf struct.
// This is used to persist information about backfill status across restarts.
func (s *Store) BackfillStatus(ctx context.Context) (*dbval.BackfillStatus, error) {
ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveBackfillStatus")
defer span.End()
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/sync/backfill/coverage/coverage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package coverage

import "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives"

// AvailableBlocker can be used to check whether there is a finalized block in the db for the given slot.
// This interface is typically fulfilled by backfill.Store.
type AvailableBlocker interface {
AvailableBlock(primitives.Slot) bool
}
20 changes: 10 additions & 10 deletions beacon-chain/sync/backfill/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (
log "github.com/sirupsen/logrus"
)

type BatchWorkerPool interface {
Spawn(ctx context.Context, n int, clock *startup.Clock, a PeerAssigner, v *verifier)
Todo(b batch)
Complete() (batch, error)
type batchWorkerPool interface {
spawn(ctx context.Context, n int, clock *startup.Clock, a PeerAssigner, v *verifier)
todo(b batch)
complete() (batch, error)
}

type worker interface {
Expand All @@ -26,7 +26,7 @@ type worker interface {

type newWorker func(id workerId, in, out chan batch, c *startup.Clock, v *verifier) worker

func DefaultNewWorker(p p2p.P2P) newWorker {
func defaultNewWorker(p p2p.P2P) newWorker {
return func(id workerId, in, out chan batch, c *startup.Clock, v *verifier) worker {
return newP2pWorker(id, p, in, out, c, v)
}
Expand All @@ -45,10 +45,10 @@ type p2pBatchWorkerPool struct {
cancel func()
}

var _ BatchWorkerPool = &p2pBatchWorkerPool{}
var _ batchWorkerPool = &p2pBatchWorkerPool{}

func newP2PBatchWorkerPool(p p2p.P2P, maxBatches int) *p2pBatchWorkerPool {
nw := DefaultNewWorker(p)
nw := defaultNewWorker(p)
return &p2pBatchWorkerPool{
newWorker: nw,
toRouter: make(chan batch, maxBatches),
Expand All @@ -60,15 +60,15 @@ func newP2PBatchWorkerPool(p p2p.P2P, maxBatches int) *p2pBatchWorkerPool {
}
}

func (p *p2pBatchWorkerPool) Spawn(ctx context.Context, n int, c *startup.Clock, a PeerAssigner, v *verifier) {
func (p *p2pBatchWorkerPool) spawn(ctx context.Context, n int, c *startup.Clock, a PeerAssigner, v *verifier) {
p.ctx, p.cancel = context.WithCancel(ctx)
go p.batchRouter(a)
for i := 0; i < n; i++ {
go p.newWorker(workerId(i), p.toWorkers, p.fromWorkers, c, v).run(p.ctx)
}
}

func (p *p2pBatchWorkerPool) Todo(b batch) {
func (p *p2pBatchWorkerPool) todo(b batch) {
// Intercept batchEndSequence batches so workers can remain unaware of this state.
// Workers don't know what to do with batchEndSequence batches. They are a signal to the pool that the batcher
// has stopped producing things for the workers to do and the pool is close to winding down. See Complete()
Expand All @@ -81,7 +81,7 @@ func (p *p2pBatchWorkerPool) Todo(b batch) {
p.toRouter <- b
}

func (p *p2pBatchWorkerPool) Complete() (batch, error) {
func (p *p2pBatchWorkerPool) complete() (batch, error) {
if len(p.endSeq) == p.maxBatches {
return p.endSeq[0], errEndSequence
}
Expand Down
24 changes: 13 additions & 11 deletions beacon-chain/sync/backfill/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,38 +12,40 @@ import (
"github.com/prysmaticlabs/prysm/v4/testing/util"
)

type MockAssigner struct {
type mockAssigner struct {
err error
assign []peer.ID
}

func (m MockAssigner) Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error) {
// Assign satisfies the PeerAssigner interface so that mockAssigner can be used in tests
// in place of the concrete p2p implementation of PeerAssigner.
func (m mockAssigner) Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error) {
if m.err != nil {
return nil, m.err
}
return m.assign, nil
}

var _ PeerAssigner = &MockAssigner{}
var _ PeerAssigner = &mockAssigner{}

func TestPoolDetectAllEnded(t *testing.T) {
nw := 5
p2p := p2ptest.NewTestP2P(t)
ctx := context.Background()
ma := &MockAssigner{}
ma := &mockAssigner{}
pool := newP2PBatchWorkerPool(p2p, nw)
st, err := util.NewBeaconState()
require.NoError(t, err)
v, err := newBackfillVerifier(st)
require.NoError(t, err)
pool.Spawn(ctx, nw, startup.NewClock(time.Now(), [32]byte{}), ma, v)
pool.spawn(ctx, nw, startup.NewClock(time.Now(), [32]byte{}), ma, v)
br := batcher{min: 10, size: 10}
endSeq := br.before(0)
require.Equal(t, batchEndSequence, endSeq.state)
for i := 0; i < nw; i++ {
pool.Todo(endSeq)
pool.todo(endSeq)
}
b, err := pool.Complete()
b, err := pool.complete()
require.ErrorIs(t, err, errEndSequence)
require.Equal(t, b.end, endSeq.end)
}
Expand All @@ -55,14 +57,14 @@ type mockPool struct {
todoChan chan batch
}

func (m *mockPool) Spawn(_ context.Context, _ int, _ *startup.Clock, _ PeerAssigner, _ *verifier) {
func (m *mockPool) spawn(_ context.Context, _ int, _ *startup.Clock, _ PeerAssigner, _ *verifier) {
}

func (m *mockPool) Todo(b batch) {
func (m *mockPool) todo(b batch) {
m.todoChan <- b
}

func (m *mockPool) Complete() (batch, error) {
func (m *mockPool) complete() (batch, error) {
select {
case b := <-m.finishedChan:
return b, nil
Expand All @@ -71,4 +73,4 @@ func (m *mockPool) Complete() (batch, error) {
}
}

var _ BatchWorkerPool = &mockPool{}
var _ batchWorkerPool = &mockPool{}
19 changes: 11 additions & 8 deletions beacon-chain/sync/backfill/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Service struct {
nWorkers int
batchSeq *batchSequencer
batchSize uint64
pool BatchWorkerPool
pool batchWorkerPool
verifier *verifier
p2p p2p.P2P
pa PeerAssigner
Expand Down Expand Up @@ -76,7 +76,7 @@ func (d defaultMinimumSlotter) minimumSlot() primitives.Slot {
log.WithError(err).Fatal("failed to obtain system/genesis clock, unable to start backfill service")
}
}
return MinimumBackfillSlot(d.clock.CurrentSlot())
return minimumBackfillSlot(d.clock.CurrentSlot())
}

func (d defaultMinimumSlotter) setClock(c *startup.Clock) {
Expand Down Expand Up @@ -109,6 +109,8 @@ type PeerAssigner interface {
Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error)
}

// NewService initializes the backfill Service. Like all implementations of the Service interface,
// the service won't begin its runloop until Start() is called.
func NewService(ctx context.Context, su *Store, cw startup.ClockWaiter, p p2p.P2P, pa PeerAssigner, opts ...ServiceOption) (*Service, error) {
s := &Service{
ctx: ctx,
Expand Down Expand Up @@ -138,7 +140,7 @@ func (s *Service) initVerifier(ctx context.Context) (*verifier, error) {
}

func (s *Service) updateComplete() bool {
b, err := s.pool.Complete()
b, err := s.pool.complete()
if err != nil {
if errors.Is(err, errEndSequence) {
log.WithField("backfill_slot", b.begin).Info("Backfill is complete")
Expand Down Expand Up @@ -197,10 +199,11 @@ func (s *Service) scheduleTodos() {
}
}
for _, b := range batches {
s.pool.Todo(b)
s.pool.todo(b)
}
}

// Start begins the runloop of backfill.Service in the current goroutine.
func (s *Service) Start() {
if !s.enabled {
log.Info("exiting backfill service; not enabled")
Expand Down Expand Up @@ -229,7 +232,7 @@ func (s *Service) Start() {
if err != nil {
log.WithError(err).Fatal("Unable to initialize backfill verifier, quitting.")
}
s.pool.Spawn(ctx, s.nWorkers, clock, s.pa, s.verifier)
s.pool.spawn(ctx, s.nWorkers, clock, s.pa, s.verifier)

if err = s.initBatches(); err != nil {
log.WithError(err).Fatal("Non-recoverable error in backfill service, quitting.")
Expand All @@ -254,7 +257,7 @@ func (s *Service) initBatches() error {
return err
}
for _, b := range batches {
s.pool.Todo(b)
s.pool.todo(b)
}
return nil
}
Expand All @@ -271,9 +274,9 @@ func (s *Service) Status() error {
return nil
}

// MinimumBackfillSlot determines the lowest slot that backfill needs to download based on looking back
// minimumBackfillSlot determines the lowest slot that backfill needs to download based on looking back
// MIN_EPOCHS_FOR_BLOCK_REQUESTS from the current slot.
func MinimumBackfillSlot(current primitives.Slot) primitives.Slot {
func minimumBackfillSlot(current primitives.Slot) primitives.Slot {
oe := helpers.MinEpochsForBlockRequests()
if oe > slots.MaxSafeEpoch() {
oe = slots.MaxSafeEpoch()
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/backfill/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestServiceInit(t *testing.T) {
require.NoError(t, cw.SetClock(startup.NewClock(time.Now(), [32]byte{})))
pool := &mockPool{todoChan: make(chan batch, nWorkers), finishedChan: make(chan batch, nWorkers)}
p2pt := p2ptest.NewTestP2P(t)
srv, err := NewService(ctx, su, cw, p2pt, &MockAssigner{}, WithBatchSize(batchSize), WithWorkerCount(nWorkers))
srv, err := NewService(ctx, su, cw, p2pt, &mockAssigner{}, WithBatchSize(batchSize), WithWorkerCount(nWorkers))
require.NoError(t, err)
srv.ms = mockMinimumSlotter{min: primitives.Slot(high - batchSize*uint64(nBatches))}
srv.pool = pool
Expand Down

0 comments on commit 86c815c

Please sign in to comment.