Skip to content

Commit

Permalink
[FIXED] Backoff not respected with multiple inflight redeliveries
Browse files Browse the repository at this point in the history
Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
MauriceVanVeen committed Nov 11, 2024
1 parent fe2c72f commit f594966
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 31 deletions.
76 changes: 45 additions & 31 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ type consumer struct {
outq *jsOutQ
pending map[uint64]*Pending
ptmr *time.Timer
ptmrEnd time.Time
rdq []uint64
rdqi avl.SequenceSet
rdc map[uint64]uint64
Expand Down Expand Up @@ -1508,7 +1509,7 @@ func (o *consumer) setLeader(isLeader bool) {
// Stop any unpause timers. Should only be running on leaders.
stopAndClearTimer(&o.uptmr)
// Make sure to clear out any re-deliver queues
stopAndClearTimer(&o.ptmr)
o.stopAndClearPtmr()
o.rdq = nil
o.rdqi.Empty()
o.pending = nil
Expand Down Expand Up @@ -1979,7 +1980,7 @@ func (o *consumer) forceExpirePending() {
p.Timestamp += off
}
}
o.ptmr.Reset(o.ackWait(0))
o.resetPtmr(o.ackWait(0))
}
o.signalNewMessages()
}
Expand Down Expand Up @@ -2128,7 +2129,7 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
// AckWait
if cfg.AckWait != o.cfg.AckWait {
if o.ptmr != nil {
o.ptmr.Reset(100 * time.Millisecond)
o.resetPtmr(100 * time.Millisecond)
}
}
// Rate Limit
Expand Down Expand Up @@ -2675,7 +2676,7 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) {
if o.ptmr != nil {
// Want checkPending to run and figure out the next timer ttl.
// TODO(dlc) - We could optimize this maybe a bit more and track when we expect the timer to fire.
o.ptmr.Reset(10 * time.Millisecond)
o.resetPtmr(10 * time.Millisecond)
}
}
// Nothing else for use to do now so return.
Expand Down Expand Up @@ -2809,11 +2810,7 @@ func (o *consumer) applyState(state *ConsumerState) {
if o.cfg.AckWait < delay {
delay = o.ackWait(0)
}
if o.ptmr == nil {
o.ptmr = time.AfterFunc(delay, o.checkPending)
} else {
o.ptmr.Reset(delay)
}
o.resetPtmr(delay)
}
}

