Skip to content

Commit

Permalink
HPCC-32259 Track time spent in different parts of Roxie receive threads
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chapman <[email protected]>
  • Loading branch information
richardkchapman committed Jul 24, 2024
1 parent 14ee3f5 commit 65432ed
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 25 deletions.
85 changes: 62 additions & 23 deletions roxie/udplib/udptrr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "jthread.hpp"
#include "jlog.hpp"
#include "jmisc.hpp"
#include "jisem.hpp"
#include "jsocket.hpp"
#include "udplib.hpp"
Expand Down Expand Up @@ -853,6 +854,31 @@ class CReceiveManager : implements IReceiveManager, public CInterface

IpMapOf<UdpSenderEntry> sendersTable;

class UdpRdTracker : public TimeDivisionTracker<6, false>
{
public:
enum
{
other,
waiting,
allocating,
processing,
pushing,
checkingPending
};

UdpRdTracker(const char *name, unsigned reportIntervalSeconds) : TimeDivisionTracker<6, false>(name, reportIntervalSeconds)
{
stateNames[other] = "other";
stateNames[waiting] = "waiting";
stateNames[allocating] = "allocating";
stateNames[processing] = "processing";
stateNames[pushing] = "pushing";
stateNames[checkingPending] = "checking pending";
}

};

