Skip to content

Commit

Permalink
stop using log.Fatal and other little cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
kasey committed Feb 14, 2024
1 parent 61539f0 commit c346815
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 75 deletions.
115 changes: 47 additions & 68 deletions beacon-chain/sync/backfill/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,49 @@ type Service struct {

var _ runtime.Service = (*Service)(nil)

// PeerAssigner describes a type that provides an Assign method, which can assign the best peer
// to service an RPC blockRequest. The Assign method takes a map of peers that should be excluded,
// allowing the caller to avoid making multiple concurrent requests to the same peer.
type PeerAssigner interface {
Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error)
}

type minimumSlotter func(primitives.Slot) primitives.Slot
type batchImporter func(ctx context.Context, current primitives.Slot, b batch, su *Store) (*dbval.BackfillStatus, error)

func defaultBatchImporter(ctx context.Context, current primitives.Slot, b batch, su *Store) (*dbval.BackfillStatus, error) {
status := su.status()
if err := b.ensureParent(bytesutil.ToBytes32(status.LowParentRoot)); err != nil {
return status, err
}
// Import blocks to db and update db state to reflect the newly imported blocks.
// Other parts of the beacon node may use the same StatusUpdater instance
// via the coverage.AvailableBlocker interface to safely determine if a given slot has been backfilled.
return su.fillBack(ctx, current, b.results, b.availabilityStore())
}

// ServiceOption represents a functional option for the backfill service constructor.
type ServiceOption func(*Service) error

// WithEnableBackfill toggles the entire backfill service on or off, intended to be used by a feature flag.
func WithEnableBackfill(enabled bool) ServiceOption {
return func(s *Service) error {
s.enabled = enabled
return nil
}
}

// WithWorkerCount sets the number of goroutines in the batch processing pool that can concurrently
// make p2p requests to download data for batches.
func WithWorkerCount(n int) ServiceOption {
return func(s *Service) error {
s.nWorkers = n
return nil
}
}

