diff --git a/roxie/udplib/udptrr.cpp b/roxie/udplib/udptrr.cpp index e4598a84974..8dec8182b4f 100644 --- a/roxie/udplib/udptrr.cpp +++ b/roxie/udplib/udptrr.cpp @@ -1303,7 +1303,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface Thread::start(); started.wait(); } - + ~receive_data() { DBGLOG("Total data packets seen = %u OOO(%u) Requests(%u) Permits(%u)", dataPacketsReceived.load(), packetsOOO.load(), flowRequestsReceived.load(), flowRequestsSent.load()); @@ -1336,11 +1336,37 @@ class CReceiveManager : implements IReceiveManager, public CInterface try { unsigned int res; + UdpPacketHeader &hdr = *(UdpPacketHeader *) b->data; while (true) { - receive_socket->readtms(b->data, 1, DATA_PAYLOAD, res, timeout); - if (res!=sizeof(UdpRequestToSendMsg)) - break; + //Read at least the size of the smallest packet we can receive + //static assert to check we are reading the smaller of the two possible packet types + static_assert(sizeof(UdpRequestToSendMsg) <= sizeof(UdpPacketHeader)); + receive_socket->readtms(b->data, sizeof(UdpRequestToSendMsg), DATA_PAYLOAD, res, timeout); + + //Even if a UDP packet is not split, very occasionally only some of the data may be present for the read. + //Slightly horribly this packet could be one of two different formats(!) + // a UdpRequestToSendMsg, which has a 2 byte command at the start of the header, with a maximum value of max_flow_cmd + // a UdpPacketHeader which has a 2 byte length. This length must be > sizeof(UdpPacketHeader). + //Since max_flow_cmd < sizeof(UdpPacketHeader) this can be used to distinguish a true data packet(!) + static_assert(flowType::max_flow_cmd < sizeof(UdpPacketHeader)); // assert to check the above comment is correct + + if (hdr.length >= sizeof(UdpPacketHeader)) + { + if (res == hdr.length) + break; + + //Very rare situation - log it so that there is some evidence that it is occurring + OWARNLOG("Received partial network packet - %u bytes out of %u received", res, hdr.length); + + //Because we are reading UDP datgrams rather than tcp packets, if we failed to read the whole datagram + //the rest of the datgram is lost - you cannot call readtms to read the rest of the datagram. + //Therefore throw this incomplete datagram away and allow the resend mechanism to retransmit it. + continue; + } + + //Sanity check + assertex(res == sizeof(UdpRequestToSendMsg)); //Sending flow packets (eg send_completed) to the data thread ensures they do not overtake the data //Redirect them to the flow thread to process them. @@ -1348,8 +1374,6 @@ class CReceiveManager : implements IReceiveManager, public CInterface } dataPacketsReceived++; - UdpPacketHeader &hdr = *(UdpPacketHeader *) b->data; - assert(hdr.length == res && hdr.length > sizeof(hdr)); UdpSenderEntry *sender = &parent.sendersTable[hdr.node]; if (sender->noteSeen(hdr)) { diff --git a/roxie/udplib/udptrs.cpp b/roxie/udplib/udptrs.cpp index 5ed9996118f..96912bc604f 100644 --- a/roxie/udplib/udptrs.cpp +++ b/roxie/udplib/udptrs.cpp @@ -573,7 +573,7 @@ class UdpReceiverEntry : public IUdpReceiverEntry aesEncrypt(udpkey.get(), udpkey.length(), data, length, encryptBuffer); header->length = encryptBuffer.length(); encryptBuffer.writeDirect(0, sizeof(UdpPacketHeader), header); // Only really need length updating - assert(length <= DATA_PAYLOAD); + assertex(encryptBuffer.length() <= DATA_PAYLOAD); data_socket->write(encryptBuffer.toByteArray(), encryptBuffer.length()); } else