Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-32259 Track time spent in different parts of Roxie receive thread #18885

Merged
merged 1 commit into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 62 additions & 23 deletions roxie/udplib/udptrr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "jthread.hpp"
#include "jlog.hpp"
#include "jmisc.hpp"
#include "jisem.hpp"
#include "jsocket.hpp"
#include "udplib.hpp"
Expand Down Expand Up @@ -853,6 +854,31 @@ class CReceiveManager : implements IReceiveManager, public CInterface

IpMapOf<UdpSenderEntry> sendersTable;

class UdpRdTracker : public TimeDivisionTracker<6, false>
{
public:
enum
{
other,
waiting,
allocating,
processing,
pushing,
checkingPending
};

UdpRdTracker(const char *name, unsigned reportIntervalSeconds) : TimeDivisionTracker<6, false>(name, reportIntervalSeconds)
{
stateNames[other] = "other";
stateNames[waiting] = "waiting";
stateNames[allocating] = "allocating";
stateNames[processing] = "processing";
stateNames[pushing] = "pushing";
stateNames[checkingPending] = "checking pending";
}

};

class receive_receive_flow : public Thread
{
CReceiveManager &parent;
Expand All @@ -863,7 +889,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
std::atomic<bool> running = { false };
SenderList pendingRequests; // List of senders requesting permission to send
PermitList pendingPermits; // List of active permits

UdpRdTracker timeTracker;
private:
void noteRequest(UdpSenderEntry *requester, sequence_t flowSeq, sequence_t sendSeq)
{
Expand Down Expand Up @@ -966,7 +992,8 @@ class CReceiveManager : implements IReceiveManager, public CInterface

public:
receive_receive_flow(CReceiveManager &_parent, unsigned flow_p, unsigned _maxSlotsPerSender)
: Thread("UdpLib::receive_receive_flow"), parent(_parent), flow_port(flow_p), maxSlotsPerSender(_maxSlotsPerSender), maxPermits(_parent.input_queue_size)
: Thread("UdpLib::receive_receive_flow"), parent(_parent), flow_port(flow_p), maxSlotsPerSender(_maxSlotsPerSender), maxPermits(_parent.input_queue_size),
timeTracker("receive_receive_flow", 60)
{
}

Expand Down Expand Up @@ -1210,15 +1237,18 @@ class CReceiveManager : implements IReceiveManager, public CInterface
{
DBGLOG("UdpReceiver: wait_read(%u)", timeout);
}
UdpRdTracker::TimeDivision d(timeTracker, UdpRdTracker::waiting);
bool dataAvail = flow_socket->wait_read(timeout);
if (dataAvail)
{
const unsigned l = sizeof(msg);
unsigned int res ;
flow_socket->readtms(&msg, l, l, res, 0);
assert(res==l);
d.switchState(UdpRdTracker::processing);
doFlowRequest(msg);
}
d.switchState(UdpRdTracker::checkingPending);
timeout = checkPendingRequests();
}
catch (IException *e)
Expand Down Expand Up @@ -1263,9 +1293,10 @@ class CReceiveManager : implements IReceiveManager, public CInterface
ISocket *selfFlowSocket = nullptr;
std::atomic<bool> running = { false };
Semaphore started;
UdpRdTracker timeTracker;

public:
receive_data(CReceiveManager &_parent) : Thread("UdpLib::receive_data"), parent(_parent)
receive_data(CReceiveManager &_parent) : Thread("UdpLib::receive_data"), parent(_parent), timeTracker("receive_data", 60)
{
unsigned ip_buffer = parent.input_queue_size*DATA_PAYLOAD*2;
if (ip_buffer < udpFlowSocketsSize) ip_buffer = udpFlowSocketsSize;
Expand Down Expand Up @@ -1325,7 +1356,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
adjustPriority(2);
#endif
started.signal();
unsigned lastOOOReport = 0;
unsigned lastOOOReport = msTick();
unsigned lastPacketsOOO = 0;
unsigned lastUnwantedDiscarded = 0;
unsigned timeout = 5000;
Expand All @@ -1342,8 +1373,10 @@ class CReceiveManager : implements IReceiveManager, public CInterface
//Read at least the size of the smallest packet we can receive
//static assert to check we are reading the smaller of the two possible packet types
static_assert(sizeof(UdpRequestToSendMsg) <= sizeof(UdpPacketHeader));
receive_socket->readtms(b->data, sizeof(UdpRequestToSendMsg), DATA_PAYLOAD, res, timeout);

{
UdpRdTracker::TimeDivision d(timeTracker, UdpRdTracker::waiting);
receive_socket->readtms(b->data, sizeof(UdpRequestToSendMsg), DATA_PAYLOAD, res, timeout);
}
//Even if a UDP packet is not split, very occasionally only some of the data may be present for the read.
//Slightly horribly this packet could be one of two different formats(!)
// a UdpRequestToSendMsg, which has a 2 byte command at the start of the header, with a maximum value of max_flow_cmd
Expand Down Expand Up @@ -1372,27 +1405,30 @@ class CReceiveManager : implements IReceiveManager, public CInterface
//Redirect them to the flow thread to process them.
selfFlowSocket->write(b->data, res);
}

dataPacketsReceived++;
UdpSenderEntry *sender = &parent.sendersTable[hdr.node];
if (sender->noteSeen(hdr))
{
// We should perhaps track how often this happens, but it's not the same as unwantedDiscarded
hdr.node.clear(); // Used to indicate a duplicate that collate thread should discard. We don't discard on this thread as don't want to do anything that requires locks...
}
else
{
//Decrease the number of active reservations to balance having received a new data packet (otherwise they will be double counted)
sender->decPermit(hdr.msgSeq);
if (udpTraceLevel > 5) // don't want to interrupt this thread if we can help it
UdpRdTracker::TimeDivision d(timeTracker, UdpRdTracker::processing);
dataPacketsReceived++;
UdpSenderEntry *sender = &parent.sendersTable[hdr.node];
if (sender->noteSeen(hdr))
{
StringBuffer s;
DBGLOG("UdpReceiver: %u bytes received packet %" SEQF "u %x from %s", res, hdr.sendSeq, hdr.pktSeq, hdr.node.getTraceText(s).str());
// We should perhaps track how often this happens, but it's not the same as unwantedDiscarded
hdr.node.clear(); // Used to indicate a duplicate that collate thread should discard. We don't discard on this thread as don't want to do anything that requires locks...
}
else
{
//Decrease the number of active reservations to balance having received a new data packet (otherwise they will be double counted)
sender->decPermit(hdr.msgSeq);
if (udpTraceLevel > 5) // don't want to interrupt this thread if we can help it
{
StringBuffer s;
DBGLOG("UdpReceiver: %u bytes received packet %" SEQF "u %x from %s", res, hdr.sendSeq, hdr.pktSeq, hdr.node.getTraceText(s).str());
}
}
d.switchState(UdpRdTracker::pushing);
parent.input_queue->pushOwn(b);
d.switchState(UdpRdTracker::allocating);
b = udpBufferManager->allocate();
}
parent.input_queue->pushOwn(b);
b = udpBufferManager->allocate();

if (udpStatsReportInterval)
{
unsigned now = msTick();
Expand Down Expand Up @@ -1537,11 +1573,14 @@ class CReceiveManager : implements IReceiveManager, public CInterface

void collatePackets()
{
UdpRdTracker timeTracker("collatePackets", 60);
while(running)
{
try
{
UdpRdTracker::TimeDivision d(timeTracker, UdpRdTracker::waiting);
DataBuffer *dataBuff = input_queue->pop(true);
d.switchState(UdpRdTracker::processing);
dataBuff->changeState(roxiemem::DBState::queued, roxiemem::DBState::unowned, __func__);
collatePacket(dataBuff);
}
Expand Down
2 changes: 1 addition & 1 deletion roxie/udplib/udptrs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ RelaxedAtomic<unsigned> flowRequestsSent;
RelaxedAtomic<unsigned> flowPermitsReceived;
RelaxedAtomic<unsigned> dataPacketsSent;

static unsigned lastResentReport = 0;
static unsigned lastResentReport = msTick();
static unsigned lastOkToSendTimeouts = 0;
static unsigned lastPacketsResent = 0;
static unsigned lastFlowRequestsSent = 0;
Expand Down
103 changes: 103 additions & 0 deletions system/jlib/jmisc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,4 +334,107 @@ extern jlib_decl char **getSystemEnv();

extern jlib_decl char *getHPCCEnvVal(const char *name, const char *defaultValue);

// class TimeDivisionTracker is useful for working out what a thread spends its time doing. See udptrrr.cpp for an example
// of its usage

template<unsigned NUMSTATES, bool reportOther> class TimeDivisionTracker
{
protected:
unsigned __int64 totals[NUMSTATES] = {0};
unsigned counts[NUMSTATES] = {0};
const char *stateNames[NUMSTATES];
unsigned __int64 lastTick = get_cycles_now();
unsigned currentState = 0;
StringAttr name;
unsigned __int64 reportIntervalCycles = 0;
unsigned __int64 lastReport = 0;

unsigned enterState(unsigned newState)
{
unsigned prevState = currentState;
unsigned __int64 now = get_cycles_now();
if (reportIntervalCycles && now - lastReport >= reportIntervalCycles)
{
report(true);
now = get_cycles_now();
}
if (newState != prevState)
{
totals[currentState] += now - lastTick;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If report() has been called, this will cause totals to be set to a very high number - because lastTick will be set to the latest value of get_cycles_now() - which will be larger than 'now'.
Probably simplest to pass now into the report function, and have a public helper which does not require it to be passed (or similar).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: This will self correct later, so maybe not worth worrying about - but if so, it is worth adding a comment to clarify.

currentState = newState;
counts[newState]++;
lastTick = now;
}
return prevState;
}

void leaveState(unsigned backToState)
{
unsigned __int64 now = get_cycles_now();
if (reportIntervalCycles && now - lastReport >= reportIntervalCycles)
report(true);
if (backToState != currentState)
{
totals[currentState] += now - lastTick;
lastTick = now;
currentState = backToState;
}
}

public:
TimeDivisionTracker(const char *_name, unsigned reportIntervalSeconds) : name(_name)
{
if (reportIntervalSeconds)
reportIntervalCycles = millisec_to_cycle(reportIntervalSeconds * 1000);
}

void report(bool reset)
{
VStringBuffer str("%s spent ", name.str());
auto now = get_cycles_now();
totals[currentState] += now - lastTick;
lastTick = now;
lastReport = now;
bool doneOne = false;
for (unsigned i = reportOther ? 0 : 1; i < NUMSTATES; i++)
{
if (counts[i])
{
if (doneOne)
str.append(", ");
formatTime(str, cycle_to_nanosec(totals[i]));
str.appendf(" %s (%u times)", stateNames[i], counts[i]);
doneOne = true;
}
if (reset)
{
totals[i] = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also reset counts[i]?

counts[i] = 0;
}
}
if (doneOne)
DBGLOG("%s", str.str());
}

class TimeDivision
{
unsigned prevState = 0;
TimeDivisionTracker &t;
public:
TimeDivision(TimeDivisionTracker &_t, unsigned newState) : t(_t)
{
prevState = t.enterState(newState);
}
~TimeDivision()
{
t.leaveState(prevState);
}
void switchState(unsigned newState)
{
t.enterState(newState);
}
};
};


#endif
2 changes: 1 addition & 1 deletion system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ const static unsigned __int64 oneMinute = I64C(60000000000);
const static unsigned __int64 oneHour = I64C(3600000000000);
const static unsigned __int64 oneDay = 24 * I64C(3600000000000);

static void formatTime(StringBuffer & out, unsigned __int64 value)
void formatTime(StringBuffer & out, unsigned __int64 value)
{
//Aim to display at least 3 significant digits in the result string
if (value < oneMicroSecond)
Expand Down
2 changes: 2 additions & 0 deletions system/jlib/jstats.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ inline constexpr stat_type statPercentageOf(stat_type value, stat_type per) { re
inline StatisticKind queryStatsVariant(StatisticKind kind) { return (StatisticKind)(kind & ~StKindMask); }
inline cost_type money2cost_type(double money) { return money * 1E6; }
inline double cost_type2money(cost_type cost) { return ((double) cost) / 1E6; }

extern jlib_decl void formatTime(StringBuffer & out, unsigned __int64 value);
//---------------------------------------------------------------------------------------------------------------------

//Represents a single level of a scope
Expand Down