From c23b07890d15dd3750d86f56cebdfff06736abee Mon Sep 17 00:00:00 2001 From: Akshay Aggarwal Date: Mon, 11 Sep 2023 21:41:58 +0100 Subject: [PATCH] Small improvements to log recoverer (#10575) * Small improvements to log recoverer * improve comments * add performFinaliltyBuffer * update comment * fix tests * resovle nit * use finality depth instead of new constant * fix tests --- .../ocr2keeper/evm21/logprovider/factory.go | 5 ++++ .../ocr2keeper/evm21/logprovider/recoverer.go | 24 +++++++++++++++---- .../evm21/logprovider/recoverer_test.go | 18 +++++++------- 3 files changed, 33 insertions(+), 14 deletions(-) diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/factory.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/factory.go index 121c6036ddd..4b3fa8cb404 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/factory.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/factory.go @@ -36,6 +36,8 @@ type LogTriggersOptions struct { BlockRateLimit rate.Limit // blockLimitBurst is the burst upper limit on the range of blocks the we fetch logs for. BlockLimitBurst int + // Finality depth is the number of blocks to wait before considering a block final. + FinalityDepth int64 } func NewOptions(finalityDepth int64) LogTriggersOptions { @@ -63,4 +65,7 @@ func (o *LogTriggersOptions) Defaults(finalityDepth int64) { if o.BlockRateLimit == 0 { o.BlockRateLimit = rate.Every(o.ReadInterval) } + if o.FinalityDepth == 0 { + o.FinalityDepth = finalityDepth + } } diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go index 06a3b257065..c5b06701737 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer.go @@ -79,6 +79,8 @@ type logRecoverer struct { poller logpoller.LogPoller client client.Client blockTimeResolver *blockTimeResolver + + finalityDepth int64 } var _ LogRecoverer = &logRecoverer{} @@ -101,6 +103,8 @@ func NewLogRecoverer(lggr logger.Logger, poller logpoller.LogPoller, client clie packer: packer, client: client, blockTimeResolver: newBlockTimeResolver(poller), + + finalityDepth: opts.FinalityDepth, } rec.lookbackBlocks.Store(opts.LookbackBlocks) @@ -309,7 +313,7 @@ func (r *logRecoverer) GetRecoveryProposals(ctx context.Context) ([]ocr2keepers. r.pending = pending - r.lggr.Debugf("found %d pending payloads", len(pending)) + r.lggr.Debugf("found %d recoverable payloads", len(results)) return results, nil } @@ -353,10 +357,10 @@ func (r *logRecoverer) recover(ctx context.Context) error { // recoverFilter recovers logs for a single upkeep filter. func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startBlock, offsetBlock int64) error { - start := f.lastRePollBlock + start := f.lastRePollBlock + 1 // NOTE: we expect f.lastRePollBlock + 1 <= offsetBlock, as others would have been filtered out // ensure we don't recover logs from before the filter was created - // NOTE: we expect that filter with configUpdateBlock > offsetBlock were already filtered out. if configUpdateBlock := int64(f.configUpdateBlock); start < configUpdateBlock { + // NOTE: we expect that configUpdateBlock <= offsetBlock, as others would have been filtered out start = configUpdateBlock } if start < startBlock { @@ -367,6 +371,7 @@ func (r *logRecoverer) recoverFilter(ctx context.Context, f upkeepFilter, startB // If recoverer is lagging by a lot (more than 100x recoveryLogsBuffer), allow // a range of recoveryLogsBurst // Exploratory: Store lastRePollBlock in DB to prevent bursts during restarts + // (while also taking into account exisitng pending payloads) end = start + recoveryLogsBurst } if end > offsetBlock { @@ -477,7 +482,16 @@ func (r *logRecoverer) getRecoveryWindow(latest int64) (int64, int64) { lookbackBlocks := r.lookbackBlocks.Load() blockTime := r.blockTime.Load() blocksInDay := int64(24*time.Hour) / blockTime - return latest - blocksInDay, latest - lookbackBlocks + start := latest - blocksInDay + // Exploratory: Instead of subtracting finality depth to account for finalized performs + // keep two pointers of lastRePollBlock for soft and hard finalization, i.e. manage + // unfinalized perform logs better + end := latest - lookbackBlocks - r.finalityDepth + if start > end { + // In this case, allow starting from more than a day behind + start = end + } + return start, end } // getFilterBatch returns a batch of filters that are ready to be recovered. @@ -485,7 +499,7 @@ func (r *logRecoverer) getFilterBatch(offsetBlock int64) []upkeepFilter { filters := r.filterStore.GetFilters(func(f upkeepFilter) bool { // ensure we work only on filters that are ready to be recovered // no need to recover in case f.configUpdateBlock is after offsetBlock - return f.lastRePollBlock <= offsetBlock && int64(f.configUpdateBlock) <= offsetBlock + return f.lastRePollBlock < offsetBlock && int64(f.configUpdateBlock) <= offsetBlock }) sort.Slice(filters, func(i, j int) bool { diff --git a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go index 2654f8f3690..59c4244304a 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go +++ b/core/services/ocr2/plugins/ocr2keeper/evm21/logprovider/recoverer_test.go @@ -352,7 +352,7 @@ func TestLogRecoverer_Recover(t *testing.T) { nil, nil, []string{"c207451fa897f9bb13b09d54d8655edf0644e027c53521b4a92eafbb64ba4d14"}, - []int64{200, 0, 450}, + []int64{201, 0, 450}, }, { "lastRePollBlock updated with burst when lagging behind", @@ -366,7 +366,7 @@ func TestLogRecoverer_Recover(t *testing.T) { topics: []common.Hash{ common.HexToHash("0x1"), }, - lastRePollBlock: 100, // Should be updated with burst + lastRePollBlock: 99, // Should be updated with burst }, }, []ocr2keepers.UpkeepState{ocr2keepers.UnknownState}, @@ -778,7 +778,7 @@ func TestLogRecoverer_GetProposalData(t *testing.T) { }, logPoller: &mockLogPoller{ LatestBlockFn: func(qopts ...pg.QOpt) (int64, error) { - return 100, nil + return 300, nil }, }, stateReader: &mockStateReader{ @@ -813,7 +813,7 @@ func TestLogRecoverer_GetProposalData(t *testing.T) { }, logPoller: &mockLogPoller{ LatestBlockFn: func(qopts ...pg.QOpt) (int64, error) { - return 100, nil + return 300, nil }, }, client: &mockClient{ @@ -853,7 +853,7 @@ func TestLogRecoverer_GetProposalData(t *testing.T) { }, logPoller: &mockLogPoller{ LatestBlockFn: func(qopts ...pg.QOpt) (int64, error) { - return 100, nil + return 300, nil }, }, client: &mockClient{ @@ -885,7 +885,7 @@ func TestLogRecoverer_GetProposalData(t *testing.T) { }, logPoller: &mockLogPoller{ LatestBlockFn: func(qopts ...pg.QOpt) (int64, error) { - return 100, nil + return 300, nil }, LogsWithSigsFn: func(start, end int64, eventSigs []common.Hash, address common.Address, qopts ...pg.QOpt) ([]logpoller.Log, error) { return nil, errors.New("logs with sigs boom") @@ -920,7 +920,7 @@ func TestLogRecoverer_GetProposalData(t *testing.T) { }, logPoller: &mockLogPoller{ LatestBlockFn: func(qopts ...pg.QOpt) (int64, error) { - return 100, nil + return 300, nil }, LogsWithSigsFn: func(start, end int64, eventSigs []common.Hash, address common.Address, qopts ...pg.QOpt) ([]logpoller.Log, error) { return []logpoller.Log{ @@ -968,7 +968,7 @@ func TestLogRecoverer_GetProposalData(t *testing.T) { }, logPoller: &mockLogPoller{ LatestBlockFn: func(qopts ...pg.QOpt) (int64, error) { - return 100, nil + return 300, nil }, LogsWithSigsFn: func(start, end int64, eventSigs []common.Hash, address common.Address, qopts ...pg.QOpt) ([]logpoller.Log, error) { return []logpoller.Log{ @@ -1019,7 +1019,7 @@ func TestLogRecoverer_GetProposalData(t *testing.T) { }, logPoller: &mockLogPoller{ LatestBlockFn: func(qopts ...pg.QOpt) (int64, error) { - return 100, nil + return 300, nil }, LogsWithSigsFn: func(start, end int64, eventSigs []common.Hash, address common.Address, qopts ...pg.QOpt) ([]logpoller.Log, error) { return []logpoller.Log{