From 0d0cb67028d96b95279769abf244e11a0c4dd557 Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Tue, 7 May 2024 18:31:55 +0300 Subject: [PATCH 01/25] node-data: Extend definition of GetData struct --- node-data/src/message.rs | 128 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 124 insertions(+), 4 deletions(-) diff --git a/node-data/src/message.rs b/node-data/src/message.rs index aa1f6370c1..a330de960d 100644 --- a/node-data/src/message.rs +++ b/node-data/src/message.rs @@ -388,6 +388,10 @@ pub mod payload { use crate::Serializable; use std::fmt; use std::io::{self, Read, Write}; + use std::net::{ + Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, + }; + use std::time::{SystemTime, UNIX_EPOCH}; use super::{ConsensusHeader, SignInfo}; @@ -872,25 +876,141 @@ pub mod payload { } } - #[derive(Default, Debug, Clone)] + #[derive(Debug, Clone)] pub struct GetData { - pub inner: Inv, + /// Inventory/Resource to search for + inventory: Inv, + + /// (requester) Address to which the resource is sent back, if found + requester_addr: SocketAddr, + + /// Request Time to live + ttl_as_sec: u64, + // TODO: Integrity test with hashing??? + } + + impl GetData { + pub fn new( + inventory: Inv, + requester_addr: SocketAddr, + ttl_as_sec: u64, + ) -> Self { + let ttl_as_sec = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + + ttl_as_sec; + + Self { + inventory, + requester_addr, + ttl_as_sec, + } + } + + pub fn get_addr(&self) -> SocketAddr { + self.requester_addr + } + + pub fn get_inv(&self) -> &Inv { + &self.inventory + } + + pub fn is_expired(&self) -> bool { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + > self.ttl_as_sec + } } impl Serializable for GetData { fn write(&self, w: &mut W) -> io::Result<()> { - self.inner.write(w) + self.inventory.write(w)?; + self.requester_addr.write(w)?; + w.write_all(&self.ttl_as_sec.to_le_bytes()[..]) } fn read(r: &mut R) -> io::Result where Self: Sized, { + let inner = Inv::read(r)?; + let requester_addr = SocketAddr::read(r)?; + + let mut buf = [0u8; 8]; + r.read_exact(&mut buf)?; + Ok(GetData { - inner: Inv::read(r)?, + inventory: inner, + requester_addr, + ttl_as_sec: u64::from_le_bytes(buf), }) } } + + impl Serializable for SocketAddr { + fn write(&self, w: &mut W) -> io::Result<()> { + match self { + SocketAddr::V4(addr_v4) => { + w.write_all(&[4])?; + w.write_all(&addr_v4.ip().octets())?; + w.write_all(&addr_v4.port().to_le_bytes())?; + } + SocketAddr::V6(addr_v6) => { + w.write_all(&[6])?; + w.write_all(&addr_v6.ip().octets())?; + w.write_all(&addr_v6.port().to_le_bytes())?; + } + } + Ok(()) + } + + fn read(r: &mut R) -> io::Result + where + Self: Sized, + { + let mut ip_type = [0u8; 1]; + r.read_exact(&mut ip_type)?; + + let ip = match ip_type[0] { + 4 => { + let mut octets = [0u8; 4]; + r.read_exact(&mut octets)?; + + let mut port_bytes = [0u8; 2]; + r.read_exact(&mut port_bytes)?; + + SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::from(octets), + u16::from_le_bytes(port_bytes), + )) + } + 6 => { + let mut octets = [0u8; 16]; + r.read_exact(&mut octets)?; + + let mut port_bytes = [0u8; 2]; + r.read_exact(&mut port_bytes)?; + + SocketAddr::V6(SocketAddrV6::new( + Ipv6Addr::from(octets), + u16::from_le_bytes(port_bytes), + 0, + 0, + )) + } + _ => { + return Err(io::Error::new( + io::ErrorKind::InvalidData, + "Invalid IP type", + )) + } + }; + Ok(ip) + } + } } macro_rules! map_topic { From fb00a9f0f01a2121ff9354b27954cb785d5ef0cb Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Tue, 7 May 2024 18:33:23 +0300 Subject: [PATCH 02/25] node: Impl network::flood_request method --- node/src/network.rs | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/node/src/network.rs b/node/src/network.rs index 75cab91684..196358c922 100644 --- a/node/src/network.rs +++ b/node/src/network.rs @@ -13,6 +13,7 @@ use async_trait::async_trait; use kadcast::config::Config; use kadcast::{MessageInfo, Peer}; use metrics::counter; +use node_data::message::payload::{GetData, Inv}; use node_data::message::Metadata; use node_data::message::{AsyncQueue, Topics}; use std::sync::atomic::{AtomicU64, Ordering}; @@ -23,6 +24,7 @@ use tracing::{error, info, trace, warn}; mod frame; const MAX_PENDING_SENDERS: u64 = 1000; +const FLOOD_REQUEST_TTL: u64 = 10; // seconds type RoutesList = [Option>; N]; type FilterList = [Option; N]; @@ -123,6 +125,9 @@ pub struct Kadcast { conf: Config, counter: AtomicU64, + + /// Represents a parsed conf.public_addr + public_addr: SocketAddr, } impl Kadcast { @@ -143,6 +148,10 @@ impl Kadcast { pending_senders: Arc::new(AtomicU64::new(0)), }; let peer = Peer::new(conf.clone(), listener)?; + let public_addr = conf + .public_address + .parse::() + .expect("valid kadcast public address"); Ok(Kadcast { routes, @@ -150,6 +159,7 @@ impl Kadcast { peer, conf, counter: AtomicU64::new(0), + public_addr, }) } @@ -213,6 +223,23 @@ impl crate::Network for Kadcast { Ok(()) } + /// Broadcast a GetData request. + /// + /// By utilizing the randomly selected peers per bucket in Kadcast, this + /// broadcast does follow the so-called "Flood with Random Walk" blind + /// search (resource discovery). + /// + /// A receiver of this message is supposed to either look up and return the + /// resource or rebroadcast it to the next bucket. + async fn flood_request(&self, msg_inv: &Inv) -> anyhow::Result<()> { + self.broadcast(&Message::new_get_data(GetData::new( + msg_inv.clone(), + self.public_addr, + FLOOD_REQUEST_TTL, + ))) + .await + } + /// Sends an encoded message to a given peer. async fn send_to_peer( &self, @@ -322,10 +349,15 @@ impl crate::Network for Kadcast { Ok(()) } + // TODO: Duplicated func fn get_info(&self) -> anyhow::Result { Ok(self.conf.public_address.to_string()) } + fn public_addr(&self) -> &SocketAddr { + &self.public_addr + } + async fn alive_nodes_count(&self) -> usize { // TODO: This call should be replaced with no-copy Kadcast API self.peer.alive_nodes(u16::MAX as usize).await.len() From 46a0a63ab09d86d1dff3c824d9a3e7cbca2847e7 Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Tue, 7 May 2024 18:44:11 +0300 Subject: [PATCH 03/25] node-data: Use absolute instead of relative ttl --- node-data/src/message.rs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/node-data/src/message.rs b/node-data/src/message.rs index a330de960d..42e38a5f7f 100644 --- a/node-data/src/message.rs +++ b/node-data/src/message.rs @@ -895,12 +895,6 @@ pub mod payload { requester_addr: SocketAddr, ttl_as_sec: u64, ) -> Self { - let ttl_as_sec = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_secs() - + ttl_as_sec; - Self { inventory, requester_addr, From fb6f9af1ddf2abf7c213d82e2d5aa495f1479daf Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Tue, 7 May 2024 18:45:45 +0300 Subject: [PATCH 04/25] node: Handle new fields of GetData message --- node/src/databroker.rs | 37 ++++++++++++++++++++++++++++--------- node/src/lib.rs | 10 +++++++++- node/src/network.rs | 10 ++++++++-- 3 files changed, 45 insertions(+), 12 deletions(-) diff --git a/node/src/databroker.rs b/node/src/databroker.rs index 174f1e5cc6..9d84d1c9cf 100644 --- a/node/src/databroker.rs +++ b/node/src/databroker.rs @@ -138,7 +138,9 @@ impl // Spawn a task to handle the request asynchronously. tokio::spawn(async move { - match Self::handle_request(&db, &msg, &conf).await { + match Self::handle_request::(&db, &network, &msg, &conf) + .await + { Ok(resp) => { // Send response let net = network.read().await; @@ -177,8 +179,9 @@ impl impl DataBrokerSrv { /// Handles inbound messages. - async fn handle_request( + async fn handle_request( db: &Arc>, + network: &Arc>, msg: &Message, conf: &conf::Params, ) -> anyhow::Result { @@ -211,14 +214,23 @@ impl DataBrokerSrv { } // Handle GetInv requests Payload::GetInv(m) => { - let msg = Self::handle_inv(db, m, conf.max_inv_entries).await?; + let msg = + Self::handle_inv(db, m, conf.max_inv_entries, recv_peer) + .await?; Ok(Response::new_from_msg(msg, recv_peer)) } // Handle GetData requests Payload::GetData(m) => { - let msgs = - Self::handle_get_data(db, m, conf.max_inv_entries).await?; - Ok(Response::new(msgs, recv_peer)) + match Self::handle_get_data(db, m, conf.max_inv_entries).await { + Ok(msg_list) => Ok(Response::new(msg_list, m.get_addr())), + Err(err) => { + if !m.is_expired() { + // Not found resource, rebroadcast its request + let _ = network.read().await.broadcast(msg).await; + } + Err(err) + } + } } _ => Err(anyhow::anyhow!("unhandled message payload")), } @@ -331,6 +343,7 @@ impl DataBrokerSrv { db: &Arc>, m: &node_data::message::payload::Inv, max_entries: usize, + recv_addr: SocketAddr, ) -> Result { let inv = db.read().await.view(|t| { let mut inv = payload::Inv::default(); @@ -374,7 +387,10 @@ impl DataBrokerSrv { return Err(anyhow::anyhow!("no items to fetch")); } - Ok(Message::new_get_data(GetData { inner: inv })) + // Send GetData request with disabled rebroadcast (ttl = 0), Inv message + // is part of one-to-one messaging flows (GetBlocks/Mempool) so it + // should not be a flooding request. + Ok(Message::new_get_data(GetData::new(inv, recv_addr, 0))) } /// Handles GetData message request. @@ -387,7 +403,8 @@ impl DataBrokerSrv { max_entries: usize, ) -> Result> { db.read().await.view(|t| { - Ok(m.inner + let res: Vec = m + .get_inv() .inv_list .iter() .filter_map(|i| match i.inv_type { @@ -423,7 +440,9 @@ impl DataBrokerSrv { } }) .take(max_entries) - .collect()) + .collect(); + + Ok(res) }) } } diff --git a/node/src/lib.rs b/node/src/lib.rs index fdeb76bfd2..0c9a12a4e7 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -15,9 +15,11 @@ pub mod telemetry; pub mod vm; use async_trait::async_trait; +use node_data::message::payload::Inv; use node_data::message::AsyncQueue; use node_data::message::Message; use node_data::message::Topics; +use std::net::SocketAddr; use std::sync::Arc; use tokio::signal::unix::{signal, SignalKind}; use tokio::sync::RwLock; @@ -38,9 +40,12 @@ pub type BoxedFilter = Box; #[async_trait] pub trait Network: Send + Sync + 'static { - /// Broadcasts a message. + /// Broadcasts a fire-and-forget message. async fn broadcast(&self, msg: &Message) -> anyhow::Result<()>; + /// Broadcasts a request message + async fn flood_request(&self, msg_inv: &Inv) -> anyhow::Result<()>; + /// Sends a message to a specified peer. async fn send_to_peer( &self, @@ -81,6 +86,9 @@ pub trait Network: Send + Sync + 'static { /// Retrieves information about the network. fn get_info(&self) -> anyhow::Result; + /// Returns public address in Kadcast + fn public_addr(&self) -> &SocketAddr; + /// Retrieves number of alive nodes async fn alive_nodes_count(&self) -> usize; } diff --git a/node/src/network.rs b/node/src/network.rs index 196358c922..e710728884 100644 --- a/node/src/network.rs +++ b/node/src/network.rs @@ -6,7 +6,6 @@ use std::net::{AddrParseError, SocketAddr}; use std::sync::Arc; -use std::time::Duration; use crate::{BoxedFilter, Message}; use async_trait::async_trait; @@ -17,6 +16,7 @@ use node_data::message::payload::{GetData, Inv}; use node_data::message::Metadata; use node_data::message::{AsyncQueue, Topics}; use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::RwLock; use tokio::time::{self, Instant}; use tracing::{error, info, trace, warn}; @@ -232,10 +232,16 @@ impl crate::Network for Kadcast { /// A receiver of this message is supposed to either look up and return the /// resource or rebroadcast it to the next bucket. async fn flood_request(&self, msg_inv: &Inv) -> anyhow::Result<()> { + let ttl_as_sec = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + + FLOOD_REQUEST_TTL; + self.broadcast(&Message::new_get_data(GetData::new( msg_inv.clone(), self.public_addr, - FLOOD_REQUEST_TTL, + ttl_as_sec, ))) .await } From 82e4dc1f3f3cc231272265e488bfdc416d05d483 Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Tue, 7 May 2024 18:51:31 +0300 Subject: [PATCH 05/25] node: Apply flood_request usage in chain --- node/src/chain/consensus.rs | 2 +- node/src/chain/fsm.rs | 39 +++++++++++++++---------------------- 2 files changed, 17 insertions(+), 24 deletions(-) diff --git a/node/src/chain/consensus.rs b/node/src/chain/consensus.rs index b576ebca59..ead1487a7c 100644 --- a/node/src/chain/consensus.rs +++ b/node/src/chain/consensus.rs @@ -229,7 +229,7 @@ impl dusk_consensus::commons::Database // For redundancy reasons, we send the GetCandidate request to multiple // network peers - let request = Message::new_get_candidate(GetCandidate { hash: *h }); + let request = Message::new_get_candidate(GetCandidate { hash: *h }); // TODO: Use GetData let res = self .network .write() diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index eedb1a7a9c..d839f61e33 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -12,9 +12,7 @@ use crate::{vm, Network}; use crate::database::{Candidate, Ledger}; use metrics::counter; use node_data::ledger::{to_str, Block, Label}; -use node_data::message::payload::{ - GetBlocks, GetData, RatificationResult, Vote, -}; +use node_data::message::payload::{GetBlocks, Inv, RatificationResult, Vote}; use node_data::message::{payload, Message, Metadata}; use std::collections::{HashMap, HashSet}; use std::fmt::{Debug, Formatter}; @@ -224,10 +222,9 @@ impl SimpleFSM { ); // Request by hash - request_block( + flood_request_block( &self.network, BlockRequest::ByHash(hash), - msg.metadata.as_ref().unwrap().src_addr, ) .await; @@ -257,11 +254,13 @@ impl SimpleFSM { err = ?err, ); + // Candidate block is not found from local + // storage. + // // Request by hash - request_block( + flood_request_block( &self.network, BlockRequest::ByHash(hash), - msg.metadata.as_ref().unwrap().src_addr, ) .await; @@ -572,10 +571,9 @@ impl InSyncImpl { )); } - request_block( + flood_request_block( &self.network, BlockRequest::ByHeight(local_header.height + 1), - metadata.src_addr, ) .await; } @@ -585,6 +583,7 @@ impl InSyncImpl { async fn on_heartbeat(&mut self) -> anyhow::Result { // TODO: Consider reporting metrics here + // TODO: Consider handling ACCEPT_BLOCK_TIMEOUT event here if let Some(pre_sync) = &mut self.presync { @@ -779,30 +778,24 @@ impl Debug for BlockRequest { } } -/// Requests a block by height from a specified peer -async fn request_block( +/// Requests a block by height/hash from the network with so-called +/// Flood-request approach. +async fn flood_request_block( network: &Arc>, req: BlockRequest, - peer_addr: SocketAddr, ) { - let mut get_data = GetData::default(); + let mut inv = Inv::default(); match req { BlockRequest::ByHeight(height) => { - get_data.inner.add_block_from_height(height); + inv.add_block_from_height(height); } BlockRequest::ByHash(hash) => { - get_data.inner.add_block_from_hash(hash); + inv.add_block_from_hash(hash); } }; - debug!(event = "request block", ?req, ?peer_addr); - - if let Err(err) = network - .read() - .await - .send_to_peer(&Message::new_get_data(get_data), peer_addr) - .await - { + debug!(event = "flood_request block", ?req); + if let Err(err) = network.read().await.flood_request(&inv).await { warn!("could not request block {err}") }; } From 7cb107b03b1bc500f29cb3a22a56f2e2d2f4b146 Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Wed, 8 May 2024 11:04:15 +0300 Subject: [PATCH 06/25] node-data: Rename GetData to GetResource to be aligned with spec --- node-data/src/message.rs | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/node-data/src/message.rs b/node-data/src/message.rs index 42e38a5f7f..5d1ac08d66 100644 --- a/node-data/src/message.rs +++ b/node-data/src/message.rs @@ -105,7 +105,7 @@ impl Serializable for Message { Payload::GetMempool(p) => p.write(w), Payload::GetInv(p) => p.write(w), Payload::GetBlocks(p) => p.write(w), - Payload::GetData(p) => p.write(w), + Payload::GetResource(p) => p.write(w), Payload::Ratification(p) => p.write(w), Payload::Empty | Payload::ValidationResult(_) => Ok(()), /* internal message, not sent on the wire */ } @@ -138,8 +138,8 @@ impl Serializable for Message { Topics::GetCandidate => { Message::new_get_candidate(payload::GetCandidate::read(r)?) } - Topics::GetData => { - Message::new_get_data(payload::GetData::read(r)?) + Topics::GetResource => { + Message::new_get_resource(payload::GetResource::read(r)?) } Topics::GetBlocks => { Message::new_get_blocks(payload::GetBlocks::read(r)?) @@ -237,11 +237,11 @@ impl Message { } } - /// Creates topics.GetData message - pub fn new_get_data(p: payload::GetData) -> Message { + /// Creates topics.GetResource message + pub fn new_get_resource(p: payload::GetResource) -> Message { Self { - topic: Topics::GetData, - payload: Payload::GetData(p), + topic: Topics::GetResource, + payload: Payload::GetResource(p), ..Default::default() } } @@ -372,7 +372,7 @@ pub enum Payload { GetMempool(payload::GetMempool), GetInv(payload::Inv), GetBlocks(payload::GetBlocks), - GetData(payload::GetData), + GetResource(payload::GetResource), CandidateResp(Box), // Internal messages payload @@ -877,7 +877,7 @@ pub mod payload { } #[derive(Debug, Clone)] - pub struct GetData { + pub struct GetResource { /// Inventory/Resource to search for inventory: Inv, @@ -889,7 +889,7 @@ pub mod payload { // TODO: Integrity test with hashing??? } - impl GetData { + impl GetResource { pub fn new( inventory: Inv, requester_addr: SocketAddr, @@ -919,7 +919,7 @@ pub mod payload { } } - impl Serializable for GetData { + impl Serializable for GetResource { fn write(&self, w: &mut W) -> io::Result<()> { self.inventory.write(w)?; self.requester_addr.write(w)?; @@ -936,7 +936,7 @@ pub mod payload { let mut buf = [0u8; 8]; r.read_exact(&mut buf)?; - Ok(GetData { + Ok(GetResource { inventory: inner, requester_addr, ttl_as_sec: u64::from_le_bytes(buf), @@ -1019,7 +1019,7 @@ macro_rules! map_topic { #[cfg_attr(any(feature = "faker", test), derive(fake::Dummy))] pub enum Topics { // Data exchange topics. - GetData = 8, + GetResource = 8, GetBlocks = 9, GetMempool = 13, // NB: This is aliased as Mempool in the golang impl GetInv = 14, // NB: This is aliased as Inv in the golang impl @@ -1056,7 +1056,7 @@ impl Topics { impl From for Topics { fn from(v: u8) -> Self { - map_topic!(v, Topics::GetData); + map_topic!(v, Topics::GetResource); map_topic!(v, Topics::GetBlocks); map_topic!(v, Topics::Tx); map_topic!(v, Topics::Block); From 1713bfcea3c3908f7bdefe20a0b90b69569c9877 Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Wed, 8 May 2024 11:05:05 +0300 Subject: [PATCH 07/25] node: Rename GetData to GetResource to be aligned with spec --- node/src/chain/consensus.rs | 2 +- node/src/databroker.rs | 38 ++++++++++++++++++++----------------- node/src/network.rs | 6 +++--- 3 files changed, 25 insertions(+), 21 deletions(-) diff --git a/node/src/chain/consensus.rs b/node/src/chain/consensus.rs index ead1487a7c..6a5892451f 100644 --- a/node/src/chain/consensus.rs +++ b/node/src/chain/consensus.rs @@ -229,7 +229,7 @@ impl dusk_consensus::commons::Database // For redundancy reasons, we send the GetCandidate request to multiple // network peers - let request = Message::new_get_candidate(GetCandidate { hash: *h }); // TODO: Use GetData + let request = Message::new_get_candidate(GetCandidate { hash: *h }); // TODO: Use GetResource let res = self .network .write() diff --git a/node/src/databroker.rs b/node/src/databroker.rs index 9d84d1c9cf..52d4ba1fb4 100644 --- a/node/src/databroker.rs +++ b/node/src/databroker.rs @@ -11,7 +11,7 @@ use crate::{database, vm, Network}; use crate::{LongLivedService, Message}; use anyhow::{anyhow, Result}; -use node_data::message::payload::{GetData, InvParam, InvType}; +use node_data::message::payload::{GetResource, InvParam, InvType}; use smallvec::SmallVec; use std::net::SocketAddr; use std::sync::Arc; @@ -26,7 +26,7 @@ const TOPICS: &[u8] = &[ Topics::GetBlocks as u8, Topics::GetMempool as u8, Topics::GetInv as u8, - Topics::GetData as u8, + Topics::GetResource as u8, Topics::GetCandidate as u8, ]; @@ -219,9 +219,11 @@ impl DataBrokerSrv { .await?; Ok(Response::new_from_msg(msg, recv_peer)) } - // Handle GetData requests - Payload::GetData(m) => { - match Self::handle_get_data(db, m, conf.max_inv_entries).await { + // Handle GetResource requests + Payload::GetResource(m) => { + match Self::handle_get_resource(db, m, conf.max_inv_entries) + .await + { Ok(msg_list) => Ok(Response::new(msg_list, m.get_addr())), Err(err) => { if !m.is_expired() { @@ -260,7 +262,7 @@ impl DataBrokerSrv { } /// Handles GetMempool requests. - /// Message flow: GetMempool -> Inv -> GetData -> Tx + /// Message flow: GetMempool -> Inv -> GetResource -> Tx async fn handle_get_mempool( db: &Arc>, ) -> Result { @@ -286,7 +288,7 @@ impl DataBrokerSrv { /// Handles GetBlocks message request. /// - /// Message flow: GetBlocks -> Inv -> GetData -> Block + /// Message flow: GetBlocks -> Inv -> GetResource -> Block async fn handle_get_blocks( db: &Arc>, m: &payload::GetBlocks, @@ -335,7 +337,7 @@ impl DataBrokerSrv { /// Handles inventory message request. /// /// This takes an inventory message (topics.Inv), checks it for any - /// items that the node state is missing, puts these items in a GetData + /// items that the node state is missing, puts these items in a GetResource /// wire message, and sends it back to request the items in full. /// /// An item is a block or a transaction. @@ -387,19 +389,21 @@ impl DataBrokerSrv { return Err(anyhow::anyhow!("no items to fetch")); } - // Send GetData request with disabled rebroadcast (ttl = 0), Inv message - // is part of one-to-one messaging flows (GetBlocks/Mempool) so it - // should not be a flooding request. - Ok(Message::new_get_data(GetData::new(inv, recv_addr, 0))) + // Send GetResource request with disabled rebroadcast (ttl = 0), Inv + // message is part of one-to-one messaging flows + // (GetBlocks/Mempool) so it should not be a flooding request. + Ok(Message::new_get_resource(GetResource::new( + inv, recv_addr, 0, + ))) } - /// Handles GetData message request. + /// Handles GetResource message request. /// - /// The response to a GetData message is a vector of messages, each of which - /// could be either topics.Block or topics.Tx. - async fn handle_get_data( + /// The response to a GetResource message is a vector of messages, each of + /// which could be either topics.Block or topics.Tx. + async fn handle_get_resource( db: &Arc>, - m: &node_data::message::payload::GetData, + m: &node_data::message::payload::GetResource, max_entries: usize, ) -> Result> { db.read().await.view(|t| { diff --git a/node/src/network.rs b/node/src/network.rs index e710728884..693ee9f140 100644 --- a/node/src/network.rs +++ b/node/src/network.rs @@ -12,7 +12,7 @@ use async_trait::async_trait; use kadcast::config::Config; use kadcast::{MessageInfo, Peer}; use metrics::counter; -use node_data::message::payload::{GetData, Inv}; +use node_data::message::payload::{GetResource, Inv}; use node_data::message::Metadata; use node_data::message::{AsyncQueue, Topics}; use std::sync::atomic::{AtomicU64, Ordering}; @@ -223,7 +223,7 @@ impl crate::Network for Kadcast { Ok(()) } - /// Broadcast a GetData request. + /// Broadcast a GetResource request. /// /// By utilizing the randomly selected peers per bucket in Kadcast, this /// broadcast does follow the so-called "Flood with Random Walk" blind @@ -238,7 +238,7 @@ impl crate::Network for Kadcast { .as_secs() + FLOOD_REQUEST_TTL; - self.broadcast(&Message::new_get_data(GetData::new( + self.broadcast(&Message::new_get_resource(GetResource::new( msg_inv.clone(), self.public_addr, ttl_as_sec, From 164f631db52a962a7b2ee77b7acf0ed05e1ec47e Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Fri, 10 May 2024 11:21:03 +0300 Subject: [PATCH 08/25] node: Return an error to instruct the caller to rebroadcast the request --- node/src/databroker.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/node/src/databroker.rs b/node/src/databroker.rs index 52d4ba1fb4..0b45a28983 100644 --- a/node/src/databroker.rs +++ b/node/src/databroker.rs @@ -446,6 +446,13 @@ impl DataBrokerSrv { .take(max_entries) .collect(); + if res.is_empty() { + // If nothing was found, return an error so that the caller is + // instructed to rebroadcast the request, if needed + debug!("handle_get_resource not found {:?}", m); + return Err(anyhow!("not found")); + } + Ok(res) }) } From 2fdf353c86d3df16be6b1628f6b618eb29a470f7 Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Mon, 13 May 2024 14:39:28 +0300 Subject: [PATCH 09/25] node-data: Support GetResource request for a candidate block from hash --- node-data/src/message.rs | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/node-data/src/message.rs b/node-data/src/message.rs index 5d1ac08d66..1cd43bd2fd 100644 --- a/node-data/src/message.rs +++ b/node-data/src/message.rs @@ -748,10 +748,15 @@ pub mod payload { #[derive(Clone, Default, Debug, Copy)] pub enum InvType { + /// A transaction fetched by tx_id MempoolTx, #[default] + /// A full block fetched by block hash BlockFromHash, + /// A full Block fetched by block height BlockFromHeight, + /// A candidate block fetched by block hash, Cert is None + CandidateFromHash, } #[derive(Debug, Clone, Copy)] @@ -775,9 +780,16 @@ pub mod payload { #[derive(Default, Debug, Clone)] pub struct Inv { pub inv_list: Vec, + pub max_entries: u16, } impl Inv { + pub fn new(max_entries: u16) -> Self { + Self { + inv_list: Default::default(), + max_entries, + } + } pub fn add_tx_hash(&mut self, hash: [u8; 32]) { self.inv_list.push(InvVect { inv_type: InvType::MempoolTx, @@ -798,6 +810,13 @@ pub mod payload { param: InvParam::Height(height), }); } + + pub fn add_candidate_from_hash(&mut self, hash: [u8; 32]) { + self.inv_list.push(InvVect { + inv_type: InvType::CandidateFromHash, + param: InvParam::Hash(hash), + }); + } } impl Serializable for Inv { @@ -816,6 +835,7 @@ pub mod payload { }; } + w.write_all(&self.max_entries.to_le_bytes())?; Ok(()) } @@ -833,6 +853,7 @@ pub mod payload { 0 => InvType::MempoolTx, 1 => InvType::BlockFromHash, 2 => InvType::BlockFromHeight, + 3 => InvType::CandidateFromHash, _ => { return Err(io::Error::from(io::ErrorKind::InvalidData)) } @@ -850,9 +871,13 @@ pub mod payload { InvType::BlockFromHeight => { inv.add_block_from_height(Self::read_u64_le(r)?); } + InvType::CandidateFromHash => { + inv.add_candidate_from_hash(Self::read_bytes(r)?); + } } } + inv.max_entries = Self::read_u16_le(r)?; Ok(inv) } } From 3f96596758e172ff0d11d97c373d5eeb437dbdf3 Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Mon, 13 May 2024 14:40:49 +0300 Subject: [PATCH 10/25] node: Handle InvType::CandidateFromHash resource type --- node/src/databroker.rs | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/node/src/databroker.rs b/node/src/databroker.rs index 0b45a28983..0a61cedf00 100644 --- a/node/src/databroker.rs +++ b/node/src/databroker.rs @@ -10,6 +10,7 @@ use crate::database::{Candidate, Ledger, Mempool}; use crate::{database, vm, Network}; use crate::{LongLivedService, Message}; use anyhow::{anyhow, Result}; +use std::cmp::min; use node_data::message::payload::{GetResource, InvParam, InvType}; use smallvec::SmallVec; @@ -368,6 +369,15 @@ impl DataBrokerSrv { } } } + InvType::CandidateFromHash => { + if let InvParam::Hash(hash) = &i.param { + if Candidate::fetch_candidate_block(&t, hash)? + .is_none() + { + inv.add_candidate_from_hash(*hash); + } + } + } InvType::MempoolTx => { if let InvParam::Hash(hash) = &i.param { if Mempool::get_tx(&t, *hash)?.is_none() { @@ -406,6 +416,11 @@ impl DataBrokerSrv { m: &node_data::message::payload::GetResource, max_entries: usize, ) -> Result> { + let mut max_entries = max_entries; + if m.get_inv().max_entries > 0 { + max_entries = min(max_entries, m.get_inv().max_entries as usize); + } + db.read().await.view(|t| { let res: Vec = m .get_inv() @@ -432,6 +447,19 @@ impl DataBrokerSrv { None } } + // GetResource CandidateFromHash is identical to + // GetCandidate msg + // TODO: Deprecate both GetCandidate and CandidateResp + InvType::CandidateFromHash => { + if let InvParam::Hash(hash) = &i.param { + Candidate::fetch_candidate_block(&t, hash) + .ok() + .flatten() + .map(Message::new_block) + } else { + None + } + } InvType::MempoolTx => { if let InvParam::Hash(hash) = &i.param { Mempool::get_tx(&t, *hash) From b1a41dde6bf48722e27d2fc3a93b08ae6b1ea572 Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Mon, 13 May 2024 14:41:10 +0300 Subject: [PATCH 11/25] node: Request candidate if block is not found --- node/src/chain/fsm.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index d839f61e33..a5554e1c65 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -784,13 +784,17 @@ async fn flood_request_block( network: &Arc>, req: BlockRequest, ) { - let mut inv = Inv::default(); + // Request only one resource + let mut inv = Inv::new(1); match req { BlockRequest::ByHeight(height) => { inv.add_block_from_height(height); } BlockRequest::ByHash(hash) => { + // Request from network the full block, if it is missing then + // request the candidate block. inv.add_block_from_hash(hash); + inv.add_candidate_from_hash(hash); } }; From 33652d82becedafdb326c8692e939d846c8ab821 Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Mon, 13 May 2024 16:33:15 +0300 Subject: [PATCH 12/25] node-data: Add hops_limit in GetRequest --- node-data/src/message.rs | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/node-data/src/message.rs b/node-data/src/message.rs index 1cd43bd2fd..4207b5574e 100644 --- a/node-data/src/message.rs +++ b/node-data/src/message.rs @@ -909,9 +909,11 @@ pub mod payload { /// (requester) Address to which the resource is sent back, if found requester_addr: SocketAddr, - /// Request Time to live + /// Limits request lifespan by absolute (epoch) time ttl_as_sec: u64, - // TODO: Integrity test with hashing??? + + /// Limits request lifespan by number of hops + hops_limit: u16, } impl GetResource { @@ -919,12 +921,23 @@ pub mod payload { inventory: Inv, requester_addr: SocketAddr, ttl_as_sec: u64, + hops_limit: u16, ) -> Self { Self { inventory, requester_addr, ttl_as_sec, + hops_limit, + } + } + + pub fn clone_with_hop_decrement(&self) -> Option { + if self.hops_limit == 1 { + return None; } + let mut req = self.clone(); + req.hops_limit -= 1; + Some(req) } pub fn get_addr(&self) -> SocketAddr { @@ -948,7 +961,8 @@ pub mod payload { fn write(&self, w: &mut W) -> io::Result<()> { self.inventory.write(w)?; self.requester_addr.write(w)?; - w.write_all(&self.ttl_as_sec.to_le_bytes()[..]) + w.write_all(&self.ttl_as_sec.to_le_bytes()[..])?; + w.write_all(&self.hops_limit.to_le_bytes()[..]) } fn read(r: &mut R) -> io::Result @@ -960,11 +974,17 @@ pub mod payload { let mut buf = [0u8; 8]; r.read_exact(&mut buf)?; + let ttl_as_sec = u64::from_le_bytes(buf); + + let mut buf = [0u8; 2]; + r.read_exact(&mut buf)?; + let hops_limit = u16::from_le_bytes(buf); Ok(GetResource { inventory: inner, requester_addr, - ttl_as_sec: u64::from_le_bytes(buf), + ttl_as_sec, + hops_limit, }) } } From d550606168aafe15240da5be5e47491cf6a804e7 Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Mon, 13 May 2024 16:34:59 +0300 Subject: [PATCH 13/25] node: Handle GetResource::hops_limit field --- node/src/chain/consensus.rs | 1 - node/src/chain/fsm.rs | 2 +- node/src/databroker.rs | 17 +++++++++++++---- node/src/lib.rs | 7 ++++++- node/src/network.rs | 11 ++++++++--- 5 files changed, 28 insertions(+), 10 deletions(-) diff --git a/node/src/chain/consensus.rs b/node/src/chain/consensus.rs index 6a5892451f..986ee6345e 100644 --- a/node/src/chain/consensus.rs +++ b/node/src/chain/consensus.rs @@ -198,7 +198,6 @@ impl dusk_consensus::commons::Database { fn store_candidate_block(&mut self, b: Block) { tracing::trace!("store candidate block: {:?}", b); - match self.db.try_read() { Ok(db) => { if let Err(e) = db.update(|t| t.store_candidate_block(b)) { diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index a5554e1c65..66741f9197 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -799,7 +799,7 @@ async fn flood_request_block( }; debug!(event = "flood_request block", ?req); - if let Err(err) = network.read().await.flood_request(&inv).await { + if let Err(err) = network.read().await.flood_request(&inv, 0, 100).await { warn!("could not request block {err}") }; } diff --git a/node/src/databroker.rs b/node/src/databroker.rs index 0a61cedf00..55276c0dca 100644 --- a/node/src/databroker.rs +++ b/node/src/databroker.rs @@ -227,9 +227,18 @@ impl DataBrokerSrv { { Ok(msg_list) => Ok(Response::new(msg_list, m.get_addr())), Err(err) => { - if !m.is_expired() { - // Not found resource, rebroadcast its request - let _ = network.read().await.broadcast(msg).await; + // resource is not found, rebroadcast the request if + // the request is neither expired nor out of hops + if let Some(m) = m.clone_with_hop_decrement() { + if !m.is_expired() { + // Construct a new message with same + // Message::metadata but with decremented + // hops_limit + let mut msg = msg.clone(); + msg.payload = Payload::GetResource(m); + let _ = + network.read().await.broadcast(&msg).await; + } } Err(err) } @@ -403,7 +412,7 @@ impl DataBrokerSrv { // message is part of one-to-one messaging flows // (GetBlocks/Mempool) so it should not be a flooding request. Ok(Message::new_get_resource(GetResource::new( - inv, recv_addr, 0, + inv, recv_addr, 0, 1, ))) } diff --git a/node/src/lib.rs b/node/src/lib.rs index 0c9a12a4e7..24e0dc0fcb 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -44,7 +44,12 @@ pub trait Network: Send + Sync + 'static { async fn broadcast(&self, msg: &Message) -> anyhow::Result<()>; /// Broadcasts a request message - async fn flood_request(&self, msg_inv: &Inv) -> anyhow::Result<()>; + async fn flood_request( + &self, + msg_inv: &Inv, + ttl_as_sec: u64, + hops_limit: u16, + ) -> anyhow::Result<()>; /// Sends a message to a specified peer. async fn send_to_peer( diff --git a/node/src/network.rs b/node/src/network.rs index 693ee9f140..22012c487b 100644 --- a/node/src/network.rs +++ b/node/src/network.rs @@ -24,7 +24,6 @@ use tracing::{error, info, trace, warn}; mod frame; const MAX_PENDING_SENDERS: u64 = 1000; -const FLOOD_REQUEST_TTL: u64 = 10; // seconds type RoutesList = [Option>; N]; type FilterList = [Option; N]; @@ -231,17 +230,23 @@ impl crate::Network for Kadcast { /// /// A receiver of this message is supposed to either look up and return the /// resource or rebroadcast it to the next bucket. - async fn flood_request(&self, msg_inv: &Inv) -> anyhow::Result<()> { + async fn flood_request( + &self, + msg_inv: &Inv, + ttl_as_sec: u64, + hops_limit: u16, + ) -> anyhow::Result<()> { let ttl_as_sec = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_secs() - + FLOOD_REQUEST_TTL; + + ttl_as_sec; self.broadcast(&Message::new_get_resource(GetResource::new( msg_inv.clone(), self.public_addr, ttl_as_sec, + hops_limit, ))) .await } From cbc9c989489a261f13842ad0ec19b2e5e5737614 Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Tue, 14 May 2024 12:27:33 +0300 Subject: [PATCH 14/25] node: Insert and use certificates from Certificates cache --- node/src/chain.rs | 56 ++++++++++++++++++++++++++++++++++++------- node/src/chain/fsm.rs | 2 +- 2 files changed, 48 insertions(+), 10 deletions(-) diff --git a/node/src/chain.rs b/node/src/chain.rs index 34ebb7dd76..b513341e3d 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -19,17 +19,20 @@ use crate::database::rocksdb::MD_HASH_KEY; use crate::database::{Ledger, Metadata}; use crate::{database, vm, Network}; use crate::{LongLivedService, Message}; -use anyhow::Result; +use anyhow::{anyhow, Result}; use async_trait::async_trait; use dusk_consensus::commons::ConsensusError; pub use header_validation::verify_block_cert; -use node_data::ledger::{to_str, BlockWithLabel, Label}; +use node_data::ledger::{to_str, Block, BlockWithLabel, Certificate, Label}; use node_data::message::AsyncQueue; use node_data::message::{Payload, Topics}; +use std::borrow::Cow; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::sync::RwLock; +use node_data::message::payload::Vote; use tokio::time::{sleep_until, Instant}; use tracing::{error, info, warn}; @@ -49,6 +52,12 @@ pub struct ChainSrv { inbound: AsyncQueue, keys_path: String, acceptor: Option>>>, + + /// Certificates cached from received Quorum messages + /// TODO: Spamming attack will be addressed by adding Quorum/Certificate + /// sanity check + /// TODO: Replace this HashMap with Quorum CF in RocksDB, if needed + certificates: HashMap<[u8; 32], Certificate>, } #[async_trait] @@ -164,13 +173,16 @@ impl metadata = ?msg.metadata, ); - // Handle a block that originates from a network peer. - // By disabling block broadcast, a block may be received from a peer - // only after explicit request (on demand). - if let Err(e) = fsm.on_block_event(blk, msg.metadata).await { - error!(event = "fsm::on_event failed", src = "wire", err = format!("{}",e)); - } else { - timeout = Self::next_timeout(); + if let Ok(blk) = self.try_add_cert(blk) { + // Handle a block that originates from a network peer. + // By disabling block broadcast, a block may be received from a peer + // only after explicit request (on demand). + if let Err(e) = fsm.on_block_event(&blk, msg.metadata).await { + error!(event = "fsm::on_event failed", src = "wire", err = format!("{}",e)); + } else { + // TODO: Clean up certificates cache + timeout = Self::next_timeout(); + } } } @@ -178,11 +190,18 @@ impl Payload::Candidate(_) | Payload::Validation(_) | Payload::Ratification(_) => { + let acc = self.acceptor.as_ref().expect("initialize is called"); if let Err(e) = acc.read().await.reroute_msg(msg).await { warn!("msg discarded: {e}"); } }, Payload::Quorum(payload) => { + if let Vote::Valid(hash) = payload.vote() { + // TODO: execute sanity check or full ceriticate verification + self.certificates.insert(*hash, payload.cert); + } + + let acc = self.acceptor.as_ref().expect("initialize is called"); if let Err(e) = acc.read().await.reroute_msg(msg.clone()).await { warn!("msg discarded: {e}"); } @@ -230,6 +249,7 @@ impl ChainSrv { inbound: AsyncQueue::unbounded(), keys_path, acceptor: None, + certificates: Default::default(), } } @@ -293,4 +313,22 @@ impl ChainSrv { .checked_add(ACCEPT_BLOCK_TIMEOUT_SEC) .unwrap() } + + fn try_add_cert<'a>(&self, blk: &'a Block) -> Result> { + if blk.header().cert == Certificate::default() { + // The default cert means the block was retrieved from Candidate + // CF thus missing the certificate. If so, we try to set the valid + // certificate from the cache certificates. + + if let Some(cert) = self.certificates.get(&blk.header().hash) { + let mut blk = blk.clone(); + blk.set_certificate(*cert); + Ok(Cow::Owned(blk)) + } else { + Err(anyhow!("cert not found")) + } + } else { + Ok(Cow::Borrowed(blk)) + } + } } diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index 66741f9197..bbd107f441 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -784,7 +784,7 @@ async fn flood_request_block( network: &Arc>, req: BlockRequest, ) { - // Request only one resource + // Request only one resource/block let mut inv = Inv::new(1); match req { BlockRequest::ByHeight(height) => { From 6b0c61f2091e679046b4d189e81941049c8440ed Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Tue, 14 May 2024 13:39:58 +0300 Subject: [PATCH 15/25] node: Ensure GetResource msg is not expired before processing it --- node/src/databroker.rs | 34 +++++++++++++++++++--------------- 1 file changed, 19 insertions(+), 15 deletions(-) diff --git a/node/src/databroker.rs b/node/src/databroker.rs index 55276c0dca..9b91402c8f 100644 --- a/node/src/databroker.rs +++ b/node/src/databroker.rs @@ -222,23 +222,24 @@ impl DataBrokerSrv { } // Handle GetResource requests Payload::GetResource(m) => { + if m.is_expired() { + return Err(anyhow!("message has expired")); + } + match Self::handle_get_resource(db, m, conf.max_inv_entries) .await { Ok(msg_list) => Ok(Response::new(msg_list, m.get_addr())), Err(err) => { - // resource is not found, rebroadcast the request if - // the request is neither expired nor out of hops + // resource is not found, rebroadcast the request only + // if hops_limit is not reached if let Some(m) = m.clone_with_hop_decrement() { - if !m.is_expired() { - // Construct a new message with same - // Message::metadata but with decremented - // hops_limit - let mut msg = msg.clone(); - msg.payload = Payload::GetResource(m); - let _ = - network.read().await.broadcast(&msg).await; - } + // Construct a new message with same + // Message::metadata but with decremented + // hops_limit + let mut msg = msg.clone(); + msg.payload = Payload::GetResource(m); + let _ = network.read().await.broadcast(&msg).await; } Err(err) } @@ -408,11 +409,14 @@ impl DataBrokerSrv { return Err(anyhow::anyhow!("no items to fetch")); } - // Send GetResource request with disabled rebroadcast (ttl = 0), Inv - // message is part of one-to-one messaging flows - // (GetBlocks/Mempool) so it should not be a flooding request. + // Send GetResource request with disabled rebroadcast (hops_limit = 1), + // Inv message is part of one-to-one messaging flows + // (GetBlocks/Mempool) so it should not be treated as flooding request Ok(Message::new_get_resource(GetResource::new( - inv, recv_addr, 0, 1, + inv, + recv_addr, + u64::MAX, + 1, ))) } From d8461a8c647a54469697086905252ef244430bcd Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Tue, 14 May 2024 13:55:04 +0300 Subject: [PATCH 16/25] node: Update flood_request doc comment --- node/src/network.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/node/src/network.rs b/node/src/network.rs index 22012c487b..15bc603193 100644 --- a/node/src/network.rs +++ b/node/src/network.rs @@ -228,8 +228,13 @@ impl crate::Network for Kadcast { /// broadcast does follow the so-called "Flood with Random Walk" blind /// search (resource discovery). /// - /// A receiver of this message is supposed to either look up and return the - /// resource or rebroadcast it to the next bucket. + /// A receiver of this message is supposed to look up the resource and + /// either return it or, if not found, rebroadcast the message to the next + /// Kadcast bucket + /// + /// * `ttl_as_sec` - Defines the lifespan of the request in seconds + /// + /// * `hops_limit` - Defines maximum number of hops to receive the request async fn flood_request( &self, msg_inv: &Inv, From 0ee31967daeffe5618c97a8481cfdb384a3bf6fc Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Wed, 15 May 2024 13:00:53 +0300 Subject: [PATCH 17/25] node: Move certificates HashMap from Chain to FSM components --- node/src/chain.rs | 54 ++--------- node/src/chain/consensus.rs | 1 - node/src/chain/fsm.rs | 189 +++++++++++++++++++----------------- node/src/databroker.rs | 2 +- node/src/lib.rs | 2 +- node/src/network.rs | 17 ++-- 6 files changed, 122 insertions(+), 143 deletions(-) diff --git a/node/src/chain.rs b/node/src/chain.rs index b513341e3d..a5bb43693a 100644 --- a/node/src/chain.rs +++ b/node/src/chain.rs @@ -19,20 +19,17 @@ use crate::database::rocksdb::MD_HASH_KEY; use crate::database::{Ledger, Metadata}; use crate::{database, vm, Network}; use crate::{LongLivedService, Message}; -use anyhow::{anyhow, Result}; +use anyhow::Result; use async_trait::async_trait; use dusk_consensus::commons::ConsensusError; pub use header_validation::verify_block_cert; -use node_data::ledger::{to_str, Block, BlockWithLabel, Certificate, Label}; +use node_data::ledger::{to_str, BlockWithLabel, Label}; use node_data::message::AsyncQueue; use node_data::message::{Payload, Topics}; -use std::borrow::Cow; -use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::sync::RwLock; -use node_data::message::payload::Vote; use tokio::time::{sleep_until, Instant}; use tracing::{error, info, warn}; @@ -52,12 +49,6 @@ pub struct ChainSrv { inbound: AsyncQueue, keys_path: String, acceptor: Option>>>, - - /// Certificates cached from received Quorum messages - /// TODO: Spamming attack will be addressed by adding Quorum/Certificate - /// sanity check - /// TODO: Replace this HashMap with Quorum CF in RocksDB, if needed - certificates: HashMap<[u8; 32], Certificate>, } #[async_trait] @@ -173,16 +164,13 @@ impl metadata = ?msg.metadata, ); - if let Ok(blk) = self.try_add_cert(blk) { - // Handle a block that originates from a network peer. - // By disabling block broadcast, a block may be received from a peer - // only after explicit request (on demand). - if let Err(e) = fsm.on_block_event(&blk, msg.metadata).await { - error!(event = "fsm::on_event failed", src = "wire", err = format!("{}",e)); - } else { - // TODO: Clean up certificates cache - timeout = Self::next_timeout(); - } + // Handle a block that originates from a network peer. + // By disabling block broadcast, a block may be received from a peer + // only after explicit request (on demand). + if let Err(e) = fsm.on_block_event(blk, msg.metadata).await { + error!(event = "fsm::on_event failed", src = "wire", err = format!("{}",e)); + } else { + timeout = Self::next_timeout(); } } @@ -196,11 +184,6 @@ impl } }, Payload::Quorum(payload) => { - if let Vote::Valid(hash) = payload.vote() { - // TODO: execute sanity check or full ceriticate verification - self.certificates.insert(*hash, payload.cert); - } - let acc = self.acceptor.as_ref().expect("initialize is called"); if let Err(e) = acc.read().await.reroute_msg(msg.clone()).await { warn!("msg discarded: {e}"); @@ -249,7 +232,6 @@ impl ChainSrv { inbound: AsyncQueue::unbounded(), keys_path, acceptor: None, - certificates: Default::default(), } } @@ -313,22 +295,4 @@ impl ChainSrv { .checked_add(ACCEPT_BLOCK_TIMEOUT_SEC) .unwrap() } - - fn try_add_cert<'a>(&self, blk: &'a Block) -> Result> { - if blk.header().cert == Certificate::default() { - // The default cert means the block was retrieved from Candidate - // CF thus missing the certificate. If so, we try to set the valid - // certificate from the cache certificates. - - if let Some(cert) = self.certificates.get(&blk.header().hash) { - let mut blk = blk.clone(); - blk.set_certificate(*cert); - Ok(Cow::Owned(blk)) - } else { - Err(anyhow!("cert not found")) - } - } else { - Ok(Cow::Borrowed(blk)) - } - } } diff --git a/node/src/chain/consensus.rs b/node/src/chain/consensus.rs index 4ec36d3a73..78bce0995a 100644 --- a/node/src/chain/consensus.rs +++ b/node/src/chain/consensus.rs @@ -218,7 +218,6 @@ impl dusk_consensus::commons::Database ) -> anyhow::Result { // Make an attempt to fetch the candidate block from local storage let res = self.db.read().await.view(|t| t.fetch_candidate_block(h))?; - if let Some(b) = res { return Ok(b); } diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index bbd107f441..28e53729bb 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -11,11 +11,11 @@ use crate::{vm, Network}; use crate::database::{Candidate, Ledger}; use metrics::counter; -use node_data::ledger::{to_str, Block, Label}; +use node_data::ledger::{to_str, Block, Certificate, Label}; use node_data::message::payload::{GetBlocks, Inv, RatificationResult, Vote}; use node_data::message::{payload, Message, Metadata}; +use std::borrow::Cow; use std::collections::{HashMap, HashSet}; -use std::fmt::{Debug, Formatter}; use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::ops::Deref; use std::time::Duration; @@ -26,6 +26,7 @@ use tracing::{debug, error, info, warn}; const MAX_BLOCKS_TO_REQUEST: i16 = 50; const EXPIRY_TIMEOUT_MILLIS: i16 = 5000; +const DEFAULT_HOPS_LIMIT: u16 = 100; pub(crate) const REDUNDANCY_PEER_FACTOR: usize = 5; @@ -71,6 +72,12 @@ pub(crate) struct SimpleFSM { network: Arc>, blacklisted_blocks: SharedHashSet, + + /// Certificates cached from received Quorum messages + /// TODO: Spamming attack will be addressed by adding Quorum/Certificate + /// sanity check + /// TODO: Replace this HashMap with Quorum CF in RocksDB, if needed + certificates: HashMap<(u64, [u8; 32]), Certificate>, } impl SimpleFSM { @@ -89,6 +96,7 @@ impl SimpleFSM { acc, network, blacklisted_blocks, + certificates: Default::default(), } } @@ -158,42 +166,45 @@ impl SimpleFSM { return Ok(()); } - match &mut self.curr { - State::InSync(ref mut curr) => { - if let Some((b, peer_addr)) = - curr.on_block_event(blk, metadata).await? - { - // Transition from InSync to OutOfSync state - curr.on_exiting().await; + if let Some(blk) = self.get_block_with_cert(blk).as_ref() { + match &mut self.curr { + State::InSync(ref mut curr) => { + if let Some((b, peer_addr)) = + curr.on_block_event(blk, metadata).await? + { + // Transition from InSync to OutOfSync state + curr.on_exiting().await; - // Enter new state - let mut next = OutOfSyncImpl::new( - self.acc.clone(), - self.network.clone(), - ); - next.on_entering(&b, peer_addr).await; - self.curr = State::OutOfSync(next); + // Enter new state + let mut next = OutOfSyncImpl::new( + self.acc.clone(), + self.network.clone(), + ); + next.on_entering(&b, peer_addr).await; + self.curr = State::OutOfSync(next); + } } - } - State::OutOfSync(ref mut curr) => { - if curr.on_block_event(blk, metadata).await? { - // Transition from OutOfSync to InSync state - curr.on_exiting().await; - - // Enter new state - let mut next = InSyncImpl::new( - self.acc.clone(), - self.network.clone(), - self.blacklisted_blocks.clone(), - ); - next.on_entering(blk).await.map_err(|e| { - error!("Unable to enter in_sync state: {e}"); - e - })?; - self.curr = State::InSync(next); + State::OutOfSync(ref mut curr) => { + if curr.on_block_event(blk, metadata).await? { + // Transition from OutOfSync to InSync state + curr.on_exiting().await; + + // Enter new state + let mut next = InSyncImpl::new( + self.acc.clone(), + self.network.clone(), + self.blacklisted_blocks.clone(), + ); + next.on_entering(blk).await.map_err(|e| { + error!("Unable to enter in_sync state: {e}"); + e + })?; + self.curr = State::InSync(next); + } } } } + Ok(()) } @@ -221,12 +232,14 @@ impl SimpleFSM { height = remote_height, ); - // Request by hash - flood_request_block( - &self.network, - BlockRequest::ByHash(hash), - ) - .await; + // Save certificate in case candidate block is received only + //self.certificates + // .insert((remote_height, hash), quorum.cert); + + let mut inv = Inv::new(1); + inv.add_candidate_from_hash(hash); + inv.add_block_from_hash(hash); + flood_request(&self.network, &inv).await; Ok(None) } else { @@ -255,14 +268,19 @@ impl SimpleFSM { ); // Candidate block is not found from local - // storage. - // - // Request by hash - flood_request_block( - &self.network, - BlockRequest::ByHash(hash), - ) - .await; + // storage. Cache the certificate and request + // candidate block only. + + // Save certificate in case candidate block is + // received only + //self.certificates + // .insert((remote_height, hash), + // quorum.cert); + + let mut inv = Inv::new(1); + inv.add_candidate_from_hash(hash); + inv.add_block_from_hash(hash); + flood_request(&self.network, &inv).await; Err(err) } @@ -323,6 +341,30 @@ impl SimpleFSM { Ok(()) } + + /// Try to attach the certificate to a block that misses it + fn get_block_with_cert<'a>( + &self, + blk: &'a Block, + ) -> Option> { + if blk.header().cert == Certificate::default() { + // The default cert means the block was retrieved from Candidate + // CF thus missing the certificate. If so, we try to set the valid + // certificate from the cache certificates. + + let key = (blk.header().height, blk.header().hash); + if let Some(cert) = self.certificates.get(&key) { + let mut blk = blk.clone(); + blk.set_certificate(*cert); + Some(Cow::Owned(blk)) + } else { + error!("cert not found for {}", hex::encode(blk.header().hash)); + None + } + } else { + Some(Cow::Borrowed(blk)) + } + } } struct InSyncImpl { @@ -571,11 +613,9 @@ impl InSyncImpl { )); } - flood_request_block( - &self.network, - BlockRequest::ByHeight(local_header.height + 1), - ) - .await; + let mut inv = Inv::new(1); + inv.add_block_from_height(local_header.height + 1); + flood_request(&self.network, &inv).await; } Ok(None) @@ -761,45 +801,16 @@ impl } } -enum BlockRequest { - ByHeight(u64), - ByHash([u8; 32]), -} -impl Debug for BlockRequest { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - BlockRequest::ByHeight(height) => { - write!(f, "BlockRequest::ByHeight({})", height) - } - BlockRequest::ByHash(hash) => { - write!(f, "BlockRequest::ByHash({})", to_str(hash)) - } - } - } -} - /// Requests a block by height/hash from the network with so-called /// Flood-request approach. -async fn flood_request_block( - network: &Arc>, - req: BlockRequest, -) { - // Request only one resource/block - let mut inv = Inv::new(1); - match req { - BlockRequest::ByHeight(height) => { - inv.add_block_from_height(height); - } - BlockRequest::ByHash(hash) => { - // Request from network the full block, if it is missing then - // request the candidate block. - inv.add_block_from_hash(hash); - inv.add_candidate_from_hash(hash); - } - }; - - debug!(event = "flood_request block", ?req); - if let Err(err) = network.read().await.flood_request(&inv, 0, 100).await { +async fn flood_request(network: &Arc>, inv: &Inv) { + debug!(event = "flood_request", ?inv); + if let Err(err) = network + .read() + .await + .flood_request(inv, None, DEFAULT_HOPS_LIMIT) + .await + { warn!("could not request block {err}") }; } diff --git a/node/src/databroker.rs b/node/src/databroker.rs index 484b49baa7..1b22a808ac 100644 --- a/node/src/databroker.rs +++ b/node/src/databroker.rs @@ -32,7 +32,7 @@ const TOPICS: &[u8] = &[ ]; struct Response { - /// A response usually consists of a single message. However in case of + /// A response usually consists of a single message. However, in case of /// GetMempool and GetBlocks we may need to send multiple messages in /// response to a single request. msgs: SmallVec<[Message; 1]>, diff --git a/node/src/lib.rs b/node/src/lib.rs index 24e0dc0fcb..422ab1a1a3 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -47,7 +47,7 @@ pub trait Network: Send + Sync + 'static { async fn flood_request( &self, msg_inv: &Inv, - ttl_as_sec: u64, + ttl_as_sec: Option, hops_limit: u16, ) -> anyhow::Result<()>; diff --git a/node/src/network.rs b/node/src/network.rs index 15bc603193..fd6e0ce75d 100644 --- a/node/src/network.rs +++ b/node/src/network.rs @@ -238,14 +238,19 @@ impl crate::Network for Kadcast { async fn flood_request( &self, msg_inv: &Inv, - ttl_as_sec: u64, + ttl_as_sec: Option, hops_limit: u16, ) -> anyhow::Result<()> { - let ttl_as_sec = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_secs() - + ttl_as_sec; + let ttl_as_sec = ttl_as_sec.map_or_else( + || u64::MAX, + |v| { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + + v + }, + ); self.broadcast(&Message::new_get_resource(GetResource::new( msg_inv.clone(), From caed8eade91dc1d27e1bc2829262bb171be054eb Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Wed, 15 May 2024 16:54:08 +0300 Subject: [PATCH 18/25] node: Remove expired certificates --- node-data/src/message.rs | 2 +- node/src/chain/fsm.rs | 76 +++++++++++++++++++++------------------- 2 files changed, 40 insertions(+), 38 deletions(-) diff --git a/node-data/src/message.rs b/node-data/src/message.rs index 82ddaa5c6f..067cf893d8 100644 --- a/node-data/src/message.rs +++ b/node-data/src/message.rs @@ -790,7 +790,7 @@ pub mod payload { max_entries, } } - + pub fn add_tx_id(&mut self, id: [u8; 32]) { self.inv_list.push(InvVect { inv_type: InvType::MempoolTx, diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index 28e53729bb..ae5c82f620 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -27,7 +27,7 @@ use tracing::{debug, error, info, warn}; const MAX_BLOCKS_TO_REQUEST: i16 = 50; const EXPIRY_TIMEOUT_MILLIS: i16 = 5000; const DEFAULT_HOPS_LIMIT: u16 = 100; - +const DEFAULT_CERT_CACHE_EXPIRY: Duration = Duration::from_secs(10); pub(crate) const REDUNDANCY_PEER_FACTOR: usize = 5; type SharedHashSet = Arc>>; @@ -74,10 +74,7 @@ pub(crate) struct SimpleFSM { blacklisted_blocks: SharedHashSet, /// Certificates cached from received Quorum messages - /// TODO: Spamming attack will be addressed by adding Quorum/Certificate - /// sanity check - /// TODO: Replace this HashMap with Quorum CF in RocksDB, if needed - certificates: HashMap<(u64, [u8; 32]), Certificate>, + certificates_cache: HashMap<[u8; 32], (Certificate, Instant)>, } impl SimpleFSM { @@ -96,18 +93,18 @@ impl SimpleFSM { acc, network, blacklisted_blocks, - certificates: Default::default(), + certificates_cache: Default::default(), } } pub async fn on_idle(&mut self, timeout: Duration) { let acc = self.acc.read().await; - let height = acc.get_curr_height().await; + let tip_height = acc.get_curr_height().await; let iter = acc.get_curr_iteration().await; if let Ok(last_finalized) = acc.get_latest_final_block().await { info!( event = "fsm::idle", - height, + tip_height, iter, timeout_sec = timeout.as_secs(), "finalized_height" = last_finalized.header().height, @@ -132,6 +129,10 @@ impl SimpleFSM { } else { error!("could not request blocks"); } + + let now = Instant::now(); + self.certificates_cache + .retain(|_, (_, expiry)| *expiry > now); } pub async fn on_failed_consensus(&mut self) { @@ -205,9 +206,28 @@ impl SimpleFSM { } } + let now = Instant::now(); + self.certificates_cache + .retain(|_, (_, expiry)| *expiry > now); + self.certificates_cache.remove(&blk.header().hash); + Ok(()) } + async fn request_block(&mut self, hash: [u8; 32], cert: Certificate) { + // Save certificate in case only candidate block is received + let expiry = Instant::now() + .checked_add(DEFAULT_CERT_CACHE_EXPIRY) + .unwrap(); + self.certificates_cache.insert(hash, (cert, expiry)); + + let mut inv = Inv::new(1); + inv.add_block_from_hash(hash); + inv.add_candidate_from_hash(hash); + + flood_request(&self.network, &inv).await; + } + /// Handles a Quorum message. /// /// Ideally, the winning block will be built from the quorum certificate @@ -220,8 +240,8 @@ impl SimpleFSM { ) -> anyhow::Result<()> { let res = match quorum.cert.result { RatificationResult::Success(Vote::Valid(hash)) => { - let acc = self.acc.read().await; - let local_header = acc.tip_header().await; + let local_header = self.acc.read().await.tip_header().await; + let db = self.acc.read().await.db.clone(); let remote_height = msg.header.round; // Quorum from future @@ -232,14 +252,7 @@ impl SimpleFSM { height = remote_height, ); - // Save certificate in case candidate block is received only - //self.certificates - // .insert((remote_height, hash), quorum.cert); - - let mut inv = Inv::new(1); - inv.add_candidate_from_hash(hash); - inv.add_block_from_hash(hash); - flood_request(&self.network, &inv).await; + self.request_block(hash, quorum.cert).await; Ok(None) } else { @@ -252,12 +265,12 @@ impl SimpleFSM { || (remote_height == local_header.height && local_header.hash != hash) { - match acc - .db + let res = db .read() .await - .view(|t| t.fetch_candidate_block(&hash)) - { + .view(|t| t.fetch_candidate_block(&hash)); + + match res { Ok(b) => Ok(b), Err(err) => { error!( @@ -270,18 +283,7 @@ impl SimpleFSM { // Candidate block is not found from local // storage. Cache the certificate and request // candidate block only. - - // Save certificate in case candidate block is - // received only - //self.certificates - // .insert((remote_height, hash), - // quorum.cert); - - let mut inv = Inv::new(1); - inv.add_candidate_from_hash(hash); - inv.add_block_from_hash(hash); - flood_request(&self.network, &inv).await; - + self.request_block(hash, quorum.cert).await; Err(err) } } @@ -351,9 +353,9 @@ impl SimpleFSM { // The default cert means the block was retrieved from Candidate // CF thus missing the certificate. If so, we try to set the valid // certificate from the cache certificates. - - let key = (blk.header().height, blk.header().hash); - if let Some(cert) = self.certificates.get(&key) { + if let Some((cert, _)) = + self.certificates_cache.get(&blk.header().hash) + { let mut blk = blk.clone(); blk.set_certificate(*cert); Some(Cow::Owned(blk)) From e977c6b3a81b15d527ae1222e63c346fc66f9e1d Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Thu, 16 May 2024 14:14:38 +0300 Subject: [PATCH 19/25] node-data: Print in hash fields in hex format --- node-data/src/message.rs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/node-data/src/message.rs b/node-data/src/message.rs index 067cf893d8..9912e95003 100644 --- a/node-data/src/message.rs +++ b/node-data/src/message.rs @@ -759,7 +759,7 @@ pub mod payload { CandidateFromHash, } - #[derive(Debug, Clone, Copy)] + #[derive(Clone, Copy)] pub enum InvParam { Hash([u8; 32]), Height(u64), @@ -771,6 +771,15 @@ pub mod payload { } } + impl fmt::Debug for InvParam { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::Hash(hash) => write!(f, "Hash: {}", to_str(hash)), + Self::Height(height) => write!(f, "Height: {}", height), + } + } + } + #[derive(Default, Debug, Clone, Copy)] pub struct InvVect { pub inv_type: InvType, @@ -883,11 +892,17 @@ pub mod payload { } } - #[derive(Debug, Clone, Default)] + #[derive(Clone, Default)] pub struct GetBlocks { pub locator: [u8; 32], } + impl fmt::Debug for GetBlocks { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "GetBlocks, locator: {}", to_str(&self.locator)) + } + } + impl Serializable for GetBlocks { fn write(&self, w: &mut W) -> io::Result<()> { w.write_all(&self.locator[..]) From 447df7437e267554ecbe956124e5efc0a7cb3b9e Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Thu, 16 May 2024 14:16:11 +0300 Subject: [PATCH 20/25] node: Pass requester addr in GetResource on handling GetInv --- node/src/databroker.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/node/src/databroker.rs b/node/src/databroker.rs index 1b22a808ac..f004b605ec 100644 --- a/node/src/databroker.rs +++ b/node/src/databroker.rs @@ -195,6 +195,7 @@ impl DataBrokerSrv { .ok_or_else(|| anyhow::anyhow!("invalid metadata src_addr"))?; debug!(event = "handle_request", ?msg); + let this_peer = *network.read().await.public_addr(); match &msg.payload { // Handle GetCandidate requests @@ -216,7 +217,7 @@ impl DataBrokerSrv { // Handle GetInv requests Payload::GetInv(m) => { let msg = - Self::handle_inv(db, m, conf.max_inv_entries, recv_peer) + Self::handle_inv(db, m, conf.max_inv_entries, this_peer) .await?; Ok(Response::new_from_msg(msg, recv_peer)) } @@ -356,7 +357,7 @@ impl DataBrokerSrv { db: &Arc>, m: &node_data::message::payload::Inv, max_entries: usize, - recv_addr: SocketAddr, + requester_addr: SocketAddr, ) -> Result { let inv = db.read().await.view(|t| { let mut inv = payload::Inv::default(); @@ -414,7 +415,7 @@ impl DataBrokerSrv { // (GetBlocks/Mempool) so it should not be treated as flooding request Ok(Message::new_get_resource(GetResource::new( inv, - recv_addr, + requester_addr, u64::MAX, 1, ))) From fec2369043d6e16768ad3a246590141bf8004509 Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Tue, 21 May 2024 11:24:13 +0300 Subject: [PATCH 21/25] node: Address issues and PR comments - Use request_block_by_height to initialize a presync procedure - Update default hops_limit to 128 - Update DEFAULT_CERT_CACHE_EXPIRY to 1min - Fix the condition for detecting sync target is reached --- node/src/chain/fsm.rs | 57 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 47 insertions(+), 10 deletions(-) diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index ae5c82f620..7c315e9a22 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -12,7 +12,10 @@ use crate::{vm, Network}; use crate::database::{Candidate, Ledger}; use metrics::counter; use node_data::ledger::{to_str, Block, Certificate, Label}; -use node_data::message::payload::{GetBlocks, Inv, RatificationResult, Vote}; +use node_data::message::payload::{ + GetBlocks, GetResource, Inv, RatificationResult, Vote, +}; + use node_data::message::{payload, Message, Metadata}; use std::borrow::Cow; use std::collections::{HashMap, HashSet}; @@ -26,8 +29,8 @@ use tracing::{debug, error, info, warn}; const MAX_BLOCKS_TO_REQUEST: i16 = 50; const EXPIRY_TIMEOUT_MILLIS: i16 = 5000; -const DEFAULT_HOPS_LIMIT: u16 = 100; -const DEFAULT_CERT_CACHE_EXPIRY: Duration = Duration::from_secs(10); +const DEFAULT_CERT_CACHE_EXPIRY: Duration = Duration::from_secs(60); +const DEFAULT_HOPS_LIMIT: u16 = 128; pub(crate) const REDUNDANCY_PEER_FACTOR: usize = 5; type SharedHashSet = Arc>>; @@ -167,7 +170,7 @@ impl SimpleFSM { return Ok(()); } - if let Some(blk) = self.get_block_with_cert(blk).as_ref() { + if let Some(blk) = self.attach_cert_if_needed(blk).as_ref() { match &mut self.curr { State::InSync(ref mut curr) => { if let Some((b, peer_addr)) = @@ -206,6 +209,7 @@ impl SimpleFSM { } } + // Clean up certificate cache let now = Instant::now(); self.certificates_cache .retain(|_, (_, expiry)| *expiry > now); @@ -215,6 +219,10 @@ impl SimpleFSM { } async fn request_block(&mut self, hash: [u8; 32], cert: Certificate) { + if self.certificates_cache.contains_key(&hash) { + return; + } + // Save certificate in case only candidate block is received let expiry = Instant::now() .checked_add(DEFAULT_CERT_CACHE_EXPIRY) @@ -345,7 +353,7 @@ impl SimpleFSM { } /// Try to attach the certificate to a block that misses it - fn get_block_with_cert<'a>( + fn attach_cert_if_needed<'a>( &self, blk: &'a Block, ) -> Option> { @@ -615,14 +623,39 @@ impl InSyncImpl { )); } - let mut inv = Inv::new(1); - inv.add_block_from_height(local_header.height + 1); - flood_request(&self.network, &inv).await; + Self::request_block_by_height( + &self.network, + local_header.height + 1, + metadata.src_addr, + ) + .await; } Ok(None) } + /// Requests a block by height from a `peer_addr` + async fn request_block_by_height( + network: &Arc>, + height: u64, + peer_addr: SocketAddr, + ) { + let mut inv = Inv::new(1); + inv.add_block_from_height(height); + let this_peer = *network.read().await.public_addr(); + let req = GetResource::new(inv, this_peer, u64::MAX, 1); + debug!(event = "request block by height", ?req, ?peer_addr); + + if let Err(err) = network + .read() + .await + .send_to_peer(&Message::new_get_resource(req), peer_addr) + .await + { + warn!("could not request block {err}") + } + } + async fn on_heartbeat(&mut self) -> anyhow::Result { // TODO: Consider reporting metrics here @@ -761,10 +794,12 @@ impl } } + let tip = acc.get_curr_height().await; // Check target height is reached - if acc.get_curr_height().await == self.range.1 { + if tip >= self.range.1 { + debug!(event = "sync target reached", height = tip); + // Block sync-up procedure manages to download all requested - // blocks acc.restart_consensus().await; // Transit to InSync mode @@ -792,6 +827,7 @@ impl .unwrap() <= SystemTime::now() { + debug!(event = "out_of_sync timer expired"); // sync-up has timed out, recover consensus task self.acc.write().await.restart_consensus().await; @@ -807,6 +843,7 @@ impl /// Flood-request approach. async fn flood_request(network: &Arc>, inv: &Inv) { debug!(event = "flood_request", ?inv); + if let Err(err) = network .read() .await From e29ce2e3c8e3e4b1349b03691200bc5ce54bacbf Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Tue, 21 May 2024 11:27:20 +0300 Subject: [PATCH 22/25] node: Resend a flood request to a randomly selected alive peer --- node/src/databroker.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/node/src/databroker.rs b/node/src/databroker.rs index f004b605ec..1c74aff2a2 100644 --- a/node/src/databroker.rs +++ b/node/src/databroker.rs @@ -240,7 +240,14 @@ impl DataBrokerSrv { // hops_limit let mut msg = msg.clone(); msg.payload = Payload::GetResource(m); - let _ = network.read().await.broadcast(&msg).await; + + debug!("resend a flood request {:?}", msg); + + let _ = network + .read() + .await + .send_to_alive_peers(&msg, 1) + .await; } Err(err) } From 8370805fda26b13e8df2dbe03ed469aaba51c71f Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Tue, 21 May 2024 11:32:04 +0300 Subject: [PATCH 23/25] node: Send a flood request to 8 randomly selected peers --- node/src/network.rs | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/node/src/network.rs b/node/src/network.rs index fd6e0ce75d..dab267cafa 100644 --- a/node/src/network.rs +++ b/node/src/network.rs @@ -25,6 +25,9 @@ mod frame; const MAX_PENDING_SENDERS: u64 = 1000; +/// Number of alive peers randomly selected which a `flood_request` is sent to +const REDUNDANCY_PEER_COUNT: usize = 8; + type RoutesList = [Option>; N]; type FilterList = [Option; N]; @@ -252,12 +255,15 @@ impl crate::Network for Kadcast { }, ); - self.broadcast(&Message::new_get_resource(GetResource::new( - msg_inv.clone(), - self.public_addr, - ttl_as_sec, - hops_limit, - ))) + self.send_to_alive_peers( + &Message::new_get_resource(GetResource::new( + msg_inv.clone(), + self.public_addr, + ttl_as_sec, + hops_limit / REDUNDANCY_PEER_COUNT as u16, + )), + REDUNDANCY_PEER_COUNT, + ) .await } From 88c3197a336949ae80fbead5eb1ba472a7f72ce3 Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Tue, 21 May 2024 11:33:28 +0300 Subject: [PATCH 24/25] node-data: Update comment --- node-data/src/message.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node-data/src/message.rs b/node-data/src/message.rs index 9912e95003..37aecc8f4c 100644 --- a/node-data/src/message.rs +++ b/node-data/src/message.rs @@ -753,7 +753,7 @@ pub mod payload { #[default] /// A full block fetched by block hash BlockFromHash, - /// A full Block fetched by block height + /// A full block fetched by block height BlockFromHeight, /// A candidate block fetched by block hash, Cert is None CandidateFromHash, From 7946a36e1451db0bad4c965b18d221b8a718776b Mon Sep 17 00:00:00 2001 From: goshawk-3 Date: Mon, 27 May 2024 12:01:37 +0300 Subject: [PATCH 25/25] node: Use proper hops_limit config --- node/src/chain/fsm.rs | 5 ++++- node/src/network.rs | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index 7c315e9a22..aae7d399a3 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -30,7 +30,10 @@ use tracing::{debug, error, info, warn}; const MAX_BLOCKS_TO_REQUEST: i16 = 50; const EXPIRY_TIMEOUT_MILLIS: i16 = 5000; const DEFAULT_CERT_CACHE_EXPIRY: Duration = Duration::from_secs(60); -const DEFAULT_HOPS_LIMIT: u16 = 128; + +/// Maximum number of hops between the requester and the node that contains the +/// requested resource +const DEFAULT_HOPS_LIMIT: u16 = 16; pub(crate) const REDUNDANCY_PEER_FACTOR: usize = 5; type SharedHashSet = Arc>>; diff --git a/node/src/network.rs b/node/src/network.rs index dab267cafa..d6eddb6023 100644 --- a/node/src/network.rs +++ b/node/src/network.rs @@ -260,7 +260,7 @@ impl crate::Network for Kadcast { msg_inv.clone(), self.public_addr, ttl_as_sec, - hops_limit / REDUNDANCY_PEER_COUNT as u16, + hops_limit, )), REDUNDANCY_PEER_COUNT, )