Skip to content

Commit

Permalink
[FIXED] Backoff not respected with multiple inflight redeliveries (#6104
Browse files Browse the repository at this point in the history
)

Resolves #6085

With two messages, one being scheduled to be redelivered after max
backoff, and another new message that just came in. The new message
would only be redelivered after the max backoff due to the first message
having set the timer as such.

Make sure that we can reset the pending timer to trigger earlier in this
scenario.

Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
derekcollison authored Nov 11, 2024
2 parents 626603c + f594966 commit 7ac6d48
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 31 deletions.
76 changes: 45 additions & 31 deletions server/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ type consumer struct {
outq *jsOutQ
pending map[uint64]*Pending
ptmr *time.Timer
ptmrEnd time.Time
rdq []uint64
rdqi avl.SequenceSet
rdc map[uint64]uint64
Expand Down Expand Up @@ -1508,7 +1509,7 @@ func (o *consumer) setLeader(isLeader bool) {
// Stop any unpause timers. Should only be running on leaders.
stopAndClearTimer(&o.uptmr)
// Make sure to clear out any re-deliver queues
stopAndClearTimer(&o.ptmr)
o.stopAndClearPtmr()
o.rdq = nil
o.rdqi.Empty()
o.pending = nil
Expand Down Expand Up @@ -1979,7 +1980,7 @@ func (o *consumer) forceExpirePending() {
p.Timestamp += off
}
}
o.ptmr.Reset(o.ackWait(0))
o.resetPtmr(o.ackWait(0))
}
o.signalNewMessages()
}
Expand Down Expand Up @@ -2128,7 +2129,7 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
// AckWait
if cfg.AckWait != o.cfg.AckWait {
if o.ptmr != nil {
o.ptmr.Reset(100 * time.Millisecond)
o.resetPtmr(100 * time.Millisecond)
}
}
// Rate Limit
Expand Down Expand Up @@ -2675,7 +2676,7 @@ func (o *consumer) processNak(sseq, dseq, dc uint64, nak []byte) {
if o.ptmr != nil {
// Want checkPending to run and figure out the next timer ttl.
// TODO(dlc) - We could optimize this maybe a bit more and track when we expect the timer to fire.
o.ptmr.Reset(10 * time.Millisecond)
o.resetPtmr(10 * time.Millisecond)
}
}
// Nothing else for use to do now so return.
Expand Down Expand Up @@ -2809,11 +2810,7 @@ func (o *consumer) applyState(state *ConsumerState) {
if o.cfg.AckWait < delay {
delay = o.ackWait(0)
}
if o.ptmr == nil {
o.ptmr = time.AfterFunc(delay, o.checkPending)
} else {
o.ptmr.Reset(delay)
}
o.resetPtmr(delay)
}
}

