Skip to content

Commit

Permalink
Merge pull request #17960 from richardkchapman/roxie-log-reduction
Browse files Browse the repository at this point in the history
HPCC-30629 Clean up Roxie logs

Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Nov 22, 2023
2 parents 637ecf2 + 8e2030a commit c40c2fa
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 165 deletions.
96 changes: 0 additions & 96 deletions roxie/udplib/udpmsgpk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ class PackageSequencer : public CInterface, implements IInterface

PackageSequencer(bool _encrypted) : encrypted(_encrypted)
{
if (checkTraceLevel(TRACE_MSGPACK, 3))
DBGLOG("UdpCollator: PackageSequencer::PackageSequencer this=%p", this);
metaSize = 0;
headerSize = 0;
header = NULL;
Expand All @@ -93,8 +91,6 @@ class PackageSequencer : public CInterface, implements IInterface

~PackageSequencer()
{
if (checkTraceLevel(TRACE_MSGPACK, 3))
DBGLOG("UdpCollator: PackageSequencer::~PackageSequencer this=%p", this);
DataBuffer *finger = firstPacket;
while (finger)
{
Expand All @@ -112,18 +108,6 @@ class PackageSequencer : public CInterface, implements IInterface
ret = after->msgNext;
else
ret = firstPacket;
if (checkTraceLevel(TRACE_MSGPACK, 5))
{
if (ret)
{
StringBuffer s;
UdpPacketHeader *pktHdr = (UdpPacketHeader*) ret->data;
DBGLOG("UdpCollator: PackageSequencer::next returns ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X node=%s dataBuffer=%p this=%p",
pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->node.getTraceText(s).str(), ret, this);
}
else
DBGLOG("UdpCollator: PackageSequencer::next returns NULL this=%p", this);
}
return ret;
}

Expand All @@ -142,13 +126,6 @@ class PackageSequencer : public CInterface, implements IInterface
resends++;
}

if (checkTraceLevel(TRACE_MSGPACK, 5))
{
StringBuffer s;
DBGLOG("UdpCollator: PackageSequencer::insert ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X sendSeq=%" SEQF "u node=%s dataBuffer=%p this=%p",
pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->sendSeq, pktHdr->node.getTraceText(s).str(), dataBuff, this);
}

// Optimize the (very) common case where I need to add to the end
DataBuffer *finger;
DataBuffer *prev;
Expand All @@ -168,8 +145,6 @@ class PackageSequencer : public CInterface, implements IInterface
if (pktHdr->pktSeq <= oldHdr->pktSeq)
{
// discard duplicated incoming packet - should be uncommon unless we requested a resend for a packet already in flight
if (checkTraceLevel(TRACE_MSGPACK, 5))
DBGLOG("UdpCollator: Discarding duplicate incoming packet %u (we have all up to %u)", pktHdr->pktSeq, oldHdr->pktSeq);
dataBuff->Release();
duplicates++;
return false;
Expand All @@ -184,28 +159,10 @@ class PackageSequencer : public CInterface, implements IInterface
}
while (finger)
{
#ifdef _DEBUG
scans++;
if (scans==1000000)
{
overscans++;
DBGLOG("%u million scans in UdpCollator insert(DataBuffer *dataBuff", overscans);
if (lastContiguousPacket)
{
UdpPacketHeader *oldHdr = (UdpPacketHeader*) lastContiguousPacket->data;
DBGLOG("lastContiguousPacket is at %u , last packet seen is %u", oldHdr->pktSeq & UDP_PACKET_SEQUENCE_MASK, pktHdr->pktSeq & UDP_PACKET_SEQUENCE_MASK);
}
else
DBGLOG("lastContiguousPacket is NULL , last packet seen is %u", pktHdr->pktSeq & UDP_PACKET_SEQUENCE_MASK);
scans = 0;
}
#endif
UdpPacketHeader *oldHdr = (UdpPacketHeader*) finger->data;
if (pktHdr->pktSeq == oldHdr->pktSeq)
{
// discard duplicated incoming packet - should be uncommon unless we requested a resend for a packet already in flight
if (checkTraceLevel(TRACE_MSGPACK, 5))
DBGLOG("UdpCollator: Discarding duplicate incoming packet %u", pktHdr->pktSeq);
dataBuff->Release();
return false;
}
Expand Down Expand Up @@ -243,8 +200,6 @@ class PackageSequencer : public CInterface, implements IInterface
// might need to rethink this
const MemoryAttr &udpkey = getSecretUdpKey(true);
size_t decryptedSize = aesDecrypt(udpkey.get(), udpkey.length(), pktHdr+1, pktHdr->length-sizeof(UdpPacketHeader), pktHdr+1, DATA_PAYLOAD-sizeof(UdpPacketHeader));
if (checkTraceLevel(TRACE_MSGPACK, 5))
DBGLOG("Decrypted %u bytes at %p resulting in %u bytes", (unsigned) (pktHdr->length-sizeof(UdpPacketHeader)), pktHdr+1, (unsigned) decryptedSize);
pktHdr->length = decryptedSize + sizeof(UdpPacketHeader);
}

Expand Down Expand Up @@ -318,16 +273,6 @@ class PackageSequencer : public CInterface, implements IInterface
return headerSize;
}

