Skip to content

Commit

Permalink
Merge pull request #113 from utilitywarehouse/agent-backoff
Browse files Browse the repository at this point in the history
Add simple backoff mechanism for lease retries
  • Loading branch information
ffilippopoulos authored May 25, 2021
2 parents d168277 + 9d7d276 commit acd6917
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 4 deletions.
81 changes: 81 additions & 0 deletions backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package main

import (
"math"
"sync"
"time"
)

type backoff struct {
lock sync.Mutex
attempt uint64
// Factor is the multiplying factor for each increment step
Factor float64
// Min and Max are the minimum and maximum values of the counter
Min, Max time.Duration
}

const (
maxInt64 = float64(math.MaxInt64 - 512)
defaultMinDuration = 1 * time.Second
defaultMaxDuration = 100 * time.Second
defaultFactor = 2
)

// newBackoff initialises and returns a new Backoff
func newBackoff(min, max time.Duration, factor float64) *backoff {
// In case of 0 min/max values apply defaults
if min <= 0 {
min = defaultMinDuration
}
if max <= 0 {
max = defaultMaxDuration
}
if factor <= 0 {
factor = defaultFactor
}
return &backoff{
lock: sync.Mutex{},
Min: min,
Max: max,
Factor: factor,
}
}

// Duration returns the next backoff duration and increments attempt
func (b *backoff) Duration() time.Duration {
b.lock.Lock()
d := b.forAttempt(float64(b.attempt))
b.attempt = b.attempt + 1
b.lock.Unlock()
return d
}

func (b *backoff) forAttempt(attempt float64) time.Duration {
if b.Min >= b.Max {
return b.Max
}
//calculate this duration
minf := float64(b.Min)
durf := minf * math.Pow(b.Factor, attempt)
//ensure float64 wont overflow int64
if durf > maxInt64 {
return b.Max
}
dur := time.Duration(durf)
//keep within bounds
if dur < b.Min {
return b.Min
}
if dur > b.Max {
return b.Max
}
return dur
}

// Reset restarts the current attempt counter at zero.
func (b *backoff) Reset() {
b.lock.Lock()
b.attempt = 0
b.lock.Unlock()
}
22 changes: 22 additions & 0 deletions backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package main

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestBackoff(t *testing.T) {
b := newBackoff(time.Second, 10*time.Second, 2)

assert.Equal(t, b.Duration(), 1*time.Second)
assert.Equal(t, b.Duration(), 2*time.Second)
assert.Equal(t, b.Duration(), 4*time.Second)
assert.Equal(t, b.Duration(), 8*time.Second)
// Reaching max means that we should always return the max value
assert.Equal(t, b.Duration(), 10*time.Second)
assert.Equal(t, b.Duration(), 10*time.Second)
b.Reset()
assert.Equal(t, b.Duration(), 1*time.Second)
}
26 changes: 22 additions & 4 deletions devicemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@ type DeviceManager struct {
configMutex sync.Mutex
config *WirestewardPeerConfig // To keep the current config
serverURLs []string
backoff *backoff // backoff timer for retries to get a new lease
healthCheck *healthCheck // Pointer to the device manager running healthchek
healthCheckConf agentHealthCheckConfig
renewLeaseChan chan struct{}
stopLeaseBackoff chan struct{}
inBackoffLoop bool // bool to signal if there is a backoff loop in progress
httpClientTimeout Duration
}

Expand All @@ -43,9 +46,12 @@ func newDeviceManager(deviceName string, mtu int, wirestewardURLs []string, http
return &DeviceManager{
agentDevice: device,
serverURLs: wirestewardURLs,
backoff: newBackoff(1*time.Second, 64*time.Second, 2),
healthCheck: &healthCheck{running: false},
healthCheckConf: hcc,
renewLeaseChan: make(chan struct{}),
stopLeaseBackoff: make(chan struct{}),
inBackoffLoop: false,
httpClientTimeout: httpClientTimeout,
}
}
Expand Down Expand Up @@ -96,12 +102,20 @@ func (dm *DeviceManager) renewLoop() {
case <-dm.renewLeaseChan:
logger.Info.Printf("Renewing lease for device:%s\n", dm.Name())
if err := dm.renewLease(); err != nil {
logger.Error.Printf("Cannot update lease, will retry in one sec: %s", err)
// Wait a second in a goroutine so we do not block here and try again
go func() {
time.Sleep(1 * time.Second)
dm.renewLeaseChan <- struct{}{}
dm.inBackoffLoop = true
duration := dm.backoff.Duration()
logger.Error.Printf("Cannot update lease, will retry in %s: %s", duration, err)
select {
case <-time.After(duration):
dm.renewLeaseChan <- struct{}{}
case <-dm.stopLeaseBackoff:
break
}
dm.inBackoffLoop = false
}()
} else {
dm.backoff.Reset()
}
}
}
Expand All @@ -116,6 +130,10 @@ func (dm *DeviceManager) nextServer() string {
func (dm *DeviceManager) RenewTokenAndLease(token string) {
dm.cachedToken = token
dm.healthCheck.Stop() // stop a running healthcheck that could also trigger renewals
if dm.inBackoffLoop {
dm.stopLeaseBackoff <- struct{}{} // Stop existing backoff loops
}
dm.backoff.Reset() // Reset backoff timer
dm.renewLeaseChan <- struct{}{}
}

Expand Down

0 comments on commit acd6917

Please sign in to comment.