Expand Down Expand Up @@ -4903,9 +4900,24 @@ func (o *consumer) trackPending(sseq, dseq uint64) {
if o.pending == nil {
o.pending = make(map[uint64]*Pending)
}
if o.ptmr == nil {
o.ptmr = time.AfterFunc(o.ackWait(0), o.checkPending)

// We could have a backoff that set a timer higher than what we need for this message.
// In that case, reset to lowest backoff required for a message redelivery.
minDelay := o.ackWait(0)
if l := len(o.cfg.BackOff); l > 0 {
bi := int(o.rdc[sseq])
if bi < 0 {
bi = 0
} else if bi >= l {
bi = l - 1
}
minDelay = o.ackWait(o.cfg.BackOff[bi])
}
minDeadline := time.Now().Add(minDelay)
if o.ptmr == nil || o.ptmrEnd.After(minDeadline) {
o.resetPtmr(minDelay)
}

if p, ok := o.pending[sseq]; ok {
// Update timestamp but keep original consumer delivery sequence.
// So do not update p.Sequence.
Expand Down Expand Up @@ -5028,24 +5040,21 @@ func (o *consumer) removeFromRedeliverQueue(seq uint64) bool {

// Checks the pending messages.
func (o *consumer) checkPending() {
o.mu.RLock()
o.mu.Lock()
defer o.mu.Unlock()

mset := o.mset
// On stop, mset and timer will be nil.
if o.closed || mset == nil || o.ptmr == nil {
stopAndClearTimer(&o.ptmr)
o.mu.RUnlock()
o.stopAndClearPtmr()
return
}
o.mu.RUnlock()

var shouldUpdateState bool
var state StreamState
mset.store.FastState(&state)
fseq := state.FirstSeq

o.mu.Lock()
defer o.mu.Unlock()

now := time.Now().UnixNano()
ttl := int64(o.cfg.AckWait)
next := int64(o.ackWait(0))
Expand All @@ -5061,11 +5070,7 @@ func (o *consumer) checkPending() {
check := len(o.pending) > 1024
for seq, p := range o.pending {
if check && atomic.LoadInt64(&o.awl) > 0 {
if o.ptmr == nil {
o.ptmr = time.AfterFunc(100*time.Millisecond, o.checkPending)
} else {
o.ptmr.Reset(100 * time.Millisecond)
}
o.resetPtmr(100 * time.Millisecond)
return
}
// Check if these are no longer valid.
Expand Down Expand Up @@ -5132,15 +5137,10 @@ func (o *consumer) checkPending() {
}

if len(o.pending) > 0 {
delay := time.Duration(next)
if o.ptmr == nil {
o.ptmr = time.AfterFunc(delay, o.checkPending)
} else {
o.ptmr.Reset(o.ackWait(delay))
}
o.resetPtmr(time.Duration(next))
} else {
// Make sure to stop timer and clear out any re delivery queues
stopAndClearTimer(&o.ptmr)
o.stopAndClearPtmr()
o.rdq = nil
o.rdqi.Empty()
o.pending = nil
Expand Down Expand Up @@ -5626,7 +5626,7 @@ func (o *consumer) stopWithFlags(dflag, sdflag, doSignal, advisory bool) error {
o.client = nil
sysc := o.sysc
o.sysc = nil
stopAndClearTimer(&o.ptmr)
o.stopAndClearPtmr()
stopAndClearTimer(&o.dtmr)
stopAndClearTimer(&o.gwdtmr)
delivery := o.cfg.DeliverSubject
Expand Down Expand Up @@ -6049,3 +6049,17 @@ func (o *consumer) checkStateForInterestStream(ss *StreamState) error {
}
return nil
}

func (o *consumer) resetPtmr(delay time.Duration) {
if o.ptmr == nil {
o.ptmr = time.AfterFunc(delay, o.checkPending)
} else {
o.ptmr.Reset(delay)
}
o.ptmrEnd = time.Now().Add(delay)
}

func (o *consumer) stopAndClearPtmr() {
stopAndClearTimer(&o.ptmr)
o.ptmrEnd = time.Time{}
}
70 changes: 70 additions & 0 deletions server/jetstream_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2386,3 +2386,73 @@ func Benchmark____JetStreamConsumerIsFilteredMatch(b *testing.B) {
})
}
}

// https://github.com/nats-io/nats-server/issues/6085
func TestJetStreamConsumerBackoffNotRespectedWithMultipleInflightRedeliveries(t *testing.T) {
s := RunBasicJetStreamServer(t)
defer s.Shutdown()

nc, js := jsClientConnect(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"events.>"},
})
require_NoError(t, err)

maxDeliver := 3
backoff := []time.Duration{2 * time.Second, 4 * time.Second}
sub, err := js.SubscribeSync(
"events.>",
nats.MaxDeliver(maxDeliver),
nats.BackOff(backoff),
nats.AckExplicit(),
)
require_NoError(t, err)

calculateExpectedBackoff := func(numDelivered int) time.Duration {
expectedBackoff := 500 * time.Millisecond
for i := 0; i < numDelivered-1 && i < len(backoff); i++ {
expectedBackoff += backoff[i]
}
return expectedBackoff
}

// We get one message to be redelivered using the final backoff duration.
firstMsgSent := time.Now()
sendStreamMsg(t, nc, "events.first", "msg-1")
_, err = sub.NextMsg(time.Second)
require_NoError(t, err)
require_LessThan(t, time.Since(firstMsgSent), calculateExpectedBackoff(1))
_, err = sub.NextMsg(5 * time.Second)
require_NoError(t, err)
require_LessThan(t, time.Since(firstMsgSent), calculateExpectedBackoff(2))
// This message will be redelivered with the final/highest backoff below.

// If we now send a new message, the pending timer should be reset to the first backoff.
// Otherwise, if it remains at the final backoff duration we'll get this message redelivered too late.
sendStreamMsg(t, nc, "events.second", "msg-2")

for {
msg, err := sub.NextMsg(5 * time.Second)
require_NoError(t, err)
if msg.Subject == "events.first" {
require_LessThan(t, time.Since(firstMsgSent), calculateExpectedBackoff(3))
continue
}

// We expect the second message to be redelivered using the specified backoff strategy.
// Before, the first redelivery of the second message would be sent after the highest backoff duration.
metadata, err := msg.Metadata()
require_NoError(t, err)
numDelivered := int(metadata.NumDelivered)
expectedBackoff := calculateExpectedBackoff(numDelivered)
require_LessThan(t, time.Since(metadata.Timestamp), expectedBackoff)

// We've received all message, test passed.
if numDelivered >= maxDeliver {
break
}
}
}

0 comments on commit 7ac6d48

Please sign in to comment.