Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use services.Ticker #12668

Merged
merged 9 commits into from
Jul 3, 2024
4 changes: 1 addition & 3 deletions common/client/multi_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/assets"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
)
Expand Down Expand Up @@ -321,7 +319,7 @@ func (c *multiNode[CHAIN_ID, SEQ, ADDR, BLOCK_HASH, TX, TX_HASH, EVENT, EVENT_OP

c.report()

monitor := time.NewTicker(utils.WithJitter(c.reportInterval))
monitor := services.NewTicker(c.reportInterval)
defer monitor.Stop()

for {
Expand Down
7 changes: 2 additions & 5 deletions common/txmgr/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
"github.com/smartcontractkit/chainlink/v2/common/types"
)
Expand Down Expand Up @@ -58,18 +56,17 @@ func (r *Reaper[CHAIN_ID]) Stop() {

func (r *Reaper[CHAIN_ID]) runLoop() {
defer close(r.chDone)
ticker := time.NewTicker(utils.WithJitter(r.txConfig.ReaperInterval()))
ticker := services.NewTicker(r.txConfig.ReaperInterval())
defer ticker.Stop()
for {
select {
case <-r.chStop:
return
case <-ticker.C:
r.work()
ticker.Reset(utils.WithJitter(r.txConfig.ReaperInterval()))
case <-r.trigger:
r.work()
ticker.Reset(utils.WithJitter(r.txConfig.ReaperInterval()))
ticker.Reset()
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions common/txmgr/resender.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/chains/label"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

"github.com/smartcontractkit/chainlink/v2/common/client"
feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
txmgrtypes "github.com/smartcontractkit/chainlink/v2/common/txmgr/types"
Expand Down Expand Up @@ -120,7 +118,7 @@ func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop()
er.logger.Warnw("Failed to resend unconfirmed transactions", "err", err)
}

ticker := time.NewTicker(utils.WithJitter(er.interval))
ticker := services.NewTicker(er.interval)
defer ticker.Stop()
for {
select {
Expand Down
10 changes: 5 additions & 5 deletions core/chains/evm/forwarders/forwarder_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
evmlogpoller "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
Expand Down Expand Up @@ -276,10 +274,12 @@ func (f *FwdMgr) runLoop() {
ctx, cancel := f.stopCh.NewCtx()
defer cancel()

tick := time.After(0)
for ; ; tick = time.After(utils.WithJitter(time.Minute)) {
ticker := services.NewTicker(time.Minute)
defer ticker.Stop()

for {
select {
case <-tick:
case <-ticker.C:
if err := f.logpoller.Ready(); err != nil {
f.logger.Warnw("Skipping log syncing", "err", err)
continue
Expand Down
21 changes: 11 additions & 10 deletions core/chains/evm/gas/arbitrum_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/gas/rollups"
Expand Down Expand Up @@ -168,27 +166,31 @@ func (a *arbitrumEstimator) getPricesInArbGas() (perL2Tx uint32, perL1CalldataUn
func (a *arbitrumEstimator) run() {
defer close(a.chDone)

t := a.refreshPricesInArbGas()
a.refreshPricesInArbGas()
close(a.chInitialised)

t := services.TickerConfig{
Initial: a.pollPeriod,
JitterPct: services.DefaultJitter,
}.NewTicker(a.pollPeriod)
defer t.Stop()

for {
select {
case <-a.chStop:
return
case ch := <-a.chForceRefetch:
t.Stop()
t = a.refreshPricesInArbGas()
a.refreshPricesInArbGas()
t.Reset()
close(ch)
case <-t.C:
t = a.refreshPricesInArbGas()
a.refreshPricesInArbGas()
}
}
}

// refreshPricesInArbGas calls getPricesInArbGas() and caches the refreshed prices.
func (a *arbitrumEstimator) refreshPricesInArbGas() (t *time.Timer) {
t = time.NewTimer(utils.WithJitter(a.pollPeriod))

func (a *arbitrumEstimator) refreshPricesInArbGas() {
perL2Tx, perL1CalldataUnit, err := a.l1Oracle.GetPricesInArbGas()
if err != nil {
a.logger.Warnw("Failed to refresh prices", "err", err)
Expand All @@ -201,5 +203,4 @@ func (a *arbitrumEstimator) refreshPricesInArbGas() (t *time.Timer) {
a.perL2Tx = perL2Tx
a.perL1CalldataUnit = perL1CalldataUnit
a.getPricesInArbGasMu.Unlock()
return
}
29 changes: 16 additions & 13 deletions core/chains/evm/gas/rollups/arbitrum_l1_oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"

gethtypes "github.com/ethereum/go-ethereum/core/types"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

gethtypes "github.com/ethereum/go-ethereum/core/types"

"github.com/smartcontractkit/chainlink/v2/common/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
Expand Down Expand Up @@ -135,41 +134,45 @@ func (o *arbitrumL1Oracle) HealthReport() map[string]error {
func (o *arbitrumL1Oracle) run() {
defer close(o.chDone)

t := o.refresh()
o.refresh()
close(o.chInitialised)

t := services.TickerConfig{
Initial: o.pollPeriod,
JitterPct: services.DefaultJitter,
}.NewTicker(o.pollPeriod)
defer t.Stop()

for {
select {
case <-o.chStop:
return
case <-t.C:
t = o.refresh()
o.refresh()
}
}
}
func (o *arbitrumL1Oracle) refresh() (t *time.Timer) {
t, err := o.refreshWithError()
func (o *arbitrumL1Oracle) refresh() {
err := o.refreshWithError()
if err != nil {
o.logger.Criticalw("Failed to refresh gas price", "err", err)
o.SvcErrBuffer.Append(err)
}
return
}

func (o *arbitrumL1Oracle) refreshWithError() (t *time.Timer, err error) {
t = time.NewTimer(utils.WithJitter(o.pollPeriod))

func (o *arbitrumL1Oracle) refreshWithError() error {
ctx, cancel := o.chStop.CtxCancel(evmclient.ContextWithDefaultTimeout())
defer cancel()

price, err := o.fetchL1GasPrice(ctx)
if err != nil {
return t, err
return err
}

o.l1GasPriceMu.Lock()
defer o.l1GasPriceMu.Unlock()
o.l1GasPrice = priceEntry{price: assets.NewWei(price), timestamp: time.Now()}
return
return nil
}

func (o *arbitrumL1Oracle) fetchL1GasPrice(ctx context.Context) (price *big.Int, err error) {
Expand Down
29 changes: 16 additions & 13 deletions core/chains/evm/gas/rollups/op_l1_oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/rpc"

gethtypes "github.com/ethereum/go-ethereum/core/types"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"

gethtypes "github.com/ethereum/go-ethereum/core/types"

"github.com/smartcontractkit/chainlink/v2/common/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
Expand Down Expand Up @@ -208,41 +207,45 @@ func (o *OptimismL1Oracle) HealthReport() map[string]error {
func (o *OptimismL1Oracle) run() {
defer close(o.chDone)

t := o.refresh()
o.refresh()
close(o.chInitialised)

t := services.TickerConfig{
Initial: o.pollPeriod,
JitterPct: services.DefaultJitter,
}.NewTicker(o.pollPeriod)
defer t.Stop()

for {
select {
case <-o.chStop:
return
case <-t.C:
t = o.refresh()
o.refresh()
}
}
}
func (o *OptimismL1Oracle) refresh() (t *time.Timer) {
t, err := o.refreshWithError()
func (o *OptimismL1Oracle) refresh() {
err := o.refreshWithError()
if err != nil {
o.logger.Criticalw("Failed to refresh gas price", "err", err)
o.SvcErrBuffer.Append(err)
}
return
}

func (o *OptimismL1Oracle) refreshWithError() (t *time.Timer, err error) {
t = time.NewTimer(utils.WithJitter(o.pollPeriod))

func (o *OptimismL1Oracle) refreshWithError() error {
ctx, cancel := o.chStop.CtxCancel(evmclient.ContextWithDefaultTimeout())
defer cancel()

price, err := o.GetDAGasPrice(ctx)
if err != nil {
return t, err
return err
}

o.l1GasPriceMu.Lock()
defer o.l1GasPriceMu.Unlock()
o.l1GasPrice = priceEntry{price: assets.NewWei(price), timestamp: time.Now()}
return
return nil
}

func (o *OptimismL1Oracle) GasPrice(_ context.Context) (l1GasPrice *assets.Wei, err error) {
Expand Down
1 change: 1 addition & 0 deletions core/chains/evm/gas/rollups/zkSync_l1_oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func (o *zkSyncL1Oracle) run() {
func (o *zkSyncL1Oracle) refresh() (t *time.Timer) {
t, err := o.refreshWithError()
if err != nil {
o.logger.Criticalw("Failed to refresh gas price", "err", err)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@simsonraj I had trouble debugging this PR, because these refresh methods are pushing errors to mark the service unhealthy, but without logging anything. These are typically paired with a critical log. Is that appropriate to do in this case, and the other files in this PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I think its appropriate to append an error, but not sure if it needs to be critical if it is polling. At least for zkSync case, it may not be since we dont need gasPrice exactly real-time & not sure how much impact an RPC malfunction will have here. Looking at Arbitrum it seems the intent was to log it as an error.
May be @amit-momin @FelixFan1992, any opinions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we looked at the timestamp, and only took this action (both log and unhealthy) if we cannot refresh after some threshold of time has passed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging the error definitely makes sense here. Following the example of other gas estimator code, I think it's ok to log Critical here. If there's an RPC issue, it's likely a connectivity issue which we'd want to surface to NOPs urgently. Although I'm sure other components would also start making some noise for it.

o.SvcErrBuffer.Append(err)
}
return
Expand Down
19 changes: 10 additions & 9 deletions core/chains/evm/gas/suggested_price_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"
bigmath "github.com/smartcontractkit/chainlink-common/pkg/utils/big_math"

"github.com/smartcontractkit/chainlink/v2/common/fee"
Expand Down Expand Up @@ -101,26 +100,29 @@ func (o *SuggestedPriceEstimator) HealthReport() map[string]error {
func (o *SuggestedPriceEstimator) run() {
defer close(o.chDone)

t := o.refreshPrice()
o.refreshPrice()
close(o.chInitialised)

t := services.TickerConfig{
Initial: o.pollPeriod,
JitterPct: services.DefaultJitter,
}.NewTicker(o.pollPeriod)

for {
select {
case <-o.chStop:
return
case ch := <-o.chForceRefetch:
t.Stop()
t = o.refreshPrice()
o.refreshPrice()
t.Reset()
close(ch)
case <-t.C:
t = o.refreshPrice()
o.refreshPrice()
}
}
}

func (o *SuggestedPriceEstimator) refreshPrice() (t *time.Timer) {
t = time.NewTimer(utils.WithJitter(o.pollPeriod))

func (o *SuggestedPriceEstimator) refreshPrice() {
var res hexutil.Big
ctx, cancel := o.chStop.CtxCancel(evmclient.ContextWithDefaultTimeout())
defer cancel()
Expand All @@ -136,7 +138,6 @@ func (o *SuggestedPriceEstimator) refreshPrice() (t *time.Timer) {
o.gasPriceMu.Lock()
defer o.gasPriceMu.Unlock()
o.GasPrice = bi
return
}

// Uses the force refetch chan to trigger a price update and blocks until complete
Expand Down
15 changes: 9 additions & 6 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,9 +552,14 @@ func (lp *logPoller) run() {
defer lp.wg.Done()
ctx, cancel := lp.stopCh.NewCtx()
defer cancel()
logPollTick := time.After(0)
logPollTicker := services.NewTicker(lp.pollPeriod)
defer logPollTicker.Stop()
// stagger these somewhat, so they don't all run back-to-back
backupLogPollTick := time.After(100 * time.Millisecond)
backupLogPollTicker := services.TickerConfig{
Initial: 100 * time.Millisecond,
JitterPct: services.DefaultJitter,
}.NewTicker(time.Duration(lp.backupPollerBlockDelay) * lp.pollPeriod)
defer backupLogPollTicker.Stop()
filtersLoaded := false

for {
Expand All @@ -563,8 +568,7 @@ func (lp *logPoller) run() {
return
case fromBlockReq := <-lp.replayStart:
lp.handleReplayRequest(ctx, fromBlockReq, filtersLoaded)
case <-logPollTick:
logPollTick = time.After(utils.WithJitter(lp.pollPeriod))
case <-logPollTicker.C:
if !filtersLoaded {
if err := lp.loadFilters(ctx); err != nil {
lp.lggr.Errorw("Failed loading filters in main logpoller loop, retrying later", "err", err)
Expand Down Expand Up @@ -602,7 +606,7 @@ func (lp *logPoller) run() {
start = lastProcessed.BlockNumber + 1
}
lp.PollAndSaveLogs(ctx, start)
case <-backupLogPollTick:
case <-backupLogPollTicker.C:
if lp.backupPollerBlockDelay == 0 {
continue // backup poller is disabled
}
Expand All @@ -614,7 +618,6 @@ func (lp *logPoller) run() {
// frequently than the primary log poller (instead of roughly once per block it runs once roughly once every
// lp.backupPollerDelay blocks--with default settings about 100x less frequently).

backupLogPollTick = time.After(utils.WithJitter(time.Duration(lp.backupPollerBlockDelay) * lp.pollPeriod))
if !filtersLoaded {
lp.lggr.Warnw("Backup log poller ran before filters loaded, skipping")
continue
Expand Down
Loading
Loading