diff --git a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp index 3a0da40d7f3..0441534af72 100644 --- a/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp +++ b/thorlcr/activities/lookupjoin/thlookupjoinslave.cpp @@ -15,6 +15,7 @@ limitations under the License. ############################################################################## */ +#include #include "thactivityutil.ipp" #include "thcompressutil.hpp" #include "thexception.hpp" @@ -155,6 +156,8 @@ class CBroadcaster : public CSimpleInterface CThreadedPersistent threaded; SimpleInterThreadQueueOf broadcastQueue; Owned exception; + unsigned myNode; + unsigned nodes; bool aborted; void clearQueue() { @@ -169,6 +172,12 @@ class CBroadcaster : public CSimpleInterface CSend(CBroadcaster &_broadcaster) : threaded("CBroadcaster::CSend", this), broadcaster(_broadcaster) { aborted = false; + myNode = broadcaster.activity.queryJob().queryMyNodeRank()-1; // 0 based + nodes = broadcaster.activity.queryJob().queryNodes(); + + // in theory each worker could be sending log(n) packets, with the broadcaster on each blocking waiting for acks + unsigned limit = nodes * std::ceil(std::log2(nodes)); + broadcastQueue.setLimit(limit); } ~CSend() { @@ -182,7 +191,22 @@ class CBroadcaster : public CSimpleInterface sendItem->Release(); throw exception.getClear(); } - broadcastQueue.enqueue(sendItem); // will block if queue full + // check if anywhere else to send to + if (sendItem) + { + unsigned origin = sendItem->queryNode(); + unsigned pseudoNode = (myNode=nodes) + { + sendItem->Release(); + return; + } + } + while (!broadcastQueue.enqueue(sendItem, 5000)) // will block if queue full + { + DBGLOG("CSend::addBlock() - broadcastQueue full, waiting for space"); + } } void start() { @@ -272,6 +296,8 @@ class CBroadcaster : public CSimpleInterface mptag_t rt = ::createReplyTag(); unsigned origin = sendItem->queryNode(); unsigned pseudoNode = (myNodequerySlave()+1, sendLen, (unsigned)sendItem->queryCode()); #endif @@ -330,6 +360,11 @@ class CBroadcaster : public CSimpleInterface } return false; } + // recieve loop, receives CSendItem packets, adds them to broadcaster thread ('sender'), and processes the packet via 'bCastReceive'. + // bcast_sendStopping are regular row packets that inform us that the sender is stopping (something upstream has asked it to stop()) + // - If all workers have signalled stopping, 'allRequestStop' will be set and will curtail the broadcast of more packets. + // - Or, if the broadcaster has explicitly been stopped (occurs via failover to local lookup), this will also curtail the broadcast of more packets. + // bcast_stop contains no row data, it signals that the sender has finished sending data. void recvLoop() { // my sender is implicitly stopped (never sends to self) @@ -349,16 +384,20 @@ class CBroadcaster : public CSimpleInterface break; } mptag_t replyTag = msg.getReplyTag(); - CMessageBuffer ackMsg; Owned sendItem = new CSendItem(msg); #ifdef _TRACEBROADCAST ActPrintLog(&activity, "Broadcast node %d received from node %d, origin node %d, origin slave %d, size %d, code=%d", myNode+1, (unsigned)sendRank, sendItem->queryNode()+1, sendItem->querySlave()+1, sendItem->length(), (unsigned)sendItem->queryCode()); #endif + CMessageBuffer ackMsg; + bool stopping = isStopping(); // this is effectively a shortcut to inform sender asap. bcast_sendStopping/bcast_stop will be queued soon + ackMsg.append(stopping); comm.send(ackMsg, sendRank, replyTag); // send ack #ifdef _TRACEBROADCAST ActPrintLog(&activity, "Broadcast node %d, sent ack to node %d, replyTag=%d", myNode+1, (unsigned)sendRank, (unsigned)replyTag); #endif - sender.addBlock(sendItem.getLink()); + // if all stopping, then suppress broadcasting (except stop packets) + if (!allRequestStop || (bcast_stop == sendItem->queryCode())) + sender.addBlock(sendItem.getLink()); assertex(myNode != sendItem->queryNode()); switch (sendItem->queryCode()) {