// WithBatchSize configures the size of backfill batches, similar to the initial-sync block-batch-limit flag.
// It should usually be left at the default value.
func WithBatchSize(n uint64) ServiceOption {
return func(s *Service) error {
s.batchSize = n
Expand All @@ -82,59 +109,6 @@ func WithVerifierWaiter(viw InitializerWaiter) ServiceOption {
}
}

type minimumSlotter interface {
minimumSlot() primitives.Slot
setClock(*startup.Clock)
}

type defaultMinimumSlotter struct {
clock *startup.Clock
cw startup.ClockWaiter
ctx context.Context
}

func (d defaultMinimumSlotter) minimumSlot() primitives.Slot {
if d.clock == nil {
var err error
d.clock, err = d.cw.WaitForClock(d.ctx)
if err != nil {
log.WithError(err).Fatal("failed to obtain system/genesis clock, unable to start backfill service")
}
}
return minimumBackfillSlot(d.clock.CurrentSlot())
}

func (d defaultMinimumSlotter) setClock(c *startup.Clock) {
//nolint:all
d.clock = c
}

var _ minimumSlotter = &defaultMinimumSlotter{}

type batchImporter func(ctx context.Context, current primitives.Slot, b batch, su *Store) (*dbval.BackfillStatus, error)

func defaultBatchImporter(ctx context.Context, current primitives.Slot, b batch, su *Store) (*dbval.BackfillStatus, error) {
status := su.status()
if err := b.ensureParent(bytesutil.ToBytes32(status.LowParentRoot)); err != nil {
return status, err
}
// Import blocks to db and update db state to reflect the newly imported blocks.
// Other parts of the beacon node may use the same StatusUpdater instance
// via the coverage.AvailableBlocker interface to safely determine if a given slot has been backfilled.
status, err := su.fillBack(ctx, current, b.results, b.availabilityStore())
if err != nil {
log.WithError(err).Fatal("Non-recoverable db error in backfill service, quitting.")
}
return status, nil
}

// PeerAssigner describes a type that provides an Assign method, which can assign the best peer
// to service an RPC blockRequest. The Assign method takes a map of peers that should be excluded,
// allowing the caller to avoid making multiple concurrent requests to the same peer.
type PeerAssigner interface {
Assign(busy map[peer.ID]bool, n int) ([]peer.ID, error)
}

// NewService initializes the backfill Service. Like all implementations of the Service interface,
// the service won't begin its runloop until Start() is called.
func NewService(ctx context.Context, su *Store, bStore *filesystem.BlobStorage, cw startup.ClockWaiter, p p2p.P2P, pa PeerAssigner, opts ...ServiceOption) (*Service, error) {
Expand All @@ -143,7 +117,7 @@ func NewService(ctx context.Context, su *Store, bStore *filesystem.BlobStorage,
store: su,
blobStore: bStore,
cw: cw,
ms: &defaultMinimumSlotter{cw: cw, ctx: ctx},
ms: minimumBackfillSlot,
p2p: p,
pa: pa,
batchImporter: defaultBatchImporter,
Expand Down Expand Up @@ -180,10 +154,11 @@ func (s *Service) updateComplete() bool {
b, err := s.pool.complete()
if err != nil {
if errors.Is(err, errEndSequence) {
log.WithField("backfill_slot", b.begin).Info("Backfill is complete")
log.WithField("backfill_slot", b.begin).Info("Backfill is complete.")
return true
}
log.WithError(err).Fatal("Non-recoverable error in backfill service, quitting.")
log.WithError(err).Error("Backfill service received unhandled error from worker pool.")
return true
}
s.batchSeq.update(b)
return false
Expand Down Expand Up @@ -247,47 +222,51 @@ func (s *Service) scheduleTodos() {
// Start begins the runloop of backfill.Service in the current goroutine.
func (s *Service) Start() {
if !s.enabled {
log.Info("Exiting backfill service; not enabled.")
log.Info("Backfill service not enabled.")
return
}
ctx, cancel := context.WithCancel(s.ctx)
defer func() {
log.Info("Backfill service is shutting down.")
cancel()
}()
clock, err := s.cw.WaitForClock(ctx)
if err != nil {
log.WithError(err).Fatal("Backfill service failed to start while waiting for genesis data.")
log.WithError(err).Error("Backfill service failed to start while waiting for genesis data.")
return
}
s.clock = clock
s.ms.setClock(clock)
v, err := s.verifierWaiter.WaitForInitializer(ctx)
s.newBlobVerifier = newBlobVerifierFromInitializer(v)

if err != nil {
log.WithError(err).Fatal("could not initialize blob verifier in backfill service")
log.WithError(err).Error("Could not initialize blob verifier in backfill service.")
return
}

if s.store.isGenesisSync() {
log.Info("Exiting backfill service as the node has been initialized with a genesis state or the backfill status is missing")
log.Info("Backfill short-circuit; node synced from genesis.")
return
}
status := s.store.status()
// Exit early if there aren't going to be any batches to backfill.
if primitives.Slot(status.LowSlot) <= s.ms.minimumSlot() {
log.WithField("minimum_required_slot", s.ms.minimumSlot()).
if primitives.Slot(status.LowSlot) <= s.ms(s.clock.CurrentSlot()) {
log.WithField("minimum_required_slot", s.ms(s.clock.CurrentSlot())).
WithField("backfill_lowest_slot", status.LowSlot).
Info("Exiting backfill service; minimum block retention slot > lowest backfilled block.")
return
}
s.verifier, s.ctxMap, err = s.initVerifier(ctx)
if err != nil {
log.WithError(err).Fatal("Unable to initialize backfill verifier, quitting.")
log.WithError(err).Error("Unable to initialize backfill verifier.")
return
}
s.pool.spawn(ctx, s.nWorkers, clock, s.pa, s.verifier, s.ctxMap, s.newBlobVerifier, s.blobStore)

s.batchSeq = newBatchSequencer(s.nWorkers, s.ms.minimumSlot(), primitives.Slot(status.LowSlot), primitives.Slot(s.batchSize))
s.batchSeq = newBatchSequencer(s.nWorkers, s.ms(s.clock.CurrentSlot()), primitives.Slot(status.LowSlot), primitives.Slot(s.batchSize))
if err = s.initBatches(); err != nil {
log.WithError(err).Fatal("Non-recoverable error in backfill service, quitting.")
log.WithError(err).Error("Non-recoverable error in backfill service.")
return
}

for {
Expand All @@ -299,8 +278,8 @@ func (s *Service) Start() {
}
s.importBatches(ctx)
batchesWaiting.Set(float64(s.batchSeq.countWithState(batchImportable)))
if err := s.batchSeq.moveMinimum(s.ms.minimumSlot()); err != nil {
log.WithError(err).Fatal("Non-recoverable error in backfill service, quitting.")
if err := s.batchSeq.moveMinimum(s.ms(s.clock.CurrentSlot())); err != nil {
log.WithError(err).Error("Non-recoverable error while adjusting backfill minimum slot.")
}
s.scheduleTodos()
}
Expand Down
9 changes: 2 additions & 7 deletions beacon-chain/sync/backfill/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,10 @@ type mockMinimumSlotter struct {
min primitives.Slot
}

var _ minimumSlotter = &mockMinimumSlotter{}

func (m mockMinimumSlotter) minimumSlot() primitives.Slot {
func (m mockMinimumSlotter) minimumSlot(_ primitives.Slot) primitives.Slot {
return m.min
}

func (m mockMinimumSlotter) setClock(*startup.Clock) {
}

type mockInitalizerWaiter struct {
}

Expand Down Expand Up @@ -65,7 +60,7 @@ func TestServiceInit(t *testing.T) {
srv, err := NewService(ctx, su, bfs, cw, p2pt, &mockAssigner{},
WithBatchSize(batchSize), WithWorkerCount(nWorkers), WithEnableBackfill(true), WithVerifierWaiter(&mockInitalizerWaiter{}))
require.NoError(t, err)
srv.ms = mockMinimumSlotter{min: primitives.Slot(high - batchSize*uint64(nBatches))}
srv.ms = mockMinimumSlotter{min: primitives.Slot(high - batchSize*uint64(nBatches))}.minimumSlot
srv.pool = pool
srv.batchImporter = func(context.Context, primitives.Slot, batch, *Store) (*dbval.BackfillStatus, error) {
return &dbval.BackfillStatus{}, nil
Expand Down

0 comments on commit c346815

Please sign in to comment.