From da189fea11282456d223fa8176a0fb5d7331810c Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Thu, 10 Oct 2024 12:04:21 +0200 Subject: [PATCH 1/2] workspace: upgrade kadcast to 0.7.0-rc.10` --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 7609152ef6..075798a758 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,7 +56,7 @@ dusk-poseidon = "=0.40.0" jubjub-schnorr = { version = "=0.5.0", default-features = false } # we leave kadcast open until a stable release is out -kadcast = "0.7.0-rc.8" +kadcast = "0.7.0-rc.10" phoenix-circuits = { version = "=0.4.0", default-features = false } phoenix-core = { version = "=0.32.0", default-features = false } piecrust = "=0.25.0" From 7799885c82dbbf5f7ffb6190ca43f574ee59fb8e Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Thu, 10 Oct 2024 12:37:33 +0200 Subject: [PATCH 2/2] node: improve send_to_alive_peers --- node/src/network.rs | 44 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 36 insertions(+), 8 deletions(-) diff --git a/node/src/network.rs b/node/src/network.rs index 498286cec4..330ea68595 100644 --- a/node/src/network.rs +++ b/node/src/network.rs @@ -18,7 +18,7 @@ use node_data::message::payload::{GetResource, Inv, Nonce}; use node_data::message::{AsyncQueue, Metadata, PROTOCOL_VERSION}; use node_data::{get_current_timestamp, Serializable}; use tokio::sync::RwLock; -use tracing::{debug, error, info, trace}; +use tracing::{debug, error, info, trace, warn}; /// Number of alive peers randomly selected which a `flood_request` is sent to const REDUNDANCY_PEER_COUNT: usize = 8; @@ -169,9 +169,16 @@ impl Kadcast { &self.conf } - async fn send_with_metrics(&self, bytes: &Vec, recv_addr: SocketAddr) { - counter!("dusk_bytes_sent").increment(bytes.len() as u64); - self.peer.send(bytes, recv_addr).await; + async fn send_with_metrics( + &self, + bytes: &Vec, + recv_addr: Vec, + ) { + if !recv_addr.is_empty() { + let bytes_sent = bytes.len() * recv_addr.len(); + counter!("dusk_bytes_sent").increment(bytes_sent as u64); + self.peer.send_to_peers(bytes, recv_addr).await; + } } } @@ -258,7 +265,7 @@ impl crate::Network for Kadcast { let topic = msg.topic(); info!("sending msg ({topic:?}) to peer {recv_addr}"); - self.send_with_metrics(&encoded, recv_addr).await; + self.send_with_metrics(&encoded, vec![recv_addr]).await; Ok(()) } @@ -281,10 +288,31 @@ impl crate::Network for Kadcast { counter!(format!("dusk_requests_{:?}", topic)).increment(1); - for recv_addr in self.peer.alive_nodes(amount).await { - trace!("sending msg ({topic:?}) to peer {recv_addr}"); - self.send_with_metrics(&encoded, recv_addr).await; + let mut alive_nodes = self.peer.alive_nodes(amount).await; + + if alive_nodes.len() < amount { + let current = alive_nodes.len(); + + let route_table = self.peer.to_route_table().await; + let new_nodes: Vec<_> = route_table + .into_values() + .flatten() + .map(|(s, _)| s) + .filter(|s| !alive_nodes.contains(s)) + .take(amount - current) + .collect(); + + alive_nodes.extend(new_nodes); + warn!( + event = "Not enought alive peers to send msg, increased", + ?topic, + requested = amount, + current, + increased = alive_nodes.len(), + ); } + trace!("sending msg ({topic:?}) to peers {alive_nodes:?}"); + self.send_with_metrics(&encoded, alive_nodes).await; Ok(()) }