void dump()
{
DBGLOG("Contains %u packets, lastSeq = %u", numPackets, maxSeqSeen);
if (lastContiguousPacket)
{
UdpPacketHeader *hdr = (UdpPacketHeader*) lastContiguousPacket->data;
DBGLOG("lastContiguousPacket is %u %" SEQF "u", hdr->pktSeq, hdr->sendSeq);
}
}

};

// MessageResult ====================================================================================
Expand All @@ -344,8 +289,6 @@ class CMessageUnpackCursor: implements IMessageUnpackCursor, public CInterface

CMessageUnpackCursor(PackageSequencer *_pkSqncr, IRowManager *_rowMgr) : rowMgr(_rowMgr)
{
if (checkTraceLevel(TRACE_MSGPACK, 3))
DBGLOG("UdpCollator: CMessageUnpackCursor::CMessageUnpackCursor this=%p", this);
pkSequencer = _pkSqncr;
dataBuff = pkSequencer->next(NULL);
UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
Expand All @@ -367,8 +310,6 @@ class CMessageUnpackCursor: implements IMessageUnpackCursor, public CInterface

~CMessageUnpackCursor()
{
if (checkTraceLevel(TRACE_MSGPACK, 3))
DBGLOG("UdpCollator: CMessageUnpackCursor::~CMessageUnpackCursor this=%p", this);
pkSequencer->Release();
}

Expand All @@ -389,14 +330,6 @@ class CMessageUnpackCursor: implements IMessageUnpackCursor, public CInterface
if (dataBuff)
{
UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
if (checkTraceLevel(TRACE_MSGPACK, 4))
{
StringBuffer s;
DBGLOG("UdpCollator: CMessageUnpackCursor::getNext(%u) pos=%u pktLength=%u metaLen=%u ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X node=%s dataBuff=%p this=%p",
length, current_pos, pktHdr->length, pktHdr->metalength,
pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq,
pktHdr->node.getTraceText(s).str(), dataBuff, this);
}
unsigned packetDataLimit = pktHdr->length - pktHdr->metalength;
if ((packetDataLimit - current_pos) >= (unsigned) length)
{
Expand Down Expand Up @@ -493,15 +426,11 @@ class CMessageResult : public IMessageResult, CInterface {

CMessageResult(PackageSequencer *_pkSqncr)
{
if (checkTraceLevel(TRACE_MSGPACK, 3))
DBGLOG("UdpCollator: CMessageResult::CMessageResult pkSqncr=%p, this=%p", _pkSqncr, this);
pkSequencer = _pkSqncr;
}

~CMessageResult()
{
if (checkTraceLevel(TRACE_MSGPACK, 3))
DBGLOG("UdpCollator: CMessageResult::~CMessageResult this=%p", this);
pkSequencer->Release();
}

Expand All @@ -523,8 +452,6 @@ class CMessageResult : public IMessageResult, CInterface {

virtual void discard() const
{
if (checkTraceLevel(TRACE_MSGPACK, 2))
DBGLOG("UdpCollator: CMessageResult - Roxie server discarded a packet");
unwantedDiscarded++;
}

Expand All @@ -545,17 +472,13 @@ PUID GETPUID(DataBuffer *dataBuff)

CMessageCollator::CMessageCollator(IRowManager *_rowMgr, unsigned _ruid, bool _encrypted) : rowMgr(_rowMgr), ruid(_ruid), encrypted(_encrypted)
{
if (checkTraceLevel(TRACE_MSGPACK, 3))
DBGLOG("UdpCollator: CMessageCollator::CMessageCollator rowMgr=%p this=%p ruid=" RUIDF "", _rowMgr, this, ruid);
memLimitExceeded = false;
activity = false;
totalBytesReceived = 0;
}

CMessageCollator::~CMessageCollator()
{
if (checkTraceLevel(TRACE_MSGPACK, 3))
DBGLOG("UdpCollator: CMessageCollator::~CMessageCollator ruid=" RUIDF ", this=%p", ruid, this);
while (!queue.empty())
{
PackageSequencer *pkSqncr = queue.front();
Expand Down Expand Up @@ -671,9 +594,6 @@ void CMessageCollator::collate(DataBuffer *dataBuff)

IMessageResult *CMessageCollator::getNextResult(unsigned time_out, bool &anyActivity)
{
if (checkTraceLevel(TRACE_MSGPACK, 3))
DBGLOG("UdpCollator: CMessageCollator::getNextResult() timeout=%.8X ruid=%u rowMgr=%p this=%p", time_out, ruid, (void*) rowMgr, this);

if (memLimitExceeded)
{
DBGLOG("UdpCollator: CMessageCollator::getNextResult() throwing memory limit exceeded exception - rowMgr=%p this=%p", (void*) rowMgr, this);
Expand All @@ -696,22 +616,6 @@ IMessageResult *CMessageCollator::getNextResult(unsigned time_out, bool &anyActi
}
anyActivity = activity;
activity = false;
if (!anyActivity && ruid>=RUID_FIRST && checkTraceLevel(TRACE_MSGPACK, 2)) // suppress the tracing for pings where we expect the timeout...
{
#ifdef _DEBUG
DBGLOG("GetNextResult timeout: mapping has %d partial results", mapping.ordinality());
HashIterator h(mapping);
ForEach(h)
{
auto *r = mapping.mapToValue(&h.query());
PUID puid = *(PUID *) h.query().getKey();
DBGLOG("puid=%" I64F "x:", puid);
PackageSequencer *pkSqncr = mapping.getValue(puid);
pkSqncr->dump();
}
#endif
DBGLOG("UdpCollator: CMessageCollator::GetNextResult timeout");
}
return 0;
}

Expand Down
19 changes: 0 additions & 19 deletions roxie/udplib/udpsha.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,6 @@ void queue_t::doEnqueue(DataBuffer *buf)
}
tail = buf;
count.fastAdd(1); // inside a critical section, so no need for atomic inc.
#ifdef _DEBUG
if (count > limit)
DBGLOG("queue_t::pushOwn set count to %u", count.load());
#endif
}

void queue_t::pushOwnWait(DataBuffer * buf)
Expand Down Expand Up @@ -376,11 +372,6 @@ bool PacketTracker::noteSeen(UdpPacketHeader &hdr)
// Be careful to think about wrapping. Less than and higher can't really be distinguished, but we treat resent differently from original
bool duplicate = false;
unsigned delta = seq - base;
if (udpTraceLevel > 5)
{
DBGLOG("PacketTracker::noteSeen %" SEQF "u: delta %d", hdr.sendSeq, delta);
dump();
}
if (delta < TRACKER_BITS)
{
unsigned idx = (seq / 64) % TRACKER_DWORDS;
Expand Down Expand Up @@ -447,11 +438,6 @@ bool PacketTracker::hasSeen(sequence_t seq) const
// Accessed only on sender side where these are not modified, so no need for locking
// Careful about wrapping!
unsigned delta = seq - base;
if (udpTraceLevel > 5)
{
DBGLOG("PacketTracker::hasSeen - have I seen %" SEQF "u, %d", seq, delta);
dump();
}
if (delta < TRACKER_BITS)
{
unsigned idx = (seq / 64) % TRACKER_DWORDS;
Expand All @@ -468,11 +454,6 @@ bool PacketTracker::canRecord(sequence_t seq) const
{
// Careful about wrapping!
unsigned delta = seq - base;
if (udpTraceLevel > 5)
{
DBGLOG("PacketTracker::hasSeen - can I record %" SEQF "u, %d", seq, delta);
dump();
}
return (delta < TRACKER_BITS);
}

Expand Down
8 changes: 0 additions & 8 deletions roxie/udplib/udpsha.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,6 @@ int check_max_socket_write_buffer(int size);
int check_set_max_socket_read_buffer(int size);
int check_set_max_socket_write_buffer(int size);

#define TRACE_RETRY_DATA 0x08
#define TRACE_MSGPACK 0x10

inline bool checkTraceLevel(unsigned category, unsigned level)
{
return (udpTraceLevel >= level);
}

extern UDPLIB_API void sanityCheckUdpSettings(unsigned receiveQueueSize, unsigned sendQueueSize, unsigned numSenders, __uint64 networkSpeedBitsPerSecond);


Expand Down
29 changes: 6 additions & 23 deletions roxie/udplib/udptrr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
msg.seen = packetsSeen.copy();
}

if (udpTraceLevel > 3 || udpTraceFlow)
if (udpTraceFlow)
{
StringBuffer ipStr;
DBGLOG("UdpReceiver: sending request_received msg seq %" SEQF "u to node=%s", _flowSeq, dest.getHostText(ipStr).str());
Expand Down Expand Up @@ -707,7 +707,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
CriticalBlock b(psCrit);
msg.seen = packetsSeen.copy();
}
if (udpTraceLevel > 3 || udpTraceFlow)
if (udpTraceFlow)
{
StringBuffer ipStr;
DBGLOG("UdpReceiver: sending ok_to_send %u msg seq %" SEQF "u to node=%s", maxTransfer, flowSeq, dest.getHostText(ipStr).str());
Expand Down Expand Up @@ -1003,7 +1003,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
void doFlowRequest(const UdpRequestToSendMsg &msg)
{
flowRequestsReceived++;
if (udpTraceLevel > 5 || udpTraceFlow)
if (udpTraceFlow)
{
StringBuffer ipStr;
DBGLOG("UdpReceiver: received %s msg flowSeq %" SEQF "u sendSeq %" SEQF "u from node=%s", flowType::name(msg.cmd), msg.flowSeq, msg.sendSeq, msg.sourceNode.getTraceText(ipStr).str());
Expand Down Expand Up @@ -1054,7 +1054,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
if (elapsed >= udpPermitTimeout)
{
UdpSenderEntry * sender = finger->owner;
if (udpTraceLevel || udpTraceFlow || udpTraceTimeouts)
if (udpTraceFlow || udpTraceTimeouts)
{
StringBuffer s;
DBGLOG("permit %" SEQF "u to node %s (%u packets) timed out after %u ms, rescheduling", sender->flowSeq, sender->dest.getHostText(s).str(), sender->getTotalReserved(), elapsed);
Expand All @@ -1066,7 +1066,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface

if (++sender->timeouts > udpMaxPermitDeadTimeouts && udpMaxPermitDeadTimeouts != 0)
{
if (udpTraceLevel || udpTraceFlow || udpTraceTimeouts)
if (udpTraceFlow || udpTraceTimeouts)
{
StringBuffer s;
DBGLOG("permit to send %" SEQF "u to node %s timed out %u times - abandoning", sender->flowSeq, sender->dest.getHostText(s).str(), sender->timeouts);
Expand Down Expand Up @@ -1206,7 +1206,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
{
try
{
if (udpTraceLevel > 5 || udpTraceFlow)
if (udpTraceFlow)
{
DBGLOG("UdpReceiver: wait_read(%u)", timeout);
}
Expand Down Expand Up @@ -1353,11 +1353,6 @@ class CReceiveManager : implements IReceiveManager, public CInterface
UdpSenderEntry *sender = &parent.sendersTable[hdr.node];
if (sender->noteSeen(hdr))
{
if (udpTraceLevel > 5) // don't want to interrupt this thread if we can help it
{
StringBuffer s;
DBGLOG("UdpReceiver: discarding unwanted resent packet %" SEQF "u %x from %s", hdr.sendSeq, hdr.pktSeq, hdr.node.getTraceText(s).str());
}
// We should perhaps track how often this happens, but it's not the same as unwantedDiscarded
hdr.node.clear(); // Used to indicate a duplicate that collate thread should discard. We don't discard on this thread as don't want to do anything that requires locks...
}
Expand Down Expand Up @@ -1509,8 +1504,6 @@ class CReceiveManager : implements IReceiveManager, public CInterface
virtual void detachCollator(const IMessageCollator *msgColl)
{
ruid_t ruid = msgColl->queryRUID();
if (udpTraceLevel >= 2)
DBGLOG("UdpReceiver: detach %p %u", msgColl, ruid);
{
CriticalBlock b(collatorsLock);
collators.erase(ruid);
Expand Down Expand Up @@ -1541,14 +1534,6 @@ class CReceiveManager : implements IReceiveManager, public CInterface
void collatePacket(DataBuffer *dataBuff)
{
const UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;

if (udpTraceLevel >= 4)
{
StringBuffer s;
DBGLOG("UdpReceiver: CPacketCollator - unQed packet - ruid=" RUIDF " id=0x%.8X mseq=%u pkseq=0x%.8X len=%d node=%s",
pktHdr->ruid, pktHdr->msgId, pktHdr->msgSeq, pktHdr->pktSeq, pktHdr->length, pktHdr->node.getTraceText(s).str());
}

Linked <CMessageCollator> msgColl;
bool isDefault = false;
{
Expand Down Expand Up @@ -1591,8 +1576,6 @@ class CReceiveManager : implements IReceiveManager, public CInterface
virtual IMessageCollator *createMessageCollator(IRowManager *rowManager, ruid_t ruid)
{
CMessageCollator *msgColl = new CMessageCollator(rowManager, ruid, encrypted);
if (udpTraceLevel > 2)
DBGLOG("UdpReceiver: createMessageCollator %p %u", msgColl, ruid);
{
CriticalBlock b(collatorsLock);
collators[ruid] = msgColl;
Expand Down
Loading

0 comments on commit c40c2fa

Please sign in to comment.