class receive_receive_flow : public Thread
{
CReceiveManager &parent;
Expand All @@ -863,7 +889,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
std::atomic<bool> running = { false };
SenderList pendingRequests; // List of senders requesting permission to send
PermitList pendingPermits; // List of active permits

UdpRdTracker timeTracker;
private:
void noteRequest(UdpSenderEntry *requester, sequence_t flowSeq, sequence_t sendSeq)
{
Expand Down Expand Up @@ -966,7 +992,8 @@ class CReceiveManager : implements IReceiveManager, public CInterface

public:
receive_receive_flow(CReceiveManager &_parent, unsigned flow_p, unsigned _maxSlotsPerSender)
: Thread("UdpLib::receive_receive_flow"), parent(_parent), flow_port(flow_p), maxSlotsPerSender(_maxSlotsPerSender), maxPermits(_parent.input_queue_size)
: Thread("UdpLib::receive_receive_flow"), parent(_parent), flow_port(flow_p), maxSlotsPerSender(_maxSlotsPerSender), maxPermits(_parent.input_queue_size),
timeTracker("receive_receive_flow", 60)
{
}

Expand Down Expand Up @@ -1210,15 +1237,18 @@ class CReceiveManager : implements IReceiveManager, public CInterface
{
DBGLOG("UdpReceiver: wait_read(%u)", timeout);
}
UdpRdTracker::TimeDivision d(timeTracker, UdpRdTracker::waiting);
bool dataAvail = flow_socket->wait_read(timeout);
if (dataAvail)
{
const unsigned l = sizeof(msg);
unsigned int res ;
flow_socket->readtms(&msg, l, l, res, 0);
assert(res==l);
d.switchState(UdpRdTracker::processing);
doFlowRequest(msg);
}
d.switchState(UdpRdTracker::checkingPending);
timeout = checkPendingRequests();
}
catch (IException *e)
Expand Down Expand Up @@ -1263,9 +1293,10 @@ class CReceiveManager : implements IReceiveManager, public CInterface
ISocket *selfFlowSocket = nullptr;
std::atomic<bool> running = { false };
Semaphore started;
UdpRdTracker timeTracker;

public:
receive_data(CReceiveManager &_parent) : Thread("UdpLib::receive_data"), parent(_parent)
receive_data(CReceiveManager &_parent) : Thread("UdpLib::receive_data"), parent(_parent), timeTracker("receive_data", 60)
{
unsigned ip_buffer = parent.input_queue_size*DATA_PAYLOAD*2;
if (ip_buffer < udpFlowSocketsSize) ip_buffer = udpFlowSocketsSize;
Expand Down Expand Up @@ -1325,7 +1356,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
adjustPriority(2);
#endif
started.signal();
unsigned lastOOOReport = 0;
unsigned lastOOOReport = msTick();
unsigned lastPacketsOOO = 0;
unsigned lastUnwantedDiscarded = 0;
unsigned timeout = 5000;
Expand All @@ -1342,8 +1373,10 @@ class CReceiveManager : implements IReceiveManager, public CInterface
//Read at least the size of the smallest packet we can receive
//static assert to check we are reading the smaller of the two possible packet types
static_assert(sizeof(UdpRequestToSendMsg) <= sizeof(UdpPacketHeader));
receive_socket->readtms(b->data, sizeof(UdpRequestToSendMsg), DATA_PAYLOAD, res, timeout);

{
UdpRdTracker::TimeDivision d(timeTracker, UdpRdTracker::waiting);
receive_socket->readtms(b->data, sizeof(UdpRequestToSendMsg), DATA_PAYLOAD, res, timeout);
}
//Even if a UDP packet is not split, very occasionally only some of the data may be present for the read.
//Slightly horribly this packet could be one of two different formats(!)
// a UdpRequestToSendMsg, which has a 2 byte command at the start of the header, with a maximum value of max_flow_cmd
Expand Down Expand Up @@ -1372,27 +1405,30 @@ class CReceiveManager : implements IReceiveManager, public CInterface
//Redirect them to the flow thread to process them.
selfFlowSocket->write(b->data, res);
}

dataPacketsReceived++;
UdpSenderEntry *sender = &parent.sendersTable[hdr.node];
if (sender->noteSeen(hdr))
{
// We should perhaps track how often this happens, but it's not the same as unwantedDiscarded
hdr.node.clear(); // Used to indicate a duplicate that collate thread should discard. We don't discard on this thread as don't want to do anything that requires locks...
}
else
{
//Decrease the number of active reservations to balance having received a new data packet (otherwise they will be double counted)
sender->decPermit(hdr.msgSeq);
if (udpTraceLevel > 5) // don't want to interrupt this thread if we can help it
UdpRdTracker::TimeDivision d(timeTracker, UdpRdTracker::processing);
dataPacketsReceived++;
UdpSenderEntry *sender = &parent.sendersTable[hdr.node];
if (sender->noteSeen(hdr))
{
StringBuffer s;
DBGLOG("UdpReceiver: %u bytes received packet %" SEQF "u %x from %s", res, hdr.sendSeq, hdr.pktSeq, hdr.node.getTraceText(s).str());
// We should perhaps track how often this happens, but it's not the same as unwantedDiscarded
hdr.node.clear(); // Used to indicate a duplicate that collate thread should discard. We don't discard on this thread as don't want to do anything that requires locks...
}
else
{
//Decrease the number of active reservations to balance having received a new data packet (otherwise they will be double counted)
sender->decPermit(hdr.msgSeq);
if (udpTraceLevel > 5) // don't want to interrupt this thread if we can help it
{
StringBuffer s;
DBGLOG("UdpReceiver: %u bytes received packet %" SEQF "u %x from %s", res, hdr.sendSeq, hdr.pktSeq, hdr.node.getTraceText(s).str());
}
}
d.switchState(UdpRdTracker::pushing);
parent.input_queue->pushOwn(b);
d.switchState(UdpRdTracker::allocating);
b = udpBufferManager->allocate();
}
parent.input_queue->pushOwn(b);
b = udpBufferManager->allocate();

if (udpStatsReportInterval)
{
unsigned now = msTick();
Expand Down Expand Up @@ -1537,11 +1573,14 @@ class CReceiveManager : implements IReceiveManager, public CInterface

void collatePackets()
{
UdpRdTracker timeTracker("collatePackets", 60);
while(running)
{
try
{
UdpRdTracker::TimeDivision d(timeTracker, UdpRdTracker::waiting);
DataBuffer *dataBuff = input_queue->pop(true);
d.switchState(UdpRdTracker::processing);
dataBuff->changeState(roxiemem::DBState::queued, roxiemem::DBState::unowned, __func__);
collatePacket(dataBuff);
}
Expand Down
2 changes: 1 addition & 1 deletion roxie/udplib/udptrs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ RelaxedAtomic<unsigned> flowRequestsSent;
RelaxedAtomic<unsigned> flowPermitsReceived;
RelaxedAtomic<unsigned> dataPacketsSent;

static unsigned lastResentReport = 0;
static unsigned lastResentReport = msTick();
static unsigned lastOkToSendTimeouts = 0;
static unsigned lastPacketsResent = 0;
static unsigned lastFlowRequestsSent = 0;
Expand Down
103 changes: 103 additions & 0 deletions system/jlib/jmisc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,4 +334,107 @@ extern jlib_decl char **getSystemEnv();

extern jlib_decl char *getHPCCEnvVal(const char *name, const char *defaultValue);

// class TimeDivisionTracker is useful for working out what a thread spends its time doing. See udptrrr.cpp for an example
// of its usage

template<unsigned NUMSTATES, bool reportOther> class TimeDivisionTracker
{
protected:
unsigned __int64 totals[NUMSTATES] = {0};
unsigned counts[NUMSTATES] = {0};
const char *stateNames[NUMSTATES];
unsigned __int64 lastTick = get_cycles_now();
unsigned currentState = 0;
StringAttr name;
unsigned __int64 reportIntervalCycles = 0;
unsigned __int64 lastReport = 0;

unsigned enterState(unsigned newState)
{
unsigned prevState = currentState;
unsigned __int64 now = get_cycles_now();
if (reportIntervalCycles && now - lastReport >= reportIntervalCycles)
{
report(true);
now = get_cycles_now();
}
if (newState != prevState)
{
totals[currentState] += now - lastTick;
currentState = newState;
counts[newState]++;
lastTick = now;
}
return prevState;
}

void leaveState(unsigned backToState)
{
unsigned __int64 now = get_cycles_now();
if (reportIntervalCycles && now - lastReport >= reportIntervalCycles)
report(true);
if (backToState != currentState)
{
totals[currentState] += now - lastTick;
lastTick = now;
currentState = backToState;
}
}

public:
TimeDivisionTracker(const char *_name, unsigned reportIntervalSeconds) : name(_name)
{
if (reportIntervalSeconds)
reportIntervalCycles = millisec_to_cycle(reportIntervalSeconds * 1000);
}

void report(bool reset)
{
VStringBuffer str("%s spent ", name.str());
auto now = get_cycles_now();
totals[currentState] += now - lastTick;
lastTick = now;
lastReport = now;
bool doneOne = false;
for (unsigned i = reportOther ? 0 : 1; i < NUMSTATES; i++)
{
if (counts[i])
{
if (doneOne)
str.append(", ");
formatTime(str, cycle_to_nanosec(totals[i]));
str.appendf(" %s (%u times)", stateNames[i], counts[i]);
doneOne = true;
}
if (reset)
{
totals[i] = 0;
counts[i] = 0;
}
}
if (doneOne)
DBGLOG("%s", str.str());
}

class TimeDivision
{
unsigned prevState = 0;
TimeDivisionTracker &t;
public:
TimeDivision(TimeDivisionTracker &_t, unsigned newState) : t(_t)
{
prevState = t.enterState(newState);
}
~TimeDivision()
{
t.leaveState(prevState);
}
void switchState(unsigned newState)
{
t.enterState(newState);
}
};
};


#endif
2 changes: 1 addition & 1 deletion system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ const static unsigned __int64 oneMinute = I64C(60000000000);
const static unsigned __int64 oneHour = I64C(3600000000000);
const static unsigned __int64 oneDay = 24 * I64C(3600000000000);

static void formatTime(StringBuffer & out, unsigned __int64 value)
void formatTime(StringBuffer & out, unsigned __int64 value)
{
//Aim to display at least 3 significant digits in the result string
if (value < oneMicroSecond)
Expand Down
2 changes: 2 additions & 0 deletions system/jlib/jstats.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ inline constexpr stat_type statPercentageOf(stat_type value, stat_type per) { re
inline StatisticKind queryStatsVariant(StatisticKind kind) { return (StatisticKind)(kind & ~StKindMask); }
inline cost_type money2cost_type(double money) { return money * 1E6; }
inline double cost_type2money(cost_type cost) { return ((double) cost) / 1E6; }

extern jlib_decl void formatTime(StringBuffer & out, unsigned __int64 value);
//---------------------------------------------------------------------------------------------------------------------

//Represents a single level of a scope
Expand Down

0 comments on commit 65432ed

Please sign in to comment.