Skip to content

Commit

Permalink
[FIXED] Ghost consumers during meta recovery (#6092)
Browse files Browse the repository at this point in the history
During meta recovery `ru.updateConsumers` and `ru.removeConsumers` would
not be properly cleared since the move from
`map[string]*consumerAssignment` to
`map[string]map[string]*consumerAssignment`. Which meant that consumers
that needed to be removed were both in `ru.removeConsumers` and left in
`ru.updateConsumers`. Resulting in a ghost consumer.

Also don't clear recovering state while we still have items to process
as part of recovery.

De-flakes `TestJetStreamClusterLostConsumers`, and makes
`TestJetStreamClusterConsumerLeak` more reliable by re-introducing the
`ca.pending` flag. Since the consumer leader responds for consumer
creation, but meta leader responds for consumer deletion, so need to
have the consumer assignment available so meta leader can respond
successfully.

Signed-off-by: Maurice van Veen <[email protected]>

---------

Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
MauriceVanVeen authored Nov 9, 2024
1 parent eed9455 commit 37d4461
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 25 deletions.
57 changes: 36 additions & 21 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,10 @@ type streamAssignment struct {
Reply string `json:"reply"`
Restore *StreamState `json:"restore_state,omitempty"`
// Internal
consumers map[string]*consumerAssignment
pendingConsumers map[string]struct{}
responded bool
recovering bool
err error
consumers map[string]*consumerAssignment
responded bool
recovering bool
err error
}

// consumerAssignment is what the meta controller uses to assign consumers to streams.
Expand All @@ -159,6 +158,7 @@ type consumerAssignment struct {
// Internal
responded bool
recovering bool
pending bool
deleted bool
err error
}
Expand Down Expand Up @@ -1376,8 +1376,6 @@ func (js *jetStream) monitorCluster() {
ces := aq.pop()
for _, ce := range ces {
if ce == nil {
// Signals we have replayed all of our metadata.
js.clearMetaRecovering()
// Process any removes that are still valid after recovery.
for _, cas := range ru.removeConsumers {
for _, ca := range cas {
Expand All @@ -1401,6 +1399,8 @@ func (js *jetStream) monitorCluster() {
js.processConsumerAssignment(ca)
}
}
// Signals we have replayed all of our metadata.
js.clearMetaRecovering()
// Clear.
ru = nil
s.Debugf("Recovered JetStream cluster metadata")
Expand Down Expand Up @@ -1552,6 +1552,11 @@ func (js *jetStream) metaSnapshot() []byte {
Consumers: make([]*consumerAssignment, 0, len(sa.consumers)),
}
for _, ca := range sa.consumers {
// Skip if the consumer is pending, we can't include it in our snapshot.
// If the proposal fails after we marked it pending, it would result in a ghost consumer.
if ca.pending {
continue
}
wsa.Consumers = append(wsa.Consumers, ca)
}
streams = append(streams, wsa)
Expand Down Expand Up @@ -1650,9 +1655,10 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
if isRecovering {
key := sa.recoveryKey()
ru.removeStreams[key] = sa
delete(ru.updateConsumers, key)
delete(ru.addStreams, key)
delete(ru.updateStreams, key)
delete(ru.updateConsumers, key)
delete(ru.removeConsumers, key)
} else {
js.processStreamRemoval(sa)
}
Expand Down Expand Up @@ -1693,7 +1699,9 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
ru.removeConsumers[skey] = map[string]*consumerAssignment{}
}
ru.removeConsumers[skey][key] = ca
delete(ru.updateConsumers, key)
if consumers, ok := ru.updateConsumers[skey]; ok {
delete(consumers, key)
}
} else {
js.processConsumerRemoval(ca)
}
Expand All @@ -1703,7 +1711,9 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove
if isRecovering {
key := ca.recoveryKey()
skey := ca.streamRecoveryKey()
delete(ru.removeConsumers, key)
if consumers, ok := ru.removeConsumers[skey]; ok {
delete(consumers, key)
}
if _, ok := ru.updateConsumers[skey]; !ok {
ru.updateConsumers[skey] = map[string]*consumerAssignment{}
}
Expand Down Expand Up @@ -1980,6 +1990,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
delete(ru.addStreams, key)
delete(ru.updateStreams, key)
delete(ru.updateConsumers, key)
delete(ru.removeConsumers, key)
} else {
js.processStreamRemoval(sa)
didRemoveStream = true
Expand All @@ -1994,7 +2005,9 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
js.setConsumerAssignmentRecovering(ca)
key := ca.recoveryKey()
skey := ca.streamRecoveryKey()
delete(ru.removeConsumers, key)
if consumers, ok := ru.removeConsumers[skey]; ok {
delete(consumers, key)
}
if _, ok := ru.updateConsumers[skey]; !ok {
ru.updateConsumers[skey] = map[string]*consumerAssignment{}
}
Expand All @@ -2012,7 +2025,9 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
js.setConsumerAssignmentRecovering(ca)
key := ca.recoveryKey()
skey := ca.streamRecoveryKey()
delete(ru.removeConsumers, key)
if consumers, ok := ru.removeConsumers[skey]; ok {
delete(consumers, key)
}
if _, ok := ru.updateConsumers[skey]; !ok {
ru.updateConsumers[skey] = map[string]*consumerAssignment{}
}
Expand All @@ -2034,7 +2049,9 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo
ru.removeConsumers[skey] = map[string]*consumerAssignment{}
}
ru.removeConsumers[skey][key] = ca
delete(ru.updateConsumers, key)
if consumers, ok := ru.updateConsumers[skey]; ok {
delete(consumers, key)
}
} else {
js.processConsumerRemoval(ca)
didRemoveConsumer = true
Expand Down Expand Up @@ -4265,10 +4282,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
// Place into our internal map under the stream assignment.
// Ok to replace an existing one, we check on process call below.
sa.consumers[ca.Name] = ca
delete(sa.pendingConsumers, ca.Name)
if len(sa.pendingConsumers) == 0 {
sa.pendingConsumers = nil
}
ca.pending = false
js.mu.Unlock()

acc, err := s.LookupAccount(accName)
Expand Down Expand Up @@ -7420,7 +7434,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
}
if maxc > 0 {
// Don't count DIRECTS.
total := len(sa.pendingConsumers)
total := 0
for cn, ca := range sa.consumers {
if action == ActionCreateOrUpdate {
// If the consumer name is specified and we think it already exists, then
Expand Down Expand Up @@ -7681,10 +7695,11 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
// Do formal proposal.
if err := cc.meta.Propose(encodeAddConsumerAssignment(ca)); err == nil {
// Mark this as pending.
if sa.pendingConsumers == nil {
sa.pendingConsumers = make(map[string]struct{})
if sa.consumers == nil {
sa.consumers = make(map[string]*consumerAssignment)
}
sa.pendingConsumers[ca.Name] = struct{}{}
ca.pending = true
sa.consumers[ca.Name] = ca
}
}

Expand Down
50 changes: 50 additions & 0 deletions server/jetstream_cluster_1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6614,6 +6614,56 @@ func TestJetStreamClusterMetaRecoveryRecreateFileStreamAsMemory(t *testing.T) {
require_Len(t, len(ru.updateConsumers), 1)
}

func TestJetStreamClusterMetaRecoveryConsumerCreateAndRemove(t *testing.T) {
tests := []struct {
title string
encodeAddConsumerAssignment func(ca *consumerAssignment) []byte
}{
{title: "simple", encodeAddConsumerAssignment: encodeAddConsumerAssignment},
{title: "compressed", encodeAddConsumerAssignment: encodeAddConsumerAssignmentCompressed},
}
for _, test := range tests {
t.Run(test.title, func(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

js := c.leader().getJetStream()

ca := &consumerAssignment{Stream: "TEST", Name: "consumer"}
createConsumer := []*Entry{{EntryNormal, test.encodeAddConsumerAssignment(ca)}}
deleteConsumer := []*Entry{{EntryNormal, encodeDeleteConsumerAssignment(ca)}}

// Need to be recovering so that we accumulate recoveryUpdates.
js.setMetaRecovering()
ru := &recoveryUpdates{
removeStreams: make(map[string]*streamAssignment),
removeConsumers: make(map[string]map[string]*consumerAssignment),
addStreams: make(map[string]*streamAssignment),
updateStreams: make(map[string]*streamAssignment),
updateConsumers: make(map[string]map[string]*consumerAssignment),
}

// Creating the consumer should append to update consumers list.
_, _, _, err := js.applyMetaEntries(createConsumer, ru)
require_NoError(t, err)
require_Len(t, len(ru.updateConsumers[":TEST"]), 1)
require_Len(t, len(ru.removeConsumers), 0)

// Deleting the consumer should append to remove consumers list and remove from update list.
_, _, _, err = js.applyMetaEntries(deleteConsumer, ru)
require_NoError(t, err)
require_Len(t, len(ru.removeConsumers[":TEST"]), 1)
require_Len(t, len(ru.updateConsumers[":TEST"]), 0)

// When re-creating the consumer, add to update list and remove from remove list.
_, _, _, err = js.applyMetaEntries(createConsumer, ru)
require_NoError(t, err)
require_Len(t, len(ru.updateConsumers[":TEST"]), 1)
require_Len(t, len(ru.removeConsumers[":TEST"]), 0)
})
}
}

//
// DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times)
// Add at the end of jetstream_cluster_<n>_test.go, with <n> being the highest value.
Expand Down
4 changes: 3 additions & 1 deletion server/jetstream_cluster_2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2077,7 +2077,9 @@ func TestJetStreamClusterMaxConsumersMultipleConcurrentRequests(t *testing.T) {
mjs := metaLeader.getJetStream()
sa := mjs.streamAssignment(globalAccountName, "MAXCC")
require_NotNil(t, sa)
require_True(t, sa.pendingConsumers == nil)
for _, ca := range sa.consumers {
require_False(t, ca.pending)
}
}

func TestJetStreamClusterAccountMaxStreamsAndConsumersMultipleConcurrentRequests(t *testing.T) {
Expand Down
7 changes: 4 additions & 3 deletions server/jetstream_cluster_3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2385,18 +2385,19 @@ func TestJetStreamClusterLostConsumers(t *testing.T) {
Stream: "TEST",
Config: ConsumerConfig{
AckPolicy: AckExplicit,
Replicas: 1,
},
}
req, err := json.Marshal(cc)
require_NoError(t, err)

reqSubj := fmt.Sprintf(JSApiConsumerCreateT, "TEST")

// Now create 50 consumers. We do not wait for the answer.
// Now create 50 consumers. Ensure they are successfully created, so they're included in our snapshot.
for i := 0; i < 50; i++ {
nc.Publish(reqSubj, req)
_, err = nc.Request(reqSubj, req, time.Second)
require_NoError(t, err)
}
nc.Flush()

// Grab the meta leader.
ml := c.leader()
Expand Down
51 changes: 51 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4390,3 +4390,54 @@ func TestJetStreamClusterKeepRaftStateIfStreamCreationFailedDuringShutdown(t *te
require_NoError(t, err)
require_True(t, len(files) > 0)
}

func TestJetStreamClusterMetaSnapshotMustNotIncludePendingConsumers(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

nc, js := jsClientConnect(t, c.randomServer())
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Replicas: 3})
require_NoError(t, err)

// We're creating an R3 consumer, just so we can copy its state and turn it into pending below.
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Name: "consumer", Replicas: 3})
require_NoError(t, err)
nc.Close()

// Bypass normal API so we can simulate having a consumer pending to be created.
// A snapshot should never create pending consumers, as that would result
// in ghost consumers if the meta proposal failed.
ml := c.leader()
mjs := ml.getJetStream()
cc := mjs.cluster
consumers := cc.streams[globalAccountName]["TEST"].consumers
sampleCa := *consumers["consumer"]
sampleCa.Name, sampleCa.pending = "pending-consumer", true
consumers[sampleCa.Name] = &sampleCa

// Create snapshot, this should not contain pending consumers.
snap := mjs.metaSnapshot()

ru := &recoveryUpdates{
removeStreams: make(map[string]*streamAssignment),
removeConsumers: make(map[string]map[string]*consumerAssignment),
addStreams: make(map[string]*streamAssignment),
updateStreams: make(map[string]*streamAssignment),
updateConsumers: make(map[string]map[string]*consumerAssignment),
}
err = mjs.applyMetaSnapshot(snap, ru, true)
require_NoError(t, err)
require_Len(t, len(ru.updateStreams), 1)
for _, sa := range ru.updateStreams {
for _, ca := range sa.consumers {
require_NotEqual(t, ca.Name, "pending-consumer")
}
}
for _, cas := range ru.updateConsumers {
for _, ca := range cas {
require_NotEqual(t, ca.Name, "pending-consumer")
}
}
}
24 changes: 24 additions & 0 deletions server/norace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6677,6 +6677,7 @@ func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) {
time.Sleep(5 * time.Second)
cancel()

// Check we don't report missing consumers.
subj := fmt.Sprintf(JSApiConsumerListT, "TEST")
checkFor(t, 20*time.Second, 200*time.Millisecond, func() error {
// Request will take at most 4 seconds if some consumers can't be found.
Expand All @@ -6691,6 +6692,29 @@ func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) {
}
return fmt.Errorf("Still have missing: %+v", resp.Missing)
})

// Also check all servers agree on the available consumer assignments.
// It could be the above check passes, i.e. our meta leader thinks all is okay, but other servers actually drifted.
checkFor(t, 5*time.Second, 200*time.Millisecond, func() error {
var previousConsumers []string
for _, s := range c.servers {
sjs := s.getJetStream()
sjs.mu.Lock()
cc := sjs.cluster
sa := cc.streams[globalAccountName]["TEST"]
var consumers []string
for cName, _ := range sa.consumers {
consumers = append(consumers, cName)
}
sjs.mu.Unlock()
slices.Sort(consumers)
if previousConsumers != nil && !slices.Equal(previousConsumers, consumers) {
return fmt.Errorf("Consumer mismatch:\n- previous: %v\n- actual : %v\n", previousConsumers, consumers)
}
previousConsumers = consumers
}
return nil
})
}

// This is to test a publish slowdown and general instability experienced in a setup similar to this.
Expand Down

0 comments on commit 37d4461

Please sign in to comment.