Skip to content

Commit

Permalink
Improve received RTPS packet header checking and ACKNACK sanity check…
Browse files Browse the repository at this point in the history
…ing.
  • Loading branch information
jhelovuo committed Aug 2, 2024
1 parent 7b9a38a commit b26b697
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 3 deletions.
20 changes: 20 additions & 0 deletions src/rtps/message_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,26 @@ impl MessageReceiver {
return;
}

// Check that the 4-byte magic string matches "RTPS"
if msg_bytes.len() >= 4 {
let magic = &msg_bytes[0..4];
if *magic == b"RTPS"[..] {
// go ahead and try to decode message
} else if *magic == b"RTPX"[..] {
// RTI Connext sends also packets with magic RTPX in the header.
// We do not know are these really same as RTPS or different, so let's
// ignore those.
info!("Received message with RTPX header. Ignoring.");
return;
} else {
warn!(
"Received message with unknown start of header {:x?}. Ignoring.",
magic
);
return;
}
}

// call Speedy reader
// Bytes .clone() is cheap, so no worries
let rtps_message = match Message::read_from_buffer(msg_bytes) {
Expand Down
20 changes: 18 additions & 2 deletions src/rtps/rtps_reader_proxy.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::collections::{BTreeMap, BTreeSet};
use std::{
cmp::max,
collections::{BTreeMap, BTreeSet},
};

use bit_vec::BitVec;
#[allow(unused_imports)]
Expand Down Expand Up @@ -229,7 +232,20 @@ impl RtpsReaderProxy {
) {
match ack_submessage {
AckSubmessage::AckNack(acknack) => {
let new_all_acked_before = acknack.reader_sn_state.base();
// Eliminate case that base = 0
let new_all_acked_before = max(acknack.reader_sn_state.base(), SequenceNumber::from(1));
// Sending acknack with sn_state base = 0 should not happen.
// This is not allowed by SequenceNumberSet
// validity rules (RTPS Spec v2.5 "8.3.5.5 SequenceNumberSet")
//
// The correct way to acknowledge that nothing has been received is to
// send ACKNACK with reader_sn_state.base = 1 and empty set contents.
// This means everything before 1 has been received, but since
// sequence numbering starts at 1 by definition
// (in Section 8.3.5.4 SequenceNumber), it means "nothing"
//
// This is logged in `writer` object.

// sanity check:
if new_all_acked_before < self.all_acked_before {
error!(
Expand Down
20 changes: 19 additions & 1 deletion src/rtps/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -990,8 +990,26 @@ impl Writer {
Some(0) => warn!("Request for SN zero! : {:?}", an),
Some(_) | None => (), // ok
}
let my_topic = self.my_topic_name.clone(); // for debugging

let reader_guid = GUID::new(reader_guid_prefix, an.reader_id);

// sanity check
if an.reader_sn_state.base() < SequenceNumber::from(1) {
// This check is based on RTPS v2.5 Spec
// Section "8.3.5.5 SequenceNumberSet" and
// Section "8.3.8.1.3 Validity".
// But apparently some RTPS implementations send ACKNACK with
// reader_sn_state.base = 0 to indicate they have matched the writer,
// so seeing these once per new writer should be ok.
info!(
"ACKNACK SequenceNumberSet minimum must be >= 1, got {:?} from {:?} topic {:?}",
an.reader_sn_state.base(),
reader_guid,
self.topic_name()
);
}

let my_topic = self.my_topic_name.clone(); // for debugging
self.update_ack_waiters(reader_guid, Some(an.reader_sn_state.base()));

if let Some(reader_proxy) = self.lookup_reader_proxy_mut(reader_guid) {
Expand Down

0 comments on commit b26b697

Please sign in to comment.