From 1158ad1b043c29adf7341ecb52fceaec544dc4d4 Mon Sep 17 00:00:00 2001 From: Shoham Chakraborty Date: Mon, 9 Sep 2024 19:51:09 +0800 Subject: [PATCH] polygon/heimdall: Split read functions into reader (#11924) - Create `heimdall.Reader` for future use in `bor_*` API - Make `AssembleReader` and `NewReader` leaner by not requiring full `BorConfig` --- eth/backend.go | 5 +- eth/stagedsync/stage_polygon_sync.go | 3 +- polygon/heimdall/reader.go | 57 +++++++++++++++++++ polygon/heimdall/service.go | 26 ++++----- polygon/heimdall/service_test.go | 3 +- .../heimdall/span_block_producers_tracker.go | 35 ++++++------ 6 files changed, 95 insertions(+), 34 deletions(-) create mode 100644 polygon/heimdall/reader.go diff --git a/eth/backend.go b/eth/backend.go index 37d968df51c..5b7b6c91d73 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -550,8 +550,9 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger } if config.PolygonSync { - polygonBridge = bridge.Assemble(config.Dirs.DataDir, logger, consensusConfig.(*borcfg.BorConfig), heimdallClient) - heimdallService = heimdall.AssembleService(consensusConfig.(*borcfg.BorConfig), config.HeimdallURL, dirs.DataDir, tmpdir, logger) + borConfig := consensusConfig.(*borcfg.BorConfig) + polygonBridge = bridge.Assemble(config.Dirs.DataDir, logger, borConfig, heimdallClient) + heimdallService = heimdall.AssembleService(borConfig.CalculateSprintNumber, config.HeimdallURL, dirs.DataDir, tmpdir, logger) backend.polygonBridge = polygonBridge } diff --git a/eth/stagedsync/stage_polygon_sync.go b/eth/stagedsync/stage_polygon_sync.go index a5818b7994a..6ddd4eba995 100644 --- a/eth/stagedsync/stage_polygon_sync.go +++ b/eth/stagedsync/stage_polygon_sync.go @@ -97,7 +97,8 @@ func NewPolygonSyncStageCfg( txActionStream: txActionStream, } borConfig := chainConfig.Bor.(*borcfg.BorConfig) - heimdallService := heimdall.NewService(borConfig, heimdallClient, heimdallStore, logger) + heimdallReader := heimdall.NewReader(borConfig.CalculateSprintNumber, heimdallStore, logger) + heimdallService := heimdall.NewService(borConfig.CalculateSprintNumber, heimdallClient, heimdallStore, logger, heimdallReader) bridgeService := bridge.NewBridge(bridgeStore, logger, borConfig, heimdallClient, nil) p2pService := p2p.NewService(maxPeers, logger, sentry, statusDataProvider.GetStatusData) checkpointVerifier := polygonsync.VerifyCheckpointHeaders diff --git a/polygon/heimdall/reader.go b/polygon/heimdall/reader.go new file mode 100644 index 00000000000..7ac14bb9f64 --- /dev/null +++ b/polygon/heimdall/reader.go @@ -0,0 +1,57 @@ +package heimdall + +import ( + "context" + + libcommon "github.com/erigontech/erigon-lib/common" + "github.com/erigontech/erigon-lib/log/v3" + "github.com/erigontech/erigon/polygon/bor/valset" +) + +type Reader struct { + logger log.Logger + store ServiceStore + spanBlockProducersTracker *spanBlockProducersTracker +} + +// AssembleReader creates and opens the MDBX store. For use cases where the store is only being read from. Must call Close. +func AssembleReader(ctx context.Context, calculateSprintNumber CalculateSprintNumberFunc, dataDir string, tmpDir string, logger log.Logger) (*Reader, error) { + store := NewMdbxServiceStore(logger, dataDir, tmpDir) + + err := store.Prepare(ctx) + if err != nil { + return nil, err + } + + return NewReader(calculateSprintNumber, store, logger), nil +} + +func NewReader(calculateSprintNumber CalculateSprintNumberFunc, store ServiceStore, logger log.Logger) *Reader { + return &Reader{ + logger: logger, + store: store, + spanBlockProducersTracker: newSpanBlockProducersTracker(logger, calculateSprintNumber, store.SpanBlockProducerSelections()), + } +} + +func (r *Reader) Span(ctx context.Context, id uint64) (*Span, bool, error) { + return r.store.Spans().Entity(ctx, id) +} + +func (r *Reader) CheckpointsFromBlock(ctx context.Context, startBlock uint64) (Waypoints, error) { + entities, err := r.store.Checkpoints().RangeFromBlockNum(ctx, startBlock) + return libcommon.SliceMap(entities, castEntityToWaypoint[*Checkpoint]), err +} + +func (r *Reader) MilestonesFromBlock(ctx context.Context, startBlock uint64) (Waypoints, error) { + entities, err := r.store.Milestones().RangeFromBlockNum(ctx, startBlock) + return libcommon.SliceMap(entities, castEntityToWaypoint[*Milestone]), err +} + +func (r *Reader) Producers(ctx context.Context, blockNum uint64) (*valset.ValidatorSet, error) { + return r.spanBlockProducersTracker.Producers(ctx, blockNum) +} + +func (r *Reader) Close() { + r.store.Close() +} diff --git a/polygon/heimdall/service.go b/polygon/heimdall/service.go index 3f7715698ab..2786024601a 100644 --- a/polygon/heimdall/service.go +++ b/polygon/heimdall/service.go @@ -26,7 +26,6 @@ import ( libcommon "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/log/v3" - "github.com/erigontech/erigon/polygon/bor/borcfg" "github.com/erigontech/erigon/polygon/bor/valset" "github.com/erigontech/erigon/polygon/polygoncommon" ) @@ -46,23 +45,25 @@ type Service interface { type service struct { logger log.Logger store ServiceStore + reader *Reader checkpointScraper *scraper[*Checkpoint] milestoneScraper *scraper[*Milestone] spanScraper *scraper[*Span] spanBlockProducersTracker *spanBlockProducersTracker } -func AssembleService(borConfig *borcfg.BorConfig, heimdallUrl string, dataDir string, tmpDir string, logger log.Logger) Service { +func AssembleService(calculateSprintNumberFn CalculateSprintNumberFunc, heimdallUrl string, dataDir string, tmpDir string, logger log.Logger) Service { store := NewMdbxServiceStore(logger, dataDir, tmpDir) client := NewHeimdallClient(heimdallUrl, logger) - return NewService(borConfig, client, store, logger) + reader := NewReader(calculateSprintNumberFn, store, logger) + return NewService(calculateSprintNumberFn, client, store, logger, reader) } -func NewService(borConfig *borcfg.BorConfig, client HeimdallClient, store ServiceStore, logger log.Logger) Service { - return newService(borConfig, client, store, logger) +func NewService(calculateSprintNumberFn CalculateSprintNumberFunc, client HeimdallClient, store ServiceStore, logger log.Logger, reader *Reader) Service { + return newService(calculateSprintNumberFn, client, store, logger, reader) } -func newService(borConfig *borcfg.BorConfig, client HeimdallClient, store ServiceStore, logger log.Logger) *service { +func newService(calculateSprintNumberFn CalculateSprintNumberFunc, client HeimdallClient, store ServiceStore, logger log.Logger, reader *Reader) *service { checkpointFetcher := newCheckpointFetcher(client, logger) milestoneFetcher := newMilestoneFetcher(client, logger) spanFetcher := newSpanFetcher(client, logger) @@ -101,10 +102,11 @@ func newService(borConfig *borcfg.BorConfig, client HeimdallClient, store Servic return &service{ logger: logger, store: store, + reader: reader, checkpointScraper: checkpointScraper, milestoneScraper: milestoneScraper, spanScraper: spanScraper, - spanBlockProducersTracker: newSpanBlockProducersTracker(logger, borConfig, store.SpanBlockProducerSelections()), + spanBlockProducersTracker: newSpanBlockProducersTracker(logger, calculateSprintNumberFn, store.SpanBlockProducerSelections()), } } @@ -164,7 +166,7 @@ func newSpanFetcher(client HeimdallClient, logger log.Logger) entityFetcher[*Spa } func (s *service) Span(ctx context.Context, id uint64) (*Span, bool, error) { - return s.store.Spans().Entity(ctx, id) + return s.reader.Span(ctx, id) } func castEntityToWaypoint[TEntity Waypoint](entity TEntity) Waypoint { @@ -220,17 +222,15 @@ func (s *service) synchronizeSpans(ctx context.Context) error { } func (s *service) CheckpointsFromBlock(ctx context.Context, startBlock uint64) (Waypoints, error) { - entities, err := s.store.Checkpoints().RangeFromBlockNum(ctx, startBlock) - return libcommon.SliceMap(entities, castEntityToWaypoint[*Checkpoint]), err + return s.reader.CheckpointsFromBlock(ctx, startBlock) } func (s *service) MilestonesFromBlock(ctx context.Context, startBlock uint64) (Waypoints, error) { - entities, err := s.store.Milestones().RangeFromBlockNum(ctx, startBlock) - return libcommon.SliceMap(entities, castEntityToWaypoint[*Milestone]), err + return s.reader.MilestonesFromBlock(ctx, startBlock) } func (s *service) Producers(ctx context.Context, blockNum uint64) (*valset.ValidatorSet, error) { - return s.spanBlockProducersTracker.Producers(ctx, blockNum) + return s.reader.Producers(ctx, blockNum) } func (s *service) RegisterMilestoneObserver(callback func(*Milestone), opts ...ObserverOption) polygoncommon.UnregisterFunc { diff --git a/polygon/heimdall/service_test.go b/polygon/heimdall/service_test.go index 7768cfbae70..3e176af8eec 100644 --- a/polygon/heimdall/service_test.go +++ b/polygon/heimdall/service_test.go @@ -77,7 +77,8 @@ func (suite *ServiceTestSuite) SetupSuite() { suite.setupSpans() suite.setupCheckpoints() suite.setupMilestones() - suite.service = newService(borConfig, suite.client, store, logger) + reader := NewReader(borConfig.CalculateSprintNumber, store, logger) + suite.service = newService(borConfig.CalculateSprintNumber, suite.client, store, logger, reader) err := suite.service.store.Prepare(suite.ctx) require.NoError(suite.T(), err) diff --git a/polygon/heimdall/span_block_producers_tracker.go b/polygon/heimdall/span_block_producers_tracker.go index a90e106c841..b984ae3c31f 100644 --- a/polygon/heimdall/span_block_producers_tracker.go +++ b/polygon/heimdall/span_block_producers_tracker.go @@ -23,31 +23,32 @@ import ( "sync/atomic" "github.com/erigontech/erigon-lib/log/v3" - "github.com/erigontech/erigon/polygon/bor/borcfg" "github.com/erigontech/erigon/polygon/bor/valset" ) +type CalculateSprintNumberFunc func(uint64) uint64 + func newSpanBlockProducersTracker( logger log.Logger, - borConfig *borcfg.BorConfig, + calculateSprintNumber CalculateSprintNumberFunc, store EntityStore[*SpanBlockProducerSelection], ) *spanBlockProducersTracker { return &spanBlockProducersTracker{ - logger: logger, - borConfig: borConfig, - store: store, - newSpans: make(chan *Span), - idleSignal: make(chan struct{}), + logger: logger, + calculateSprintNumber: calculateSprintNumber, + store: store, + newSpans: make(chan *Span), + idleSignal: make(chan struct{}), } } type spanBlockProducersTracker struct { - logger log.Logger - borConfig *borcfg.BorConfig - store EntityStore[*SpanBlockProducerSelection] - newSpans chan *Span - queued atomic.Int32 - idleSignal chan struct{} + logger log.Logger + calculateSprintNumber CalculateSprintNumberFunc + store EntityStore[*SpanBlockProducerSelection] + newSpans chan *Span + queued atomic.Int32 + idleSignal chan struct{} } func (t *spanBlockProducersTracker) Run(ctx context.Context) error { @@ -141,8 +142,8 @@ func (t *spanBlockProducersTracker) ObserveSpan(ctx context.Context, newSpan *Sp return err } - spanStartSprintNum := t.borConfig.CalculateSprintNumber(lastProducerSelection.StartBlock) - spanEndSprintNum := t.borConfig.CalculateSprintNumber(lastProducerSelection.EndBlock) + spanStartSprintNum := t.calculateSprintNumber(lastProducerSelection.StartBlock) + spanEndSprintNum := t.calculateSprintNumber(lastProducerSelection.EndBlock) increments := int(spanEndSprintNum - spanStartSprintNum) if increments > 0 { producers.IncrementProposerPriority(increments) @@ -181,8 +182,8 @@ func (t *spanBlockProducersTracker) Producers(ctx context.Context, blockNum uint return nil, err } - spanStartSprintNum := t.borConfig.CalculateSprintNumber(producerSelection.StartBlock) - currentSprintNum := t.borConfig.CalculateSprintNumber(blockNum) + spanStartSprintNum := t.calculateSprintNumber(producerSelection.StartBlock) + currentSprintNum := t.calculateSprintNumber(blockNum) increments := int(currentSprintNum - spanStartSprintNum) if increments > 0 { producers.IncrementProposerPriority(increments)