From aba01b89b19d7d71c53217f03abf12b72ef916f7 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Wed, 20 Nov 2024 11:14:24 +0000 Subject: [PATCH 01/13] execution: remove td checks from execution module getters --- turbo/execution/eth1/ethereum_execution.go | 22 ---------------------- turbo/execution/eth1/getters.go | 7 ------- 2 files changed, 29 deletions(-) diff --git a/turbo/execution/eth1/ethereum_execution.go b/turbo/execution/eth1/ethereum_execution.go index e0a7cacdf5c..6e6c56745ca 100644 --- a/turbo/execution/eth1/ethereum_execution.go +++ b/turbo/execution/eth1/ethereum_execution.go @@ -121,14 +121,6 @@ func NewEthereumExecutionModule(blockReader services.FullBlockReader, db kv.RwDB } func (e *EthereumExecutionModule) getHeader(ctx context.Context, tx kv.Tx, blockHash libcommon.Hash, blockNumber uint64) (*types.Header, error) { - td, err := rawdb.ReadTd(tx, blockHash, blockNumber) - if err != nil { - return nil, err - } - if td == nil { - return nil, nil - } - if e.blockReader == nil { return rawdb.ReadHeader(tx, blockHash, blockNumber), nil } @@ -142,13 +134,6 @@ func (e *EthereumExecutionModule) getTD(_ context.Context, tx kv.Tx, blockHash l } func (e *EthereumExecutionModule) getBody(ctx context.Context, tx kv.Tx, blockHash libcommon.Hash, blockNumber uint64) (*types.Body, error) { - td, err := rawdb.ReadTd(tx, blockHash, blockNumber) - if err != nil { - return nil, err - } - if td == nil { - return nil, nil - } if e.blockReader == nil { body, _, _ := rawdb.ReadBody(tx, blockHash, blockNumber) return body, nil @@ -175,13 +160,6 @@ func (e *EthereumExecutionModule) canonicalHash(ctx context.Context, tx kv.Tx, b } } - td, err := rawdb.ReadTd(tx, canonical, blockNumber) - if err != nil { - return libcommon.Hash{}, err - } - if td == nil { - return libcommon.Hash{}, nil - } return canonical, nil } diff --git a/turbo/execution/eth1/getters.go b/turbo/execution/eth1/getters.go index 9f305daf5fd..ffd9b008a78 100644 --- a/turbo/execution/eth1/getters.go +++ b/turbo/execution/eth1/getters.go @@ -84,13 +84,6 @@ func (e *EthereumExecutionModule) GetBody(ctx context.Context, req *execution.Ge if err != nil { return nil, fmt.Errorf("ethereumExecutionModule.GetBody: parseSegmentRequest error %w", err) } - td, err := rawdb.ReadTd(tx, blockHash, blockNumber) - if err != nil { - return nil, fmt.Errorf("ethereumExecutionModule.GetBody: ReadTd error %w", err) - } - if td == nil { - return &execution.GetBodyResponse{Body: nil}, nil - } body, err := e.getBody(ctx, tx, blockHash, blockNumber) if err != nil { return nil, fmt.Errorf("ethereumExecutionModule.GetBody: getBody error %w", err) From e6d632422039465f12f0fd74c0e104ff0bdd2808 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Wed, 20 Nov 2024 16:26:16 +0000 Subject: [PATCH 02/13] wip --- polygon/bridge/service.go | 6 +-- polygon/heimdall/entity_fetcher.go | 21 +++++++-- polygon/p2p/message_listener.go | 2 +- polygon/p2p/peer_tracker.go | 2 +- polygon/sync/store.go | 76 ++++++++++++++++++++++-------- polygon/sync/tip_events.go | 2 +- 6 files changed, 80 insertions(+), 29 deletions(-) diff --git a/polygon/bridge/service.go b/polygon/bridge/service.go index 82fdb8b9a95..c0a41c20027 100644 --- a/polygon/bridge/service.go +++ b/polygon/bridge/service.go @@ -168,7 +168,7 @@ func (s *Service) Run(ctx context.Context) error { } // start syncing - s.logger.Debug( + s.logger.Info( bridgeLogPrefix("running bridge component"), "lastFetchedEventId", lastFetchedEventId, "lastProcessedEventId", lastProcessedEventId, @@ -238,7 +238,7 @@ func (s *Service) Run(ctx context.Context) error { select { case <-logTicker.C: - s.logger.Debug( + s.logger.Info( bridgeLogPrefix("fetched new events periodic progress"), "count", len(events), "lastFetchedEventId", lastFetchedEventId, @@ -257,7 +257,7 @@ func (s *Service) InitialBlockReplayNeeded(ctx context.Context) (uint64, bool, e lastFrozen := s.store.LastFrozenEventBlockNum() if blockInfo := s.lastProcessedBlockInfo.Load(); blockInfo != nil && blockInfo.BlockNum > lastFrozen { - return 0, false, nil + return blockInfo.BlockNum, false, nil } blockInfo, ok, err := s.store.LastProcessedBlockInfo(ctx) diff --git a/polygon/heimdall/entity_fetcher.go b/polygon/heimdall/entity_fetcher.go index 43a4c27b968..aa01392a6cc 100644 --- a/polygon/heimdall/entity_fetcher.go +++ b/polygon/heimdall/entity_fetcher.go @@ -94,8 +94,23 @@ func (f *EntityFetcher[TEntity]) FetchEntitiesRange(ctx context.Context, idRange } func (f *EntityFetcher[TEntity]) FetchEntitiesRangeSequentially(ctx context.Context, idRange ClosedRange) ([]TEntity, error) { + progressLogTicker := time.NewTicker(30 * time.Second) + defer progressLogTicker.Stop() + entities := make([]TEntity, 0, idRange.Len()) for id := idRange.Start; id <= idRange.End; id++ { + select { + case <-progressLogTicker.C: + f.logger.Info( + heimdallLogPrefix(f.name+" fetch entities sequentially periodic progress"), + "current", id, + "start", idRange.Start, + "end", idRange.End, + ) + default: + // carry-on + } + entity, err := f.fetchEntity(ctx, int64(id)) if err != nil { // return fetched entities up to this point in case of transient errors @@ -134,8 +149,8 @@ func (f *EntityFetcher[TEntity]) FetchAllEntities(ctx context.Context) ([]TEntit select { case <-progressLogTicker.C: - f.logger.Debug( - heimdallLogPrefix(f.name+" progress"), + f.logger.Info( + heimdallLogPrefix(f.name+" fetch all entities periodic progress"), "page", page, "len", len(entities), ) @@ -155,7 +170,7 @@ func (f *EntityFetcher[TEntity]) FetchAllEntities(ctx context.Context) ([]TEntit } f.logger.Debug( - heimdallLogPrefix(f.name+" done"), + heimdallLogPrefix(f.name+" fetch all entities done"), "len", len(entities), "duration", time.Since(fetchStartTime), ) diff --git a/polygon/p2p/message_listener.go b/polygon/p2p/message_listener.go index ceba8f1f2d6..046b3be607c 100644 --- a/polygon/p2p/message_listener.go +++ b/polygon/p2p/message_listener.go @@ -73,7 +73,7 @@ type MessageListener struct { } func (ml *MessageListener) Run(ctx context.Context) error { - ml.logger.Debug(messageListenerLogPrefix("running p2p message listener component")) + ml.logger.Info(messageListenerLogPrefix("running p2p message listener component")) backgroundLoops := []func(ctx context.Context){ ml.listenInboundMessages, diff --git a/polygon/p2p/peer_tracker.go b/polygon/p2p/peer_tracker.go index fbe43f631d4..c54f29b6204 100644 --- a/polygon/p2p/peer_tracker.go +++ b/polygon/p2p/peer_tracker.go @@ -63,7 +63,7 @@ type PeerTracker struct { } func (pt *PeerTracker) Run(ctx context.Context) error { - pt.logger.Debug(peerTrackerLogPrefix("running peer tracker component")) + pt.logger.Info(peerTrackerLogPrefix("running peer tracker component")) var peerEventUnreg polygoncommon.UnregisterFunc defer func() { peerEventUnreg() }() diff --git a/polygon/sync/store.go b/polygon/sync/store.go index 78d216f1a56..235c67eca6b 100644 --- a/polygon/sync/store.go +++ b/polygon/sync/store.go @@ -115,7 +115,7 @@ func (s *ExecutionClientStore) Flush(ctx context.Context) error { } func (s *ExecutionClientStore) Run(ctx context.Context) error { - s.logger.Debug(syncLogPrefix("running execution client store component")) + s.logger.Info(syncLogPrefix("running execution client store component")) for { select { @@ -190,48 +190,84 @@ func (s *ExecutionClientStore) insertBlocks(ctx context.Context, blocks []*types // // The bridge store is in control of determining whether and which block need replaying. func (s *ExecutionClientStore) bridgeReplayInitialBlockIfNeeded(ctx context.Context) error { - initialBlockNum, replayNeeded, err := s.bridgeStore.InitialBlockReplayNeeded(ctx) + initialBlockNum, initialReplayNeeded, err := s.bridgeStore.InitialBlockReplayNeeded(ctx) if err != nil { return err } - if !replayNeeded { - return nil - } - initialHeader, err := s.executionStore.GetHeader(ctx, initialBlockNum) - if err != nil { - return err - } + if initialReplayNeeded { + initialHeader, err := s.executionStore.GetHeader(ctx, initialBlockNum) + if err != nil { + return err + } - s.logger.Debug( - syncLogPrefix("replaying initial block for bridge store"), - "blockNum", initialHeader.Number.Uint64(), - ) + s.logger.Info( + syncLogPrefix("replaying initial block for bridge store"), + "blockNum", initialHeader.Number.Uint64(), + ) - if err = s.bridgeStore.ReplayInitialBlock(ctx, types.NewBlockWithHeader(initialHeader)); err != nil { - return err + if err = s.bridgeStore.ReplayInitialBlock(ctx, types.NewBlockWithHeader(initialHeader)); err != nil { + return err + } } if s.lastQueuedBlock <= initialBlockNum { return nil } + start, end := initialBlockNum+1, s.lastQueuedBlock blocksCount := s.lastQueuedBlock - initialBlockNum - s.logger.Debug( + s.logger.Info( syncLogPrefix("replaying post initial blocks for bridge store to fill gap with execution"), + "start", start, + "end", end, "blocks", blocksCount, - "executionTip", s.lastQueuedBlock, ) - blocks := make([]*types.Block, 0, blocksCount) - for blockNum := initialBlockNum + 1; blockNum <= s.lastQueuedBlock; blockNum++ { + progressLogTicker := time.NewTicker(30 * time.Second) + defer progressLogTicker.Stop() + + blocksBatchSize := 10_000 + blocks := make([]*types.Block, 0, blocksBatchSize) + for blockNum := start; blockNum <= end; blockNum++ { header, err := s.executionStore.GetHeader(ctx, blockNum) if err != nil { return err } + if header == nil { + // this should never happen, check snapshot files and/or db data integrity + //return fmt.Errorf("unexpected block header missing when replying blocks for bridge: %d", blockNum) + + // + // TODO switch back to error + // + s.logger.Error("unexpected block header missing when replying blocks for bridge", "blockNum", blockNum) + continue + } + + select { + case <-progressLogTicker.C: + s.logger.Info( + syncLogPrefix("replaying blocks for bridge periodic progress"), + "current", blockNum, + "start", start, + "end", end, + ) + default: + // carry-on + } blocks = append(blocks, types.NewBlockWithHeader(header)) + if len(blocks) < blocksBatchSize && blockNum != s.lastQueuedBlock { + continue + } + + if err := s.bridgeStore.ProcessNewBlocks(ctx, blocks); err != nil { + return err + } + + blocks = nil } - return s.bridgeStore.ProcessNewBlocks(ctx, blocks) + return nil } diff --git a/polygon/sync/tip_events.go b/polygon/sync/tip_events.go index 4cde4b6e4ac..b839266c97f 100644 --- a/polygon/sync/tip_events.go +++ b/polygon/sync/tip_events.go @@ -141,7 +141,7 @@ func (te *TipEvents) Events() <-chan Event { } func (te *TipEvents) Run(ctx context.Context) error { - te.logger.Debug(syncLogPrefix("running tip events component")) + te.logger.Info(syncLogPrefix("running tip events component")) newBlockObserverCancel := te.p2pObserverRegistrar.RegisterNewBlockObserver(func(message *p2p.DecodedInboundMessage[*eth.NewBlockPacket]) { block := message.Decoded.Block From 74ae79a50f52c0a22cb971c1f255665804d39438 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Fri, 22 Nov 2024 13:54:54 +0000 Subject: [PATCH 03/13] good err --- polygon/sync/store.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/polygon/sync/store.go b/polygon/sync/store.go index 235c67eca6b..06f8c876e55 100644 --- a/polygon/sync/store.go +++ b/polygon/sync/store.go @@ -236,13 +236,9 @@ func (s *ExecutionClientStore) bridgeReplayInitialBlockIfNeeded(ctx context.Cont } if header == nil { // this should never happen, check snapshot files and/or db data integrity - //return fmt.Errorf("unexpected block header missing when replying blocks for bridge: %d", blockNum) - - // - // TODO switch back to error - // - s.logger.Error("unexpected block header missing when replying blocks for bridge", "blockNum", blockNum) - continue + return fmt.Errorf("unexpected block header missing when replying blocks for bridge, "+ + "check snapshot files and/or db data integrity, "+ + "rm -rf datadir/chaindata can fix this: %d", blockNum) } select { From 67735e9780d5ed2840470bb2961b490e5aa320a8 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Fri, 22 Nov 2024 16:33:01 +0000 Subject: [PATCH 04/13] fix stuff, more logging --- polygon/bridge/service.go | 2 +- polygon/heimdall/scraper.go | 5 ++ polygon/heimdall/service.go | 6 +- .../heimdall/span_block_producers_tracker.go | 3 +- polygon/p2p/publisher.go | 3 +- polygon/p2p/service.go | 4 + polygon/sync/service.go | 5 +- polygon/sync/store.go | 73 ++++++++++--------- polygon/sync/store_mock.go | 38 ++++++++++ polygon/sync/sync.go | 4 + txnprovider/txpool/pool_mock.go | 69 +++++++++--------- 11 files changed, 137 insertions(+), 75 deletions(-) diff --git a/polygon/bridge/service.go b/polygon/bridge/service.go index c0a41c20027..6a7daec44cb 100644 --- a/polygon/bridge/service.go +++ b/polygon/bridge/service.go @@ -169,7 +169,7 @@ func (s *Service) Run(ctx context.Context) error { // start syncing s.logger.Info( - bridgeLogPrefix("running bridge component"), + bridgeLogPrefix("running bridge service component"), "lastFetchedEventId", lastFetchedEventId, "lastProcessedEventId", lastProcessedEventId, "lastProcessedBlockNum", lastProcessedBlockInfo.BlockNum, diff --git a/polygon/heimdall/scraper.go b/polygon/heimdall/scraper.go index 016dab34e84..45c4047ec98 100644 --- a/polygon/heimdall/scraper.go +++ b/polygon/heimdall/scraper.go @@ -31,6 +31,7 @@ import ( ) type Scraper[TEntity Entity] struct { + name string store EntityStore[TEntity] fetcher entityFetcher[TEntity] pollDelay time.Duration @@ -41,6 +42,7 @@ type Scraper[TEntity Entity] struct { } func NewScraper[TEntity Entity]( + name string, store EntityStore[TEntity], fetcher entityFetcher[TEntity], pollDelay time.Duration, @@ -48,6 +50,7 @@ func NewScraper[TEntity Entity]( logger log.Logger, ) *Scraper[TEntity] { return &Scraper[TEntity]{ + name: name, store: store, fetcher: fetcher, pollDelay: pollDelay, @@ -59,6 +62,8 @@ func NewScraper[TEntity Entity]( } func (s *Scraper[TEntity]) Run(ctx context.Context) error { + s.logger.Info(heimdallLogPrefix(fmt.Sprintf("running %s scrapper component", s.name))) + defer s.store.Close() if err := s.store.Prepare(ctx); err != nil { return err diff --git a/polygon/heimdall/service.go b/polygon/heimdall/service.go index baf892a46a2..e288cd38fb0 100644 --- a/polygon/heimdall/service.go +++ b/polygon/heimdall/service.go @@ -60,6 +60,7 @@ func NewService(config ServiceConfig) *Service { spanFetcher := NewSpanFetcher(client, logger) checkpointScraper := NewScraper( + "checkpoints", store.Checkpoints(), checkpointFetcher, 1*time.Second, @@ -74,6 +75,7 @@ func NewService(config ServiceConfig) *Service { milestoneScraperTransientErrors := []error{ErrNotInMilestoneList} milestoneScraperTransientErrors = append(milestoneScraperTransientErrors, TransientErrors...) milestoneScraper := NewScraper( + "milestones", store.Milestones(), milestoneFetcher, 1*time.Second, @@ -82,6 +84,7 @@ func NewService(config ServiceConfig) *Service { ) spanScraper := NewScraper( + "spans", store.Spans(), spanFetcher, 1*time.Second, @@ -297,8 +300,9 @@ func (s *Service) Ready(ctx context.Context) <-chan error { } func (s *Service) Run(ctx context.Context) error { - defer s.store.Close() + s.logger.Info(heimdallLogPrefix("running %s heimdall service component")) + defer s.store.Close() if err := s.store.Prepare(ctx); err != nil { return nil } diff --git a/polygon/heimdall/span_block_producers_tracker.go b/polygon/heimdall/span_block_producers_tracker.go index 89e6193935a..3775b52ddbb 100644 --- a/polygon/heimdall/span_block_producers_tracker.go +++ b/polygon/heimdall/span_block_producers_tracker.go @@ -61,8 +61,9 @@ type spanBlockProducersTracker struct { } func (t *spanBlockProducersTracker) Run(ctx context.Context) error { - defer close(t.idleSignal) + t.logger.Info(heimdallLogPrefix("running %s span block producers tracker component")) + defer close(t.idleSignal) for { select { case <-ctx.Done(): diff --git a/polygon/p2p/publisher.go b/polygon/p2p/publisher.go index 5a6c7d50ec3..fbce5d8b5df 100644 --- a/polygon/p2p/publisher.go +++ b/polygon/p2p/publisher.go @@ -73,7 +73,8 @@ func (p Publisher) PublishNewBlockHashes(block *types.Block) { } func (p Publisher) Run(ctx context.Context) error { - p.logger.Debug("[p2p-publisher] running publisher") + p.logger.Info("[p2p-publisher] running publisher component") + for { select { case <-ctx.Done(): diff --git a/polygon/p2p/service.go b/polygon/p2p/service.go index ee3ac5262dc..b21bf38ae99 100644 --- a/polygon/p2p/service.go +++ b/polygon/p2p/service.go @@ -43,6 +43,7 @@ func NewService(logger log.Logger, maxPeers int, sc sentryproto.SentryClient, sd fetcher = NewTrackingFetcher(fetcher, peerTracker) publisher := NewPublisher(logger, messageSender, peerTracker) return &Service{ + logger: logger, fetcher: fetcher, messageListener: messageListener, peerPenalizer: peerPenalizer, @@ -53,6 +54,7 @@ func NewService(logger log.Logger, maxPeers int, sc sentryproto.SentryClient, sd } type Service struct { + logger log.Logger fetcher Fetcher messageListener *MessageListener peerPenalizer *PeerPenalizer @@ -62,6 +64,8 @@ type Service struct { } func (s *Service) Run(ctx context.Context) error { + s.logger.Info("[p2p] running p2p service component") + eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { if err := s.messageListener.Run(ctx); err != nil { diff --git a/polygon/sync/service.go b/polygon/sync/service.go index 8ce5c77bb89..a1eef4132c2 100644 --- a/polygon/sync/service.go +++ b/polygon/sync/service.go @@ -80,6 +80,7 @@ func NewService( notifications, ) return &Service{ + logger: logger, sync: sync, p2pService: p2pService, store: store, @@ -90,6 +91,7 @@ func NewService( } type Service struct { + logger log.Logger sync *Sync p2pService *p2p.Service store Store @@ -99,8 +101,9 @@ type Service struct { } func (s *Service) Run(parentCtx context.Context) error { - group, ctx := errgroup.WithContext(parentCtx) + s.logger.Info(syncLogPrefix("running sync service component")) + group, ctx := errgroup.WithContext(parentCtx) group.Go(func() error { if err := s.p2pService.Run(ctx); err != nil { return fmt.Errorf("pos sync p2p failed: %w", err) diff --git a/polygon/sync/store.go b/polygon/sync/store.go index 06f8c876e55..5493ebd2a4e 100644 --- a/polygon/sync/store.go +++ b/polygon/sync/store.go @@ -30,6 +30,8 @@ import ( //go:generate mockgen -typed=true -destination=./store_mock.go -package=sync . Store type Store interface { + // Prepare runs initialisation of the store + Prepare(ctx context.Context) error // InsertBlocks queues blocks for writing into the local canonical chain. InsertBlocks(ctx context.Context, blocks []*types.Block) error // Flush makes sure that all queued blocks have been written. @@ -69,19 +71,37 @@ type ExecutionClientStore struct { tasksCount atomic.Int32 // tasksDoneSignal gets sent a value when tasksCount becomes 0 tasksDoneSignal chan bool - blockReplayDone bool + prepared bool lastQueuedBlock uint64 } +func (s *ExecutionClientStore) Prepare(ctx context.Context) error { + if s.prepared { + return nil + } + + executionTip, err := s.executionStore.CurrentHeader(ctx) + if err != nil { + return err + } + + executionTipBlockNum := executionTip.Number.Uint64() + if err := s.bridgeReplayInitialBlockIfNeeded(ctx, executionTipBlockNum); err != nil { + return err + } + + s.lastQueuedBlock = executionTipBlockNum + s.prepared = true + return nil +} + func (s *ExecutionClientStore) InsertBlocks(ctx context.Context, blocks []*types.Block) error { if len(blocks) == 0 { return nil } - if s.lastQueuedBlock != 0 { - if blocks[0].NumberU64() > s.lastQueuedBlock+1 { - return fmt.Errorf("block gap inserted: expected: %d, have: %d", s.lastQueuedBlock+1, blocks[0].NumberU64()) - } + if blocks[0].NumberU64() > s.lastQueuedBlock+1 { + return fmt.Errorf("block gap inserted: expected: %d, have: %d", s.lastQueuedBlock+1, blocks[0].NumberU64()) } if lastInserted := blocks[len(blocks)-1].NumberU64(); lastInserted > s.lastQueuedBlock { @@ -139,28 +159,6 @@ func (s *ExecutionClientStore) Run(ctx context.Context) error { func (s *ExecutionClientStore) insertBlocks(ctx context.Context, blocks []*types.Block) error { defer s.tasksCount.Add(-1) insertStartTime := time.Now() - - if !s.blockReplayDone { - if s.lastQueuedBlock == 0 { - executionTip, err := s.executionStore.CurrentHeader(ctx) - if err != nil { - return err - } - - if executionTip == nil { - return nil - } - - s.lastQueuedBlock = executionTip.Number.Uint64() - } - - if err := s.bridgeReplayInitialBlockIfNeeded(ctx); err != nil { - return err - } - - s.blockReplayDone = true - } - err := s.executionStore.InsertBlocks(ctx, blocks) if err != nil { return err @@ -172,7 +170,10 @@ func (s *ExecutionClientStore) insertBlocks(ctx context.Context, blocks []*types } if len(blocks) > 0 { - s.logger.Debug(syncLogPrefix("inserted blocks"), "from", blocks[0].NumberU64(), "to", blocks[len(blocks)-1].NumberU64(), + s.logger.Debug( + syncLogPrefix("inserted blocks"), + "from", blocks[0].NumberU64(), + "to", blocks[len(blocks)-1].NumberU64(), "blocks", len(blocks), "duration", time.Since(insertStartTime), "blks/sec", float64(len(blocks))/math.Max(time.Since(insertStartTime).Seconds(), 0.0001)) @@ -189,7 +190,7 @@ func (s *ExecutionClientStore) insertBlocks(ctx context.Context, blocks []*types // a conscious design decision. // // The bridge store is in control of determining whether and which block need replaying. -func (s *ExecutionClientStore) bridgeReplayInitialBlockIfNeeded(ctx context.Context) error { +func (s *ExecutionClientStore) bridgeReplayInitialBlockIfNeeded(ctx context.Context, executionTipBlockNum uint64) error { initialBlockNum, initialReplayNeeded, err := s.bridgeStore.InitialBlockReplayNeeded(ctx) if err != nil { return err @@ -211,12 +212,12 @@ func (s *ExecutionClientStore) bridgeReplayInitialBlockIfNeeded(ctx context.Cont } } - if s.lastQueuedBlock <= initialBlockNum { + if executionTipBlockNum <= initialBlockNum { return nil } - start, end := initialBlockNum+1, s.lastQueuedBlock - blocksCount := s.lastQueuedBlock - initialBlockNum + start, end := initialBlockNum+1, executionTipBlockNum + blocksCount := executionTipBlockNum - initialBlockNum s.logger.Info( syncLogPrefix("replaying post initial blocks for bridge store to fill gap with execution"), "start", start, @@ -235,10 +236,10 @@ func (s *ExecutionClientStore) bridgeReplayInitialBlockIfNeeded(ctx context.Cont return err } if header == nil { - // this should never happen, check snapshot files and/or db data integrity - return fmt.Errorf("unexpected block header missing when replying blocks for bridge, "+ - "check snapshot files and/or db data integrity, "+ - "rm -rf datadir/chaindata can fix this: %d", blockNum) + return fmt.Errorf("unexpected block header missing when replaying blocks for bridge, "+ + "likely due to a gap in snapshot files and db data after restart, "+ + "rm -rf datadir/chaindata can fix this "+ + "(note this is quick to recover from and not equivalent to a full re-sync): %d", blockNum) } select { diff --git a/polygon/sync/store_mock.go b/polygon/sync/store_mock.go index 5d5e7e4c70a..c8f941f9f56 100644 --- a/polygon/sync/store_mock.go +++ b/polygon/sync/store_mock.go @@ -117,6 +117,44 @@ func (c *MockStoreInsertBlocksCall) DoAndReturn(f func(context.Context, []*types return c } +// Prepare mocks base method. +func (m *MockStore) Prepare(ctx context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Prepare", ctx) + ret0, _ := ret[0].(error) + return ret0 +} + +// Prepare indicates an expected call of Prepare. +func (mr *MockStoreMockRecorder) Prepare(ctx any) *MockStorePrepareCall { + mr.mock.ctrl.T.Helper() + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Prepare", reflect.TypeOf((*MockStore)(nil).Prepare), ctx) + return &MockStorePrepareCall{Call: call} +} + +// MockStorePrepareCall wrap *gomock.Call +type MockStorePrepareCall struct { + *gomock.Call +} + +// Return rewrite *gomock.Call.Return +func (c *MockStorePrepareCall) Return(arg0 error) *MockStorePrepareCall { + c.Call = c.Call.Return(arg0) + return c +} + +// Do rewrite *gomock.Call.Do +func (c *MockStorePrepareCall) Do(f func(context.Context) error) *MockStorePrepareCall { + c.Call = c.Call.Do(f) + return c +} + +// DoAndReturn rewrite *gomock.Call.DoAndReturn +func (c *MockStorePrepareCall) DoAndReturn(f func(context.Context) error) *MockStorePrepareCall { + c.Call = c.Call.DoAndReturn(f) + return c +} + // Run mocks base method. func (m *MockStore) Run(ctx context.Context) error { m.ctrl.T.Helper() diff --git a/polygon/sync/sync.go b/polygon/sync/sync.go index c2e5788a595..10ad46d2798 100644 --- a/polygon/sync/sync.go +++ b/polygon/sync/sync.go @@ -610,6 +610,10 @@ func (s *Sync) Run(ctx context.Context) error { return err } + if err := s.store.Prepare(ctx); err != nil { + return err + } + s.logger.Info(syncLogPrefix("running sync component")) result, err := s.syncToTip(ctx) diff --git a/txnprovider/txpool/pool_mock.go b/txnprovider/txpool/pool_mock.go index 7c3baf01d83..35e3136df48 100644 --- a/txnprovider/txpool/pool_mock.go +++ b/txnprovider/txpool/pool_mock.go @@ -23,6 +23,7 @@ import ( type MockPool struct { ctrl *gomock.Controller recorder *MockPoolMockRecorder + isgomock struct{} } // MockPoolMockRecorder is the mock recorder for MockPool. @@ -43,18 +44,18 @@ func (m *MockPool) EXPECT() *MockPoolMockRecorder { } // AddLocalTxns mocks base method. -func (m *MockPool) AddLocalTxns(arg0 context.Context, arg1 TxnSlots) ([]txpoolcfg.DiscardReason, error) { +func (m *MockPool) AddLocalTxns(ctx context.Context, newTxns TxnSlots) ([]txpoolcfg.DiscardReason, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AddLocalTxns", arg0, arg1) + ret := m.ctrl.Call(m, "AddLocalTxns", ctx, newTxns) ret0, _ := ret[0].([]txpoolcfg.DiscardReason) ret1, _ := ret[1].(error) return ret0, ret1 } // AddLocalTxns indicates an expected call of AddLocalTxns. -func (mr *MockPoolMockRecorder) AddLocalTxns(arg0, arg1 any) *MockPoolAddLocalTxnsCall { +func (mr *MockPoolMockRecorder) AddLocalTxns(ctx, newTxns any) *MockPoolAddLocalTxnsCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddLocalTxns", reflect.TypeOf((*MockPool)(nil).AddLocalTxns), arg0, arg1) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddLocalTxns", reflect.TypeOf((*MockPool)(nil).AddLocalTxns), ctx, newTxns) return &MockPoolAddLocalTxnsCall{Call: call} } @@ -82,15 +83,15 @@ func (c *MockPoolAddLocalTxnsCall) DoAndReturn(f func(context.Context, TxnSlots) } // AddNewGoodPeer mocks base method. -func (m *MockPool) AddNewGoodPeer(arg0 PeerID) { +func (m *MockPool) AddNewGoodPeer(peerID PeerID) { m.ctrl.T.Helper() - m.ctrl.Call(m, "AddNewGoodPeer", arg0) + m.ctrl.Call(m, "AddNewGoodPeer", peerID) } // AddNewGoodPeer indicates an expected call of AddNewGoodPeer. -func (mr *MockPoolMockRecorder) AddNewGoodPeer(arg0 any) *MockPoolAddNewGoodPeerCall { +func (mr *MockPoolMockRecorder) AddNewGoodPeer(peerID any) *MockPoolAddNewGoodPeerCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddNewGoodPeer", reflect.TypeOf((*MockPool)(nil).AddNewGoodPeer), arg0) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddNewGoodPeer", reflect.TypeOf((*MockPool)(nil).AddNewGoodPeer), peerID) return &MockPoolAddNewGoodPeerCall{Call: call} } @@ -118,15 +119,15 @@ func (c *MockPoolAddNewGoodPeerCall) DoAndReturn(f func(PeerID)) *MockPoolAddNew } // AddRemoteTxns mocks base method. -func (m *MockPool) AddRemoteTxns(arg0 context.Context, arg1 TxnSlots) { +func (m *MockPool) AddRemoteTxns(ctx context.Context, newTxns TxnSlots) { m.ctrl.T.Helper() - m.ctrl.Call(m, "AddRemoteTxns", arg0, arg1) + m.ctrl.Call(m, "AddRemoteTxns", ctx, newTxns) } // AddRemoteTxns indicates an expected call of AddRemoteTxns. -func (mr *MockPoolMockRecorder) AddRemoteTxns(arg0, arg1 any) *MockPoolAddRemoteTxnsCall { +func (mr *MockPoolMockRecorder) AddRemoteTxns(ctx, newTxns any) *MockPoolAddRemoteTxnsCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRemoteTxns", reflect.TypeOf((*MockPool)(nil).AddRemoteTxns), arg0, arg1) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRemoteTxns", reflect.TypeOf((*MockPool)(nil).AddRemoteTxns), ctx, newTxns) return &MockPoolAddRemoteTxnsCall{Call: call} } @@ -154,18 +155,18 @@ func (c *MockPoolAddRemoteTxnsCall) DoAndReturn(f func(context.Context, TxnSlots } // FilterKnownIdHashes mocks base method. -func (m *MockPool) FilterKnownIdHashes(arg0 kv.Tx, arg1 Hashes) (Hashes, error) { +func (m *MockPool) FilterKnownIdHashes(tx kv.Tx, hashes Hashes) (Hashes, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FilterKnownIdHashes", arg0, arg1) + ret := m.ctrl.Call(m, "FilterKnownIdHashes", tx, hashes) ret0, _ := ret[0].(Hashes) ret1, _ := ret[1].(error) return ret0, ret1 } // FilterKnownIdHashes indicates an expected call of FilterKnownIdHashes. -func (mr *MockPoolMockRecorder) FilterKnownIdHashes(arg0, arg1 any) *MockPoolFilterKnownIdHashesCall { +func (mr *MockPoolMockRecorder) FilterKnownIdHashes(tx, hashes any) *MockPoolFilterKnownIdHashesCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FilterKnownIdHashes", reflect.TypeOf((*MockPool)(nil).FilterKnownIdHashes), arg0, arg1) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FilterKnownIdHashes", reflect.TypeOf((*MockPool)(nil).FilterKnownIdHashes), tx, hashes) return &MockPoolFilterKnownIdHashesCall{Call: call} } @@ -175,8 +176,8 @@ type MockPoolFilterKnownIdHashesCall struct { } // Return rewrite *gomock.Call.Return -func (c *MockPoolFilterKnownIdHashesCall) Return(arg0 Hashes, arg1 error) *MockPoolFilterKnownIdHashesCall { - c.Call = c.Call.Return(arg0, arg1) +func (c *MockPoolFilterKnownIdHashesCall) Return(unknownHashes Hashes, err error) *MockPoolFilterKnownIdHashesCall { + c.Call = c.Call.Return(unknownHashes, err) return c } @@ -193,18 +194,18 @@ func (c *MockPoolFilterKnownIdHashesCall) DoAndReturn(f func(kv.Tx, Hashes) (Has } // GetRlp mocks base method. -func (m *MockPool) GetRlp(arg0 kv.Tx, arg1 []byte) ([]byte, error) { +func (m *MockPool) GetRlp(tx kv.Tx, hash []byte) ([]byte, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetRlp", arg0, arg1) + ret := m.ctrl.Call(m, "GetRlp", tx, hash) ret0, _ := ret[0].([]byte) ret1, _ := ret[1].(error) return ret0, ret1 } // GetRlp indicates an expected call of GetRlp. -func (mr *MockPoolMockRecorder) GetRlp(arg0, arg1 any) *MockPoolGetRlpCall { +func (mr *MockPoolMockRecorder) GetRlp(tx, hash any) *MockPoolGetRlpCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRlp", reflect.TypeOf((*MockPool)(nil).GetRlp), arg0, arg1) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRlp", reflect.TypeOf((*MockPool)(nil).GetRlp), tx, hash) return &MockPoolGetRlpCall{Call: call} } @@ -232,18 +233,18 @@ func (c *MockPoolGetRlpCall) DoAndReturn(f func(kv.Tx, []byte) ([]byte, error)) } // IdHashKnown mocks base method. -func (m *MockPool) IdHashKnown(arg0 kv.Tx, arg1 []byte) (bool, error) { +func (m *MockPool) IdHashKnown(tx kv.Tx, hash []byte) (bool, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "IdHashKnown", arg0, arg1) + ret := m.ctrl.Call(m, "IdHashKnown", tx, hash) ret0, _ := ret[0].(bool) ret1, _ := ret[1].(error) return ret0, ret1 } // IdHashKnown indicates an expected call of IdHashKnown. -func (mr *MockPoolMockRecorder) IdHashKnown(arg0, arg1 any) *MockPoolIdHashKnownCall { +func (mr *MockPoolMockRecorder) IdHashKnown(tx, hash any) *MockPoolIdHashKnownCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IdHashKnown", reflect.TypeOf((*MockPool)(nil).IdHashKnown), arg0, arg1) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IdHashKnown", reflect.TypeOf((*MockPool)(nil).IdHashKnown), tx, hash) return &MockPoolIdHashKnownCall{Call: call} } @@ -271,17 +272,17 @@ func (c *MockPoolIdHashKnownCall) DoAndReturn(f func(kv.Tx, []byte) (bool, error } // OnNewBlock mocks base method. -func (m *MockPool) OnNewBlock(arg0 context.Context, arg1 *remoteproto.StateChangeBatch, arg2, arg3, arg4 TxnSlots) error { +func (m *MockPool) OnNewBlock(ctx context.Context, stateChanges *remoteproto.StateChangeBatch, unwindTxns, unwindBlobTxns, minedTxns TxnSlots) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "OnNewBlock", arg0, arg1, arg2, arg3, arg4) + ret := m.ctrl.Call(m, "OnNewBlock", ctx, stateChanges, unwindTxns, unwindBlobTxns, minedTxns) ret0, _ := ret[0].(error) return ret0 } // OnNewBlock indicates an expected call of OnNewBlock. -func (mr *MockPoolMockRecorder) OnNewBlock(arg0, arg1, arg2, arg3, arg4 any) *MockPoolOnNewBlockCall { +func (mr *MockPoolMockRecorder) OnNewBlock(ctx, stateChanges, unwindTxns, unwindBlobTxns, minedTxns any) *MockPoolOnNewBlockCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnNewBlock", reflect.TypeOf((*MockPool)(nil).OnNewBlock), arg0, arg1, arg2, arg3, arg4) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnNewBlock", reflect.TypeOf((*MockPool)(nil).OnNewBlock), ctx, stateChanges, unwindTxns, unwindBlobTxns, minedTxns) return &MockPoolOnNewBlockCall{Call: call} } @@ -347,17 +348,17 @@ func (c *MockPoolStartedCall) DoAndReturn(f func() bool) *MockPoolStartedCall { } // ValidateSerializedTxn mocks base method. -func (m *MockPool) ValidateSerializedTxn(arg0 []byte) error { +func (m *MockPool) ValidateSerializedTxn(serializedTxn []byte) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ValidateSerializedTxn", arg0) + ret := m.ctrl.Call(m, "ValidateSerializedTxn", serializedTxn) ret0, _ := ret[0].(error) return ret0 } // ValidateSerializedTxn indicates an expected call of ValidateSerializedTxn. -func (mr *MockPoolMockRecorder) ValidateSerializedTxn(arg0 any) *MockPoolValidateSerializedTxnCall { +func (mr *MockPoolMockRecorder) ValidateSerializedTxn(serializedTxn any) *MockPoolValidateSerializedTxnCall { mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ValidateSerializedTxn", reflect.TypeOf((*MockPool)(nil).ValidateSerializedTxn), arg0) + call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ValidateSerializedTxn", reflect.TypeOf((*MockPool)(nil).ValidateSerializedTxn), serializedTxn) return &MockPoolValidateSerializedTxnCall{Call: call} } From 069f050ac757de2477216312cb386e39ba04d86a Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Fri, 22 Nov 2024 17:07:21 +0000 Subject: [PATCH 05/13] logging fix --- polygon/heimdall/span_block_producers_tracker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polygon/heimdall/span_block_producers_tracker.go b/polygon/heimdall/span_block_producers_tracker.go index 3775b52ddbb..de4df15ad5d 100644 --- a/polygon/heimdall/span_block_producers_tracker.go +++ b/polygon/heimdall/span_block_producers_tracker.go @@ -61,7 +61,7 @@ type spanBlockProducersTracker struct { } func (t *spanBlockProducersTracker) Run(ctx context.Context) error { - t.logger.Info(heimdallLogPrefix("running %s span block producers tracker component")) + t.logger.Info(heimdallLogPrefix("running span block producers tracker component")) defer close(t.idleSignal) for { From a7b348af481f91656a7dcf9ecc9d564eb8584062 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Fri, 22 Nov 2024 17:41:04 +0000 Subject: [PATCH 06/13] logging fix --- polygon/heimdall/scraper.go | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/polygon/heimdall/scraper.go b/polygon/heimdall/scraper.go index 45c4047ec98..70be2e83470 100644 --- a/polygon/heimdall/scraper.go +++ b/polygon/heimdall/scraper.go @@ -62,13 +62,16 @@ func NewScraper[TEntity Entity]( } func (s *Scraper[TEntity]) Run(ctx context.Context) error { - s.logger.Info(heimdallLogPrefix(fmt.Sprintf("running %s scrapper component", s.name))) + s.logger.Info(heimdallLogPrefix("running scraper component"), "name", s.name) defer s.store.Close() if err := s.store.Prepare(ctx); err != nil { return err } + progressLogTicker := time.NewTicker(30 * time.Second) + defer progressLogTicker.Stop() + for ctx.Err() == nil { lastKnownId, hasLastKnownId, err := s.store.LastEntityId(ctx) if err != nil { @@ -121,8 +124,24 @@ func (s *Scraper[TEntity]) Run(ctx context.Context) error { } s.observers.NotifySync(entities) // NotifySync preserves order of events + + select { + case <-progressLogTicker.C: + if len(entities) > 0 { + s.logger.Info( + heimdallLogPrefix("scraper periodic progress"), + "name", s.name, + "last", entities[len(entities)-1].RawId(), + "rangeStart", idRange.Start, + "rangeEnd", idRange.End, + ) + } + default: + // carry on + } } } + return ctx.Err() } From ba6742746228f513b7ecca2614aa99150b391f4d Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Fri, 22 Nov 2024 18:23:39 +0000 Subject: [PATCH 07/13] logging fix --- polygon/heimdall/scraper.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/polygon/heimdall/scraper.go b/polygon/heimdall/scraper.go index 70be2e83470..060b42e9cdf 100644 --- a/polygon/heimdall/scraper.go +++ b/polygon/heimdall/scraper.go @@ -131,9 +131,10 @@ func (s *Scraper[TEntity]) Run(ctx context.Context) error { s.logger.Info( heimdallLogPrefix("scraper periodic progress"), "name", s.name, - "last", entities[len(entities)-1].RawId(), "rangeStart", idRange.Start, "rangeEnd", idRange.End, + "priorLastKnownId", lastKnownId, + "newLast", entities[len(entities)-1].RawId(), ) } default: From da53f226132369688acaa41b7e5fdf82f0bca0a0 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Sat, 23 Nov 2024 13:21:47 +0000 Subject: [PATCH 08/13] fix bridgeReplayInitialBlockIfNeeded and logging --- polygon/bridge/service.go | 6 +++--- turbo/snapshotsync/freezeblocks/bor_snapshots.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/polygon/bridge/service.go b/polygon/bridge/service.go index 6a7daec44cb..a42623e1f6a 100644 --- a/polygon/bridge/service.go +++ b/polygon/bridge/service.go @@ -265,12 +265,12 @@ func (s *Service) InitialBlockReplayNeeded(ctx context.Context) (uint64, bool, e return 0, false, err } if ok && blockInfo.BlockNum > lastFrozen { - // we have all info, no need to replay - return 0, false, nil + // we have all info, no need to replay initial block + return blockInfo.BlockNum, false, nil } // replay the last block we have in event snapshots - return s.store.LastFrozenEventBlockNum(), true, nil + return lastFrozen, true, nil } func (s *Service) ReplayInitialBlock(ctx context.Context, block *types.Block) error { diff --git a/turbo/snapshotsync/freezeblocks/bor_snapshots.go b/turbo/snapshotsync/freezeblocks/bor_snapshots.go index 5b681954f2c..35b2f4e73ed 100644 --- a/turbo/snapshotsync/freezeblocks/bor_snapshots.go +++ b/turbo/snapshotsync/freezeblocks/bor_snapshots.go @@ -67,7 +67,7 @@ func (br *BlockRetire) retireBorBlocks(ctx context.Context, minBlockNum uint64, return false, nil } - logger.Log(log.LvlInfo /*lvl*/, "[bor snapshots] Retire Bor Blocks", "type", snap, + logger.Log(lvl, "[bor snapshots] Retire Bor Blocks", "type", snap, "range", fmt.Sprintf("%s-%s", common.PrettyCounter(blockFrom), common.PrettyCounter(blockTo))) var firstKeyGetter snaptype.FirstKeyGetter From 4769d11190dcbb999f6669d550d59cb21a5649fc Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Sat, 23 Nov 2024 14:08:05 +0000 Subject: [PATCH 09/13] fix empty index underflow --- polygon/bridge/snapshot_store.go | 4 ++-- polygon/heimdall/snapshot_store.go | 27 ++++++++++++++++++--------- polygon/sync/block_downloader.go | 8 ++++++-- 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/polygon/bridge/snapshot_store.go b/polygon/bridge/snapshot_store.go index dd4f2318712..f25ccd93a3c 100644 --- a/polygon/bridge/snapshot_store.go +++ b/polygon/bridge/snapshot_store.go @@ -83,7 +83,7 @@ func (s *SnapshotStore) LastFrozenEventBlockNum() uint64 { if len(segments) == 0 { return 0 } - // find the last segment which has a built index + // find the last segment which has a built non-empty index var lastSegment *snapshotsync.VisibleSegment for i := len(segments) - 1; i >= 0; i-- { if segments[i].Src().Index() != nil { @@ -161,7 +161,7 @@ func (s *SnapshotStore) LastFrozenEventId() uint64 { if len(segments) == 0 { return 0 } - // find the last segment which has a built index + // find the last segment which has a built non-empty index var lastSegment *snapshotsync.VisibleSegment for i := len(segments) - 1; i >= 0; i-- { if segments[i].Src().Index() != nil { diff --git a/polygon/heimdall/snapshot_store.go b/polygon/heimdall/snapshot_store.go index 7d382ca6b81..2048b83a734 100644 --- a/polygon/heimdall/snapshot_store.go +++ b/polygon/heimdall/snapshot_store.go @@ -102,12 +102,15 @@ func (s *SpanSnapshotStore) LastFrozenEntityId() uint64 { if len(segments) == 0 { return 0 } - // find the last segment which has a built index + // find the last segment which has a built non-empty index var lastSegment *snapshotsync.VisibleSegment for i := len(segments) - 1; i >= 0; i-- { if segments[i].Src().Index() != nil { - lastSegment = segments[i] - break + gg := segments[i].Src().MakeGetter() + if gg.HasNext() { + lastSegment = segments[i] + break + } } } if lastSegment == nil { @@ -221,12 +224,15 @@ func (r *milestoneSnapshotStore) LastFrozenEntityId() uint64 { if len(segments) == 0 { return 0 } - // find the last segment which has a built index + // find the last segment which has a built non-empty index var lastSegment *snapshotsync.VisibleSegment for i := len(segments) - 1; i >= 0; i-- { if segments[i].Src().Index() != nil { - lastSegment = segments[i] - break + gg := segments[i].Src().MakeGetter() + if gg.HasNext() { + lastSegment = segments[i] + break + } } } if lastSegment == nil { @@ -380,12 +386,15 @@ func (r *checkpointSnapshotStore) LastFrozenEntityId() uint64 { if len(segments) == 0 { return 0 } - // find the last segment which has a built index + // find the last segment which has a built non-empty index var lastSegment *snapshotsync.VisibleSegment for i := len(segments) - 1; i >= 0; i-- { if segments[i].Src().Index() != nil { - lastSegment = segments[i] - break + gg := segments[i].Src().MakeGetter() + if gg.HasNext() { + lastSegment = segments[i] + break + } } } diff --git a/polygon/sync/block_downloader.go b/polygon/sync/block_downloader.go index 424f93ae120..798cda29128 100644 --- a/polygon/sync/block_downloader.go +++ b/polygon/sync/block_downloader.go @@ -283,10 +283,14 @@ func (d *BlockDownloader) downloadBlocksUsingWaypoints( continue } - d.logger.Debug(syncLogPrefix("fetched blocks"), "start", blocks[0].NumberU64(), "end", blocks[len(blocks)-1].NumberU64(), + d.logger.Debug( + syncLogPrefix("fetched blocks"), + "start", blocks[0].NumberU64(), + "end", blocks[len(blocks)-1].NumberU64(), "blocks", len(blocks), "duration", time.Since(batchFetchStartTime), - "blks/sec", float64(len(blocks))/math.Max(time.Since(batchFetchStartTime).Seconds(), 0.0001)) + "blks/sec", float64(len(blocks))/math.Max(time.Since(batchFetchStartTime).Seconds(), 0.0001), + ) batchFetchStartTime = time.Now() // reset for next time From 16d23de4cd2916eae4c64f0541576306c9f416f5 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Sat, 23 Nov 2024 14:13:28 +0000 Subject: [PATCH 10/13] fix bridgeReplayInitialBlockIfNeeded --- polygon/sync/store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polygon/sync/store.go b/polygon/sync/store.go index 5493ebd2a4e..d49408c7a41 100644 --- a/polygon/sync/store.go +++ b/polygon/sync/store.go @@ -255,7 +255,7 @@ func (s *ExecutionClientStore) bridgeReplayInitialBlockIfNeeded(ctx context.Cont } blocks = append(blocks, types.NewBlockWithHeader(header)) - if len(blocks) < blocksBatchSize && blockNum != s.lastQueuedBlock { + if len(blocks) < blocksBatchSize && blockNum != end { continue } From 1e32d3868a619ca3d218574f3b71a35193e7562d Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Sat, 23 Nov 2024 14:24:20 +0000 Subject: [PATCH 11/13] fix logging --- polygon/sync/block_downloader.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/polygon/sync/block_downloader.go b/polygon/sync/block_downloader.go index 798cda29128..f14344a9cfc 100644 --- a/polygon/sync/block_downloader.go +++ b/polygon/sync/block_downloader.go @@ -138,7 +138,7 @@ func (d *BlockDownloader) downloadBlocksUsingWaypoints( waypoints = d.limitWaypoints(waypoints) - d.logger.Debug( + d.logger.Info( syncLogPrefix("downloading blocks using waypoints"), "waypointsLen", len(waypoints), "start", waypoints[0].StartBlock().Uint64(), @@ -295,9 +295,10 @@ func (d *BlockDownloader) downloadBlocksUsingWaypoints( batchFetchStartTime = time.Now() // reset for next time d.logger.Info( - syncLogPrefix(fmt.Sprintf("inserting %d fetched blocks", len(blocks))), + syncLogPrefix("inserting fetched blocks"), "start", blocks[0].NumberU64(), "end", blocks[len(blocks)-1].NumberU64(), + "blocks", len(blocks), ) if err := d.store.InsertBlocks(ctx, blocks); err != nil { return nil, err From 81639d451e3fbba25b5a3ffc15a1bf69f1c559b7 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Sat, 23 Nov 2024 14:40:18 +0000 Subject: [PATCH 12/13] fix logging --- polygon/heimdall/service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polygon/heimdall/service.go b/polygon/heimdall/service.go index e288cd38fb0..5012619b6b1 100644 --- a/polygon/heimdall/service.go +++ b/polygon/heimdall/service.go @@ -300,7 +300,7 @@ func (s *Service) Ready(ctx context.Context) <-chan error { } func (s *Service) Run(ctx context.Context) error { - s.logger.Info(heimdallLogPrefix("running %s heimdall service component")) + s.logger.Info(heimdallLogPrefix("running heimdall service component")) defer s.store.Close() if err := s.store.Prepare(ctx); err != nil { From 8eb13bda99a354f20371041d594acc2bd6ec3ec5 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Sat, 23 Nov 2024 19:18:46 +0000 Subject: [PATCH 13/13] fix test --- polygon/heimdall/scraper_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polygon/heimdall/scraper_test.go b/polygon/heimdall/scraper_test.go index c4dc51ee426..8e127feee87 100644 --- a/polygon/heimdall/scraper_test.go +++ b/polygon/heimdall/scraper_test.go @@ -72,7 +72,7 @@ func TestScrapper_Run_TransientErr(t *testing.T) { ) transientErrs := []error{ErrNotInMilestoneList, ErrBadGateway} - scrapper := NewScraper[*Milestone](store, fetcher, time.Millisecond, transientErrs, logger) + scrapper := NewScraper[*Milestone]("test", store, fetcher, time.Millisecond, transientErrs, logger) eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error {