Skip to content

Commit

Permalink
Merge pull request #18226 from ghalliday/issuer31131
Browse files Browse the repository at this point in the history
HPCC-31131 Allow roxie to recover from partially read datagrams

Reviewed-by: Mark Kelly [email protected]
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Jan 16, 2024
2 parents 53d72d2 + acc5f54 commit 71d4b75
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 7 deletions.
36 changes: 30 additions & 6 deletions roxie/udplib/udptrr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1303,7 +1303,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
Thread::start();
started.wait();
}

~receive_data()
{
DBGLOG("Total data packets seen = %u OOO(%u) Requests(%u) Permits(%u)", dataPacketsReceived.load(), packetsOOO.load(), flowRequestsReceived.load(), flowRequestsSent.load());
Expand Down Expand Up @@ -1336,20 +1336,44 @@ class CReceiveManager : implements IReceiveManager, public CInterface
try
{
unsigned int res;
UdpPacketHeader &hdr = *(UdpPacketHeader *) b->data;
while (true)
{
receive_socket->readtms(b->data, 1, DATA_PAYLOAD, res, timeout);
if (res!=sizeof(UdpRequestToSendMsg))
break;
//Read at least the size of the smallest packet we can receive
//static assert to check we are reading the smaller of the two possible packet types
static_assert(sizeof(UdpRequestToSendMsg) <= sizeof(UdpPacketHeader));
receive_socket->readtms(b->data, sizeof(UdpRequestToSendMsg), DATA_PAYLOAD, res, timeout);

//Even if a UDP packet is not split, very occasionally only some of the data may be present for the read.
//Slightly horribly this packet could be one of two different formats(!)
// a UdpRequestToSendMsg, which has a 2 byte command at the start of the header, with a maximum value of max_flow_cmd
// a UdpPacketHeader which has a 2 byte length. This length must be > sizeof(UdpPacketHeader).
//Since max_flow_cmd < sizeof(UdpPacketHeader) this can be used to distinguish a true data packet(!)
static_assert(flowType::max_flow_cmd < sizeof(UdpPacketHeader)); // assert to check the above comment is correct

if (hdr.length >= sizeof(UdpPacketHeader))
{
if (res == hdr.length)
break;

//Very rare situation - log it so that there is some evidence that it is occurring
OWARNLOG("Received partial network packet - %u bytes out of %u received", res, hdr.length);

//Because we are reading UDP datgrams rather than tcp packets, if we failed to read the whole datagram
//the rest of the datgram is lost - you cannot call readtms to read the rest of the datagram.
//Therefore throw this incomplete datagram away and allow the resend mechanism to retransmit it.
continue;
}

//Sanity check
assertex(res == sizeof(UdpRequestToSendMsg));

//Sending flow packets (eg send_completed) to the data thread ensures they do not overtake the data
//Redirect them to the flow thread to process them.
selfFlowSocket->write(b->data, res);
}

dataPacketsReceived++;
UdpPacketHeader &hdr = *(UdpPacketHeader *) b->data;
assert(hdr.length == res && hdr.length > sizeof(hdr));
UdpSenderEntry *sender = &parent.sendersTable[hdr.node];
if (sender->noteSeen(hdr))
{
Expand Down
2 changes: 1 addition & 1 deletion roxie/udplib/udptrs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ class UdpReceiverEntry : public IUdpReceiverEntry
aesEncrypt(udpkey.get(), udpkey.length(), data, length, encryptBuffer);
header->length = encryptBuffer.length();
encryptBuffer.writeDirect(0, sizeof(UdpPacketHeader), header); // Only really need length updating
assert(length <= DATA_PAYLOAD);
assertex(encryptBuffer.length() <= DATA_PAYLOAD);
data_socket->write(encryptBuffer.toByteArray(), encryptBuffer.length());
}
else
Expand Down

0 comments on commit 71d4b75

Please sign in to comment.