Skip to content

Commit

Permalink
HPCC-32313 Limit lookup broadcast queue size (preventing mem build up)
Browse files Browse the repository at this point in the history
As a smart join is failing over to a local smart join and stopping
the broadcaster, an excessive number of inflight packets can be
received and queued on the broadcaster queue. If this is unbound
it can cause a large amount of heap memory to be consumed.

Also:
1) add logic into the broadcaster so that receivers send back
notification that they are stopping to the sender, so the sender
can stop as quickly as possible.
2) Prevent adding packages to the broadcaster thread if it
can be deemed that they do not need broadcasting anywhere else.

Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Aug 2, 2024
1 parent dc060d4 commit 7dde8d8
Showing 1 changed file with 42 additions and 3 deletions.
45 changes: 42 additions & 3 deletions thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
limitations under the License.
############################################################################## */

#include <cmath>
#include "thactivityutil.ipp"
#include "thcompressutil.hpp"
#include "thexception.hpp"
Expand Down Expand Up @@ -155,6 +156,8 @@ class CBroadcaster : public CSimpleInterface
CThreadedPersistent threaded;
SimpleInterThreadQueueOf<CSendItem, true> broadcastQueue;
Owned<IException> exception;
unsigned myNode;
unsigned nodes;
bool aborted;
void clearQueue()
{
Expand All @@ -169,6 +172,12 @@ class CBroadcaster : public CSimpleInterface
CSend(CBroadcaster &_broadcaster) : threaded("CBroadcaster::CSend", this), broadcaster(_broadcaster)
{
aborted = false;
myNode = broadcaster.activity.queryJob().queryMyNodeRank()-1; // 0 based
nodes = broadcaster.activity.queryJob().queryNodes();

// in theory each worker could be sending log(n) packets, with the broadcaster on each blocking waiting for acks
unsigned limit = nodes * std::ceil(std::log2(nodes));
broadcastQueue.setLimit(limit);
}
~CSend()
{
Expand All @@ -182,7 +191,22 @@ class CBroadcaster : public CSimpleInterface
sendItem->Release();
throw exception.getClear();
}
broadcastQueue.enqueue(sendItem); // will block if queue full
// check if anywhere else to send to
if (sendItem)
{
unsigned origin = sendItem->queryNode();
unsigned pseudoNode = (myNode<origin) ? nodes-origin+myNode : myNode-origin;
unsigned t = broadcaster.target(0, pseudoNode);
if (t>=nodes)
{
sendItem->Release();
return;
}
}
while (!broadcastQueue.enqueue(sendItem, 5000)) // will block if queue full
{
DBGLOG("CSend::addBlock() - broadcastQueue full, waiting for space");
}
}
void start()
{
Expand Down Expand Up @@ -272,6 +296,8 @@ class CBroadcaster : public CSimpleInterface
mptag_t rt = ::createReplyTag();
unsigned origin = sendItem->queryNode();
unsigned pseudoNode = (myNode<origin) ? nodes-origin+myNode : myNode-origin;

// NB: could in theory spot if all descendents that this would end up propagating to have stopped, and not send
CMessageBuffer replyMsg;
// sends to all in 1st pass, then waits for ack from all
for (unsigned sendRecv=0; sendRecv<2 && !activity.queryAbortSoon(); sendRecv++)
Expand Down Expand Up @@ -306,6 +332,10 @@ class CBroadcaster : public CSimpleInterface
#endif
if (!activity.receiveMsg(comm, replyMsg, t, rt))
break;
bool isStopping;
replyMsg.read(isStopping);
if (isStopping) // effectively a shortcut to mark asap that sender is stopping because the receiver has been requested to stop
setStopping(t-1);
#ifdef _TRACEBROADCAST
ActPrintLog(&activity, "Broadcast node %d Sent to node %d, origin node %d, origin slave %d, size %d, code=%d - received ack", myNode+1, t, origin+1, sendItem->querySlave()+1, sendLen, (unsigned)sendItem->queryCode());
#endif
Expand All @@ -330,6 +360,11 @@ class CBroadcaster : public CSimpleInterface
}
return false;
}
// recieve loop, receives CSendItem packets, adds them to broadcaster thread ('sender'), and processes the packet via 'bCastReceive'.
// bcast_sendStopping are regular row packets that inform us that the sender is stopping (something upstream has asked it to stop())
// - If all workers have signalled stopping, 'allRequestStop' will be set and will curtail the broadcast of more packets.
// - Or, if the broadcaster has explicitly been stopped (occurs via failover to local lookup), this will also curtail the broadcast of more packets.
// bcast_stop contains no row data, it signals that the sender has finished sending data.
void recvLoop()
{
// my sender is implicitly stopped (never sends to self)
Expand All @@ -349,16 +384,20 @@ class CBroadcaster : public CSimpleInterface
break;
}
mptag_t replyTag = msg.getReplyTag();
CMessageBuffer ackMsg;
Owned<CSendItem> sendItem = new CSendItem(msg);
#ifdef _TRACEBROADCAST
ActPrintLog(&activity, "Broadcast node %d received from node %d, origin node %d, origin slave %d, size %d, code=%d", myNode+1, (unsigned)sendRank, sendItem->queryNode()+1, sendItem->querySlave()+1, sendItem->length(), (unsigned)sendItem->queryCode());
#endif
CMessageBuffer ackMsg;
bool stopping = isStopping(); // this is effectively a shortcut to inform sender asap. bcast_sendStopping/bcast_stop will be queued soon
ackMsg.append(stopping);
comm.send(ackMsg, sendRank, replyTag); // send ack
#ifdef _TRACEBROADCAST
ActPrintLog(&activity, "Broadcast node %d, sent ack to node %d, replyTag=%d", myNode+1, (unsigned)sendRank, (unsigned)replyTag);
#endif
sender.addBlock(sendItem.getLink());
// if all stopping, then suppress broadcasting (except stop packets)
if (!allRequestStop || (bcast_stop == sendItem->queryCode()))
sender.addBlock(sendItem.getLink());
assertex(myNode != sendItem->queryNode());
switch (sendItem->queryCode())
{
Expand Down

0 comments on commit 7dde8d8

Please sign in to comment.