From 1df5f45c75eada79a557c9a3882ce3140cc70a94 Mon Sep 17 00:00:00 2001 From: Richard Chapman Date: Wed, 17 Jul 2024 10:26:55 +0100 Subject: [PATCH] WIP Signed-off-by: Richard Chapman --- roxie/udplib/udptrr.cpp | 100 +++++++++++++++++- .../Grafana/CurlClient/GrafanaCurlClient.cpp | 2 +- 2 files changed, 97 insertions(+), 5 deletions(-) diff --git a/roxie/udplib/udptrr.cpp b/roxie/udplib/udptrr.cpp index 9a31fc0d287..64f95658f7d 100644 --- a/roxie/udplib/udptrr.cpp +++ b/roxie/udplib/udptrr.cpp @@ -1256,6 +1256,92 @@ class CReceiveManager : implements IReceiveManager, public CInterface }; + template class TimeDivisionTracker + { + protected: + unsigned __int64 totals[NUMSTATES] = {0}; + unsigned counts[NUMSTATES] = {0}; + const char *stateNames[NUMSTATES]; + unsigned __int64 lastTick = get_cycles_now(); + unsigned currentState = 0; + StringAttr name; + + unsigned switchState(unsigned newState) + { + unsigned prevState = currentState; + if (newState != currentState) + { + unsigned __int64 now = get_cycles_now(); + totals[currentState] += now - lastTick; + counts[currentState]++; + lastTick = now; + currentState = newState; + } + return prevState; + } + + public: + TimeDivisionTracker(const char *_name) : name(_name) + { + } + + void report(bool reset) + { + VStringBuffer str("%s spent ", name.str()); + auto now = get_cycles_now(); + totals[currentState] += now - lastTick; + lastTick = now; + for (unsigned i = 0; i < NUMSTATES; i++) + { + if (i) + str.append(", "); + str.appendf("%.3fs in state %s (%u times)", cycle_to_millisec(totals[i])/1000.0, stateNames[i], counts[i]); + if (reset) + totals[i] = 0; + } + DBGLOG("%s", str.str()); + if (reset) + lastTick = now; + } + + class TimeDivision + { + unsigned prevState = 0; + TimeDivisionTracker &t; + public: + TimeDivision(TimeDivisionTracker &_t, unsigned newState) : t(_t) + { + prevState = t.switchState(newState); + } + ~TimeDivision() + { + t.switchState(prevState); + } + void switchState(unsigned newState) + { + t.switchState(newState); + } + }; + }; + class UdpRdTracker : public TimeDivisionTracker<3> + { + public: + enum + { + processing, + waiting, + allocating + }; + + UdpRdTracker() : TimeDivisionTracker<3>("udptrr") + { + stateNames[processing] = "processing"; + stateNames[waiting] = "waiting"; + stateNames[allocating] = "allocating"; + } + + }; + class receive_data : public Thread { CReceiveManager &parent; @@ -1329,6 +1415,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface unsigned lastPacketsOOO = 0; unsigned lastUnwantedDiscarded = 0; unsigned timeout = 5000; + UdpRdTracker timeTracker; roxiemem::IDataBufferManager * udpBufferManager = bufferManager; DataBuffer *b = udpBufferManager->allocate(); while (running) @@ -1342,8 +1429,10 @@ class CReceiveManager : implements IReceiveManager, public CInterface //Read at least the size of the smallest packet we can receive //static assert to check we are reading the smaller of the two possible packet types static_assert(sizeof(UdpRequestToSendMsg) <= sizeof(UdpPacketHeader)); - receive_socket->readtms(b->data, sizeof(UdpRequestToSendMsg), DATA_PAYLOAD, res, timeout); - + { + UdpRdTracker::TimeDivision d(timeTracker, UdpRdTracker::waiting); + receive_socket->readtms(b->data, sizeof(UdpRequestToSendMsg), DATA_PAYLOAD, res, timeout); + } //Even if a UDP packet is not split, very occasionally only some of the data may be present for the read. //Slightly horribly this packet could be one of two different formats(!) // a UdpRequestToSendMsg, which has a 2 byte command at the start of the header, with a maximum value of max_flow_cmd @@ -1391,14 +1480,17 @@ class CReceiveManager : implements IReceiveManager, public CInterface } } parent.input_queue->pushOwn(b); - b = udpBufferManager->allocate(); - + { + UdpRdTracker::TimeDivision d(timeTracker, UdpRdTracker::allocating); + b = udpBufferManager->allocate(); + } if (udpStatsReportInterval) { unsigned now = msTick(); if (now-lastOOOReport > udpStatsReportInterval) { lastOOOReport = now; + timeTracker.report(true); if (unwantedDiscarded > lastUnwantedDiscarded) { DBGLOG("%u more unwanted packets discarded by this server (%u total)", unwantedDiscarded - lastUnwantedDiscarded, unwantedDiscarded-0); diff --git a/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.cpp b/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.cpp index 5ada0237838..355c047e37a 100644 --- a/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.cpp +++ b/system/logaccess/Grafana/CurlClient/GrafanaCurlClient.cpp @@ -651,7 +651,7 @@ bool GrafanaLogAccessCurlClient::fetchLog(LogQueryResultDetails & resultDetails, encodeURL(fullQuery, logLineParser.str()); fullQuery.appendf("&start=%s000000000", std::to_string(trange.getStartt().getSimple()).c_str()); - if (trange.getEndt().isNull() != -1) //aka 'to' has been initialized + if (!trange.getEndt().isNull()) //aka 'to' has been initialized { fullQuery.appendf("&end=%s000000000", std::to_string(trange.getEndt().getSimple()).c_str()); }