Skip to content

Commit

Permalink
[FIXED] LeafNode: queue distribution with daisy-chain and gateway (#6126
Browse files Browse the repository at this point in the history
)

In complex setup, a message produced from a cluster that had queue
interest from leafnodes (either hub or spoke) would sometimes not
deliver a message if the interest was a leafnode that had the interest
on behalf of a gateway.

In the setup described in the issue this PR fixes, "Cluster B" may have
picked "Cluster C", but that cluster does not have local queue interest,
only the leafnode interest from "Cluster B", and would pick a LEAF
connection to this cluster, but then suppress the message since it came
from "B" so "C" cannot send it back there.

But picking a queue sub for "B" in "C" would then prevent the message to
be delivered to the gateway "D".

Resolves #6125

Signed-off-by: Ivan Kozlovic <[email protected]>
  • Loading branch information
derekcollison authored Nov 13, 2024
2 parents a024273 + cd43b70 commit da1dcda
Show file tree
Hide file tree
Showing 2 changed files with 354 additions and 13 deletions.
47 changes: 34 additions & 13 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4752,6 +4752,21 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
// Declared here because of goto.
var queues [][]byte

var leafOrigin string
switch c.kind {
case ROUTER:
if len(c.pa.origin) > 0 {
// Picture a message sent from a leafnode to a server that then routes
// this message: CluserA -leaf-> HUB1 -route-> HUB2
// Here we are in HUB2, so c.kind is a ROUTER, but the message will
// contain a c.pa.origin set to "ClusterA" to indicate that this message
// originated from that leafnode cluster.
leafOrigin = bytesToString(c.pa.origin)
}
case LEAF:
leafOrigin = c.remoteCluster()
}

// For all routes/leaf/gateway connections, we may still want to send messages to
// leaf nodes or routes even if there are no queue filters since we collect
// them above and do not process inline like normal clients.
Expand Down Expand Up @@ -4791,7 +4806,13 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
for i := 0; i < len(qsubs); i++ {
sub = qsubs[i]
if dst := sub.client.kind; dst == LEAF || dst == ROUTER {
// If we have assigned an ROUTER rsub already, replace if
// If the destination is a LEAF, we first need to make sure
// that we would not pick one that was the origin of this
// message.
if dst == LEAF && leafOrigin != _EMPTY_ && leafOrigin == sub.client.remoteCluster() {
continue
}
// If we have assigned a ROUTER rsub already, replace if
// the destination is a LEAF since we want to favor that.
if rsub == nil || (rsub.client.kind == ROUTER && dst == LEAF) {
rsub = sub
Expand All @@ -4817,6 +4838,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
}

// Find a subscription that is able to deliver this message starting at a random index.
// Note that if the message came from a ROUTER, we will only have CLIENT or LEAF
// queue subs here, otherwise we can have all types.
for i := 0; i < lqs; i++ {
if sindex+i < lqs {
sub = qsubs[sindex+i]
Expand All @@ -4837,6 +4860,11 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
// Here we just care about a client or leaf and skipping a leaf and preferring locals.
if dst := sub.client.kind; dst == ROUTER || dst == LEAF {
if (src == LEAF || src == CLIENT) && dst == LEAF {
// If we come from a LEAF and are about to pick a LEAF connection,
// make sure this is not the same leaf cluster.
if src == LEAF && leafOrigin != _EMPTY_ && leafOrigin == sub.client.remoteCluster() {
continue
}
// Remember that leaf in case we don't find any other candidate.
if rsub == nil {
rsub = sub
Expand Down Expand Up @@ -4980,18 +5008,11 @@ sendToRoutesOrLeafs:
// If so make sure we do not send it back to the same cluster for a different
// leafnode. Cluster wide no echo.
if dc.kind == LEAF {
// Check two scenarios. One is inbound from a route (c.pa.origin)
if c.kind == ROUTER && len(c.pa.origin) > 0 {
if bytesToString(c.pa.origin) == dc.remoteCluster() {
continue
}
}
// The other is leaf to leaf.
if c.kind == LEAF {
src, dest := c.remoteCluster(), dc.remoteCluster()
if src != _EMPTY_ && src == dest {
continue
}
// Check two scenarios. One is inbound from a route (c.pa.origin),
// and the other is leaf to leaf. In both case, leafOrigin is the one
// to use for the comparison.
if leafOrigin != _EMPTY_ && leafOrigin == dc.remoteCluster() {
continue
}

// We need to check if this is a request that has a stamped client information header.
Expand Down
Loading

0 comments on commit da1dcda

Please sign in to comment.