From efb2b18c17ea591063226cec77d5066f0b5db63a Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Tue, 2 Apr 2024 20:46:06 -0500 Subject: [PATCH 1/3] use services.NewTicker --- common/client/multi_node.go | 4 +-- common/txmgr/reaper.go | 7 ++--- common/txmgr/resender.go | 4 +-- .../evm/forwarders/forwarder_manager.go | 10 +++---- core/chains/evm/gas/arbitrum_estimator.go | 21 +++++++------- .../evm/gas/rollups/arbitrum_l1_oracle.go | 29 ++++++++++--------- core/chains/evm/gas/rollups/op_l1_oracle.go | 29 ++++++++++--------- .../evm/gas/rollups/zkSync_l1_oracle.go | 1 + .../evm/gas/suggested_price_estimator.go | 19 ++++++------ core/chains/evm/logpoller/log_poller.go | 15 ++++++---- core/services/blockhashstore/delegate.go | 3 +- core/services/blockheaderfeeder/delegate.go | 3 +- .../llo/onchain_channel_definition_cache.go | 4 +-- .../evmregistry/v21/logprovider/recoverer.go | 4 +-- .../evmregistry/v21/upkeepstate/store.go | 4 +-- core/services/pipeline/runner.go | 5 ++-- .../relay/evm/mercury/persistence_manager.go | 9 +++--- core/services/relay/evm/mercury/queue.go | 3 +- .../relay/evm/mercury/wsrpc/cache/cache.go | 6 ++-- 19 files changed, 90 insertions(+), 90 deletions(-) diff --git a/common/client/multi_node.go b/common/client/multi_node.go index 0fc095c2931..1223626b13c 100644 --- a/common/client/multi_node.go +++ b/common/client/multi_node.go @@ -15,8 +15,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/assets" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink-common/pkg/utils" - feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" "github.com/smartcontractkit/chainlink/v2/common/types" ) @@ -321,7 +319,7 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP c.report() - monitor := time.NewTicker(utils.WithJitter(c.reportInterval)) + monitor := services.NewTicker(c.reportInterval) defer monitor.Stop() for { diff --git a/common/txmgr/reaper.go b/common/txmgr/reaper.go index 3ed05b2caee..932b58f6430 100644 --- a/common/txmgr/reaper.go +++ b/common/txmgr/reaper.go @@ -7,8 +7,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink-common/pkg/utils" - txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" "github.com/smartcontractkit/chainlink/v2/common/types" ) @@ -58,7 +56,7 @@ func (r *Reaper[CHAIN_ID]) Stop() { func (r *Reaper[CHAIN_ID]) runLoop() { defer close(r.chDone) - ticker := time.NewTicker(utils.WithJitter(r.txConfig.ReaperInterval())) + ticker := services.NewTicker(r.txConfig.ReaperInterval()) defer ticker.Stop() for { select { @@ -66,10 +64,9 @@ func (r *Reaper[CHAIN_ID]) runLoop() { return case <-ticker.C: r.work() - ticker.Reset(utils.WithJitter(r.txConfig.ReaperInterval())) case <-r.trigger: r.work() - ticker.Reset(utils.WithJitter(r.txConfig.ReaperInterval())) + ticker.Reset() } } } diff --git a/common/txmgr/resender.go b/common/txmgr/resender.go index 8483b7a0264..d4f1b4275a1 100644 --- a/common/txmgr/resender.go +++ b/common/txmgr/resender.go @@ -9,8 +9,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/chains/label" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink-common/pkg/utils" - "github.com/smartcontractkit/chainlink/v2/common/client" feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types" @@ -120,7 +118,7 @@ func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() er.logger.Warnw("Failed to resend unconfirmed transactions", "err", err) } - ticker := time.NewTicker(utils.WithJitter(er.interval)) + ticker := services.NewTicker(er.interval) defer ticker.Stop() for { select { diff --git a/core/chains/evm/forwarders/forwarder_manager.go b/core/chains/evm/forwarders/forwarder_manager.go index 638836094dc..9181ef334c1 100644 --- a/core/chains/evm/forwarders/forwarder_manager.go +++ b/core/chains/evm/forwarders/forwarder_manager.go @@ -16,8 +16,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" - "github.com/smartcontractkit/chainlink-common/pkg/utils" - evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" evmlogpoller "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types" @@ -276,10 +274,12 @@ func (f *FwdMgr) runLoop() { ctx, cancel := f.stopCh.NewCtx() defer cancel() - tick := time.After(0) - for ; ; tick = time.After(utils.WithJitter(time.Minute)) { + ticker := services.NewTicker(time.Minute) + defer ticker.Stop() + + for { select { - case <-tick: + case <-ticker.C: if err := f.logpoller.Ready(); err != nil { f.logger.Warnw("Skipping log syncing", "err", err) continue diff --git a/core/chains/evm/gas/arbitrum_estimator.go b/core/chains/evm/gas/arbitrum_estimator.go index 0cd4bbcdd0b..6a11932878b 100644 --- a/core/chains/evm/gas/arbitrum_estimator.go +++ b/core/chains/evm/gas/arbitrum_estimator.go @@ -11,8 +11,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink-common/pkg/utils" - feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas/rollups" @@ -168,27 +166,31 @@ func (a *arbitrumEstimator) getPricesInArbGas() (perL2Tx uint32, perL1CalldataUn func (a *arbitrumEstimator) run() { defer close(a.chDone) - t := a.refreshPricesInArbGas() + a.refreshPricesInArbGas() close(a.chInitialised) + t := services.TickerConfig{ + Initial: a.pollPeriod, + JitterPct: services.DefaultJitter, + }.NewTicker(a.pollPeriod) + defer t.Stop() + for { select { case <-a.chStop: return case ch := <-a.chForceRefetch: - t.Stop() - t = a.refreshPricesInArbGas() + a.refreshPricesInArbGas() + t.Reset() close(ch) case <-t.C: - t = a.refreshPricesInArbGas() + a.refreshPricesInArbGas() } } } // refreshPricesInArbGas calls getPricesInArbGas() and caches the refreshed prices. -func (a *arbitrumEstimator) refreshPricesInArbGas() (t *time.Timer) { - t = time.NewTimer(utils.WithJitter(a.pollPeriod)) - +func (a *arbitrumEstimator) refreshPricesInArbGas() { perL2Tx, perL1CalldataUnit, err := a.l1Oracle.GetPricesInArbGas() if err != nil { a.logger.Warnw("Failed to refresh prices", "err", err) @@ -201,5 +203,4 @@ func (a *arbitrumEstimator) refreshPricesInArbGas() (t *time.Timer) { a.perL2Tx = perL2Tx a.perL1CalldataUnit = perL1CalldataUnit a.getPricesInArbGasMu.Unlock() - return } diff --git a/core/chains/evm/gas/rollups/arbitrum_l1_oracle.go b/core/chains/evm/gas/rollups/arbitrum_l1_oracle.go index 9e15cfe4c93..09ce8aa7d56 100644 --- a/core/chains/evm/gas/rollups/arbitrum_l1_oracle.go +++ b/core/chains/evm/gas/rollups/arbitrum_l1_oracle.go @@ -13,11 +13,10 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" + gethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink-common/pkg/utils" - - gethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/smartcontractkit/chainlink/v2/common/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" @@ -135,41 +134,45 @@ func (o *arbitrumL1Oracle) HealthReport() map[string]error { func (o *arbitrumL1Oracle) run() { defer close(o.chDone) - t := o.refresh() + o.refresh() close(o.chInitialised) + t := services.TickerConfig{ + Initial: o.pollPeriod, + JitterPct: services.DefaultJitter, + }.NewTicker(o.pollPeriod) + defer t.Stop() + for { select { case <-o.chStop: return case <-t.C: - t = o.refresh() + o.refresh() } } } -func (o *arbitrumL1Oracle) refresh() (t *time.Timer) { - t, err := o.refreshWithError() +func (o *arbitrumL1Oracle) refresh() { + err := o.refreshWithError() if err != nil { + o.logger.Criticalw("Failed to refresh gas price", "err", err) o.SvcErrBuffer.Append(err) } - return } -func (o *arbitrumL1Oracle) refreshWithError() (t *time.Timer, err error) { - t = time.NewTimer(utils.WithJitter(o.pollPeriod)) - +func (o *arbitrumL1Oracle) refreshWithError() error { ctx, cancel := o.chStop.CtxCancel(evmclient.ContextWithDefaultTimeout()) defer cancel() price, err := o.fetchL1GasPrice(ctx) if err != nil { - return t, err + return err } o.l1GasPriceMu.Lock() defer o.l1GasPriceMu.Unlock() o.l1GasPrice = priceEntry{price: assets.NewWei(price), timestamp: time.Now()} - return + return nil } func (o *arbitrumL1Oracle) fetchL1GasPrice(ctx context.Context) (price *big.Int, err error) { diff --git a/core/chains/evm/gas/rollups/op_l1_oracle.go b/core/chains/evm/gas/rollups/op_l1_oracle.go index 1c4db432cea..48e38e077a7 100644 --- a/core/chains/evm/gas/rollups/op_l1_oracle.go +++ b/core/chains/evm/gas/rollups/op_l1_oracle.go @@ -14,11 +14,10 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/rpc" + gethtypes "github.com/ethereum/go-ethereum/core/types" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink-common/pkg/utils" - - gethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/smartcontractkit/chainlink/v2/common/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets" @@ -208,41 +207,45 @@ func (o *OptimismL1Oracle) HealthReport() map[string]error { func (o *OptimismL1Oracle) run() { defer close(o.chDone) - t := o.refresh() + o.refresh() close(o.chInitialised) + t := services.TickerConfig{ + Initial: o.pollPeriod, + JitterPct: services.DefaultJitter, + }.NewTicker(o.pollPeriod) + defer t.Stop() + for { select { case <-o.chStop: return case <-t.C: - t = o.refresh() + o.refresh() } } } -func (o *OptimismL1Oracle) refresh() (t *time.Timer) { - t, err := o.refreshWithError() +func (o *OptimismL1Oracle) refresh() { + err := o.refreshWithError() if err != nil { + o.logger.Criticalw("Failed to refresh gas price", "err", err) o.SvcErrBuffer.Append(err) } - return } -func (o *OptimismL1Oracle) refreshWithError() (t *time.Timer, err error) { - t = time.NewTimer(utils.WithJitter(o.pollPeriod)) - +func (o *OptimismL1Oracle) refreshWithError() error { ctx, cancel := o.chStop.CtxCancel(evmclient.ContextWithDefaultTimeout()) defer cancel() price, err := o.GetDAGasPrice(ctx) if err != nil { - return t, err + return err } o.l1GasPriceMu.Lock() defer o.l1GasPriceMu.Unlock() o.l1GasPrice = priceEntry{price: assets.NewWei(price), timestamp: time.Now()} - return + return nil } func (o *OptimismL1Oracle) GasPrice(_ context.Context) (l1GasPrice *assets.Wei, err error) { diff --git a/core/chains/evm/gas/rollups/zkSync_l1_oracle.go b/core/chains/evm/gas/rollups/zkSync_l1_oracle.go index 00ee91b3f8a..48186f088fd 100644 --- a/core/chains/evm/gas/rollups/zkSync_l1_oracle.go +++ b/core/chains/evm/gas/rollups/zkSync_l1_oracle.go @@ -120,6 +120,7 @@ func (o *zkSyncL1Oracle) run() { func (o *zkSyncL1Oracle) refresh() (t *time.Timer) { t, err := o.refreshWithError() if err != nil { + o.logger.Criticalw("Failed to refresh gas price", "err", err) o.SvcErrBuffer.Append(err) } return diff --git a/core/chains/evm/gas/suggested_price_estimator.go b/core/chains/evm/gas/suggested_price_estimator.go index e947e9109d1..f3bfa6e5be6 100644 --- a/core/chains/evm/gas/suggested_price_estimator.go +++ b/core/chains/evm/gas/suggested_price_estimator.go @@ -12,7 +12,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink-common/pkg/utils" bigmath "github.com/smartcontractkit/chainlink-common/pkg/utils/big_math" "github.com/smartcontractkit/chainlink/v2/common/fee" @@ -101,26 +100,29 @@ func (o *SuggestedPriceEstimator) HealthReport() map[string]error { func (o *SuggestedPriceEstimator) run() { defer close(o.chDone) - t := o.refreshPrice() + o.refreshPrice() close(o.chInitialised) + t := services.TickerConfig{ + Initial: o.pollPeriod, + JitterPct: services.DefaultJitter, + }.NewTicker(o.pollPeriod) + for { select { case <-o.chStop: return case ch := <-o.chForceRefetch: - t.Stop() - t = o.refreshPrice() + o.refreshPrice() + t.Reset() close(ch) case <-t.C: - t = o.refreshPrice() + o.refreshPrice() } } } -func (o *SuggestedPriceEstimator) refreshPrice() (t *time.Timer) { - t = time.NewTimer(utils.WithJitter(o.pollPeriod)) - +func (o *SuggestedPriceEstimator) refreshPrice() { var res hexutil.Big ctx, cancel := o.chStop.CtxCancel(evmclient.ContextWithDefaultTimeout()) defer cancel() @@ -136,7 +138,6 @@ func (o *SuggestedPriceEstimator) refreshPrice() (t *time.Timer) { o.gasPriceMu.Lock() defer o.gasPriceMu.Unlock() o.GasPrice = bi - return } // Uses the force refetch chan to trigger a price update and blocks until complete diff --git a/core/chains/evm/logpoller/log_poller.go b/core/chains/evm/logpoller/log_poller.go index 26978b18d48..564650703dc 100644 --- a/core/chains/evm/logpoller/log_poller.go +++ b/core/chains/evm/logpoller/log_poller.go @@ -552,9 +552,14 @@ func (lp *logPoller) run() { defer lp.wg.Done() ctx, cancel := lp.stopCh.NewCtx() defer cancel() - logPollTick := time.After(0) + logPollTicker := services.NewTicker(lp.pollPeriod) + defer logPollTicker.Stop() // stagger these somewhat, so they don't all run back-to-back - backupLogPollTick := time.After(100 * time.Millisecond) + backupLogPollTicker := services.TickerConfig{ + Initial: 100 * time.Millisecond, + JitterPct: services.DefaultJitter, + }.NewTicker(time.Duration(lp.backupPollerBlockDelay) * lp.pollPeriod) + defer backupLogPollTicker.Stop() filtersLoaded := false for { @@ -563,8 +568,7 @@ func (lp *logPoller) run() { return case fromBlockReq := <-lp.replayStart: lp.handleReplayRequest(ctx, fromBlockReq, filtersLoaded) - case <-logPollTick: - logPollTick = time.After(utils.WithJitter(lp.pollPeriod)) + case <-logPollTicker.C: if !filtersLoaded { if err := lp.loadFilters(ctx); err != nil { lp.lggr.Errorw("Failed loading filters in main logpoller loop, retrying later", "err", err) @@ -602,7 +606,7 @@ func (lp *logPoller) run() { start = lastProcessed.BlockNumber + 1 } lp.PollAndSaveLogs(ctx, start) - case <-backupLogPollTick: + case <-backupLogPollTicker.C: if lp.backupPollerBlockDelay == 0 { continue // backup poller is disabled } @@ -614,7 +618,6 @@ func (lp *logPoller) run() { // frequently than the primary log poller (instead of roughly once per block it runs once roughly once every // lp.backupPollerDelay blocks--with default settings about 100x less frequently). - backupLogPollTick = time.After(utils.WithJitter(time.Duration(lp.backupPollerBlockDelay) * lp.pollPeriod)) if !filtersLoaded { lp.lggr.Warnw("Backup log poller ran before filters loaded, skipping") continue diff --git a/core/services/blockhashstore/delegate.go b/core/services/blockhashstore/delegate.go index 172dbafc4a4..7c3d6074c66 100644 --- a/core/services/blockhashstore/delegate.go +++ b/core/services/blockhashstore/delegate.go @@ -21,7 +21,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) var _ job.ServiceCtx = &service{} @@ -234,7 +233,7 @@ func (s *service) Start(context.Context) error { defer s.wg.Done() ctx, cancel := s.stopCh.NewCtx() defer cancel() - ticker := time.NewTicker(utils.WithJitter(s.pollPeriod)) + ticker := services.NewTicker(s.pollPeriod) defer ticker.Stop() for { select { diff --git a/core/services/blockheaderfeeder/delegate.go b/core/services/blockheaderfeeder/delegate.go index 830c2e23377..046941aa154 100644 --- a/core/services/blockheaderfeeder/delegate.go +++ b/core/services/blockheaderfeeder/delegate.go @@ -21,7 +21,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/blockhashstore" "github.com/smartcontractkit/chainlink/v2/core/services/job" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) var _ job.ServiceCtx = &service{} @@ -241,7 +240,7 @@ func (s *service) Start(context.Context) error { defer close(s.done) ctx, cancel := s.stopCh.NewCtx() defer cancel() - ticker := time.NewTicker(utils.WithJitter(s.pollPeriod)) + ticker := services.NewTicker(s.pollPeriod) defer ticker.Stop() for { select { diff --git a/core/services/llo/onchain_channel_definition_cache.go b/core/services/llo/onchain_channel_definition_cache.go index d72079d0b1e..5c89561481a 100644 --- a/core/services/llo/onchain_channel_definition_cache.go +++ b/core/services/llo/onchain_channel_definition_cache.go @@ -19,7 +19,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/llo-feeds/generated/channel_config_store" "github.com/smartcontractkit/chainlink/v2/core/logger" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) type ChannelDefinitionCacheORM interface { @@ -114,7 +113,8 @@ const pollInterval = 1 * time.Second func (c *channelDefinitionCache) poll() { defer c.wg.Done() - pollT := time.NewTicker(utils.WithJitter(pollInterval)) + pollT := services.NewTicker(pollInterval) + defer pollT.Stop() for { select { diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go index f00459dee67..9e41008ed83 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/logprovider/recoverer.go @@ -149,14 +149,14 @@ func (r *logRecoverer) Start(ctx context.Context) error { }) r.threadCtrl.Go(func(ctx context.Context) { - cleanupTicker := time.NewTicker(utils.WithJitter(GCInterval)) + cleanupTicker := services.NewTicker(GCInterval) defer cleanupTicker.Stop() for { select { case <-cleanupTicker.C: r.clean(ctx) - cleanupTicker.Reset(utils.WithJitter(GCInterval)) + cleanupTicker.Reset() case <-ctx.Done(): return } diff --git a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go index bf7a62aad48..e6486ca56ae 100644 --- a/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go +++ b/core/services/ocr2/plugins/ocr2keeper/evmregistry/v21/upkeepstate/store.go @@ -108,7 +108,7 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { u.lggr.Debug("Starting upkeep state store") u.threadCtrl.Go(func(ctx context.Context) { - ticker := time.NewTicker(utils.WithJitter(u.cleanCadence)) + ticker := services.NewTicker(u.cleanCadence) defer ticker.Stop() flushTicker := newTickerFn(utils.WithJitter(flushCadence)) @@ -120,7 +120,7 @@ func (u *upkeepStateStore) Start(pctx context.Context) error { if err := u.cleanup(ctx); err != nil { u.lggr.Errorw("unable to clean old state values", "err", err) } - ticker.Reset(utils.WithJitter(u.cleanCadence)) + ticker.Reset() case <-flushTicker.C: u.flush(ctx) flushTicker.Reset(utils.WithJitter(flushCadence)) diff --git a/core/services/pipeline/runner.go b/core/services/pipeline/runner.go index e816e96dad8..92d72944976 100644 --- a/core/services/pipeline/runner.go +++ b/core/services/pipeline/runner.go @@ -25,7 +25,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/recovery" "github.com/smartcontractkit/chainlink/v2/core/store/models" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) //go:generate mockery --quiet --name Runner --output ./mocks/ --case=underscore @@ -173,7 +172,7 @@ func (r *runner) runReaperLoop() { return } - runReaperTicker := time.NewTicker(utils.WithJitter(r.config.ReaperInterval())) + runReaperTicker := services.NewTicker(r.config.ReaperInterval()) defer runReaperTicker.Stop() for { select { @@ -181,7 +180,7 @@ func (r *runner) runReaperLoop() { return case <-runReaperTicker.C: r.runReaperWorker.WakeUp() - runReaperTicker.Reset(utils.WithJitter(r.config.ReaperInterval())) + runReaperTicker.Reset() } } } diff --git a/core/services/relay/evm/mercury/persistence_manager.go b/core/services/relay/evm/mercury/persistence_manager.go index 38576174423..d7f3d8eaa0d 100644 --- a/core/services/relay/evm/mercury/persistence_manager.go +++ b/core/services/relay/evm/mercury/persistence_manager.go @@ -11,7 +11,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) var ( @@ -90,11 +89,11 @@ func (pm *PersistenceManager) runFlushDeletesLoop() { ctx, cancel := pm.stopCh.Ctx(context.Background()) defer cancel() - ticker := time.NewTicker(utils.WithJitter(pm.flushDeletesFrequency)) + ticker := services.NewTicker(pm.flushDeletesFrequency) + defer ticker.Stop() for { select { case <-ctx.Done(): - ticker.Stop() return case <-ticker.C: queuedReqs := pm.resetDeleteQueue() @@ -114,11 +113,11 @@ func (pm *PersistenceManager) runPruneLoop() { ctx, cancel := pm.stopCh.NewCtx() defer cancel() - ticker := time.NewTicker(utils.WithJitter(pm.pruneFrequency)) + ticker := services.NewTicker(pm.pruneFrequency) + defer ticker.Stop() for { select { case <-ctx.Done(): - ticker.Stop() return case <-ticker.C: func(ctx context.Context) { diff --git a/core/services/relay/evm/mercury/queue.go b/core/services/relay/evm/mercury/queue.go index 1923b091531..93dc0bfc689 100644 --- a/core/services/relay/evm/mercury/queue.go +++ b/core/services/relay/evm/mercury/queue.go @@ -17,7 +17,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) //go:generate mockery --quiet --name asyncDeleter --output ./mocks/ --case=underscore --structname=AsyncDeleter @@ -143,7 +142,7 @@ func (tq *transmitQueue) IsEmpty() bool { func (tq *transmitQueue) Start(context.Context) error { return tq.StartOnce("TransmitQueue", func() error { - t := time.NewTicker(utils.WithJitter(promInterval)) + t := services.NewTicker(promInterval) wg := new(sync.WaitGroup) chStop := make(chan struct{}) tq.stopMonitor = func() { diff --git a/core/services/relay/evm/mercury/wsrpc/cache/cache.go b/core/services/relay/evm/mercury/wsrpc/cache/cache.go index adc439e802b..58e04c5d730 100644 --- a/core/services/relay/evm/mercury/wsrpc/cache/cache.go +++ b/core/services/relay/evm/mercury/wsrpc/cache/cache.go @@ -15,7 +15,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/logger" mercuryutils "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/utils" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/pb" - "github.com/smartcontractkit/chainlink/v2/core/utils" ) var ( @@ -346,13 +345,14 @@ func (m *memCache) runloop() { if m.cfg.MaxStaleAge == 0 { return } - t := time.NewTicker(utils.WithJitter(m.cfg.MaxStaleAge)) + t := services.NewTicker(m.cfg.MaxStaleAge) + defer t.Stop() for { select { case <-t.C: m.cleanup() - t.Reset(utils.WithJitter(m.cfg.MaxStaleAge)) + t.Reset() case <-m.chStop: return } From 990b571c8596c52293276ab5c0fa06c74b4f90ed Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Fri, 14 Jun 2024 09:43:28 -0500 Subject: [PATCH 2/3] adjust timing to fix flake --- core/services/relay/evm/chain_reader_test.go | 4 ++-- core/services/relay/evm/evmtesting/run_tests.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/services/relay/evm/chain_reader_test.go b/core/services/relay/evm/chain_reader_test.go index 9e133428bf3..afe301990f5 100644 --- a/core/services/relay/evm/chain_reader_test.go +++ b/core/services/relay/evm/chain_reader_test.go @@ -87,8 +87,8 @@ func (h *helper) Context(t *testing.T) context.Context { } func (h *helper) MaxWaitTimeForEvents() time.Duration { - // From trial and error, when running on CI, sometimes the boxes get slow - maxWaitTime := time.Second * 30 + // There are no scheduling guarantees in a busy CI test suite. 30s was failing frequently. + maxWaitTime := 10 * time.Minute maxWaitTimeStr, ok := os.LookupEnv("MAX_WAIT_TIME_FOR_EVENTS_S") if ok { waitS, err := strconv.ParseInt(maxWaitTimeStr, 10, 64) diff --git a/core/services/relay/evm/evmtesting/run_tests.go b/core/services/relay/evm/evmtesting/run_tests.go index 2c492966729..3058784b30a 100644 --- a/core/services/relay/evm/evmtesting/run_tests.go +++ b/core/services/relay/evm/evmtesting/run_tests.go @@ -39,7 +39,7 @@ func RunChainReaderEvmTests[T TestingT[T]](t T, it *EVMChainReaderInterfaceTeste require.Eventually(t, func() bool { return cr.GetLatestValue(ctx, AnyContractName, triggerWithDynamicTopic, input, output) == nil - }, it.MaxWaitTimeForEvents(), time.Millisecond*10) + }, it.MaxWaitTimeForEvents(), 100*time.Millisecond) assert.Equal(t, &anyString, rOutput.FieldByName("Field").Interface()) topic, err := abi.MakeTopics([]any{anyString}) From 04468a9b1582e42609c0ecec5dfebfd3c316944a Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Fri, 28 Jun 2024 12:48:51 -0500 Subject: [PATCH 3/3] revert MaxWaitTimeForEvents --- core/services/relay/evm/chain_reader_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/services/relay/evm/chain_reader_test.go b/core/services/relay/evm/chain_reader_test.go index 691f370d3d9..1e32753bc52 100644 --- a/core/services/relay/evm/chain_reader_test.go +++ b/core/services/relay/evm/chain_reader_test.go @@ -200,8 +200,8 @@ func (h *helper) Context(t *testing.T) context.Context { } func (h *helper) MaxWaitTimeForEvents() time.Duration { - // There are no scheduling guarantees in a busy CI test suite. 30s was failing frequently. - maxWaitTime := 10 * time.Minute + // From trial and error, when running on CI, sometimes the boxes get slow + maxWaitTime := time.Second * 30 maxWaitTimeStr, ok := os.LookupEnv("MAX_WAIT_TIME_FOR_EVENTS_S") if ok { waitS, err := strconv.ParseInt(maxWaitTimeStr, 10, 64)