Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chapman <[email protected]>
  • Loading branch information
richardkchapman committed Jul 17, 2024
1 parent 7ff8447 commit fb5d9fd
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 3 deletions.
76 changes: 74 additions & 2 deletions roxie/udplib/udptrr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1256,6 +1256,74 @@ class CReceiveManager : implements IReceiveManager, public CInterface

};

template<unsigned NUMSTATES> class TimeDivisionTracker
{
protected:
unsigned __int64 totals[NUMSTATES] = {0};
const char *stateNames[NUMSTATES];
unsigned __int64 lastTick = nsTick();
unsigned currentState = 0;
StringAttr name;

unsigned switchState(unsigned newState)
{
unsigned prevState = currentState;
if (newState != currentState)
{
unsigned __int64 now = nsTick();
totals[currentState] += now - lastTick;
lastTick = now;
currentState = newState;
}
return prevState;
}

public:
TimeDivisionTracker(const char *_name) : name(_name)
{
}

void report(bool reset)
{
VStringBuffer str("%s spent ", name.str());
for (unsigned i = 0; i < NUMSTATES; i++)
{
if (i)
str.append(", ");
str.appendf("%.3fs in state %s", nanoToMilli(totals[i])/1000.0, stateNames[i]);
if (reset)
totals[i] = 0;
}
DBGLOG("%s", str.str());
if (reset)
lastTick = nsTick();
}

class TimeDivision
{
unsigned prevState = 0;
TimeDivisionTracker &t;
public:
TimeDivision(TimeDivisionTracker &_t, unsigned newState) : t(_t)
{
prevState = t.switchState(newState);
}
~TimeDivision()
{
t.switchState(prevState);
}
};
};
class UdpRdTracker : public TimeDivisionTracker<2>
{
public:
UdpRdTracker() : TimeDivisionTracker<2>("udptrr")
{
stateNames[0] = "processing";
stateNames[1] = "waiting";
}
};

class receive_data : public Thread
{
CReceiveManager &parent;
Expand Down Expand Up @@ -1329,6 +1397,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
unsigned lastPacketsOOO = 0;
unsigned lastUnwantedDiscarded = 0;
unsigned timeout = 5000;
UdpRdTracker timeTracker;
roxiemem::IDataBufferManager * udpBufferManager = bufferManager;
DataBuffer *b = udpBufferManager->allocate();
while (running)
Expand All @@ -1342,8 +1411,10 @@ class CReceiveManager : implements IReceiveManager, public CInterface
//Read at least the size of the smallest packet we can receive
//static assert to check we are reading the smaller of the two possible packet types
static_assert(sizeof(UdpRequestToSendMsg) <= sizeof(UdpPacketHeader));
receive_socket->readtms(b->data, sizeof(UdpRequestToSendMsg), DATA_PAYLOAD, res, timeout);

{
UdpRdTracker::TimeDivision d(timeTracker, 1);
receive_socket->readtms(b->data, sizeof(UdpRequestToSendMsg), DATA_PAYLOAD, res, timeout);
}
//Even if a UDP packet is not split, very occasionally only some of the data may be present for the read.
//Slightly horribly this packet could be one of two different formats(!)
// a UdpRequestToSendMsg, which has a 2 byte command at the start of the header, with a maximum value of max_flow_cmd
Expand Down Expand Up @@ -1399,6 +1470,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
if (now-lastOOOReport > udpStatsReportInterval)
{
lastOOOReport = now;
timeTracker.report(true);
if (unwantedDiscarded > lastUnwantedDiscarded)
{
DBGLOG("%u more unwanted packets discarded by this server (%u total)", unwantedDiscarded - lastUnwantedDiscarded, unwantedDiscarded-0);
Expand Down
2 changes: 1 addition & 1 deletion system/logaccess/Grafana/CurlClient/GrafanaCurlClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -651,7 +651,7 @@ bool GrafanaLogAccessCurlClient::fetchLog(LogQueryResultDetails & resultDetails,
encodeURL(fullQuery, logLineParser.str());

fullQuery.appendf("&start=%s000000000", std::to_string(trange.getStartt().getSimple()).c_str());
if (trange.getEndt().isNull() != -1) //aka 'to' has been initialized
if (!trange.getEndt().isNull()) //aka 'to' has been initialized
{
fullQuery.appendf("&end=%s000000000", std::to_string(trange.getEndt().getSimple()).c_str());
}
Expand Down

0 comments on commit fb5d9fd

Please sign in to comment.