From c595a7ad614d53a825e2d374ac93c78c64a97fbe Mon Sep 17 00:00:00 2001 From: Jordan Krage Date: Fri, 12 Jan 2024 08:05:56 -0600 Subject: [PATCH] pkg/timeutil: add Ticker --- pkg/services/ticker.go | 21 ++++++++++++++++ pkg/timeutil/jitter.go | 22 +++++++++++++++++ pkg/timeutil/jitter_test.go | 28 +++++++++++++++++++++ pkg/timeutil/ticker.go | 41 +++++++++++++++++++++++++++++++ pkg/utils/mailbox/mailbox_prom.go | 24 +++++++----------- pkg/utils/utils.go | 19 +++----------- 6 files changed, 125 insertions(+), 30 deletions(-) create mode 100644 pkg/services/ticker.go create mode 100644 pkg/timeutil/jitter.go create mode 100644 pkg/timeutil/jitter_test.go create mode 100644 pkg/timeutil/ticker.go diff --git a/pkg/services/ticker.go b/pkg/services/ticker.go new file mode 100644 index 000000000..7a567ecf1 --- /dev/null +++ b/pkg/services/ticker.go @@ -0,0 +1,21 @@ +package services + +import ( + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/timeutil" +) + +// NewTicker returns a new timeutil.Ticker configured to: +// - fire the first tick immediately +// - apply jitter to each period via timeutil.WithJitter +func NewTicker(d time.Duration) *timeutil.Ticker { + first := true + return timeutil.NewTicker(func() time.Duration { + if first { + first = false + return 0 + } + return timeutil.WithJitter(d) + }) +} diff --git a/pkg/timeutil/jitter.go b/pkg/timeutil/jitter.go new file mode 100644 index 000000000..424f6cb19 --- /dev/null +++ b/pkg/timeutil/jitter.go @@ -0,0 +1,22 @@ +package timeutil + +import ( + "math" + mrand "math/rand" + "time" +) + +// WithJitter adds +/- 10% to a duration. +// TODO better name? JitterDuration; ApplyJitter +// TODO customizable percentage? +func WithJitter(d time.Duration) time.Duration { + // #nosec + if d == 0 { + return 0 + } + // ensure non-zero arg to Intn to avoid panic + max := math.Max(float64(d.Abs())/5.0, 1.) + // #nosec - non critical randomness + jitter := mrand.Intn(int(max)) / 2 + return time.Duration(int(d) + jitter) +} diff --git a/pkg/timeutil/jitter_test.go b/pkg/timeutil/jitter_test.go new file mode 100644 index 000000000..dbf46a0f0 --- /dev/null +++ b/pkg/timeutil/jitter_test.go @@ -0,0 +1,28 @@ +package timeutil + +import ( + "testing" + "time" +) + +func TestWithJitter(t *testing.T) { + for _, tt := range []struct { + dur time.Duration + from, to time.Duration + }{ + {0, 0, 0}, + {time.Second, 900 * time.Millisecond, 1100 * time.Millisecond}, + {time.Minute, 54 * time.Second, 66 * time.Second}, + {24 * time.Hour, 21*time.Hour + 36*time.Minute, 26*time.Hour + 24*time.Minute}, + } { + t.Run(tt.dur.String(), func(t *testing.T) { + for i := 0; i < 100; i++ { + got := WithJitter(tt.dur) + t.Logf("%d: %s", i, got) + if got < tt.from || got > tt.to { + t.Errorf("expected duration %s with jitter to be between (%s, %s) but got: %s", tt.dur, tt.from, tt.to, got) + } + } + }) + } +} diff --git a/pkg/timeutil/ticker.go b/pkg/timeutil/ticker.go new file mode 100644 index 000000000..268c7b4cb --- /dev/null +++ b/pkg/timeutil/ticker.go @@ -0,0 +1,41 @@ +package timeutil + +import ( + "time" +) + +// Ticker is like time.Ticker, but with a variable period. +type Ticker struct { + C <-chan time.Time + stop chan struct{} +} + +// NewTicker returns a started Ticker which calls nextDur for each period. +func NewTicker(nextDur func() time.Duration) *Ticker { + c := make(chan time.Time) // unbuffered so we block and delay if not being handled + t := Ticker{C: c, stop: make(chan struct{})} + go t.run(c, nextDur) + return &t +} + +func (t *Ticker) run(c chan<- time.Time, nextDur func() time.Duration) { + var timer *time.Timer + defer timer.Stop() + for { + timer = time.NewTimer(nextDur()) + select { + case <-t.stop: + return + + case <-timer.C: + timer.Stop() + select { + case <-t.stop: + return + case c <- time.Now(): + } + } + } +} + +func (t *Ticker) Stop() { close(t.stop) } diff --git a/pkg/utils/mailbox/mailbox_prom.go b/pkg/utils/mailbox/mailbox_prom.go index 36744b420..cb6d916f4 100644 --- a/pkg/utils/mailbox/mailbox_prom.go +++ b/pkg/utils/mailbox/mailbox_prom.go @@ -12,7 +12,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" - "github.com/smartcontractkit/chainlink-common/pkg/utils" ) var mailboxLoad = promauto.NewGaugeVec(prometheus.GaugeOpts{ @@ -30,33 +29,26 @@ type Monitor struct { lggr logger.Logger mailboxes sync.Map - stop func() + stopCh services.StopChan done chan struct{} } func NewMonitor(appID string, lggr logger.Logger) *Monitor { - return &Monitor{appID: appID, lggr: logger.Named(lggr, "Monitor")} + return &Monitor{appID: appID, lggr: logger.Named(lggr, "Monitor"), stopCh: make(services.StopChan), done: make(chan struct{})} } func (m *Monitor) Name() string { return m.lggr.Name() } func (m *Monitor) Start(context.Context) error { return m.StartOnce("Monitor", func() error { - t := time.NewTicker(utils.WithJitter(mailboxPromInterval)) - ctx, cancel := context.WithCancel(context.Background()) - m.stop = func() { - t.Stop() - cancel() - } - m.done = make(chan struct{}) - go m.monitorLoop(ctx, t.C) + go m.monitorLoop() return nil }) } func (m *Monitor) Close() error { return m.StopOnce("Monitor", func() error { - m.stop() + close(m.stopCh) <-m.done return nil }) @@ -66,13 +58,15 @@ func (m *Monitor) HealthReport() map[string]error { return map[string]error{m.Name(): m.Healthy()} } -func (m *Monitor) monitorLoop(ctx context.Context, c <-chan time.Time) { +func (m *Monitor) monitorLoop() { defer close(m.done) + t := services.NewTicker(mailboxPromInterval) + defer t.Stop() for { select { - case <-ctx.Done(): + case <-m.stopCh: return - case <-c: + case <-t.C: m.mailboxes.Range(func(k, v any) bool { name, mb := k.(string), v.(mailbox) c, p := mb.load() diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 6be8e3df1..4a1f2b9ac 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -3,27 +3,16 @@ package utils import ( "context" "fmt" - "math" - mrand "math/rand" "sync" "time" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/timeutil" ) -// WithJitter adds +/- 10% to a duration -func WithJitter(d time.Duration) time.Duration { - // #nosec - if d == 0 { - return 0 - } - // ensure non-zero arg to Intn to avoid panic - max := math.Max(float64(d.Abs())/5.0, 1.) - // #nosec - non critical randomness - jitter := mrand.Intn(int(max)) - jitter = jitter - (jitter / 2) - return time.Duration(int(d) + jitter) -} +// WithJitter adds +/- 10% to a duration. +// Deprecated: use timeutil.WithJitter +func WithJitter(d time.Duration) time.Duration { return timeutil.WithJitter(d) } // ContextFromChan creates a context that finishes when the provided channel // receives or is closed.