From 682b28feadc5d1235679056f45b7e2fcb0f45e22 Mon Sep 17 00:00:00 2001 From: Foivos Filippopoulos Date: Tue, 25 May 2021 12:34:30 +0100 Subject: [PATCH 1/2] Add simple backoff mechanism for lease retries --- backoff.go | 81 ++++++++++++++++++++++++++++++++++++++++++++++++ backoff_test.go | 22 +++++++++++++ devicemanager.go | 23 ++++++++++++-- 3 files changed, 123 insertions(+), 3 deletions(-) create mode 100644 backoff.go create mode 100644 backoff_test.go diff --git a/backoff.go b/backoff.go new file mode 100644 index 0000000..c9408c3 --- /dev/null +++ b/backoff.go @@ -0,0 +1,81 @@ +package main + +import ( + "math" + "sync" + "time" +) + +type backoff struct { + lock sync.Mutex + attempt uint64 + // Factor is the multiplying factor for each increment step + Factor float64 + // Min and Max are the minimum and maximum values of the counter + Min, Max time.Duration +} + +const ( + maxInt64 = float64(math.MaxInt64 - 512) + minDuration = 1 * time.Second + maxDuration = 100 * time.Second + defaultFactor = 2 +) + +// newBackoff initialises and returns a new Backoff +func newBackoff(min, max time.Duration, factor float64) *backoff { + // In case of 0 min/max values apply defaults + if min <= 0 { + min = minDuration + } + if max <= 0 { + max = maxDuration + } + if factor <= 0 { + factor = defaultFactor + } + return &backoff{ + lock: sync.Mutex{}, + Min: min, + Max: max, + Factor: factor, + } +} + +// Duration returns the next backoff duration and increments attempt +func (b *backoff) Duration() time.Duration { + b.lock.Lock() + d := b.forAttempt(float64(b.attempt)) + b.attempt = b.attempt + 1 + b.lock.Unlock() + return d +} + +func (b *backoff) forAttempt(attempt float64) time.Duration { + if b.Min >= b.Max { + return b.Max + } + //calculate this duration + minf := float64(b.Min) + durf := minf * math.Pow(b.Factor, attempt) + //ensure float64 wont overflow int64 + if durf > maxInt64 { + return b.Max + } + dur := time.Duration(durf) + //keep within bounds + if dur < b.Min { + return b.Min + } + if dur > b.Max { + return b.Max + } + return dur +} + +// Reset restarts the current attempt counter at zero. +func (b *backoff) Reset() { + b.lock.Lock() + b.attempt = 0 + b.lock.Unlock() +} diff --git a/backoff_test.go b/backoff_test.go new file mode 100644 index 0000000..b30b1fb --- /dev/null +++ b/backoff_test.go @@ -0,0 +1,22 @@ +package main + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestBackoff(t *testing.T) { + b := newBackoff(time.Second, 10*time.Second, 2) + + assert.Equal(t, b.Duration(), 1*time.Second) + assert.Equal(t, b.Duration(), 2*time.Second) + assert.Equal(t, b.Duration(), 4*time.Second) + assert.Equal(t, b.Duration(), 8*time.Second) + // Reaching max means that we should always return the max value + assert.Equal(t, b.Duration(), 10*time.Second) + assert.Equal(t, b.Duration(), 10*time.Second) + b.Reset() + assert.Equal(t, b.Duration(), 1*time.Second) +} diff --git a/devicemanager.go b/devicemanager.go index d06487c..9692e8a 100644 --- a/devicemanager.go +++ b/devicemanager.go @@ -27,9 +27,12 @@ type DeviceManager struct { configMutex sync.Mutex config *WirestewardPeerConfig // To keep the current config serverURLs []string + backoff *backoff // backoff timer for retries to get a new lease healthCheck *healthCheck // Pointer to the device manager running healthchek healthCheckConf agentHealthCheckConfig renewLeaseChan chan struct{} + stopLeaseBackoff chan struct{} + inBackoffLoop bool // bool to signal if there is a backoff loop in progress httpClientTimeout Duration } @@ -43,9 +46,12 @@ func newDeviceManager(deviceName string, mtu int, wirestewardURLs []string, http return &DeviceManager{ agentDevice: device, serverURLs: wirestewardURLs, + backoff: newBackoff(1*time.Second, 64*time.Second, 2), healthCheck: &healthCheck{running: false}, healthCheckConf: hcc, renewLeaseChan: make(chan struct{}), + stopLeaseBackoff: make(chan struct{}), + inBackoffLoop: false, httpClientTimeout: httpClientTimeout, } } @@ -97,11 +103,18 @@ func (dm *DeviceManager) renewLoop() { logger.Info.Printf("Renewing lease for device:%s\n", dm.Name()) if err := dm.renewLease(); err != nil { logger.Error.Printf("Cannot update lease, will retry in one sec: %s", err) - // Wait a second in a goroutine so we do not block here and try again go func() { - time.Sleep(1 * time.Second) - dm.renewLeaseChan <- struct{}{} + dm.inBackoffLoop = true + select { + case <-time.After(dm.backoff.Duration()): + dm.renewLeaseChan <- struct{}{} + case <-dm.stopLeaseBackoff: + break + } + dm.inBackoffLoop = false }() + } else { + dm.backoff.Reset() } } } @@ -116,6 +129,10 @@ func (dm *DeviceManager) nextServer() string { func (dm *DeviceManager) RenewTokenAndLease(token string) { dm.cachedToken = token dm.healthCheck.Stop() // stop a running healthcheck that could also trigger renewals + if dm.inBackoffLoop { + dm.stopLeaseBackoff <- struct{}{} // Stop existing backoff loops + } + dm.backoff.Reset() // Reset backoff timer dm.renewLeaseChan <- struct{}{} } From 9d7d276952556636e498a3c08002c756c6b36234 Mon Sep 17 00:00:00 2001 From: Foivos Filippopoulos Date: Tue, 25 May 2021 12:56:45 +0100 Subject: [PATCH 2/2] Log next backoff duration, update default vars names --- backoff.go | 12 ++++++------ devicemanager.go | 5 +++-- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/backoff.go b/backoff.go index c9408c3..04ab378 100644 --- a/backoff.go +++ b/backoff.go @@ -16,20 +16,20 @@ type backoff struct { } const ( - maxInt64 = float64(math.MaxInt64 - 512) - minDuration = 1 * time.Second - maxDuration = 100 * time.Second - defaultFactor = 2 + maxInt64 = float64(math.MaxInt64 - 512) + defaultMinDuration = 1 * time.Second + defaultMaxDuration = 100 * time.Second + defaultFactor = 2 ) // newBackoff initialises and returns a new Backoff func newBackoff(min, max time.Duration, factor float64) *backoff { // In case of 0 min/max values apply defaults if min <= 0 { - min = minDuration + min = defaultMinDuration } if max <= 0 { - max = maxDuration + max = defaultMaxDuration } if factor <= 0 { factor = defaultFactor diff --git a/devicemanager.go b/devicemanager.go index 9692e8a..c0fa408 100644 --- a/devicemanager.go +++ b/devicemanager.go @@ -102,11 +102,12 @@ func (dm *DeviceManager) renewLoop() { case <-dm.renewLeaseChan: logger.Info.Printf("Renewing lease for device:%s\n", dm.Name()) if err := dm.renewLease(); err != nil { - logger.Error.Printf("Cannot update lease, will retry in one sec: %s", err) go func() { dm.inBackoffLoop = true + duration := dm.backoff.Duration() + logger.Error.Printf("Cannot update lease, will retry in %s: %s", duration, err) select { - case <-time.After(dm.backoff.Duration()): + case <-time.After(duration): dm.renewLeaseChan <- struct{}{} case <-dm.stopLeaseBackoff: break