Skip to content

Commit

Permalink
Merge pull request #18944 from jakesmith/HPCC-32313-smartjoin-heap-is…
Browse files Browse the repository at this point in the history
…sues

HPCC-32313 Limit lookup broadcast queue size (preventing mem build up)

Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Aug 2, 2024
2 parents b5d36a8 + 7dde8d8 commit eb81ac5
Showing 1 changed file with 42 additions and 3 deletions.
45 changes: 42 additions & 3 deletions thorlcr/activities/lookupjoin/thlookupjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
limitations under the License.
############################################################################## */

#include <cmath>
#include "thactivityutil.ipp"
#include "thcompressutil.hpp"
#include "thexception.hpp"
Expand Down Expand Up @@ -155,6 +156,8 @@ class CBroadcaster : public CSimpleInterface
CThreadedPersistent threaded;
SimpleInterThreadQueueOf<CSendItem, true> broadcastQueue;
Owned<IException> exception;
unsigned myNode;
unsigned nodes;
bool aborted;
void clearQueue()
{
Expand All @@ -169,6 +172,12 @@ class CBroadcaster : public CSimpleInterface
CSend(CBroadcaster &_broadcaster) : threaded("CBroadcaster::CSend", this), broadcaster(_broadcaster)
{
aborted = false;
myNode = broadcaster.activity.queryJob().queryMyNodeRank()-1; // 0 based
nodes = broadcaster.activity.queryJob().queryNodes();

// in theory each worker could be sending log(n) packets, with the broadcaster on each blocking waiting for acks
unsigned limit = nodes * std::ceil(std::log2(nodes));
broadcastQueue.setLimit(limit);
}
~CSend()
{
Expand All @@ -182,7 +191,22 @@ class CBroadcaster : public CSimpleInterface
sendItem->Release();
throw exception.getClear();
}
broadcastQueue.enqueue(sendItem); // will block if queue full
// check if anywhere else to send to
if (sendItem)
{
unsigned origin = sendItem->queryNode();
unsigned pseudoNode = (myNode<origin) ? nodes-origin+myNode : myNode-origin;
unsigned t = broadcaster.target(0, pseudoNode);
if (t>=nodes)
{
sendItem->Release();
return;
}
}
while (!broadcastQueue.enqueue(sendItem, 5000)) // will block if queue full
{
DBGLOG("CSend::addBlock() - broadcastQueue full, waiting for space");
}
}
void start()
{
Expand Down Expand Up @@ -272,6 +296,8 @@ class CBroadcaster : public CSimpleInterface
mptag_t rt = ::createReplyTag();
unsigned origin = sendItem->queryNode();
unsigned pseudoNode = (myNode<origin) ? nodes-origin+myNode : myNode-origin;

// NB: could in theory spot if all descendents that this would end up propagating to have stopped, and not send
CMessageBuffer replyMsg;
// sends to all in 1st pass, then waits for ack from all
for (unsigned sendRecv=0; sendRecv<2 && !activity.queryAbortSoon(); sendRecv++)
Expand Down Expand Up @@ -306,6 +332,10 @@ class CBroadcaster : public CSimpleInterface
#endif
if (!activity.receiveMsg(comm, replyMsg, t, rt))
break;
bool isStopping;
replyMsg.read(isStopping);
if (isStopping) // effectively a shortcut to mark asap that sender is stopping because the receiver has been requested to stop
setStopping(t-1);
#ifdef _TRACEBROADCAST
ActPrintLog(&activity, "Broadcast node %d Sent to node %d, origin node %d, origin slave %d, size %d, code=%d - received ack", myNode+1, t, origin+1, sendItem->querySlave()+1, sendLen, (unsigned)sendItem->queryCode());
#endif
Expand All @@ -330,6 +360,11 @@ class CBroadcaster : public CSimpleInterface
}
return false;
}
// recieve loop, receives CSendItem packets, adds them to broadcaster thread ('sender'), and processes the packet via 'bCastReceive'.
// bcast_sendStopping are regular row packets that inform us that the sender is stopping (something upstream has asked it to stop())
// - If all workers have signalled stopping, 'allRequestStop' will be set and will curtail the broadcast of more packets.
// - Or, if the broadcaster has explicitly been stopped (occurs via failover to local lookup), this will also curtail the broadcast of more packets.
// bcast_stop contains no row data, it signals that the sender has finished sending data.
void recvLoop()
{
// my sender is implicitly stopped (never sends to self)
Expand All @@ -349,16 +384,20 @@ class CBroadcaster : public CSimpleInterface
break;
}
mptag_t replyTag = msg.getReplyTag();
CMessageBuffer ackMsg;
Owned<CSendItem> sendItem = new CSendItem(msg);
#ifdef _TRACEBROADCAST
ActPrintLog(&activity, "Broadcast node %d received from node %d, origin node %d, origin slave %d, size %d, code=%d", myNode+1, (unsigned)sendRank, sendItem->queryNode()+1, sendItem->querySlave()+1, sendItem->length(), (unsigned)sendItem->queryCode());
#endif
CMessageBuffer ackMsg;
bool stopping = isStopping(); // this is effectively a shortcut to inform sender asap. bcast_sendStopping/bcast_stop will be queued soon
ackMsg.append(stopping);
comm.send(ackMsg, sendRank, replyTag); // send ack
#ifdef _TRACEBROADCAST
ActPrintLog(&activity, "Broadcast node %d, sent ack to node %d, replyTag=%d", myNode+1, (unsigned)sendRank, (unsigned)replyTag);
#endif
sender.addBlock(sendItem.getLink());
// if all stopping, then suppress broadcasting (except stop packets)
if (!allRequestStop || (bcast_stop == sendItem->queryCode()))
sender.addBlock(sendItem.getLink());
assertex(myNode != sendItem->queryNode());
switch (sendItem->queryCode())
{
Expand Down

0 comments on commit eb81ac5

Please sign in to comment.