Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chapman <[email protected]>
  • Loading branch information
richardkchapman committed Jul 17, 2024
1 parent 7ff8447 commit 3776c04
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 5 deletions.
93 changes: 89 additions & 4 deletions roxie/udplib/udptrr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1256,6 +1256,85 @@ class CReceiveManager : implements IReceiveManager, public CInterface

};

template<unsigned NUMSTATES> class TimeDivisionTracker
{
protected:
unsigned __int64 totals[NUMSTATES] = {0};
unsigned counts[NUMSTATES] = {0};
const char *stateNames[NUMSTATES];
unsigned __int64 lastTick = nsTick();
unsigned currentState = 0;
StringAttr name;

unsigned switchState(unsigned newState)
{
unsigned prevState = currentState;
if (newState != currentState)
{
unsigned __int64 now = nsTick();
totals[currentState] += now - lastTick;
counts[currentState]++;
lastTick = now;
currentState = newState;
}
return prevState;
}

public:
TimeDivisionTracker(const char *_name) : name(_name)
{
}

void report(bool reset)
{
VStringBuffer str("%s spent ", name.str());
for (unsigned i = 0; i < NUMSTATES; i++)
{
if (i)
str.append(", ");
str.appendf("%.3fs in state %s (%u times)", nanoToMilli(totals[i])/1000.0, stateNames[i], counts[i]);
if (reset)
totals[i] = 0;
}
DBGLOG("%s", str.str());
if (reset)
lastTick = nsTick();
}

class TimeDivision
{
unsigned prevState = 0;
TimeDivisionTracker &t;
public:
TimeDivision(TimeDivisionTracker &_t, unsigned newState) : t(_t)
{
prevState = t.switchState(newState);
}
~TimeDivision()
{
t.switchState(prevState);
}
};
};
class UdpRdTracker : public TimeDivisionTracker<3>
{
public:
enum
{
processing,
waiting,
allocating
};

UdpRdTracker() : TimeDivisionTracker<3>("udptrr")
{
stateNames[processing] = "processing";
stateNames[waiting] = "waiting";
stateNames[allocating] = "allocating";
}

};

class receive_data : public Thread
{
CReceiveManager &parent;
Expand Down Expand Up @@ -1329,6 +1408,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
unsigned lastPacketsOOO = 0;
unsigned lastUnwantedDiscarded = 0;
unsigned timeout = 5000;
UdpRdTracker timeTracker;
roxiemem::IDataBufferManager * udpBufferManager = bufferManager;
DataBuffer *b = udpBufferManager->allocate();
while (running)
Expand All @@ -1342,8 +1422,10 @@ class CReceiveManager : implements IReceiveManager, public CInterface
//Read at least the size of the smallest packet we can receive
//static assert to check we are reading the smaller of the two possible packet types
static_assert(sizeof(UdpRequestToSendMsg) <= sizeof(UdpPacketHeader));
receive_socket->readtms(b->data, sizeof(UdpRequestToSendMsg), DATA_PAYLOAD, res, timeout);

{
UdpRdTracker::TimeDivision d(timeTracker, UdpRdTracker::waiting);
receive_socket->readtms(b->data, sizeof(UdpRequestToSendMsg), DATA_PAYLOAD, res, timeout);
}
//Even if a UDP packet is not split, very occasionally only some of the data may be present for the read.
//Slightly horribly this packet could be one of two different formats(!)
// a UdpRequestToSendMsg, which has a 2 byte command at the start of the header, with a maximum value of max_flow_cmd
Expand Down Expand Up @@ -1391,14 +1473,17 @@ class CReceiveManager : implements IReceiveManager, public CInterface
}
}
parent.input_queue->pushOwn(b);
b = udpBufferManager->allocate();

{
UdpRdTracker::TimeDivision d(timeTracker, UdpRdTracker::allocating);
b = udpBufferManager->allocate();
}
if (udpStatsReportInterval)
{
unsigned now = msTick();
if (now-lastOOOReport > udpStatsReportInterval)
{
lastOOOReport = now;
timeTracker.report(true);
if (unwantedDiscarded > lastUnwantedDiscarded)
{
DBGLOG("%u more unwanted packets discarded by this server (%u total)", unwantedDiscarded - lastUnwantedDiscarded, unwantedDiscarded-0);
Expand Down
2 changes: 1 addition & 1 deletion system/logaccess/Grafana/CurlClient/GrafanaCurlClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ bool GrafanaLogAccessCurlClient::fetchLog(LogQueryResultDetails & resultDetails,
encodeURL(fullQuery, logLineParser.str());

fullQuery.appendf("&start=%s000000000", std::to_string(trange.getStartt().getSimple()).c_str());
if (trange.getEndt().isNull() != -1) //aka 'to' has been initialized
if (!trange.getEndt().isNull()) //aka 'to' has been initialized
{
fullQuery.appendf("&end=%s000000000", std::to_string(trange.getEndt().getSimple()).c_str());
}
Expand Down

0 comments on commit 3776c04

Please sign in to comment.