diff --git a/Cargo.toml b/Cargo.toml index 6734350144..c7488d32b6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,7 +56,7 @@ dusk-poseidon = "=0.40.0" jubjub-schnorr = { version = "=0.5.0", default-features = false } # we leave kadcast open until a stable release is out -kadcast = "0.7.0-rc.8" +kadcast = "0.7.0-rc.10" phoenix-circuits = { version = "=0.4.0", default-features = false } phoenix-core = { version = "=0.32.0", default-features = false } # we leave piecrust open until a stable release is out diff --git a/node/src/network.rs b/node/src/network.rs index 498286cec4..330ea68595 100644 --- a/node/src/network.rs +++ b/node/src/network.rs @@ -18,7 +18,7 @@ use node_data::message::payload::{GetResource, Inv, Nonce}; use node_data::message::{AsyncQueue, Metadata, PROTOCOL_VERSION}; use node_data::{get_current_timestamp, Serializable}; use tokio::sync::RwLock; -use tracing::{debug, error, info, trace}; +use tracing::{debug, error, info, trace, warn}; /// Number of alive peers randomly selected which a `flood_request` is sent to const REDUNDANCY_PEER_COUNT: usize = 8; @@ -169,9 +169,16 @@ impl Kadcast { &self.conf } - async fn send_with_metrics(&self, bytes: &Vec, recv_addr: SocketAddr) { - counter!("dusk_bytes_sent").increment(bytes.len() as u64); - self.peer.send(bytes, recv_addr).await; + async fn send_with_metrics( + &self, + bytes: &Vec, + recv_addr: Vec, + ) { + if !recv_addr.is_empty() { + let bytes_sent = bytes.len() * recv_addr.len(); + counter!("dusk_bytes_sent").increment(bytes_sent as u64); + self.peer.send_to_peers(bytes, recv_addr).await; + } } } @@ -258,7 +265,7 @@ impl crate::Network for Kadcast { let topic = msg.topic(); info!("sending msg ({topic:?}) to peer {recv_addr}"); - self.send_with_metrics(&encoded, recv_addr).await; + self.send_with_metrics(&encoded, vec![recv_addr]).await; Ok(()) } @@ -281,10 +288,31 @@ impl crate::Network for Kadcast { counter!(format!("dusk_requests_{:?}", topic)).increment(1); - for recv_addr in self.peer.alive_nodes(amount).await { - trace!("sending msg ({topic:?}) to peer {recv_addr}"); - self.send_with_metrics(&encoded, recv_addr).await; + let mut alive_nodes = self.peer.alive_nodes(amount).await; + + if alive_nodes.len() < amount { + let current = alive_nodes.len(); + + let route_table = self.peer.to_route_table().await; + let new_nodes: Vec<_> = route_table + .into_values() + .flatten() + .map(|(s, _)| s) + .filter(|s| !alive_nodes.contains(s)) + .take(amount - current) + .collect(); + + alive_nodes.extend(new_nodes); + warn!( + event = "Not enought alive peers to send msg, increased", + ?topic, + requested = amount, + current, + increased = alive_nodes.len(), + ); } + trace!("sending msg ({topic:?}) to peers {alive_nodes:?}"); + self.send_with_metrics(&encoded, alive_nodes).await; Ok(()) }