Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

polygon/bridge: fix snapshot stores, logging, replay blocks if needed on startup #12849

Merged
merged 17 commits into from
Nov 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions polygon/bridge/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ func (s *Service) Run(ctx context.Context) error {
}

// start syncing
s.logger.Debug(
bridgeLogPrefix("running bridge component"),
s.logger.Info(
bridgeLogPrefix("running bridge service component"),
"lastFetchedEventId", lastFetchedEventId,
"lastProcessedEventId", lastProcessedEventId,
"lastProcessedBlockNum", lastProcessedBlockInfo.BlockNum,
Expand Down Expand Up @@ -238,7 +238,7 @@ func (s *Service) Run(ctx context.Context) error {

select {
case <-logTicker.C:
s.logger.Debug(
s.logger.Info(
bridgeLogPrefix("fetched new events periodic progress"),
"count", len(events),
"lastFetchedEventId", lastFetchedEventId,
Expand All @@ -257,20 +257,20 @@ func (s *Service) InitialBlockReplayNeeded(ctx context.Context) (uint64, bool, e
lastFrozen := s.store.LastFrozenEventBlockNum()

if blockInfo := s.lastProcessedBlockInfo.Load(); blockInfo != nil && blockInfo.BlockNum > lastFrozen {
return 0, false, nil
return blockInfo.BlockNum, false, nil
}

blockInfo, ok, err := s.store.LastProcessedBlockInfo(ctx)
if err != nil {
return 0, false, err
}
if ok && blockInfo.BlockNum > lastFrozen {
// we have all info, no need to replay
return 0, false, nil
// we have all info, no need to replay initial block
return blockInfo.BlockNum, false, nil
}

// replay the last block we have in event snapshots
return s.store.LastFrozenEventBlockNum(), true, nil
return lastFrozen, true, nil
}

func (s *Service) ReplayInitialBlock(ctx context.Context, block *types.Block) error {
Expand Down
4 changes: 2 additions & 2 deletions polygon/bridge/snapshot_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (s *SnapshotStore) LastFrozenEventBlockNum() uint64 {
if len(segments) == 0 {
return 0
}
// find the last segment which has a built index
// find the last segment which has a built non-empty index
var lastSegment *snapshotsync.VisibleSegment
for i := len(segments) - 1; i >= 0; i-- {
if segments[i].Src().Index() != nil {
Expand Down Expand Up @@ -161,7 +161,7 @@ func (s *SnapshotStore) LastFrozenEventId() uint64 {
if len(segments) == 0 {
return 0
}
// find the last segment which has a built index
// find the last segment which has a built non-empty index
var lastSegment *snapshotsync.VisibleSegment
for i := len(segments) - 1; i >= 0; i-- {
if segments[i].Src().Index() != nil {
Expand Down
21 changes: 18 additions & 3 deletions polygon/heimdall/entity_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,23 @@ func (f *EntityFetcher[TEntity]) FetchEntitiesRange(ctx context.Context, idRange
}

func (f *EntityFetcher[TEntity]) FetchEntitiesRangeSequentially(ctx context.Context, idRange ClosedRange) ([]TEntity, error) {
progressLogTicker := time.NewTicker(30 * time.Second)
defer progressLogTicker.Stop()

entities := make([]TEntity, 0, idRange.Len())
for id := idRange.Start; id <= idRange.End; id++ {
select {
case <-progressLogTicker.C:
f.logger.Info(
heimdallLogPrefix(f.name+" fetch entities sequentially periodic progress"),
"current", id,
"start", idRange.Start,
"end", idRange.End,
)
default:
// carry-on
}

entity, err := f.fetchEntity(ctx, int64(id))
if err != nil {
// return fetched entities up to this point in case of transient errors
Expand Down Expand Up @@ -134,8 +149,8 @@ func (f *EntityFetcher[TEntity]) FetchAllEntities(ctx context.Context) ([]TEntit

select {
case <-progressLogTicker.C:
f.logger.Debug(
heimdallLogPrefix(f.name+" progress"),
f.logger.Info(
heimdallLogPrefix(f.name+" fetch all entities periodic progress"),
"page", page,
"len", len(entities),
)
Expand All @@ -155,7 +170,7 @@ func (f *EntityFetcher[TEntity]) FetchAllEntities(ctx context.Context) ([]TEntit
}

f.logger.Debug(
heimdallLogPrefix(f.name+" done"),
heimdallLogPrefix(f.name+" fetch all entities done"),
"len", len(entities),
"duration", time.Since(fetchStartTime),
)
Expand Down
25 changes: 25 additions & 0 deletions polygon/heimdall/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
)

type Scraper[TEntity Entity] struct {
name string
store EntityStore[TEntity]
fetcher entityFetcher[TEntity]
pollDelay time.Duration
Expand All @@ -41,13 +42,15 @@ type Scraper[TEntity Entity] struct {
}

func NewScraper[TEntity Entity](
name string,
store EntityStore[TEntity],
fetcher entityFetcher[TEntity],
pollDelay time.Duration,
transientErrors []error,
logger log.Logger,
) *Scraper[TEntity] {
return &Scraper[TEntity]{
name: name,
store: store,
fetcher: fetcher,
pollDelay: pollDelay,
Expand All @@ -59,11 +62,16 @@ func NewScraper[TEntity Entity](
}

func (s *Scraper[TEntity]) Run(ctx context.Context) error {
s.logger.Info(heimdallLogPrefix("running scraper component"), "name", s.name)

defer s.store.Close()
if err := s.store.Prepare(ctx); err != nil {
return err
}

progressLogTicker := time.NewTicker(30 * time.Second)
defer progressLogTicker.Stop()

for ctx.Err() == nil {
lastKnownId, hasLastKnownId, err := s.store.LastEntityId(ctx)
if err != nil {
Expand Down Expand Up @@ -116,8 +124,25 @@ func (s *Scraper[TEntity]) Run(ctx context.Context) error {
}

s.observers.NotifySync(entities) // NotifySync preserves order of events

select {
case <-progressLogTicker.C:
if len(entities) > 0 {
s.logger.Info(
heimdallLogPrefix("scraper periodic progress"),
"name", s.name,
"rangeStart", idRange.Start,
"rangeEnd", idRange.End,
"priorLastKnownId", lastKnownId,
"newLast", entities[len(entities)-1].RawId(),
)
}
default:
// carry on
}
}
}

return ctx.Err()
}

Expand Down
2 changes: 1 addition & 1 deletion polygon/heimdall/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestScrapper_Run_TransientErr(t *testing.T) {
)

transientErrs := []error{ErrNotInMilestoneList, ErrBadGateway}
scrapper := NewScraper[*Milestone](store, fetcher, time.Millisecond, transientErrs, logger)
scrapper := NewScraper[*Milestone]("test", store, fetcher, time.Millisecond, transientErrs, logger)

eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
Expand Down
6 changes: 5 additions & 1 deletion polygon/heimdall/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func NewService(config ServiceConfig) *Service {
spanFetcher := NewSpanFetcher(client, logger)

checkpointScraper := NewScraper(
"checkpoints",
store.Checkpoints(),
checkpointFetcher,
1*time.Second,
Expand All @@ -74,6 +75,7 @@ func NewService(config ServiceConfig) *Service {
milestoneScraperTransientErrors := []error{ErrNotInMilestoneList}
milestoneScraperTransientErrors = append(milestoneScraperTransientErrors, TransientErrors...)
milestoneScraper := NewScraper(
"milestones",
store.Milestones(),
milestoneFetcher,
1*time.Second,
Expand All @@ -82,6 +84,7 @@ func NewService(config ServiceConfig) *Service {
)

spanScraper := NewScraper(
"spans",
store.Spans(),
spanFetcher,
1*time.Second,
Expand Down Expand Up @@ -297,8 +300,9 @@ func (s *Service) Ready(ctx context.Context) <-chan error {
}

func (s *Service) Run(ctx context.Context) error {
defer s.store.Close()
s.logger.Info(heimdallLogPrefix("running heimdall service component"))

defer s.store.Close()
if err := s.store.Prepare(ctx); err != nil {
return nil
}
Expand Down
27 changes: 18 additions & 9 deletions polygon/heimdall/snapshot_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,15 @@ func (s *SpanSnapshotStore) LastFrozenEntityId() uint64 {
if len(segments) == 0 {
return 0
}
// find the last segment which has a built index
// find the last segment which has a built non-empty index
var lastSegment *snapshotsync.VisibleSegment
for i := len(segments) - 1; i >= 0; i-- {
if segments[i].Src().Index() != nil {
lastSegment = segments[i]
break
gg := segments[i].Src().MakeGetter()
if gg.HasNext() {
lastSegment = segments[i]
break
}
}
}
if lastSegment == nil {
Expand Down Expand Up @@ -221,12 +224,15 @@ func (r *milestoneSnapshotStore) LastFrozenEntityId() uint64 {
if len(segments) == 0 {
return 0
}
// find the last segment which has a built index
// find the last segment which has a built non-empty index
var lastSegment *snapshotsync.VisibleSegment
for i := len(segments) - 1; i >= 0; i-- {
if segments[i].Src().Index() != nil {
lastSegment = segments[i]
break
gg := segments[i].Src().MakeGetter()
if gg.HasNext() {
lastSegment = segments[i]
break
}
}
}
if lastSegment == nil {
Expand Down Expand Up @@ -380,12 +386,15 @@ func (r *checkpointSnapshotStore) LastFrozenEntityId() uint64 {
if len(segments) == 0 {
return 0
}
// find the last segment which has a built index
// find the last segment which has a built non-empty index
var lastSegment *snapshotsync.VisibleSegment
for i := len(segments) - 1; i >= 0; i-- {
if segments[i].Src().Index() != nil {
lastSegment = segments[i]
break
gg := segments[i].Src().MakeGetter()
if gg.HasNext() {
lastSegment = segments[i]
break
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion polygon/heimdall/span_block_producers_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ type spanBlockProducersTracker struct {
}

func (t *spanBlockProducersTracker) Run(ctx context.Context) error {
defer close(t.idleSignal)
t.logger.Info(heimdallLogPrefix("running span block producers tracker component"))

defer close(t.idleSignal)
for {
select {
case <-ctx.Done():
Expand Down
2 changes: 1 addition & 1 deletion polygon/p2p/message_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type MessageListener struct {
}

func (ml *MessageListener) Run(ctx context.Context) error {
ml.logger.Debug(messageListenerLogPrefix("running p2p message listener component"))
ml.logger.Info(messageListenerLogPrefix("running p2p message listener component"))

backgroundLoops := []func(ctx context.Context){
ml.listenInboundMessages,
Expand Down
2 changes: 1 addition & 1 deletion polygon/p2p/peer_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type PeerTracker struct {
}

func (pt *PeerTracker) Run(ctx context.Context) error {
pt.logger.Debug(peerTrackerLogPrefix("running peer tracker component"))
pt.logger.Info(peerTrackerLogPrefix("running peer tracker component"))

var peerEventUnreg polygoncommon.UnregisterFunc
defer func() { peerEventUnreg() }()
Expand Down
3 changes: 2 additions & 1 deletion polygon/p2p/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ func (p Publisher) PublishNewBlockHashes(block *types.Block) {
}

func (p Publisher) Run(ctx context.Context) error {
p.logger.Debug("[p2p-publisher] running publisher")
p.logger.Info("[p2p-publisher] running publisher component")

for {
select {
case <-ctx.Done():
Expand Down
4 changes: 4 additions & 0 deletions polygon/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func NewService(logger log.Logger, maxPeers int, sc sentryproto.SentryClient, sd
fetcher = NewTrackingFetcher(fetcher, peerTracker)
publisher := NewPublisher(logger, messageSender, peerTracker)
return &Service{
logger: logger,
fetcher: fetcher,
messageListener: messageListener,
peerPenalizer: peerPenalizer,
Expand All @@ -53,6 +54,7 @@ func NewService(logger log.Logger, maxPeers int, sc sentryproto.SentryClient, sd
}

type Service struct {
logger log.Logger
fetcher Fetcher
messageListener *MessageListener
peerPenalizer *PeerPenalizer
Expand All @@ -62,6 +64,8 @@ type Service struct {
}

func (s *Service) Run(ctx context.Context) error {
s.logger.Info("[p2p] running p2p service component")

eg, ctx := errgroup.WithContext(ctx)
eg.Go(func() error {
if err := s.messageListener.Run(ctx); err != nil {
Expand Down
13 changes: 9 additions & 4 deletions polygon/sync/block_downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (d *BlockDownloader) downloadBlocksUsingWaypoints(

waypoints = d.limitWaypoints(waypoints)

d.logger.Debug(
d.logger.Info(
syncLogPrefix("downloading blocks using waypoints"),
"waypointsLen", len(waypoints),
"start", waypoints[0].StartBlock().Uint64(),
Expand Down Expand Up @@ -283,17 +283,22 @@ func (d *BlockDownloader) downloadBlocksUsingWaypoints(
continue
}

d.logger.Debug(syncLogPrefix("fetched blocks"), "start", blocks[0].NumberU64(), "end", blocks[len(blocks)-1].NumberU64(),
d.logger.Debug(
syncLogPrefix("fetched blocks"),
"start", blocks[0].NumberU64(),
"end", blocks[len(blocks)-1].NumberU64(),
"blocks", len(blocks),
"duration", time.Since(batchFetchStartTime),
"blks/sec", float64(len(blocks))/math.Max(time.Since(batchFetchStartTime).Seconds(), 0.0001))
"blks/sec", float64(len(blocks))/math.Max(time.Since(batchFetchStartTime).Seconds(), 0.0001),
)

batchFetchStartTime = time.Now() // reset for next time

d.logger.Info(
syncLogPrefix(fmt.Sprintf("inserting %d fetched blocks", len(blocks))),
syncLogPrefix("inserting fetched blocks"),
"start", blocks[0].NumberU64(),
"end", blocks[len(blocks)-1].NumberU64(),
"blocks", len(blocks),
)
if err := d.store.InsertBlocks(ctx, blocks); err != nil {
return nil, err
Expand Down
Loading
Loading