Skip to content

Commit

Permalink
pkg/timeutil: add Ticker
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Apr 3, 2024
1 parent c01e8d1 commit c595a7a
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 30 deletions.
21 changes: 21 additions & 0 deletions pkg/services/ticker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package services

import (
"time"

"github.com/smartcontractkit/chainlink-common/pkg/timeutil"
)

// NewTicker returns a new timeutil.Ticker configured to:
// - fire the first tick immediately
// - apply jitter to each period via timeutil.WithJitter
func NewTicker(d time.Duration) *timeutil.Ticker {
first := true
return timeutil.NewTicker(func() time.Duration {
if first {
first = false
return 0
}
return timeutil.WithJitter(d)
})
}
22 changes: 22 additions & 0 deletions pkg/timeutil/jitter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package timeutil

import (
"math"
mrand "math/rand"
"time"
)

// WithJitter adds +/- 10% to a duration.
// TODO better name? JitterDuration; ApplyJitter
// TODO customizable percentage?
func WithJitter(d time.Duration) time.Duration {
// #nosec
if d == 0 {
return 0
}
// ensure non-zero arg to Intn to avoid panic
max := math.Max(float64(d.Abs())/5.0, 1.)
// #nosec - non critical randomness
jitter := mrand.Intn(int(max)) / 2
return time.Duration(int(d) + jitter)
}
28 changes: 28 additions & 0 deletions pkg/timeutil/jitter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package timeutil

import (
"testing"
"time"
)

func TestWithJitter(t *testing.T) {
for _, tt := range []struct {
dur time.Duration
from, to time.Duration
}{
{0, 0, 0},
{time.Second, 900 * time.Millisecond, 1100 * time.Millisecond},
{time.Minute, 54 * time.Second, 66 * time.Second},
{24 * time.Hour, 21*time.Hour + 36*time.Minute, 26*time.Hour + 24*time.Minute},
} {
t.Run(tt.dur.String(), func(t *testing.T) {
for i := 0; i < 100; i++ {
got := WithJitter(tt.dur)
t.Logf("%d: %s", i, got)
if got < tt.from || got > tt.to {
t.Errorf("expected duration %s with jitter to be between (%s, %s) but got: %s", tt.dur, tt.from, tt.to, got)
}
}
})
}
}
41 changes: 41 additions & 0 deletions pkg/timeutil/ticker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package timeutil

import (
"time"
)

// Ticker is like time.Ticker, but with a variable period.
type Ticker struct {
C <-chan time.Time
stop chan struct{}
}

// NewTicker returns a started Ticker which calls nextDur for each period.
func NewTicker(nextDur func() time.Duration) *Ticker {
c := make(chan time.Time) // unbuffered so we block and delay if not being handled
t := Ticker{C: c, stop: make(chan struct{})}
go t.run(c, nextDur)
return &t
}

func (t *Ticker) run(c chan<- time.Time, nextDur func() time.Duration) {
var timer *time.Timer
defer timer.Stop()
for {
timer = time.NewTimer(nextDur())
select {
case <-t.stop:
return

case <-timer.C:
timer.Stop()
select {
case <-t.stop:
return
case c <- time.Now():
}
}
}
}

func (t *Ticker) Stop() { close(t.stop) }
24 changes: 9 additions & 15 deletions pkg/utils/mailbox/mailbox_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/utils"
)

var mailboxLoad = promauto.NewGaugeVec(prometheus.GaugeOpts{
Expand All @@ -30,33 +29,26 @@ type Monitor struct {
lggr logger.Logger

mailboxes sync.Map
stop func()
stopCh services.StopChan
done chan struct{}
}

func NewMonitor(appID string, lggr logger.Logger) *Monitor {
return &Monitor{appID: appID, lggr: logger.Named(lggr, "Monitor")}
return &Monitor{appID: appID, lggr: logger.Named(lggr, "Monitor"), stopCh: make(services.StopChan), done: make(chan struct{})}
}

func (m *Monitor) Name() string { return m.lggr.Name() }

func (m *Monitor) Start(context.Context) error {
return m.StartOnce("Monitor", func() error {
t := time.NewTicker(utils.WithJitter(mailboxPromInterval))
ctx, cancel := context.WithCancel(context.Background())
m.stop = func() {
t.Stop()
cancel()
}
m.done = make(chan struct{})
go m.monitorLoop(ctx, t.C)
go m.monitorLoop()
return nil
})
}

func (m *Monitor) Close() error {
return m.StopOnce("Monitor", func() error {
m.stop()
close(m.stopCh)
<-m.done
return nil
})
Expand All @@ -66,13 +58,15 @@ func (m *Monitor) HealthReport() map[string]error {
return map[string]error{m.Name(): m.Healthy()}
}

func (m *Monitor) monitorLoop(ctx context.Context, c <-chan time.Time) {
func (m *Monitor) monitorLoop() {
defer close(m.done)
t := services.NewTicker(mailboxPromInterval)
defer t.Stop()
for {
select {
case <-ctx.Done():
case <-m.stopCh:
return
case <-c:
case <-t.C:
m.mailboxes.Range(func(k, v any) bool {
name, mb := k.(string), v.(mailbox)
c, p := mb.load()
Expand Down
19 changes: 4 additions & 15 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,16 @@ package utils
import (
"context"
"fmt"
"math"
mrand "math/rand"
"sync"
"time"

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/timeutil"
)

// WithJitter adds +/- 10% to a duration
func WithJitter(d time.Duration) time.Duration {
// #nosec
if d == 0 {
return 0
}
// ensure non-zero arg to Intn to avoid panic
max := math.Max(float64(d.Abs())/5.0, 1.)
// #nosec - non critical randomness
jitter := mrand.Intn(int(max))
jitter = jitter - (jitter / 2)
return time.Duration(int(d) + jitter)
}
// WithJitter adds +/- 10% to a duration.
// Deprecated: use timeutil.WithJitter
func WithJitter(d time.Duration) time.Duration { return timeutil.WithJitter(d) }

// ContextFromChan creates a context that finishes when the provided channel
// receives or is closed.
Expand Down

0 comments on commit c595a7a

Please sign in to comment.