Skip to content

Commit

Permalink
[FIXED] Ghost consumers after failed meta proposal (#6088)
Browse files Browse the repository at this point in the history
If the `cc.meta.Propose` fails, for example due to restarts, the
`sa.consumers[ca.Name] = ca` would have staged a consumer that will
never be successfully added. That consumer would then become part of the
next snapshot (for example during shutdown), which meant that that
server had a ghost consumer it was unable to clean up.

This does not fully solve the ghost consumers issue, but this should
contribute to the final solution.

Signed-off-by: Maurice van Veen <[email protected]>
  • Loading branch information
derekcollison authored Nov 7, 2024
2 parents 8f16f1b + 3c0afd2 commit 73a481d
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 12 deletions.
29 changes: 17 additions & 12 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,11 @@ type streamAssignment struct {
Reply string `json:"reply"`
Restore *StreamState `json:"restore_state,omitempty"`
// Internal
consumers map[string]*consumerAssignment
responded bool
recovering bool
err error
consumers map[string]*consumerAssignment
pendingConsumers map[string]struct{}
responded bool
recovering bool
err error
}

// consumerAssignment is what the meta controller uses to assign consumers to streams.
Expand Down Expand Up @@ -4264,6 +4265,10 @@ func (js *jetStream) processConsumerAssignment(ca *consumerAssignment) {
// Place into our internal map under the stream assignment.
// Ok to replace an existing one, we check on process call below.
sa.consumers[ca.Name] = ca
delete(sa.pendingConsumers, ca.Name)
if len(sa.pendingConsumers) == 0 {
sa.pendingConsumers = nil
}
js.mu.Unlock()

acc, err := s.LookupAccount(accName)
Expand Down Expand Up @@ -7415,7 +7420,7 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
}
if maxc > 0 {
// Don't count DIRECTS.
total := 0
total := len(sa.pendingConsumers)
for cn, ca := range sa.consumers {
if action == ActionCreateOrUpdate {
// If the consumer name is specified and we think it already exists, then
Expand Down Expand Up @@ -7673,14 +7678,14 @@ func (s *Server) jsClusteredConsumerRequest(ci *ClientInfo, acc *Account, subjec
ca = nca
}

// Mark this as pending.
if sa.consumers == nil {
sa.consumers = make(map[string]*consumerAssignment)
}
sa.consumers[ca.Name] = ca

// Do formal proposal.
cc.meta.Propose(encodeAddConsumerAssignment(ca))
if err := cc.meta.Propose(encodeAddConsumerAssignment(ca)); err == nil {
// Mark this as pending.
if sa.pendingConsumers == nil {
sa.pendingConsumers = make(map[string]struct{})
}
sa.pendingConsumers[ca.Name] = struct{}{}
}
}

func encodeAddConsumerAssignment(ca *consumerAssignment) []byte {
Expand Down
6 changes: 6 additions & 0 deletions server/jetstream_cluster_2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2072,6 +2072,12 @@ func TestJetStreamClusterMaxConsumersMultipleConcurrentRequests(t *testing.T) {
if nc := len(names); nc > 1 {
t.Fatalf("Expected only 1 consumer, got %d", nc)
}

metaLeader := c.leader()
mjs := metaLeader.getJetStream()
sa := mjs.streamAssignment(globalAccountName, "MAXCC")
require_NotNil(t, sa)
require_True(t, sa.pendingConsumers == nil)
}

func TestJetStreamClusterAccountMaxStreamsAndConsumersMultipleConcurrentRequests(t *testing.T) {
Expand Down

0 comments on commit 73a481d

Please sign in to comment.