From b60a0ec45af30b364476ed59213810b3725a09b8 Mon Sep 17 00:00:00 2001 From: Al Liu Date: Tue, 23 Apr 2024 22:38:31 +0800 Subject: [PATCH] Add overhead to decide to use short message or stream (#1062) --- .../core/src/transport/connection_handler.rs | 8 +++- crates/core/src/transport/peer_connection.rs | 2 +- .../core/src/transport/symmetric_message.rs | 45 ++++++++++++++++--- 3 files changed, 46 insertions(+), 9 deletions(-) diff --git a/crates/core/src/transport/connection_handler.rs b/crates/core/src/transport/connection_handler.rs index fa7dd05d5..3e3010667 100644 --- a/crates/core/src/transport/connection_handler.rs +++ b/crates/core/src/transport/connection_handler.rs @@ -1333,19 +1333,22 @@ mod test { .await } + /// This one is the maximum size (1324 currently) of a short message from user side + /// by using public send API can be directly sent #[tokio::test] async fn simulate_send_max_short_message() -> Result<(), DynError> { + // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::ERROR)); let (peer_a_pub, mut peer_a, peer_a_addr) = set_peer_connection(Default::default()).await?; let (peer_b_pub, mut peer_b, peer_b_addr) = set_peer_connection(Default::default()).await?; let peer_b = tokio::spawn(async move { let peer_a_conn = peer_b.connect(peer_a_pub, peer_a_addr).await; let mut conn = tokio::time::timeout(Duration::from_secs(500), peer_a_conn).await??; - let data = vec![0u8; 1432]; + let data = vec![0u8; 1324]; let data = tokio::task::spawn_blocking(move || bincode::serialize(&data).unwrap()) .await .unwrap(); - conn.outbound_short_message(data).await?; + conn.send(data).await?; Ok::<_, DynError>(()) }); @@ -1366,6 +1369,7 @@ mod test { #[test] #[should_panic] fn simulate_send_max_short_message_plus_1() { + // crate::config::set_logger(Some(tracing::level_filters::LevelFilter::ERROR)); tokio::runtime::Runtime::new() .unwrap() .block_on(async move { diff --git a/crates/core/src/transport/peer_connection.rs b/crates/core/src/transport/peer_connection.rs index 1428c5b33..79aab3303 100644 --- a/crates/core/src/transport/peer_connection.rs +++ b/crates/core/src/transport/peer_connection.rs @@ -115,7 +115,7 @@ impl PeerConnection { let data = tokio::task::spawn_blocking(move || bincode::serialize(&data).unwrap()) .await .unwrap(); - if data.len() > MAX_DATA_SIZE { + if data.len() + SymmetricMessage::short_message_overhead() > MAX_DATA_SIZE { self.outbound_stream(data).await; } else { self.outbound_short_message(data).await?; diff --git a/crates/core/src/transport/symmetric_message.rs b/crates/core/src/transport/symmetric_message.rs index 158c101c9..2ef93eab4 100644 --- a/crates/core/src/transport/symmetric_message.rs +++ b/crates/core/src/transport/symmetric_message.rs @@ -38,18 +38,36 @@ impl SymmetricMessage { }, }; - pub(crate) fn max_num_of_confirm_receipts_of_noop_message() -> usize { - static MAX_NUM_CONFIRM_RECEIPTS: Lazy = Lazy::new(|| { - // try to find the maximum number of confirm_receipts that can be serialized within the MAX_DATA_SIZE + pub(crate) fn short_message_overhead() -> usize { + static OVERHEAD: Lazy = Lazy::new(|| { + let blank = SymmetricMessage { + packet_id: u32::MAX, + confirm_receipt: vec![], + payload: SymmetricMessagePayload::ShortMessage { payload: vec![] }, + }; + bincode::serialized_size(&blank).unwrap() as usize + }); + + *OVERHEAD + } + + pub(crate) fn noop_message_overhead() -> usize { + static OVERHEAD: Lazy = Lazy::new(|| { let blank = SymmetricMessage { packet_id: u32::MAX, confirm_receipt: vec![], payload: SymmetricMessagePayload::NoOp, }; - let overhead = bincode::serialized_size(&blank).unwrap(); + bincode::serialized_size(&blank).unwrap() as usize + }); - let max_elems = (MAX_DATA_SIZE as u64 - overhead) / core::mem::size_of::() as u64; + *OVERHEAD + } + pub(crate) fn max_num_of_confirm_receipts_of_noop_message() -> usize { + static MAX_NUM_CONFIRM_RECEIPTS: Lazy = Lazy::new(|| { + let overhead = SymmetricMessage::noop_message_overhead() as u64; + let max_elems = (MAX_DATA_SIZE as u64 - overhead) / core::mem::size_of::() as u64; max_elems as usize }); @@ -351,6 +369,21 @@ mod test { payload: SymmetricMessagePayload::NoOp, }; let size = bincode::serialized_size(&msg).unwrap(); - assert!(size <= MAX_DATA_SIZE as u64); + assert_eq!(size, MAX_DATA_SIZE as u64); + } + + #[test] + fn max_short_message() { + let overhead = SymmetricMessage::short_message_overhead(); + + let msg = SymmetricMessage { + packet_id: u32::MAX, + confirm_receipt: vec![], + payload: SymmetricMessagePayload::ShortMessage { + payload: vec![0; MAX_DATA_SIZE - overhead], + }, + }; + let size = bincode::serialized_size(&msg).unwrap(); + assert_eq!(size, MAX_DATA_SIZE as u64); } }