Skip to content

Commit

Permalink
[FIXED] LeafNode: queue distribution with daisy-chain and gateway
Browse files Browse the repository at this point in the history
In complex setup, a message produced from a cluster that had queue
interest from leafnodes (either hub or spoke) would sometimes not
deliver a message if the interest was a leafnode that had the
interest on behalf of a gateway.

In the setup described in the issue this PR fixes, "Cluster B"
may have picked "Cluster C", but that cluster does not have local
queue interest, only the leafnode interest from "Cluster B", and
would pick a LEAF connection to this cluster, but then suppress
the message since it came from "B" so "C" cannot send it back there.

But picking a queue sub for "B" in "C" would then prevent the
message to be delivered to the gateway "D".

Resolves #6125

Signed-off-by: Ivan Kozlovic <[email protected]>
  • Loading branch information
kozlovic committed Nov 13, 2024
1 parent a024273 commit cd43b70
Show file tree
Hide file tree
Showing 2 changed files with 354 additions and 13 deletions.
47 changes: 34 additions & 13 deletions server/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4752,6 +4752,21 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
// Declared here because of goto.
var queues [][]byte

var leafOrigin string
switch c.kind {
case ROUTER:
if len(c.pa.origin) > 0 {
// Picture a message sent from a leafnode to a server that then routes
// this message: CluserA -leaf-> HUB1 -route-> HUB2
// Here we are in HUB2, so c.kind is a ROUTER, but the message will
// contain a c.pa.origin set to "ClusterA" to indicate that this message
// originated from that leafnode cluster.
leafOrigin = bytesToString(c.pa.origin)
}
case LEAF:
leafOrigin = c.remoteCluster()
}

// For all routes/leaf/gateway connections, we may still want to send messages to
// leaf nodes or routes even if there are no queue filters since we collect
// them above and do not process inline like normal clients.
Expand Down Expand Up @@ -4791,7 +4806,13 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
for i := 0; i < len(qsubs); i++ {
sub = qsubs[i]
if dst := sub.client.kind; dst == LEAF || dst == ROUTER {
// If we have assigned an ROUTER rsub already, replace if
// If the destination is a LEAF, we first need to make sure
// that we would not pick one that was the origin of this
// message.
if dst == LEAF && leafOrigin != _EMPTY_ && leafOrigin == sub.client.remoteCluster() {
continue
}
// If we have assigned a ROUTER rsub already, replace if
// the destination is a LEAF since we want to favor that.
if rsub == nil || (rsub.client.kind == ROUTER && dst == LEAF) {
rsub = sub
Expand All @@ -4817,6 +4838,8 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
}

// Find a subscription that is able to deliver this message starting at a random index.
// Note that if the message came from a ROUTER, we will only have CLIENT or LEAF
// queue subs here, otherwise we can have all types.
for i := 0; i < lqs; i++ {
if sindex+i < lqs {
sub = qsubs[sindex+i]
Expand All @@ -4837,6 +4860,11 @@ func (c *client) processMsgResults(acc *Account, r *SublistResult, msg, deliver,
// Here we just care about a client or leaf and skipping a leaf and preferring locals.
if dst := sub.client.kind; dst == ROUTER || dst == LEAF {
if (src == LEAF || src == CLIENT) && dst == LEAF {
// If we come from a LEAF and are about to pick a LEAF connection,
// make sure this is not the same leaf cluster.
if src == LEAF && leafOrigin != _EMPTY_ && leafOrigin == sub.client.remoteCluster() {
continue
}
// Remember that leaf in case we don't find any other candidate.
if rsub == nil {
rsub = sub
Expand Down Expand Up @@ -4980,18 +5008,11 @@ sendToRoutesOrLeafs:
// If so make sure we do not send it back to the same cluster for a different
// leafnode. Cluster wide no echo.
if dc.kind == LEAF {
// Check two scenarios. One is inbound from a route (c.pa.origin)
if c.kind == ROUTER && len(c.pa.origin) > 0 {
if bytesToString(c.pa.origin) == dc.remoteCluster() {
continue
}
}
// The other is leaf to leaf.
if c.kind == LEAF {
src, dest := c.remoteCluster(), dc.remoteCluster()
if src != _EMPTY_ && src == dest {
continue
}
// Check two scenarios. One is inbound from a route (c.pa.origin),
// and the other is leaf to leaf. In both case, leafOrigin is the one
// to use for the comparison.
if leafOrigin != _EMPTY_ && leafOrigin == dc.remoteCluster() {
continue
}

// We need to check if this is a request that has a stamped client information header.
Expand Down
Loading

0 comments on commit cd43b70

Please sign in to comment.