From 7dde8d81cbbd8c451fa92a60e6c8b3b91c1ed81c Mon Sep 17 00:00:00 2001 From: Jake Smith Date: Wed, 31 Jul 2024 17:39:17 +0100 Subject: [PATCH] HPCC-32313 Limit lookup broadcast queue size (preventing mem build up) As a smart join is failing over to a local smart join and stopping the broadcaster, an excessive number of inflight packets can be received and queued on the broadcaster queue. If this is unbound it can cause a large amount of heap memory to be consumed. Also: 1) add logic into the broadcaster so that receivers send back notification that they are stopping to the sender, so the sender can stop as quickly as possible. 2) Prevent adding packages to the broadcaster thread if it can be deemed that they do not need broadcasting anywhere else. Signed-off-by: Jake Smith --- .../lookupjoin/thlookupjoinslave.cpp | 45 +++++++++++++++++-- 1 file changed, 42 insertions(+), 3 deletions(-) diff --git a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp index 3a0da40d7f3..0441534af72 100644 --- a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp +++ b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp @@ -15,6 +15,7 @@ limitations under the License. ############################################################################## */ +#include #include "thactivityutil.ipp" #include "thcompressutil.hpp" #include "thexception.hpp" @@ -155,6 +156,8 @@ class CBroadcaster : public CSimpleInterface CThreadedPersistent threaded; SimpleInterThreadQueueOf broadcastQueue; Owned exception; + unsigned myNode; + unsigned nodes; bool aborted; void clearQueue() { @@ -169,6 +172,12 @@ class CBroadcaster : public CSimpleInterface CSend(CBroadcaster &_broadcaster) : threaded("CBroadcaster::CSend", this), broadcaster(_broadcaster) { aborted = false; + myNode = broadcaster.activity.queryJob().queryMyNodeRank()-1; // 0 based + nodes = broadcaster.activity.queryJob().queryNodes(); + + // in theory each worker could be sending log(n) packets, with the broadcaster on each blocking waiting for acks + unsigned limit = nodes * std::ceil(std::log2(nodes)); + broadcastQueue.setLimit(limit); } ~CSend() { @@ -182,7 +191,22 @@ class CBroadcaster : public CSimpleInterface sendItem->Release(); throw exception.getClear(); } - broadcastQueue.enqueue(sendItem); // will block if queue full + // check if anywhere else to send to + if (sendItem) + { + unsigned origin = sendItem->queryNode(); + unsigned pseudoNode = (myNode=nodes) + { + sendItem->Release(); + return; + } + } + while (!broadcastQueue.enqueue(sendItem, 5000)) // will block if queue full + { + DBGLOG("CSend::addBlock() - broadcastQueue full, waiting for space"); + } } void start() { @@ -272,6 +296,8 @@ class CBroadcaster : public CSimpleInterface mptag_t rt = ::createReplyTag(); unsigned origin = sendItem->queryNode(); unsigned pseudoNode = (myNodequerySlave()+1, sendLen, (unsigned)sendItem->queryCode()); #endif @@ -330,6 +360,11 @@ class CBroadcaster : public CSimpleInterface } return false; } + // recieve loop, receives CSendItem packets, adds them to broadcaster thread ('sender'), and processes the packet via 'bCastReceive'. + // bcast_sendStopping are regular row packets that inform us that the sender is stopping (something upstream has asked it to stop()) + // - If all workers have signalled stopping, 'allRequestStop' will be set and will curtail the broadcast of more packets. + // - Or, if the broadcaster has explicitly been stopped (occurs via failover to local lookup), this will also curtail the broadcast of more packets. + // bcast_stop contains no row data, it signals that the sender has finished sending data. void recvLoop() { // my sender is implicitly stopped (never sends to self) @@ -349,16 +384,20 @@ class CBroadcaster : public CSimpleInterface break; } mptag_t replyTag = msg.getReplyTag(); - CMessageBuffer ackMsg; Owned sendItem = new CSendItem(msg); #ifdef _TRACEBROADCAST ActPrintLog(&activity, "Broadcast node %d received from node %d, origin node %d, origin slave %d, size %d, code=%d", myNode+1, (unsigned)sendRank, sendItem->queryNode()+1, sendItem->querySlave()+1, sendItem->length(), (unsigned)sendItem->queryCode()); #endif + CMessageBuffer ackMsg; + bool stopping = isStopping(); // this is effectively a shortcut to inform sender asap. bcast_sendStopping/bcast_stop will be queued soon + ackMsg.append(stopping); comm.send(ackMsg, sendRank, replyTag); // send ack #ifdef _TRACEBROADCAST ActPrintLog(&activity, "Broadcast node %d, sent ack to node %d, replyTag=%d", myNode+1, (unsigned)sendRank, (unsigned)replyTag); #endif - sender.addBlock(sendItem.getLink()); + // if all stopping, then suppress broadcasting (except stop packets) + if (!allRequestStop || (bcast_stop == sendItem->queryCode())) + sender.addBlock(sendItem.getLink()); assertex(myNode != sendItem->queryNode()); switch (sendItem->queryCode()) {