From 37d4461b24e23ffd95d910c86d7e9af6d3b8403d Mon Sep 17 00:00:00 2001 From: Maurice van Veen Date: Sat, 9 Nov 2024 03:39:15 +0100 Subject: [PATCH] [FIXED] Ghost consumers during meta recovery (#6092) During meta recovery `ru.updateConsumers` and `ru.removeConsumers` would not be properly cleared since the move from `map[string]*consumerAssignment` to `map[string]map[string]*consumerAssignment`. Which meant that consumers that needed to be removed were both in `ru.removeConsumers` and left in `ru.updateConsumers`. Resulting in a ghost consumer. Also don't clear recovering state while we still have items to process as part of recovery. De-flakes `TestJetStreamClusterLostConsumers`, and makes `TestJetStreamClusterConsumerLeak` more reliable by re-introducing the `ca.pending` flag. Since the consumer leader responds for consumer creation, but meta leader responds for consumer deletion, so need to have the consumer assignment available so meta leader can respond successfully. Signed-off-by: Maurice van Veen --------- Signed-off-by: Maurice van Veen --- server/jetstream_cluster.go | 57 +++++++++++++++++++----------- server/jetstream_cluster_1_test.go | 50 ++++++++++++++++++++++++++ server/jetstream_cluster_2_test.go | 4 ++- server/jetstream_cluster_3_test.go | 7 ++-- server/jetstream_cluster_4_test.go | 51 ++++++++++++++++++++++++++ server/norace_test.go | 24 +++++++++++++ 6 files changed, 168 insertions(+), 25 deletions(-) diff --git a/server/jetstream_cluster.go b/server/jetstream_cluster.go index c4430829b74..cf5321dfb1d 100644 --- a/server/jetstream_cluster.go +++ b/server/jetstream_cluster.go @@ -138,11 +138,10 @@ type streamAssignment struct { Reply string `json:"reply"` Restore *StreamState `json:"restore_state,omitempty"` // Internal - consumers map[string]*consumerAssignment - pendingConsumers map[string]struct{} - responded bool - recovering bool - err error + consumers map[string]*consumerAssignment + responded bool + recovering bool + err error } // consumerAssignment is what the meta controller uses to assign consumers to streams. @@ -159,6 +158,7 @@ type consumerAssignment struct { // Internal responded bool recovering bool + pending bool deleted bool err error } @@ -1376,8 +1376,6 @@ func (js *jetStream) monitorCluster() { ces := aq.pop() for _, ce := range ces { if ce == nil { - // Signals we have replayed all of our metadata. - js.clearMetaRecovering() // Process any removes that are still valid after recovery. for _, cas := range ru.removeConsumers { for _, ca := range cas { @@ -1401,6 +1399,8 @@ func (js *jetStream) monitorCluster() { js.processConsumerAssignment(ca) } } + // Signals we have replayed all of our metadata. + js.clearMetaRecovering() // Clear. ru = nil s.Debugf("Recovered JetStream cluster metadata") @@ -1552,6 +1552,11 @@ func (js *jetStream) metaSnapshot() []byte { Consumers: make([]*consumerAssignment, 0, len(sa.consumers)), } for _, ca := range sa.consumers { + // Skip if the consumer is pending, we can't include it in our snapshot. + // If the proposal fails after we marked it pending, it would result in a ghost consumer. + if ca.pending { + continue + } wsa.Consumers = append(wsa.Consumers, ca) } streams = append(streams, wsa) @@ -1650,9 +1655,10 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove if isRecovering { key := sa.recoveryKey() ru.removeStreams[key] = sa - delete(ru.updateConsumers, key) delete(ru.addStreams, key) delete(ru.updateStreams, key) + delete(ru.updateConsumers, key) + delete(ru.removeConsumers, key) } else { js.processStreamRemoval(sa) } @@ -1693,7 +1699,9 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove ru.removeConsumers[skey] = map[string]*consumerAssignment{} } ru.removeConsumers[skey][key] = ca - delete(ru.updateConsumers, key) + if consumers, ok := ru.updateConsumers[skey]; ok { + delete(consumers, key) + } } else { js.processConsumerRemoval(ca) } @@ -1703,7 +1711,9 @@ func (js *jetStream) applyMetaSnapshot(buf []byte, ru *recoveryUpdates, isRecove if isRecovering { key := ca.recoveryKey() skey := ca.streamRecoveryKey() - delete(ru.removeConsumers, key) + if consumers, ok := ru.removeConsumers[skey]; ok { + delete(consumers, key) + } if _, ok := ru.updateConsumers[skey]; !ok { ru.updateConsumers[skey] = map[string]*consumerAssignment{} } @@ -1980,6 +1990,7 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo delete(ru.addStreams, key) delete(ru.updateStreams, key) delete(ru.updateConsumers, key) + delete(ru.removeConsumers, key) } else { js.processStreamRemoval(sa) didRemoveStream = true @@ -1994,7 +2005,9 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo js.setConsumerAssignmentRecovering(ca) key := ca.recoveryKey() skey := ca.streamRecoveryKey() - delete(ru.removeConsumers, key) + if consumers, ok := ru.removeConsumers[skey]; ok { + delete(consumers, key) + } if _, ok := ru.updateConsumers[skey]; !ok { ru.updateConsumers[skey] = map[string]*consumerAssignment{} } @@ -2012,7 +2025,9 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo js.setConsumerAssignmentRecovering(ca) key := ca.recoveryKey() skey := ca.streamRecoveryKey() - delete(ru.removeConsumers, key) + if consumers, ok := ru.removeConsumers[skey]; ok { + delete(consumers, key) + } if _, ok := ru.updateConsumers[skey]; !ok { ru.updateConsumers[skey] = map[string]*consumerAssignment{} } @@ -2034,7 +2049,9 @@ func (js *jetStream) applyMetaEntries(entries []*Entry, ru *recoveryUpdates) (bo ru.removeConsumers[skey] = map[string]*consumerAssignment{} } ru.removeConsumers[skey][key] = ca - delete(ru.updateConsumers, key) + if consumers, ok := ru.updateConsumers[skey]; ok { + delete(consumers, key) + } } else { js.processConsumerRemoval(ca) didRemoveConsumer = true @@ -4265,10 +4282,7 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) { // Place into our internal map under the stream assignment. // Ok to replace an existing one, we check on process call below. sa.consumers[ca.Name] = ca - delete(sa.pendingConsumers, ca.Name) - if len(sa.pendingConsumers) == 0 { - sa.pendingConsumers = nil - } + ca.pending = false js.mu.Unlock() acc, err := s.LookupAccount(accName) @@ -7420,7 +7434,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec } if maxc > 0 { // Don't count DIRECTS. - total := len(sa.pendingConsumers) + total := 0 for cn, ca := range sa.consumers { if action == ActionCreateOrUpdate { // If the consumer name is specified and we think it already exists, then @@ -7681,10 +7695,11 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec // Do formal proposal. if err := cc.meta.Propose(encodeAddConsumerAssignment(ca)); err == nil { // Mark this as pending. - if sa.pendingConsumers == nil { - sa.pendingConsumers = make(map[string]struct{}) + if sa.consumers == nil { + sa.consumers = make(map[string]*consumerAssignment) } - sa.pendingConsumers[ca.Name] = struct{}{} + ca.pending = true + sa.consumers[ca.Name] = ca } } diff --git a/server/jetstream_cluster_1_test.go b/server/jetstream_cluster_1_test.go index 31d68eaab1c..14987a7a384 100644 --- a/server/jetstream_cluster_1_test.go +++ b/server/jetstream_cluster_1_test.go @@ -6614,6 +6614,56 @@ func TestJetStreamClusterMetaRecoveryRecreateFileStreamAsMemory(t *testing.T) { require_Len(t, len(ru.updateConsumers), 1) } +func TestJetStreamClusterMetaRecoveryConsumerCreateAndRemove(t *testing.T) { + tests := []struct { + title string + encodeAddConsumerAssignment func(ca *consumerAssignment) []byte + }{ + {title: "simple", encodeAddConsumerAssignment: encodeAddConsumerAssignment}, + {title: "compressed", encodeAddConsumerAssignment: encodeAddConsumerAssignmentCompressed}, + } + for _, test := range tests { + t.Run(test.title, func(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + js := c.leader().getJetStream() + + ca := &consumerAssignment{Stream: "TEST", Name: "consumer"} + createConsumer := []*Entry{{EntryNormal, test.encodeAddConsumerAssignment(ca)}} + deleteConsumer := []*Entry{{EntryNormal, encodeDeleteConsumerAssignment(ca)}} + + // Need to be recovering so that we accumulate recoveryUpdates. + js.setMetaRecovering() + ru := &recoveryUpdates{ + removeStreams: make(map[string]*streamAssignment), + removeConsumers: make(map[string]map[string]*consumerAssignment), + addStreams: make(map[string]*streamAssignment), + updateStreams: make(map[string]*streamAssignment), + updateConsumers: make(map[string]map[string]*consumerAssignment), + } + + // Creating the consumer should append to update consumers list. + _, _, _, err := js.applyMetaEntries(createConsumer, ru) + require_NoError(t, err) + require_Len(t, len(ru.updateConsumers[":TEST"]), 1) + require_Len(t, len(ru.removeConsumers), 0) + + // Deleting the consumer should append to remove consumers list and remove from update list. + _, _, _, err = js.applyMetaEntries(deleteConsumer, ru) + require_NoError(t, err) + require_Len(t, len(ru.removeConsumers[":TEST"]), 1) + require_Len(t, len(ru.updateConsumers[":TEST"]), 0) + + // When re-creating the consumer, add to update list and remove from remove list. + _, _, _, err = js.applyMetaEntries(createConsumer, ru) + require_NoError(t, err) + require_Len(t, len(ru.updateConsumers[":TEST"]), 1) + require_Len(t, len(ru.removeConsumers[":TEST"]), 0) + }) + } +} + // // DO NOT ADD NEW TESTS IN THIS FILE (unless to balance test times) // Add at the end of jetstream_cluster__test.go, with being the highest value. diff --git a/server/jetstream_cluster_2_test.go b/server/jetstream_cluster_2_test.go index 2b0364444f9..bbfe5288df5 100644 --- a/server/jetstream_cluster_2_test.go +++ b/server/jetstream_cluster_2_test.go @@ -2077,7 +2077,9 @@ func TestJetStreamClusterMaxConsumersMultipleConcurrentRequests(t *testing.T) { mjs := metaLeader.getJetStream() sa := mjs.streamAssignment(globalAccountName, "MAXCC") require_NotNil(t, sa) - require_True(t, sa.pendingConsumers == nil) + for _, ca := range sa.consumers { + require_False(t, ca.pending) + } } func TestJetStreamClusterAccountMaxStreamsAndConsumersMultipleConcurrentRequests(t *testing.T) { diff --git a/server/jetstream_cluster_3_test.go b/server/jetstream_cluster_3_test.go index 6fc1102baa3..ed6e265220d 100644 --- a/server/jetstream_cluster_3_test.go +++ b/server/jetstream_cluster_3_test.go @@ -2385,6 +2385,7 @@ func TestJetStreamClusterLostConsumers(t *testing.T) { Stream: "TEST", Config: ConsumerConfig{ AckPolicy: AckExplicit, + Replicas: 1, }, } req, err := json.Marshal(cc) @@ -2392,11 +2393,11 @@ func TestJetStreamClusterLostConsumers(t *testing.T) { reqSubj := fmt.Sprintf(JSApiConsumerCreateT, "TEST") - // Now create 50 consumers. We do not wait for the answer. + // Now create 50 consumers. Ensure they are successfully created, so they're included in our snapshot. for i := 0; i < 50; i++ { - nc.Publish(reqSubj, req) + _, err = nc.Request(reqSubj, req, time.Second) + require_NoError(t, err) } - nc.Flush() // Grab the meta leader. ml := c.leader() diff --git a/server/jetstream_cluster_4_test.go b/server/jetstream_cluster_4_test.go index 79d57fcf758..74d30807fd3 100644 --- a/server/jetstream_cluster_4_test.go +++ b/server/jetstream_cluster_4_test.go @@ -4390,3 +4390,54 @@ func TestJetStreamClusterKeepRaftStateIfStreamCreationFailedDuringShutdown(t *te require_NoError(t, err) require_True(t, len(files) > 0) } + +func TestJetStreamClusterMetaSnapshotMustNotIncludePendingConsumers(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + nc, js := jsClientConnect(t, c.randomServer()) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Replicas: 3}) + require_NoError(t, err) + + // We're creating an R3 consumer, just so we can copy its state and turn it into pending below. + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Name: "consumer", Replicas: 3}) + require_NoError(t, err) + nc.Close() + + // Bypass normal API so we can simulate having a consumer pending to be created. + // A snapshot should never create pending consumers, as that would result + // in ghost consumers if the meta proposal failed. + ml := c.leader() + mjs := ml.getJetStream() + cc := mjs.cluster + consumers := cc.streams[globalAccountName]["TEST"].consumers + sampleCa := *consumers["consumer"] + sampleCa.Name, sampleCa.pending = "pending-consumer", true + consumers[sampleCa.Name] = &sampleCa + + // Create snapshot, this should not contain pending consumers. + snap := mjs.metaSnapshot() + + ru := &recoveryUpdates{ + removeStreams: make(map[string]*streamAssignment), + removeConsumers: make(map[string]map[string]*consumerAssignment), + addStreams: make(map[string]*streamAssignment), + updateStreams: make(map[string]*streamAssignment), + updateConsumers: make(map[string]map[string]*consumerAssignment), + } + err = mjs.applyMetaSnapshot(snap, ru, true) + require_NoError(t, err) + require_Len(t, len(ru.updateStreams), 1) + for _, sa := range ru.updateStreams { + for _, ca := range sa.consumers { + require_NotEqual(t, ca.Name, "pending-consumer") + } + } + for _, cas := range ru.updateConsumers { + for _, ca := range cas { + require_NotEqual(t, ca.Name, "pending-consumer") + } + } +} diff --git a/server/norace_test.go b/server/norace_test.go index 5499cb3ae3b..48375a9cedc 100644 --- a/server/norace_test.go +++ b/server/norace_test.go @@ -6677,6 +6677,7 @@ func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) { time.Sleep(5 * time.Second) cancel() + // Check we don't report missing consumers. subj := fmt.Sprintf(JSApiConsumerListT, "TEST") checkFor(t, 20*time.Second, 200*time.Millisecond, func() error { // Request will take at most 4 seconds if some consumers can't be found. @@ -6691,6 +6692,29 @@ func TestNoRaceJetStreamClusterGhostConsumers(t *testing.T) { } return fmt.Errorf("Still have missing: %+v", resp.Missing) }) + + // Also check all servers agree on the available consumer assignments. + // It could be the above check passes, i.e. our meta leader thinks all is okay, but other servers actually drifted. + checkFor(t, 5*time.Second, 200*time.Millisecond, func() error { + var previousConsumers []string + for _, s := range c.servers { + sjs := s.getJetStream() + sjs.mu.Lock() + cc := sjs.cluster + sa := cc.streams[globalAccountName]["TEST"] + var consumers []string + for cName, _ := range sa.consumers { + consumers = append(consumers, cName) + } + sjs.mu.Unlock() + slices.Sort(consumers) + if previousConsumers != nil && !slices.Equal(previousConsumers, consumers) { + return fmt.Errorf("Consumer mismatch:\n- previous: %v\n- actual : %v\n", previousConsumers, consumers) + } + previousConsumers = consumers + } + return nil + }) } // This is to test a publish slowdown and general instability experienced in a setup similar to this.