diff --git a/roxie/udplib/udptrr.cpp b/roxie/udplib/udptrr.cpp index 9a31fc0d287..984b5a164cc 100644 --- a/roxie/udplib/udptrr.cpp +++ b/roxie/udplib/udptrr.cpp @@ -22,6 +22,7 @@ #include "jthread.hpp" #include "jlog.hpp" +#include "jmisc.hpp" #include "jisem.hpp" #include "jsocket.hpp" #include "udplib.hpp" @@ -853,6 +854,31 @@ class CReceiveManager : implements IReceiveManager, public CInterface IpMapOf sendersTable; + class UdpRdTracker : public TimeDivisionTracker<6, false> + { + public: + enum + { + other, + waiting, + allocating, + processing, + pushing, + checkingPending + }; + + UdpRdTracker(const char *name, unsigned reportIntervalSeconds) : TimeDivisionTracker<6, false>(name, reportIntervalSeconds) + { + stateNames[other] = "other"; + stateNames[waiting] = "waiting"; + stateNames[allocating] = "allocating"; + stateNames[processing] = "processing"; + stateNames[pushing] = "pushing"; + stateNames[checkingPending] = "checking pending"; + } + + }; + class receive_receive_flow : public Thread { CReceiveManager &parent; @@ -863,7 +889,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface std::atomic running = { false }; SenderList pendingRequests; // List of senders requesting permission to send PermitList pendingPermits; // List of active permits - + UdpRdTracker timeTracker; private: void noteRequest(UdpSenderEntry *requester, sequence_t flowSeq, sequence_t sendSeq) { @@ -966,7 +992,8 @@ class CReceiveManager : implements IReceiveManager, public CInterface public: receive_receive_flow(CReceiveManager &_parent, unsigned flow_p, unsigned _maxSlotsPerSender) - : Thread("UdpLib::receive_receive_flow"), parent(_parent), flow_port(flow_p), maxSlotsPerSender(_maxSlotsPerSender), maxPermits(_parent.input_queue_size) + : Thread("UdpLib::receive_receive_flow"), parent(_parent), flow_port(flow_p), maxSlotsPerSender(_maxSlotsPerSender), maxPermits(_parent.input_queue_size), + timeTracker("receive_receive_flow", 60) { } @@ -1210,6 +1237,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface { DBGLOG("UdpReceiver: wait_read(%u)", timeout); } + UdpRdTracker::TimeDivision d(timeTracker, UdpRdTracker::waiting); bool dataAvail = flow_socket->wait_read(timeout); if (dataAvail) { @@ -1217,8 +1245,10 @@ class CReceiveManager : implements IReceiveManager, public CInterface unsigned int res ; flow_socket->readtms(&msg, l, l, res, 0); assert(res==l); + d.switchState(UdpRdTracker::processing); doFlowRequest(msg); } + d.switchState(UdpRdTracker::checkingPending); timeout = checkPendingRequests(); } catch (IException *e) @@ -1263,9 +1293,10 @@ class CReceiveManager : implements IReceiveManager, public CInterface ISocket *selfFlowSocket = nullptr; std::atomic running = { false }; Semaphore started; + UdpRdTracker timeTracker; public: - receive_data(CReceiveManager &_parent) : Thread("UdpLib::receive_data"), parent(_parent) + receive_data(CReceiveManager &_parent) : Thread("UdpLib::receive_data"), parent(_parent), timeTracker("receive_data", 60) { unsigned ip_buffer = parent.input_queue_size*DATA_PAYLOAD*2; if (ip_buffer < udpFlowSocketsSize) ip_buffer = udpFlowSocketsSize; @@ -1325,7 +1356,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface adjustPriority(2); #endif started.signal(); - unsigned lastOOOReport = 0; + unsigned lastOOOReport = msTick(); unsigned lastPacketsOOO = 0; unsigned lastUnwantedDiscarded = 0; unsigned timeout = 5000; @@ -1342,8 +1373,10 @@ class CReceiveManager : implements IReceiveManager, public CInterface //Read at least the size of the smallest packet we can receive //static assert to check we are reading the smaller of the two possible packet types static_assert(sizeof(UdpRequestToSendMsg) <= sizeof(UdpPacketHeader)); - receive_socket->readtms(b->data, sizeof(UdpRequestToSendMsg), DATA_PAYLOAD, res, timeout); - + { + UdpRdTracker::TimeDivision d(timeTracker, UdpRdTracker::waiting); + receive_socket->readtms(b->data, sizeof(UdpRequestToSendMsg), DATA_PAYLOAD, res, timeout); + } //Even if a UDP packet is not split, very occasionally only some of the data may be present for the read. //Slightly horribly this packet could be one of two different formats(!) // a UdpRequestToSendMsg, which has a 2 byte command at the start of the header, with a maximum value of max_flow_cmd @@ -1372,27 +1405,30 @@ class CReceiveManager : implements IReceiveManager, public CInterface //Redirect them to the flow thread to process them. selfFlowSocket->write(b->data, res); } - - dataPacketsReceived++; - UdpSenderEntry *sender = &parent.sendersTable[hdr.node]; - if (sender->noteSeen(hdr)) - { - // We should perhaps track how often this happens, but it's not the same as unwantedDiscarded - hdr.node.clear(); // Used to indicate a duplicate that collate thread should discard. We don't discard on this thread as don't want to do anything that requires locks... - } - else { - //Decrease the number of active reservations to balance having received a new data packet (otherwise they will be double counted) - sender->decPermit(hdr.msgSeq); - if (udpTraceLevel > 5) // don't want to interrupt this thread if we can help it + UdpRdTracker::TimeDivision d(timeTracker, UdpRdTracker::processing); + dataPacketsReceived++; + UdpSenderEntry *sender = &parent.sendersTable[hdr.node]; + if (sender->noteSeen(hdr)) { - StringBuffer s; - DBGLOG("UdpReceiver: %u bytes received packet %" SEQF "u %x from %s", res, hdr.sendSeq, hdr.pktSeq, hdr.node.getTraceText(s).str()); + // We should perhaps track how often this happens, but it's not the same as unwantedDiscarded + hdr.node.clear(); // Used to indicate a duplicate that collate thread should discard. We don't discard on this thread as don't want to do anything that requires locks... + } + else + { + //Decrease the number of active reservations to balance having received a new data packet (otherwise they will be double counted) + sender->decPermit(hdr.msgSeq); + if (udpTraceLevel > 5) // don't want to interrupt this thread if we can help it + { + StringBuffer s; + DBGLOG("UdpReceiver: %u bytes received packet %" SEQF "u %x from %s", res, hdr.sendSeq, hdr.pktSeq, hdr.node.getTraceText(s).str()); + } } + d.switchState(UdpRdTracker::pushing); + parent.input_queue->pushOwn(b); + d.switchState(UdpRdTracker::allocating); + b = udpBufferManager->allocate(); } - parent.input_queue->pushOwn(b); - b = udpBufferManager->allocate(); - if (udpStatsReportInterval) { unsigned now = msTick(); @@ -1537,11 +1573,14 @@ class CReceiveManager : implements IReceiveManager, public CInterface void collatePackets() { + UdpRdTracker timeTracker("collatePackets", 60); while(running) { try { + UdpRdTracker::TimeDivision d(timeTracker, UdpRdTracker::waiting); DataBuffer *dataBuff = input_queue->pop(true); + d.switchState(UdpRdTracker::processing); dataBuff->changeState(roxiemem::DBState::queued, roxiemem::DBState::unowned, __func__); collatePacket(dataBuff); } diff --git a/roxie/udplib/udptrs.cpp b/roxie/udplib/udptrs.cpp index c223094c064..fe7c46ae868 100644 --- a/roxie/udplib/udptrs.cpp +++ b/roxie/udplib/udptrs.cpp @@ -80,7 +80,7 @@ RelaxedAtomic flowRequestsSent; RelaxedAtomic flowPermitsReceived; RelaxedAtomic dataPacketsSent; -static unsigned lastResentReport = 0; +static unsigned lastResentReport = msTick(); static unsigned lastOkToSendTimeouts = 0; static unsigned lastPacketsResent = 0; static unsigned lastFlowRequestsSent = 0; diff --git a/system/jlib/jmisc.hpp b/system/jlib/jmisc.hpp index bc3cdd07205..6e6fe9acafe 100644 --- a/system/jlib/jmisc.hpp +++ b/system/jlib/jmisc.hpp @@ -334,4 +334,107 @@ extern jlib_decl char **getSystemEnv(); extern jlib_decl char *getHPCCEnvVal(const char *name, const char *defaultValue); +// class TimeDivisionTracker is useful for working out what a thread spends its time doing. See udptrrr.cpp for an example +// of its usage + +template class TimeDivisionTracker +{ +protected: + unsigned __int64 totals[NUMSTATES] = {0}; + unsigned counts[NUMSTATES] = {0}; + const char *stateNames[NUMSTATES]; + unsigned __int64 lastTick = get_cycles_now(); + unsigned currentState = 0; + StringAttr name; + unsigned __int64 reportIntervalCycles = 0; + unsigned __int64 lastReport = 0; + + unsigned enterState(unsigned newState) + { + unsigned prevState = currentState; + unsigned __int64 now = get_cycles_now(); + if (reportIntervalCycles && now - lastReport >= reportIntervalCycles) + { + report(true); + now = get_cycles_now(); + } + if (newState != prevState) + { + totals[currentState] += now - lastTick; + currentState = newState; + counts[newState]++; + lastTick = now; + } + return prevState; + } + + void leaveState(unsigned backToState) + { + unsigned __int64 now = get_cycles_now(); + if (reportIntervalCycles && now - lastReport >= reportIntervalCycles) + report(true); + if (backToState != currentState) + { + totals[currentState] += now - lastTick; + lastTick = now; + currentState = backToState; + } + } + +public: + TimeDivisionTracker(const char *_name, unsigned reportIntervalSeconds) : name(_name) + { + if (reportIntervalSeconds) + reportIntervalCycles = millisec_to_cycle(reportIntervalSeconds * 1000); + } + + void report(bool reset) + { + VStringBuffer str("%s spent ", name.str()); + auto now = get_cycles_now(); + totals[currentState] += now - lastTick; + lastTick = now; + lastReport = now; + bool doneOne = false; + for (unsigned i = reportOther ? 0 : 1; i < NUMSTATES; i++) + { + if (counts[i]) + { + if (doneOne) + str.append(", "); + formatTime(str, cycle_to_nanosec(totals[i])); + str.appendf(" %s (%u times)", stateNames[i], counts[i]); + doneOne = true; + } + if (reset) + { + totals[i] = 0; + counts[i] = 0; + } + } + if (doneOne) + DBGLOG("%s", str.str()); + } + + class TimeDivision + { + unsigned prevState = 0; + TimeDivisionTracker &t; + public: + TimeDivision(TimeDivisionTracker &_t, unsigned newState) : t(_t) + { + prevState = t.enterState(newState); + } + ~TimeDivision() + { + t.leaveState(prevState); + } + void switchState(unsigned newState) + { + t.enterState(newState); + } + }; +}; + + #endif diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index 92f02d1bdec..af280d1685c 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -197,7 +197,7 @@ const static unsigned __int64 oneMinute = I64C(60000000000); const static unsigned __int64 oneHour = I64C(3600000000000); const static unsigned __int64 oneDay = 24 * I64C(3600000000000); -static void formatTime(StringBuffer & out, unsigned __int64 value) +void formatTime(StringBuffer & out, unsigned __int64 value) { //Aim to display at least 3 significant digits in the result string if (value < oneMicroSecond) diff --git a/system/jlib/jstats.h b/system/jlib/jstats.h index 253d640c94d..b25cb768b71 100644 --- a/system/jlib/jstats.h +++ b/system/jlib/jstats.h @@ -44,6 +44,8 @@ inline constexpr stat_type statPercentageOf(stat_type value, stat_type per) { re inline StatisticKind queryStatsVariant(StatisticKind kind) { return (StatisticKind)(kind & ~StKindMask); } inline cost_type money2cost_type(double money) { return money * 1E6; } inline double cost_type2money(cost_type cost) { return ((double) cost) / 1E6; } + +extern jlib_decl void formatTime(StringBuffer & out, unsigned __int64 value); //--------------------------------------------------------------------------------------------------------------------- //Represents a single level of a scope