diff --git a/CHANGELOG.md b/CHANGELOG.md index a099fcc..22bf42d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add network version to handshake messages - Add Ray-ID to MessageInfo for message tracking - Add warning when discarding incomplete messages +- Add tracing when broadcasting to an eclipsed network ### Fixed @@ -23,6 +24,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Fix ObjectTransmissionInformation deserialization - Fix duplicate processing for messages with different RaptorQ configurations - Fix idle nodes removal on maintainance +- Fix `find_new_nodes` to query the proper buckets ### Changed diff --git a/Cargo.toml b/Cargo.toml index 02afe63..c3733e1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "kadcast" authors = ["herr-seppia "] -version = "0.7.0-rc.10" +version = "0.7.0-rc.11" edition = "2018" description = "Implementation of the Kadcast Network Protocol." categories = ["network-programming"] diff --git a/src/kbucket.rs b/src/kbucket.rs index 87ff9e3..b50149d 100644 --- a/src/kbucket.rs +++ b/src/kbucket.rs @@ -11,6 +11,7 @@ pub use bucket::InsertError; pub use bucket::InsertOk; pub use bucket::{NodeInsertError, NodeInsertOk}; use itertools::Itertools; +pub use key::MAX_BUCKET_HEIGHT; pub use key::{BinaryID, BinaryKey, BinaryNonce}; pub use node::Node; use std::collections::hash_map::Entry; @@ -20,7 +21,6 @@ mod bucket; mod key; mod node; use crate::config::BucketConfig; -use crate::K_ALPHA; use crate::K_BETA; pub type BucketHeight = u8; @@ -118,25 +118,12 @@ impl Tree { .map(|(&height, bucket)| (height, bucket.peers())) } - #[allow(dead_code)] - pub(crate) fn idle_or_empty_heigth( - &'static self, - ) -> impl Iterator { - let max_buckets = (crate::K_ID_LEN_BYTES * 8) as BucketHeight; - (0..max_buckets).filter(move |h| { - self.buckets.get(h).map_or_else(|| true, |b| b.has_idle()) - }) - } - - // pick at most Alpha nodes for each idle bucket - pub(crate) fn idle_buckets( - &self, - ) -> impl Iterator>)> - { - self.buckets - .iter() - .filter(|(_, bucket)| bucket.has_idle()) - .map(|(&height, bucket)| (height, bucket.pick::())) + pub(crate) fn idle_or_empty_height(&self) -> Vec { + (0..MAX_BUCKET_HEIGHT as u8) + .filter(|h| { + self.buckets.get(h).map_or_else(|| true, |b| b.has_idle()) + }) + .collect() } // Return the height of a Peer @@ -178,6 +165,13 @@ impl Tree { .map_or(false, |bucket| bucket.is_full()) } + pub(crate) fn bucket_size(&self, height: BucketHeight) -> usize { + self.buckets + .get(&height) + .map(|bucket| bucket.peers().count()) + .unwrap_or_default() + } + pub(crate) fn new(root: Node, config: BucketConfig) -> Tree { info!( "Building table [K={}] with root: {:?}", diff --git a/src/kbucket/key.rs b/src/kbucket/key.rs index d6b3a7c..c8bc744 100644 --- a/src/kbucket/key.rs +++ b/src/kbucket/key.rs @@ -19,8 +19,11 @@ use crate::{K_DIFF_MIN_BIT, K_DIFF_PRODUCED_BIT}; use super::BucketHeight; +pub const MAX_BUCKET_HEIGHT: usize = + K_ID_LEN_BYTES * BucketHeight::BITS as usize; + const _: () = assert!( - (K_ID_LEN_BYTES * BucketHeight::BITS as usize) < BucketHeight::MAX as usize, + MAX_BUCKET_HEIGHT < BucketHeight::MAX as usize, "K_ID_LEN_BYTES must be lower than BucketHeight::MAX" ); @@ -84,6 +87,27 @@ impl BinaryID { .map(|(i, b)| BinaryID::msb(b).expect("to be Some") + (i << 3) - 1) } + /// Given a specific `kadcast` distance, this method generates a `BinaryKey` + /// that has the requested XOR-based distance from `self`. + /// + /// The method works by flipping the bit at the specified `distance` in the + /// binary representation of the `BinaryId`. The distance is used to + /// identify both the byte (`idx`) and the bit within that byte + /// (`bit_to_change`) to be modified. The bit is toggled (flipped) using + /// an XOR operation, resulting in a new `BinaryKey` that differs from + /// `self` at exactly the requested distance. + pub fn get_at_distance(&self, distance: BucketHeight) -> BinaryKey { + let mut new_key = self.bytes; + + let distance = distance as usize; + let idx = distance / 8; + let bit_to_change = distance % 8; + + new_key[idx] ^= 1 << bit_to_change; + + new_key + } + /// Returns the position of the most significant bit set in a byte. /// /// Returns `None` if no bit is set. @@ -168,6 +192,8 @@ impl BinaryID { #[cfg(test)] mod tests { + use itertools::Itertools; + use super::*; use crate::kbucket::BucketHeight; use crate::peer::PeerNode; @@ -206,6 +232,25 @@ mod tests { Ok(()) } + fn key_as_string(key: BinaryKey) -> String { + key.iter().map(|b| format!("{b:08b}")).join(" ") + } + + #[test] + fn test_get_at_distance() -> Result<()> { + let current = PeerNode::generate("192.168.0.1:666", 0)?; + let current_str = key_as_string(current.as_peer_info().id); + for i in 0..(8 * K_ID_LEN_BYTES) { + let other = current.id().get_at_distance(i as u8); + let other_str = key_as_string(other); + println!("current {current_str}"); + println!("other {other_str}"); + println!("distance {i:?}"); + assert_eq!(current.id().calculate_distance(&other), Some(i as u8)) + } + Ok(()) + } + #[test] fn test_id_nonce() -> Result<()> { let root = PeerNode::generate("192.168.0.1:666", 0)?; diff --git a/src/lib.rs b/src/lib.rs index 5734f75..9da3380 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,6 +16,7 @@ use encoding::payload::BroadcastPayload; use handling::MessageHandler; pub use handling::MessageInfo; use itertools::Itertools; +use kbucket::MAX_BUCKET_HEIGHT; use kbucket::{BucketHeight, Tree}; use maintainer::TableMaintainer; use peer::{PeerInfo, PeerNode}; @@ -23,6 +24,7 @@ use rand::prelude::IteratorRandom; pub(crate) use rwlock::RwLock; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::task; +use tracing::warn; use tracing::{error, info}; use transport::{MessageBeanOut, WireNetwork}; @@ -226,10 +228,24 @@ impl Peer { return; } - let tosend: Vec<_> = self - .ktable - .read() - .await + for i in self.extract(message, height).await { + self.outbound_sender.send(i).await.unwrap_or_else(|e| { + error!("Unable to send from broadcast {e}") + }); + } + } + + async fn extract( + &self, + message: &[u8], + height: Option, + ) -> Vec<(Message, Vec)> { + const LAST_BUCKET_IDX: u8 = MAX_BUCKET_HEIGHT as u8 - 1; + let ktable = self.ktable.read().await; + if height.is_none() && ktable.bucket_size(LAST_BUCKET_IDX) == 0 { + warn!("Broadcasting a new message with empty bucket height {LAST_BUCKET_IDX}") + } + ktable .extract(height) .map(|(height, nodes)| { let msg = Message::broadcast( @@ -243,13 +259,7 @@ impl Peer { nodes.map(|node| *node.value().address()).collect(); (msg, targets) }) - .collect(); - - for i in tosend { - self.outbound_sender.send(i).await.unwrap_or_else(|e| { - error!("Unable to send from broadcast {e}") - }); - } + .collect() } /// Send a message to a peer in the network diff --git a/src/maintainer.rs b/src/maintainer.rs index a33fa01..00be55f 100644 --- a/src/maintainer.rs +++ b/src/maintainer.rs @@ -15,7 +15,7 @@ use crate::encoding::message::{Header, Message}; use crate::kbucket::Tree; use crate::peer::PeerInfo; use crate::transport::MessageBeanOut; -use crate::RwLock; +use crate::{RwLock, K_ALPHA}; pub(crate) struct TableMaintainer { bootstrapping_nodes: Vec, @@ -132,27 +132,39 @@ impl TableMaintainer { self.ktable.write().await.remove_idle_nodes(); } - /// Search for idle buckets (no message received) and try to contact some of - /// the belonging nodes + /// Searches for idle or empty buckets (those without received messages) in + /// the routing table and requests information about the nodes in these + /// buckets from active peers. + /// + /// For each identified idle or empty bucket, it calculates a target binary + /// key using the `get_at_distance` method, which flips a specific bit + /// in the node's binary identifier based on the given distance. This + /// generates a new target key that is used to search for additional + /// nodes. + /// + /// A set of active peers, up to `K_ALPHA`, is gathered from the current + /// routing table and combined with the bootstrapping nodes to form the + /// list of peers to contact. + /// + /// The purpose of this method is to keep the routing table active and up to + /// date by finding new peers whenever buckets are empty or nodes become + /// unresponsive. async fn find_new_nodes(&self) { let table_lock_read = self.ktable.read().await; - - let find_node_messages = table_lock_read - .idle_buckets() - .flat_map(|(_, idle_nodes)| idle_nodes) - .map(|target| { - ( - Message::FindNodes( - self.header, - self.version.clone(), - *target.id().as_binary(), - ), - //TODO: Extract alpha nodes - vec![*target.value().address()], - ) - }); - for find_node in find_node_messages { - self.send(find_node).await; + let buckets_to_refresh = table_lock_read.idle_or_empty_height(); + + let alive_peers = table_lock_read + .alive_nodes() + .map(|n| n.as_peer_info().to_socket_address()) + .take(K_ALPHA) + .chain(self.bootstrapping_nodes_addr().into_iter()) + .collect::>(); + + for bucket_h in buckets_to_refresh { + let target = self.header.binary_id().get_at_distance(bucket_h); + let msg = + Message::FindNodes(self.header, self.version.clone(), target); + self.send((msg, alive_peers.clone())).await; } } }