From 42896560bf9b642b01f6504c163cc50617604683 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Wed, 2 Oct 2024 21:03:38 -0700 Subject: [PATCH] Revert countBasedPruningActive flag --- core/chains/evm/logpoller/log_poller.go | 72 ++++++++----------------- 1 file changed, 22 insertions(+), 50 deletions(-) diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index e592060030..eadfd0d28e 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -132,8 +132,7 @@ type logPoller struct { // Usually the only way to recover is to manually remove the offending logs and block from the database. // LogPoller keeps running in infinite loop, so whenever the invalid state is removed from the database it should // recover automatically without needing to restart the LogPoller. - finalityViolated *atomic.Bool - countBasedLogPruningActive *atomic.Bool + finalityViolated *atomic.Bool } type Opts struct { @@ -159,25 +158,24 @@ type Opts struct { // support chain, polygon, which has 2s block times, we need RPCs roughly with <= 500ms latency func NewLogPoller(orm ORM, ec Client, lggr logger.Logger, headTracker HeadTracker, opts Opts) *logPoller { return &logPoller{ - stopCh: make(chan struct{}), - ec: ec, - orm: orm, - headTracker: headTracker, - lggr: logger.Sugared(logger.Named(lggr, "LogPoller")), - replayStart: make(chan int64), - replayComplete: make(chan error), - pollPeriod: opts.PollPeriod, - backupPollerBlockDelay: opts.BackupPollerBlockDelay, - finalityDepth: opts.FinalityDepth, - useFinalityTag: opts.UseFinalityTag, - backfillBatchSize: opts.BackfillBatchSize, - rpcBatchSize: opts.RpcBatchSize, - keepFinalizedBlocksDepth: opts.KeepFinalizedBlocksDepth, - logPrunePageSize: opts.LogPrunePageSize, - filters: make(map[string]Filter), - filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet. - finalityViolated: new(atomic.Bool), - countBasedLogPruningActive: new(atomic.Bool), + stopCh: make(chan struct{}), + ec: ec, + orm: orm, + headTracker: headTracker, + lggr: logger.Sugared(logger.Named(lggr, "LogPoller")), + replayStart: make(chan int64), + replayComplete: make(chan error), + pollPeriod: opts.PollPeriod, + backupPollerBlockDelay: opts.BackupPollerBlockDelay, + finalityDepth: opts.FinalityDepth, + useFinalityTag: opts.UseFinalityTag, + backfillBatchSize: opts.BackfillBatchSize, + rpcBatchSize: opts.RpcBatchSize, + keepFinalizedBlocksDepth: opts.KeepFinalizedBlocksDepth, + logPrunePageSize: opts.LogPrunePageSize, + filters: make(map[string]Filter), + filterDirty: true, // Always build Filter on first call to cache an empty filter if nothing registered yet. + finalityViolated: new(atomic.Bool), } } @@ -294,9 +292,6 @@ func (lp *logPoller) RegisterFilter(ctx context.Context, filter Filter) error { } lp.filters[filter.Name] = filter lp.filterDirty = true - if filter.MaxLogsKept > 0 { - lp.countBasedLogPruningActive.Store(true) - } return nil } @@ -552,37 +547,18 @@ func (lp *logPoller) GetReplayFromBlock(ctx context.Context, requested int64) (i return mathutil.Min(requested, lastProcessed.BlockNumber), nil } -// loadFilters loads the filters from db, and activates count-based Log Pruning -// if required by any of the filters func (lp *logPoller) loadFilters(ctx context.Context) error { - filters, err := lp._loadFilters(ctx) - if err != nil { - return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying") - } - if lp.countBasedLogPruningActive.Load() { - return nil - } - for _, filter := range filters { - if filter.MaxLogsKept != 0 { - lp.countBasedLogPruningActive.Store(true) - } - } - return nil -} - -// _loadFilters is the part of loadFilters() requiring a filterMu lock -func (lp *logPoller) _loadFilters(ctx context.Context) (filters map[string]Filter, err error) { lp.filterMu.Lock() defer lp.filterMu.Unlock() + filters, err := lp.orm.LoadFilters(ctx) - filters, err = lp.orm.LoadFilters(ctx) if err != nil { - return filters, err + return pkgerrors.Wrapf(err, "Failed to load initial filters from db, retrying") } lp.filters = filters lp.filterDirty = true - return filters, nil + return nil } // tickStaggeredDelay chooses a uniformly random amount of time to delay between minDelay and minDelay + period @@ -1174,10 +1150,6 @@ func (lp *logPoller) PruneExpiredLogs(ctx context.Context) (bool, error) { done = false } - if !lp.countBasedLogPruningActive.Load() { - return done, err - } - rowIDs, err := lp.orm.SelectExcessLogIDs(ctx, lp.logPrunePageSize) if err != nil { lp.lggr.Errorw("Unable to find excess logs for pruning", "err", err)