Skip to content

Commit

Permalink
Add overhead to decide to use short message or stream (#1062)
Browse files Browse the repository at this point in the history
  • Loading branch information
al8n authored Apr 23, 2024
1 parent 02f873d commit b60a0ec
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 9 deletions.
8 changes: 6 additions & 2 deletions crates/core/src/transport/connection_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1333,19 +1333,22 @@ mod test {
.await
}

/// This one is the maximum size (1324 currently) of a short message from user side
/// by using public send API can be directly sent
#[tokio::test]
async fn simulate_send_max_short_message() -> Result<(), DynError> {
// crate::config::set_logger(Some(tracing::level_filters::LevelFilter::ERROR));
let (peer_a_pub, mut peer_a, peer_a_addr) = set_peer_connection(Default::default()).await?;
let (peer_b_pub, mut peer_b, peer_b_addr) = set_peer_connection(Default::default()).await?;

let peer_b = tokio::spawn(async move {
let peer_a_conn = peer_b.connect(peer_a_pub, peer_a_addr).await;
let mut conn = tokio::time::timeout(Duration::from_secs(500), peer_a_conn).await??;
let data = vec![0u8; 1432];
let data = vec![0u8; 1324];
let data = tokio::task::spawn_blocking(move || bincode::serialize(&data).unwrap())
.await
.unwrap();
conn.outbound_short_message(data).await?;
conn.send(data).await?;
Ok::<_, DynError>(())
});

Expand All @@ -1366,6 +1369,7 @@ mod test {
#[test]
#[should_panic]
fn simulate_send_max_short_message_plus_1() {
// crate::config::set_logger(Some(tracing::level_filters::LevelFilter::ERROR));
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async move {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/transport/peer_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl PeerConnection {
let data = tokio::task::spawn_blocking(move || bincode::serialize(&data).unwrap())
.await
.unwrap();
if data.len() > MAX_DATA_SIZE {
if data.len() + SymmetricMessage::short_message_overhead() > MAX_DATA_SIZE {
self.outbound_stream(data).await;
} else {
self.outbound_short_message(data).await?;
Expand Down
45 changes: 39 additions & 6 deletions crates/core/src/transport/symmetric_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,36 @@ impl SymmetricMessage {
},
};

pub(crate) fn max_num_of_confirm_receipts_of_noop_message() -> usize {
static MAX_NUM_CONFIRM_RECEIPTS: Lazy<usize> = Lazy::new(|| {
// try to find the maximum number of confirm_receipts that can be serialized within the MAX_DATA_SIZE
pub(crate) fn short_message_overhead() -> usize {
static OVERHEAD: Lazy<usize> = Lazy::new(|| {
let blank = SymmetricMessage {
packet_id: u32::MAX,
confirm_receipt: vec![],
payload: SymmetricMessagePayload::ShortMessage { payload: vec![] },
};
bincode::serialized_size(&blank).unwrap() as usize
});

*OVERHEAD
}

pub(crate) fn noop_message_overhead() -> usize {
static OVERHEAD: Lazy<usize> = Lazy::new(|| {
let blank = SymmetricMessage {
packet_id: u32::MAX,
confirm_receipt: vec![],
payload: SymmetricMessagePayload::NoOp,
};
let overhead = bincode::serialized_size(&blank).unwrap();
bincode::serialized_size(&blank).unwrap() as usize
});

let max_elems = (MAX_DATA_SIZE as u64 - overhead) / core::mem::size_of::<u32>() as u64;
*OVERHEAD
}

pub(crate) fn max_num_of_confirm_receipts_of_noop_message() -> usize {
static MAX_NUM_CONFIRM_RECEIPTS: Lazy<usize> = Lazy::new(|| {
let overhead = SymmetricMessage::noop_message_overhead() as u64;
let max_elems = (MAX_DATA_SIZE as u64 - overhead) / core::mem::size_of::<u32>() as u64;
max_elems as usize
});

Expand Down Expand Up @@ -351,6 +369,21 @@ mod test {
payload: SymmetricMessagePayload::NoOp,
};
let size = bincode::serialized_size(&msg).unwrap();
assert!(size <= MAX_DATA_SIZE as u64);
assert_eq!(size, MAX_DATA_SIZE as u64);
}

#[test]
fn max_short_message() {
let overhead = SymmetricMessage::short_message_overhead();

let msg = SymmetricMessage {
packet_id: u32::MAX,
confirm_receipt: vec![],
payload: SymmetricMessagePayload::ShortMessage {
payload: vec![0; MAX_DATA_SIZE - overhead],
},
};
let size = bincode::serialized_size(&msg).unwrap();
assert_eq!(size, MAX_DATA_SIZE as u64);
}
}

0 comments on commit b60a0ec

Please sign in to comment.