Expand Down Expand Up @@ -4903,9 +4900,24 @@ func (o *consumer) trackPending(sseq, dseq uint64) {
if o.pending == nil {
o.pending = make(map[uint64]*Pending)
}
if o.ptmr == nil {
o.ptmr = time.AfterFunc(o.ackWait(0), o.checkPending)

// We could have a backoff that set a timer higher than what we need for this message.
// In that case, reset to lowest backoff required for a message redelivery.
minDelay := o.ackWait(0)
if l := len(o.cfg.BackOff); l > 0 {
bi := int(o.rdc[sseq])
if bi < 0 {
bi = 0
} else if bi >= l {
bi = l - 1
}
minDelay = o.ackWait(o.cfg.BackOff[bi])
}
minDeadline := time.Now().Add(minDelay)
if o.ptmr == nil || o.ptmrEnd.After(minDeadline) {
o.resetPtmr(minDelay)
}

if p, ok := o.pending[sseq]; ok {
// Update timestamp but keep original consumer delivery sequence.
// So do not update p.Sequence.
Expand Down Expand Up @@ -5028,24 +5040,21 @@ func (o *consumer) removeFromRedeliverQueue(seq uint64) bool {

// Checks the pending messages.
func (o *consumer) checkPending() {
o.mu.RLock()
o.mu.Lock()
defer o.mu.Unlock()

mset := o.mset
// On stop, mset and timer will be nil.
if o.closed || mset == nil || o.ptmr == nil {
stopAndClearTimer(&o.ptmr)
o.mu.RUnlock()
o.stopAndClearPtmr()
return
}
o.mu.RUnlock()

var shouldUpdateState bool
var state StreamState
mset.store.FastState(&state)
fseq := state.FirstSeq

o.mu.Lock()
defer o.mu.Unlock()

now := time.Now().UnixNano()
ttl := int64(o.cfg.AckWait)
next := int64(o.ackWait(0))
Expand All @@ -5061,11 +5070,7 @@ func (o *consumer) checkPending() {
check := len(o.pending) > 1024
for seq, p := range o.pending {
if check && atomic.LoadInt64(&o.awl) > 0 {
if o.ptmr == nil {
o.ptmr = time.AfterFunc(100*time.Millisecond, o.checkPending)
} else {
o.ptmr.Reset(100 * time.Millisecond)
}
o.resetPtmr(100 * time.Millisecond)
return
}
// Check if these are no longer valid.
Expand Down Expand Up @@ -5132,15 +5137,10 @@ func (o *consumer) checkPending() {
}

if len(o.pending) > 0 {
delay := time.Duration(next)
if o.ptmr == nil {
o.ptmr = time.AfterFunc(delay, o.checkPending)
} else {
o.ptmr.Reset(o.ackWait(delay))
}
o.resetPtmr(time.Duration(next))
} else {
// Make sure to stop timer and clear out any re delivery queues
stopAndClearTimer(&o.ptmr)
o.stopAndClearPtmr()
o.rdq = nil
o.rdqi.Empty()
o.pending = nil
Expand Down Expand Up @@ -5626,7 +5626,7 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
o.client = nil
sysc := o.sysc
o.sysc = nil
stopAndClearTimer(&o.ptmr)
o.stopAndClearPtmr()
stopAndClearTimer(&o.dtmr)
stopAndClearTimer(&o.gwdtmr)
delivery := o.cfg.DeliverSubject
Expand Down Expand Up @@ -6049,3 +6049,17 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
}
return nil
}

func (o *consumer) resetPtmr(delay time.Duration) {
if o.ptmr == nil {
o.ptmr = time.AfterFunc(delay, o.checkPending)
} else {
o.ptmr.Reset(delay)
}
o.ptmrEnd = time.Now().Add(delay)
}

func (o *consumer) stopAndClearPtmr() {
stopAndClearTimer(&o.ptmr)
o.ptmrEnd = time.Time{}
}
70 changes: 70 additions & 0 deletions server/jetstream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2386,3 +2386,73 @@ func Benchmark____JetStreamConsumerIsFilteredMatch(b *testing.B) {
})
}
}

// https://github.com/nats-io/nats-server/issues/6085
func TestJetStreamConsumerBackoffNotRespectedWithMultipleInflightRedeliveries(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"events.>"},
})
require_NoError(t, err)

maxDeliver := 3
backoff := []time.Duration{2 * time.Second, 4 * time.Second}
sub, err := js.SubscribeSync(
"events.>",
nats.MaxDeliver(maxDeliver),
nats.BackOff(backoff),
nats.AckExplicit(),
)
require_NoError(t, err)

calculateExpectedBackoff := func(numDelivered int) time.Duration {
expectedBackoff := 500 * time.Millisecond
for i := 0; i < numDelivered-1 && i < len(backoff); i++ {
expectedBackoff += backoff[i]
}
return expectedBackoff
}

// We get one message to be redelivered using the final backoff duration.
firstMsgSent := time.Now()
sendStreamMsg(t, nc, "events.first", "msg-1")
_, err = sub.NextMsg(time.Second)
require_NoError(t, err)
require_LessThan(t, time.Since(firstMsgSent), calculateExpectedBackoff(1))
_, err = sub.NextMsg(5 * time.Second)
require_NoError(t, err)
require_LessThan(t, time.Since(firstMsgSent), calculateExpectedBackoff(2))
// This message will be redelivered with the final/highest backoff below.

// If we now send a new message, the pending timer should be reset to the first backoff.
// Otherwise, if it remains at the final backoff duration we'll get this message redelivered too late.
sendStreamMsg(t, nc, "events.second", "msg-2")

for {
msg, err := sub.NextMsg(5 * time.Second)
require_NoError(t, err)
if msg.Subject == "events.first" {
require_LessThan(t, time.Since(firstMsgSent), calculateExpectedBackoff(3))
continue
}

// We expect the second message to be redelivered using the specified backoff strategy.
// Before, the first redelivery of the second message would be sent after the highest backoff duration.
metadata, err := msg.Metadata()
require_NoError(t, err)
numDelivered := int(metadata.NumDelivered)
expectedBackoff := calculateExpectedBackoff(numDelivered)
require_LessThan(t, time.Since(metadata.Timestamp), expectedBackoff)

// We've received all message, test passed.
if numDelivered >= maxDeliver {
break
}
}
}

0 comments on commit f594966

Please sign in to comment.