diff --git a/beacon-chain/sync/backfill/service.go b/beacon-chain/sync/backfill/service.go index 476a1171a360..e1de7f20078d 100644 --- a/beacon-chain/sync/backfill/service.go +++ b/beacon-chain/sync/backfill/service.go @@ -43,8 +43,31 @@ type Service struct { var _ runtime.Service = (*Service)(nil) +// PeerAssigner describes a type that provides an Assign method, which can assign the best peer +// to service an RPC blockRequest. The Assign method takes a map of peers that should be excluded, +// allowing the caller to avoid making multiple concurrent requests to the same peer. +type PeerAssigner interface { + Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error) +} + +type minimumSlotter func(primitives.Slot) primitives.Slot +type batchImporter func(ctx context.Context, current primitives.Slot, b batch, su *Store) (*dbval.BackfillStatus, error) + +func defaultBatchImporter(ctx context.Context, current primitives.Slot, b batch, su *Store) (*dbval.BackfillStatus, error) { + status := su.status() + if err := b.ensureParent(bytesutil.ToBytes32(status.LowParentRoot)); err != nil { + return status, err + } + // Import blocks to db and update db state to reflect the newly imported blocks. + // Other parts of the beacon node may use the same StatusUpdater instance + // via the coverage.AvailableBlocker interface to safely determine if a given slot has been backfilled. + return su.fillBack(ctx, current, b.results, b.availabilityStore()) +} + +// ServiceOption represents a functional option for the backfill service constructor. type ServiceOption func(*Service) error +// WithEnableBackfill toggles the entire backfill service on or off, intended to be used by a feature flag. func WithEnableBackfill(enabled bool) ServiceOption { return func(s *Service) error { s.enabled = enabled @@ -52,6 +75,8 @@ func WithEnableBackfill(enabled bool) ServiceOption { } } +// WithWorkerCount sets the number of goroutines in the batch processing pool that can concurrently +// make p2p requests to download data for batches. func WithWorkerCount(n int) ServiceOption { return func(s *Service) error { s.nWorkers = n @@ -59,6 +84,8 @@ func WithWorkerCount(n int) ServiceOption { } } +// WithBatchSize configures the size of backfill batches, similar to the initial-sync block-batch-limit flag. +// It should usually be left at the default value. func WithBatchSize(n uint64) ServiceOption { return func(s *Service) error { s.batchSize = n @@ -82,59 +109,6 @@ func WithVerifierWaiter(viw InitializerWaiter) ServiceOption { } } -type minimumSlotter interface { - minimumSlot() primitives.Slot - setClock(*startup.Clock) -} - -type defaultMinimumSlotter struct { - clock *startup.Clock - cw startup.ClockWaiter - ctx context.Context -} - -func (d defaultMinimumSlotter) minimumSlot() primitives.Slot { - if d.clock == nil { - var err error - d.clock, err = d.cw.WaitForClock(d.ctx) - if err != nil { - log.WithError(err).Fatal("failed to obtain system/genesis clock, unable to start backfill service") - } - } - return minimumBackfillSlot(d.clock.CurrentSlot()) -} - -func (d defaultMinimumSlotter) setClock(c *startup.Clock) { - //nolint:all - d.clock = c -} - -var _ minimumSlotter = &defaultMinimumSlotter{} - -type batchImporter func(ctx context.Context, current primitives.Slot, b batch, su *Store) (*dbval.BackfillStatus, error) - -func defaultBatchImporter(ctx context.Context, current primitives.Slot, b batch, su *Store) (*dbval.BackfillStatus, error) { - status := su.status() - if err := b.ensureParent(bytesutil.ToBytes32(status.LowParentRoot)); err != nil { - return status, err - } - // Import blocks to db and update db state to reflect the newly imported blocks. - // Other parts of the beacon node may use the same StatusUpdater instance - // via the coverage.AvailableBlocker interface to safely determine if a given slot has been backfilled. - status, err := su.fillBack(ctx, current, b.results, b.availabilityStore()) - if err != nil { - log.WithError(err).Fatal("Non-recoverable db error in backfill service, quitting.") - } - return status, nil -} - -// PeerAssigner describes a type that provides an Assign method, which can assign the best peer -// to service an RPC blockRequest. The Assign method takes a map of peers that should be excluded, -// allowing the caller to avoid making multiple concurrent requests to the same peer. -type PeerAssigner interface { - Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error) -} - // NewService initializes the backfill Service. Like all implementations of the Service interface, // the service won't begin its runloop until Start() is called. func NewService(ctx context.Context, su *Store, bStore *filesystem.BlobStorage, cw startup.ClockWaiter, p p2p.P2P, pa PeerAssigner, opts ...ServiceOption) (*Service, error) { @@ -143,7 +117,7 @@ func NewService(ctx context.Context, su *Store, bStore *filesystem.BlobStorage, store: su, blobStore: bStore, cw: cw, - ms: &defaultMinimumSlotter{cw: cw, ctx: ctx}, + ms: minimumBackfillSlot, p2p: p, pa: pa, batchImporter: defaultBatchImporter, @@ -180,10 +154,11 @@ func (s *Service) updateComplete() bool { b, err := s.pool.complete() if err != nil { if errors.Is(err, errEndSequence) { - log.WithField("backfill_slot", b.begin).Info("Backfill is complete") + log.WithField("backfill_slot", b.begin).Info("Backfill is complete.") return true } - log.WithError(err).Fatal("Non-recoverable error in backfill service, quitting.") + log.WithError(err).Error("Backfill service received unhandled error from worker pool.") + return true } s.batchSeq.update(b) return false @@ -247,47 +222,51 @@ func (s *Service) scheduleTodos() { // Start begins the runloop of backfill.Service in the current goroutine. func (s *Service) Start() { if !s.enabled { - log.Info("Exiting backfill service; not enabled.") + log.Info("Backfill service not enabled.") return } ctx, cancel := context.WithCancel(s.ctx) defer func() { + log.Info("Backfill service is shutting down.") cancel() }() clock, err := s.cw.WaitForClock(ctx) if err != nil { - log.WithError(err).Fatal("Backfill service failed to start while waiting for genesis data.") + log.WithError(err).Error("Backfill service failed to start while waiting for genesis data.") + return } s.clock = clock - s.ms.setClock(clock) v, err := s.verifierWaiter.WaitForInitializer(ctx) s.newBlobVerifier = newBlobVerifierFromInitializer(v) if err != nil { - log.WithError(err).Fatal("could not initialize blob verifier in backfill service") + log.WithError(err).Error("Could not initialize blob verifier in backfill service.") + return } if s.store.isGenesisSync() { - log.Info("Exiting backfill service as the node has been initialized with a genesis state or the backfill status is missing") + log.Info("Backfill short-circuit; node synced from genesis.") return } status := s.store.status() // Exit early if there aren't going to be any batches to backfill. - if primitives.Slot(status.LowSlot) <= s.ms.minimumSlot() { - log.WithField("minimum_required_slot", s.ms.minimumSlot()). + if primitives.Slot(status.LowSlot) <= s.ms(s.clock.CurrentSlot()) { + log.WithField("minimum_required_slot", s.ms(s.clock.CurrentSlot())). WithField("backfill_lowest_slot", status.LowSlot). Info("Exiting backfill service; minimum block retention slot > lowest backfilled block.") return } s.verifier, s.ctxMap, err = s.initVerifier(ctx) if err != nil { - log.WithError(err).Fatal("Unable to initialize backfill verifier, quitting.") + log.WithError(err).Error("Unable to initialize backfill verifier.") + return } s.pool.spawn(ctx, s.nWorkers, clock, s.pa, s.verifier, s.ctxMap, s.newBlobVerifier, s.blobStore) - s.batchSeq = newBatchSequencer(s.nWorkers, s.ms.minimumSlot(), primitives.Slot(status.LowSlot), primitives.Slot(s.batchSize)) + s.batchSeq = newBatchSequencer(s.nWorkers, s.ms(s.clock.CurrentSlot()), primitives.Slot(status.LowSlot), primitives.Slot(s.batchSize)) if err = s.initBatches(); err != nil { - log.WithError(err).Fatal("Non-recoverable error in backfill service, quitting.") + log.WithError(err).Error("Non-recoverable error in backfill service.") + return } for { @@ -299,8 +278,8 @@ func (s *Service) Start() { } s.importBatches(ctx) batchesWaiting.Set(float64(s.batchSeq.countWithState(batchImportable))) - if err := s.batchSeq.moveMinimum(s.ms.minimumSlot()); err != nil { - log.WithError(err).Fatal("Non-recoverable error in backfill service, quitting.") + if err := s.batchSeq.moveMinimum(s.ms(s.clock.CurrentSlot())); err != nil { + log.WithError(err).Error("Non-recoverable error while adjusting backfill minimum slot.") } s.scheduleTodos() } diff --git a/beacon-chain/sync/backfill/service_test.go b/beacon-chain/sync/backfill/service_test.go index f530d2f09dff..5e9c76db7b33 100644 --- a/beacon-chain/sync/backfill/service_test.go +++ b/beacon-chain/sync/backfill/service_test.go @@ -22,15 +22,10 @@ type mockMinimumSlotter struct { min primitives.Slot } -var _ minimumSlotter = &mockMinimumSlotter{} - -func (m mockMinimumSlotter) minimumSlot() primitives.Slot { +func (m mockMinimumSlotter) minimumSlot(_ primitives.Slot) primitives.Slot { return m.min } -func (m mockMinimumSlotter) setClock(*startup.Clock) { -} - type mockInitalizerWaiter struct { } @@ -65,7 +60,7 @@ func TestServiceInit(t *testing.T) { srv, err := NewService(ctx, su, bfs, cw, p2pt, &mockAssigner{}, WithBatchSize(batchSize), WithWorkerCount(nWorkers), WithEnableBackfill(true), WithVerifierWaiter(&mockInitalizerWaiter{})) require.NoError(t, err) - srv.ms = mockMinimumSlotter{min: primitives.Slot(high - batchSize*uint64(nBatches))} + srv.ms = mockMinimumSlotter{min: primitives.Slot(high - batchSize*uint64(nBatches))}.minimumSlot srv.pool = pool srv.batchImporter = func(context.Context, primitives.Slot, batch, *Store) (*dbval.BackfillStatus, error) { return &dbval.BackfillStatus{}, nil