From 86c815c068fbb846459f52830b5eb89d58bdc8e9 Mon Sep 17 00:00:00 2001 From: Kasey Kirkham Date: Tue, 10 Oct 2023 12:53:25 -0500 Subject: [PATCH] exported symbol cleanup - comment or make private --- beacon-chain/db/kv/backfill.go | 5 ++++ .../sync/backfill/coverage/coverage.go | 2 ++ beacon-chain/sync/backfill/pool.go | 20 ++++++++-------- beacon-chain/sync/backfill/pool_test.go | 24 ++++++++++--------- beacon-chain/sync/backfill/service.go | 19 ++++++++------- beacon-chain/sync/backfill/service_test.go | 2 +- 6 files changed, 42 insertions(+), 30 deletions(-) diff --git a/beacon-chain/db/kv/backfill.go b/beacon-chain/db/kv/backfill.go index 277f8215bd27..c653d6a4d09a 100644 --- a/beacon-chain/db/kv/backfill.go +++ b/beacon-chain/db/kv/backfill.go @@ -10,6 +10,9 @@ import ( "google.golang.org/protobuf/proto" ) +// SaveBackfillStatus encodes the given BackfillStatus protobuf struct and writes it to a single key in the db. +// This value is used by the backfill service to keep track of the range of blocks that need to be synced. It is also used by the +// code that serves blocks or regenerates states to keep track of what range of blocks are available. func (s *Store) SaveBackfillStatus(ctx context.Context, bf *dbval.BackfillStatus) error { ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveBackfillStatus") defer span.End() @@ -23,6 +26,8 @@ func (s *Store) SaveBackfillStatus(ctx context.Context, bf *dbval.BackfillStatus }) } +// BackfillStatus retrieves the most recently saved version of the BackfillStatus protobuf struct. +// This is used to persist information about backfill status across restarts. func (s *Store) BackfillStatus(ctx context.Context) (*dbval.BackfillStatus, error) { ctx, span := trace.StartSpan(ctx, "BeaconDB.SaveBackfillStatus") defer span.End() diff --git a/beacon-chain/sync/backfill/coverage/coverage.go b/beacon-chain/sync/backfill/coverage/coverage.go index 633603e88690..8d2e4afb4fc2 100644 --- a/beacon-chain/sync/backfill/coverage/coverage.go +++ b/beacon-chain/sync/backfill/coverage/coverage.go @@ -2,6 +2,8 @@ package coverage import "github.com/prysmaticlabs/prysm/v4/consensus-types/primitives" +// AvailableBlocker can be used to check whether there is a finalized block in the db for the given slot. +// This interface is typically fulfilled by backfill.Store. type AvailableBlocker interface { AvailableBlock(primitives.Slot) bool } diff --git a/beacon-chain/sync/backfill/pool.go b/beacon-chain/sync/backfill/pool.go index e617b5775467..72adb1ff5541 100644 --- a/beacon-chain/sync/backfill/pool.go +++ b/beacon-chain/sync/backfill/pool.go @@ -14,10 +14,10 @@ import ( log "github.com/sirupsen/logrus" ) -type BatchWorkerPool interface { - Spawn(ctx context.Context, n int, clock *startup.Clock, a PeerAssigner, v *verifier) - Todo(b batch) - Complete() (batch, error) +type batchWorkerPool interface { + spawn(ctx context.Context, n int, clock *startup.Clock, a PeerAssigner, v *verifier) + todo(b batch) + complete() (batch, error) } type worker interface { @@ -26,7 +26,7 @@ type worker interface { type newWorker func(id workerId, in, out chan batch, c *startup.Clock, v *verifier) worker -func DefaultNewWorker(p p2p.P2P) newWorker { +func defaultNewWorker(p p2p.P2P) newWorker { return func(id workerId, in, out chan batch, c *startup.Clock, v *verifier) worker { return newP2pWorker(id, p, in, out, c, v) } @@ -45,10 +45,10 @@ type p2pBatchWorkerPool struct { cancel func() } -var _ BatchWorkerPool = &p2pBatchWorkerPool{} +var _ batchWorkerPool = &p2pBatchWorkerPool{} func newP2PBatchWorkerPool(p p2p.P2P, maxBatches int) *p2pBatchWorkerPool { - nw := DefaultNewWorker(p) + nw := defaultNewWorker(p) return &p2pBatchWorkerPool{ newWorker: nw, toRouter: make(chan batch, maxBatches), @@ -60,7 +60,7 @@ func newP2PBatchWorkerPool(p p2p.P2P, maxBatches int) *p2pBatchWorkerPool { } } -func (p *p2pBatchWorkerPool) Spawn(ctx context.Context, n int, c *startup.Clock, a PeerAssigner, v *verifier) { +func (p *p2pBatchWorkerPool) spawn(ctx context.Context, n int, c *startup.Clock, a PeerAssigner, v *verifier) { p.ctx, p.cancel = context.WithCancel(ctx) go p.batchRouter(a) for i := 0; i < n; i++ { @@ -68,7 +68,7 @@ func (p *p2pBatchWorkerPool) Spawn(ctx context.Context, n int, c *startup.Clock, } } -func (p *p2pBatchWorkerPool) Todo(b batch) { +func (p *p2pBatchWorkerPool) todo(b batch) { // Intercept batchEndSequence batches so workers can remain unaware of this state. // Workers don't know what to do with batchEndSequence batches. They are a signal to the pool that the batcher // has stopped producing things for the workers to do and the pool is close to winding down. See Complete() @@ -81,7 +81,7 @@ func (p *p2pBatchWorkerPool) Todo(b batch) { p.toRouter <- b } -func (p *p2pBatchWorkerPool) Complete() (batch, error) { +func (p *p2pBatchWorkerPool) complete() (batch, error) { if len(p.endSeq) == p.maxBatches { return p.endSeq[0], errEndSequence } diff --git a/beacon-chain/sync/backfill/pool_test.go b/beacon-chain/sync/backfill/pool_test.go index e952e5c28b2b..beb1901298ad 100644 --- a/beacon-chain/sync/backfill/pool_test.go +++ b/beacon-chain/sync/backfill/pool_test.go @@ -12,38 +12,40 @@ import ( "github.com/prysmaticlabs/prysm/v4/testing/util" ) -type MockAssigner struct { +type mockAssigner struct { err error assign []peer.ID } -func (m MockAssigner) Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error) { +// Assign satisfies the PeerAssigner interface so that mockAssigner can be used in tests +// in place of the concrete p2p implementation of PeerAssigner. +func (m mockAssigner) Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error) { if m.err != nil { return nil, m.err } return m.assign, nil } -var _ PeerAssigner = &MockAssigner{} +var _ PeerAssigner = &mockAssigner{} func TestPoolDetectAllEnded(t *testing.T) { nw := 5 p2p := p2ptest.NewTestP2P(t) ctx := context.Background() - ma := &MockAssigner{} + ma := &mockAssigner{} pool := newP2PBatchWorkerPool(p2p, nw) st, err := util.NewBeaconState() require.NoError(t, err) v, err := newBackfillVerifier(st) require.NoError(t, err) - pool.Spawn(ctx, nw, startup.NewClock(time.Now(), [32]byte{}), ma, v) + pool.spawn(ctx, nw, startup.NewClock(time.Now(), [32]byte{}), ma, v) br := batcher{min: 10, size: 10} endSeq := br.before(0) require.Equal(t, batchEndSequence, endSeq.state) for i := 0; i < nw; i++ { - pool.Todo(endSeq) + pool.todo(endSeq) } - b, err := pool.Complete() + b, err := pool.complete() require.ErrorIs(t, err, errEndSequence) require.Equal(t, b.end, endSeq.end) } @@ -55,14 +57,14 @@ type mockPool struct { todoChan chan batch } -func (m *mockPool) Spawn(_ context.Context, _ int, _ *startup.Clock, _ PeerAssigner, _ *verifier) { +func (m *mockPool) spawn(_ context.Context, _ int, _ *startup.Clock, _ PeerAssigner, _ *verifier) { } -func (m *mockPool) Todo(b batch) { +func (m *mockPool) todo(b batch) { m.todoChan <- b } -func (m *mockPool) Complete() (batch, error) { +func (m *mockPool) complete() (batch, error) { select { case b := <-m.finishedChan: return b, nil @@ -71,4 +73,4 @@ func (m *mockPool) Complete() (batch, error) { } } -var _ BatchWorkerPool = &mockPool{} +var _ batchWorkerPool = &mockPool{} diff --git a/beacon-chain/sync/backfill/service.go b/beacon-chain/sync/backfill/service.go index 72c22306207f..ca344d8f4f39 100644 --- a/beacon-chain/sync/backfill/service.go +++ b/beacon-chain/sync/backfill/service.go @@ -25,7 +25,7 @@ type Service struct { nWorkers int batchSeq *batchSequencer batchSize uint64 - pool BatchWorkerPool + pool batchWorkerPool verifier *verifier p2p p2p.P2P pa PeerAssigner @@ -76,7 +76,7 @@ func (d defaultMinimumSlotter) minimumSlot() primitives.Slot { log.WithError(err).Fatal("failed to obtain system/genesis clock, unable to start backfill service") } } - return MinimumBackfillSlot(d.clock.CurrentSlot()) + return minimumBackfillSlot(d.clock.CurrentSlot()) } func (d defaultMinimumSlotter) setClock(c *startup.Clock) { @@ -109,6 +109,8 @@ type PeerAssigner interface { Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error) } +// NewService initializes the backfill Service. Like all implementations of the Service interface, +// the service won't begin its runloop until Start() is called. func NewService(ctx context.Context, su *Store, cw startup.ClockWaiter, p p2p.P2P, pa PeerAssigner, opts ...ServiceOption) (*Service, error) { s := &Service{ ctx: ctx, @@ -138,7 +140,7 @@ func (s *Service) initVerifier(ctx context.Context) (*verifier, error) { } func (s *Service) updateComplete() bool { - b, err := s.pool.Complete() + b, err := s.pool.complete() if err != nil { if errors.Is(err, errEndSequence) { log.WithField("backfill_slot", b.begin).Info("Backfill is complete") @@ -197,10 +199,11 @@ func (s *Service) scheduleTodos() { } } for _, b := range batches { - s.pool.Todo(b) + s.pool.todo(b) } } +// Start begins the runloop of backfill.Service in the current goroutine. func (s *Service) Start() { if !s.enabled { log.Info("exiting backfill service; not enabled") @@ -229,7 +232,7 @@ func (s *Service) Start() { if err != nil { log.WithError(err).Fatal("Unable to initialize backfill verifier, quitting.") } - s.pool.Spawn(ctx, s.nWorkers, clock, s.pa, s.verifier) + s.pool.spawn(ctx, s.nWorkers, clock, s.pa, s.verifier) if err = s.initBatches(); err != nil { log.WithError(err).Fatal("Non-recoverable error in backfill service, quitting.") @@ -254,7 +257,7 @@ func (s *Service) initBatches() error { return err } for _, b := range batches { - s.pool.Todo(b) + s.pool.todo(b) } return nil } @@ -271,9 +274,9 @@ func (s *Service) Status() error { return nil } -// MinimumBackfillSlot determines the lowest slot that backfill needs to download based on looking back +// minimumBackfillSlot determines the lowest slot that backfill needs to download based on looking back // MIN_EPOCHS_FOR_BLOCK_REQUESTS from the current slot. -func MinimumBackfillSlot(current primitives.Slot) primitives.Slot { +func minimumBackfillSlot(current primitives.Slot) primitives.Slot { oe := helpers.MinEpochsForBlockRequests() if oe > slots.MaxSafeEpoch() { oe = slots.MaxSafeEpoch() diff --git a/beacon-chain/sync/backfill/service_test.go b/beacon-chain/sync/backfill/service_test.go index a6ea61d39aa5..1c91e3ce070e 100644 --- a/beacon-chain/sync/backfill/service_test.go +++ b/beacon-chain/sync/backfill/service_test.go @@ -50,7 +50,7 @@ func TestServiceInit(t *testing.T) { require.NoError(t, cw.SetClock(startup.NewClock(time.Now(), [32]byte{}))) pool := &mockPool{todoChan: make(chan batch, nWorkers), finishedChan: make(chan batch, nWorkers)} p2pt := p2ptest.NewTestP2P(t) - srv, err := NewService(ctx, su, cw, p2pt, &MockAssigner{}, WithBatchSize(batchSize), WithWorkerCount(nWorkers)) + srv, err := NewService(ctx, su, cw, p2pt, &mockAssigner{}, WithBatchSize(batchSize), WithWorkerCount(nWorkers)) require.NoError(t, err) srv.ms = mockMinimumSlotter{min: primitives.Slot(high - batchSize*uint64(nBatches))} srv.pool = pool