Skip to content

Commit

Permalink
[FIXED] Don't SendSnapshot on becoming consumer leader (#6151)
Browse files Browse the repository at this point in the history
We could have an empty apply queue length, but have stored uncommitted
entries. If we then call `SendSnapshot` when becoming consumer leader we
would be reverting back to previous state.

This was also an issue for meta leader changes, which was fixed in
#5700. This PR fixes it for
consumer leader changes.

Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
derekcollison authored Nov 20, 2024
2 parents 1e017cd + c0e0046 commit 28c581b
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 11 deletions.
12 changes: 1 addition & 11 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -4970,23 +4970,13 @@ func (js *jetStream) monitorConsumer(o *consumer, ca *consumerAssignment) {
}

// Process the change.
if err := js.processConsumerLeaderChange(o, isLeader); err == nil && isLeader {
if err := js.processConsumerLeaderChange(o, isLeader); err == nil {
// Check our state if we are under an interest based stream.
if mset := o.getStream(); mset != nil {
var ss StreamState
mset.store.FastState(&ss)
o.checkStateForInterestStream(&ss)
}
// Do a snapshot.
doSnapshot(true)
// Synchronize followers to our state. Only send out if we have state and nothing pending.
if n != nil {
if _, _, applied := n.Progress(); applied > 0 && aq.len() == 0 {
if snap, err := o.store.EncodedState(); err == nil {
n.SendSnapshot(snap)
}
}
}
}

// We may receive a leader change after the consumer assignment which would cancel us
Expand Down
128 changes: 128 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package server

import (
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -4457,3 +4458,130 @@ func TestJetStreamClusterMetaSnapshotMustNotIncludePendingConsumers(t *testing.T
}
}
}

func TestJetStreamClusterConsumerDontSendSnapshotOnLeaderChange(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
Replicas: 3,
})
require_NoError(t, err)

_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{
Durable: "CONSUMER",
Replicas: 3,
AckPolicy: nats.AckExplicitPolicy,
})
require_NoError(t, err)

// Add a message and let the consumer ack it, this moves the consumer's RAFT applied up to 1.
_, err = js.Publish("foo", nil)
require_NoError(t, err)
sub, err := js.PullSubscribe("foo", "CONSUMER")
require_NoError(t, err)
msgs, err := sub.Fetch(1)
require_NoError(t, err)
require_Len(t, len(msgs), 1)
err = msgs[0].AckSync()
require_NoError(t, err)

// We don't need the client anymore.
nc.Close()

lookupConsumer := func(s *Server) *consumer {
t.Helper()
mset, err := s.lookupAccount(globalAccountName)
require_NoError(t, err)
acc, err := mset.lookupStream("TEST")
require_NoError(t, err)
o := acc.lookupConsumer("CONSUMER")
require_NotNil(t, o)
return o
}

// Grab current consumer leader before moving all into observer mode.
cl := c.consumerLeader(globalAccountName, "TEST", "CONSUMER")
for _, s := range c.servers {
// Put all consumer's RAFT into observer mode, this will prevent all servers from trying to become leader.
o := lookupConsumer(s)
o.node.SetObserver(true)
if s != cl {
// For all followers, pause apply so they only store messages in WAL but not apply and possibly snapshot.
err = o.node.PauseApply()
require_NoError(t, err)
}
}

updateDeliveredBuffer := func() []byte {
var b [4*binary.MaxVarintLen64 + 1]byte
b[0] = byte(updateDeliveredOp)
n := 1
n += binary.PutUvarint(b[n:], 100)
n += binary.PutUvarint(b[n:], 100)
n += binary.PutUvarint(b[n:], 1)
n += binary.PutVarint(b[n:], time.Now().UnixNano())
return b[:n]
}

updateAcksBuffer := func() []byte {
var b [2*binary.MaxVarintLen64 + 1]byte
b[0] = byte(updateAcksOp)
n := 1
n += binary.PutUvarint(b[n:], 100)
n += binary.PutUvarint(b[n:], 100)
return b[:n]
}

// Store an uncommitted entry into our WAL, which will be committed and applied later.
co := lookupConsumer(cl)
rn := co.node.(*raft)
rn.Lock()
entries := []*Entry{{EntryNormal, updateDeliveredBuffer()}, {EntryNormal, updateAcksBuffer()}}
ae := encode(t, rn.buildAppendEntry(entries))
err = rn.storeToWAL(ae)
minPindex := rn.pindex
rn.Unlock()
require_NoError(t, err)

// Simulate leader change, we do this so we can check what happens in the upper layer logic.
rn.leadc <- true
rn.SetObserver(false)

// Since upper layer is async, we don't know whether it will or will not act on the leader change.
// Wait for some time to check if it does.
time.Sleep(2 * time.Second)
rn.RLock()
maxPindex := rn.pindex
rn.RUnlock()

r := c.randomNonConsumerLeader(globalAccountName, "TEST", "CONSUMER")
ro := lookupConsumer(r)
rn = ro.node.(*raft)

checkFor(t, 5*time.Second, time.Second, func() error {
rn.RLock()
defer rn.RUnlock()
if rn.pindex < maxPindex {
return fmt.Errorf("rn.pindex too low, expected %d, got %d", maxPindex, rn.pindex)
}
return nil
})

// We should only have 'Normal' entries.
// If we'd get a 'Snapshot' entry, that would mean it had incomplete state and would be reverting committed state.
var state StreamState
rn.wal.FastState(&state)
for seq := minPindex; seq <= maxPindex; seq++ {
ae, err = rn.loadEntry(seq)
require_NoError(t, err)
for _, entry := range ae.entries {
require_Equal(t, entry.Type, EntryNormal)
}
}
}

0 comments on commit 28c581b

Please sign in to comment.