diff --git a/pkg/services/ticker.go b/pkg/services/ticker.go new file mode 100644 index 000000000..5ce92f911 --- /dev/null +++ b/pkg/services/ticker.go @@ -0,0 +1,39 @@ +package services + +import ( + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/timeutil" +) + +// DefaultJitter is +/-10% +const DefaultJitter timeutil.JitterPct = 0.1 + +// NewTicker returns a new timeutil.Ticker configured to: +// - fire the first tick immediately +// - apply DefaultJitter to each period +func NewTicker(period time.Duration) *timeutil.Ticker { + return TickerConfig{JitterPct: DefaultJitter}.NewTicker(period) +} + +type TickerConfig struct { + // Initial delay before the first tick. + Initial time.Duration + // JitterPct to apply to each period. + JitterPct timeutil.JitterPct +} + +func (c TickerConfig) NewTicker(period time.Duration) *timeutil.Ticker { + first := true + return timeutil.NewTicker(func() time.Duration { + if first { + first = false + return c.Initial + } + p := period + if c.JitterPct != 0.0 { + p = c.JitterPct.Apply(p) + } + return p + }) +} diff --git a/pkg/timeutil/jitter.go b/pkg/timeutil/jitter.go new file mode 100644 index 000000000..5f651930f --- /dev/null +++ b/pkg/timeutil/jitter.go @@ -0,0 +1,22 @@ +package timeutil + +import ( + mrand "math/rand" + "time" +) + +// JitterPct is a percent by which to scale a duration up or down. +// For example, 0.1 will result in +/- 10%. +type JitterPct float64 + +func (p JitterPct) Apply(d time.Duration) time.Duration { + // #nosec + if d == 0 { + return 0 + } + // ensure non-zero arg to Intn to avoid panic + ub := max(1, int(float64(d.Abs())*float64(p))) + // #nosec - non critical randomness + jitter := mrand.Intn(2*ub) - ub + return time.Duration(int(d) + jitter) +} diff --git a/pkg/timeutil/jitter_test.go b/pkg/timeutil/jitter_test.go new file mode 100644 index 000000000..251744967 --- /dev/null +++ b/pkg/timeutil/jitter_test.go @@ -0,0 +1,29 @@ +package timeutil + +import ( + "testing" + "time" +) + +func TestJitterPct(t *testing.T) { + for _, tt := range []struct { + pct JitterPct + dur time.Duration + from, to time.Duration + }{ + {0.1, 0, 0, 0}, + {0.1, time.Second, 900 * time.Millisecond, 1100 * time.Millisecond}, + {0.1, time.Minute, 54 * time.Second, 66 * time.Second}, + {0.1, 24 * time.Hour, 21*time.Hour + 36*time.Minute, 26*time.Hour + 24*time.Minute}, + } { + t.Run(tt.dur.String(), func(t *testing.T) { + for i := 0; i < 100; i++ { + got := tt.pct.Apply(tt.dur) + t.Logf("%d: %s", i, got) + if got < tt.from || got > tt.to { + t.Errorf("expected duration %s with jitter to be between (%s, %s) but got: %s", tt.dur, tt.from, tt.to, got) + } + } + }) + } +} diff --git a/pkg/timeutil/ticker.go b/pkg/timeutil/ticker.go new file mode 100644 index 000000000..6d4a07dac --- /dev/null +++ b/pkg/timeutil/ticker.go @@ -0,0 +1,58 @@ +package timeutil + +import ( + "time" +) + +// Ticker is like time.Ticker, but with a variable period. +type Ticker struct { + C <-chan time.Time + stop chan struct{} + reset chan struct{} +} + +// NewTicker returns a started Ticker which calls nextDur for each period. +// Ticker.Stop should be called to prevent goroutine leaks. +func NewTicker(nextDur func() time.Duration) *Ticker { + c := make(chan time.Time) // unbuffered so we block and delay if not being handled + t := Ticker{C: c, stop: make(chan struct{}), reset: make(chan struct{})} + go t.run(c, nextDur) + return &t +} + +// Stop permanently stops the Ticker. It cannot be Reset. +func (t *Ticker) Stop() { close(t.stop) } + +func (t *Ticker) run(c chan<- time.Time, nextDur func() time.Duration) { + for { + timer := time.NewTimer(nextDur()) + select { + case <-t.stop: + timer.Stop() + return + + case <-t.reset: + timer.Stop() + + case <-timer.C: + timer.Stop() + select { + case <-t.stop: + return + case c <- time.Now(): + case <-t.reset: + } + } + } +} + +// Reset starts a new period. +func (t *Ticker) Reset() { + select { + case <-t.stop: + case t.reset <- struct{}{}: + default: + // unnecessary + return + } +} diff --git a/pkg/utils/mailbox/mailbox_prom.go b/pkg/utils/mailbox/mailbox_prom.go index 36744b420..cb6d916f4 100644 --- a/pkg/utils/mailbox/mailbox_prom.go +++ b/pkg/utils/mailbox/mailbox_prom.go @@ -12,7 +12,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink-common/pkg/utils" ) var mailboxLoad = promauto.NewGaugeVec(prometheus.GaugeOpts{ @@ -30,33 +29,26 @@ type Monitor struct { lggr logger.Logger mailboxes sync.Map - stop func() + stopCh services.StopChan done chan struct{} } func NewMonitor(appID string, lggr logger.Logger) *Monitor { - return &Monitor{appID: appID, lggr: logger.Named(lggr, "Monitor")} + return &Monitor{appID: appID, lggr: logger.Named(lggr, "Monitor"), stopCh: make(services.StopChan), done: make(chan struct{})} } func (m *Monitor) Name() string { return m.lggr.Name() } func (m *Monitor) Start(context.Context) error { return m.StartOnce("Monitor", func() error { - t := time.NewTicker(utils.WithJitter(mailboxPromInterval)) - ctx, cancel := context.WithCancel(context.Background()) - m.stop = func() { - t.Stop() - cancel() - } - m.done = make(chan struct{}) - go m.monitorLoop(ctx, t.C) + go m.monitorLoop() return nil }) } func (m *Monitor) Close() error { return m.StopOnce("Monitor", func() error { - m.stop() + close(m.stopCh) <-m.done return nil }) @@ -66,13 +58,15 @@ func (m *Monitor) HealthReport() map[string]error { return map[string]error{m.Name(): m.Healthy()} } -func (m *Monitor) monitorLoop(ctx context.Context, c <-chan time.Time) { +func (m *Monitor) monitorLoop() { defer close(m.done) + t := services.NewTicker(mailboxPromInterval) + defer t.Stop() for { select { - case <-ctx.Done(): + case <-m.stopCh: return - case <-c: + case <-t.C: m.mailboxes.Range(func(k, v any) bool { name, mb := k.(string), v.(mailbox) c, p := mb.load() diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 6be8e3df1..6a60d2629 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -3,27 +3,16 @@ package utils import ( "context" "fmt" - "math" - mrand "math/rand" "sync" "time" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/timeutil" ) -// WithJitter adds +/- 10% to a duration -func WithJitter(d time.Duration) time.Duration { - // #nosec - if d == 0 { - return 0 - } - // ensure non-zero arg to Intn to avoid panic - max := math.Max(float64(d.Abs())/5.0, 1.) - // #nosec - non critical randomness - jitter := mrand.Intn(int(max)) - jitter = jitter - (jitter / 2) - return time.Duration(int(d) + jitter) -} +// WithJitter adds +/- 10% to a duration. +// Deprecated: use timeutil.WithJitter +func WithJitter(d time.Duration) time.Duration { return timeutil.JitterPct(0.1).Apply(d) } // ContextFromChan creates a context that finishes when the provided channel // receives or is closed.