From 7551bb8d83ac257c5f65b37a625709a1e32ea2ce Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Tue, 3 Sep 2024 19:27:47 +0200 Subject: [PATCH 01/10] node-data: refactor Message using `WireMessage` trait --- node-data/src/message.rs | 243 +++++++++++++++++++++------------------ 1 file changed, 131 insertions(+), 112 deletions(-) diff --git a/node-data/src/message.rs b/node-data/src/message.rs index 30405dbb39..d5a54972f4 100644 --- a/node-data/src/message.rs +++ b/node-data/src/message.rs @@ -53,6 +53,16 @@ pub struct Message { pub metadata: Option, } +pub trait WireMessage: Into { + const TOPIC: Topics; + fn consensus_header(&self) -> ConsensusHeader { + ConsensusHeader::default() + } + fn payload(self) -> Payload { + self.into() + } +} + impl Message { pub fn compare(&self, round: u64, iteration: u8, step: StepName) -> Status { self.header @@ -120,31 +130,17 @@ impl Serializable for Message { { // Read topic let topic = Topics::from(Self::read_u8(r)?); - let message = match topic { - Topics::Candidate => { - Message::new_candidate(payload::Candidate::read(r)?) - } - Topics::Validation => { - Message::new_validation(payload::Validation::read(r)?) - } - Topics::Ratification => { - Message::new_ratification(payload::Ratification::read(r)?) - } - Topics::Quorum => Message::new_quorum(payload::Quorum::read(r)?), - Topics::Block => Message::new_block(ledger::Block::read(r)?), - Topics::Tx => { - Message::new_transaction(ledger::Transaction::read(r)?) - } - Topics::GetResource => { - Message::new_get_resource(payload::GetResource::read(r)?) - } - Topics::GetBlocks => { - Message::new_get_blocks(payload::GetBlocks::read(r)?) - } - Topics::GetMempool => { - Message::new_get_mempool(payload::GetMempool::read(r)?) - } - Topics::Inv => Message::new_inv(payload::Inv::read(r)?), + let message: Message = match topic { + Topics::Candidate => payload::Candidate::read(r)?.into(), + Topics::Validation => payload::Validation::read(r)?.into(), + Topics::Ratification => payload::Ratification::read(r)?.into(), + Topics::Quorum => payload::Quorum::read(r)?.into(), + Topics::Block => ledger::Block::read(r)?.into(), + Topics::Tx => ledger::Transaction::read(r)?.into(), + Topics::GetResource => payload::GetResource::read(r)?.into(), + Topics::GetBlocks => payload::GetBlocks::read(r)?.into(), + Topics::GetMempool => payload::GetMempool::read(r)?.into(), + Topics::Inv => payload::Inv::read(r)?.into(), Topics::Unknown => { return Err(io::Error::new( io::ErrorKind::InvalidData, @@ -157,110 +153,74 @@ impl Serializable for Message { } } -impl Message { - /// Creates topics.Candidate message - pub fn new_candidate(payload: payload::Candidate) -> Message { +impl From for Message { + fn from(wire_msg: W) -> Self { Self { - header: payload.header.clone(), - topic: Topics::Candidate, - payload: Payload::Candidate(Box::new(payload)), + header: wire_msg.consensus_header(), + topic: W::TOPIC, + payload: wire_msg.payload(), ..Default::default() } } +} - /// Creates topics.Ratification message - pub fn new_ratification(payload: payload::Ratification) -> Message { - Self { - header: payload.header.clone(), - topic: Topics::Ratification, - payload: Payload::Ratification(payload), - ..Default::default() - } +impl WireMessage for Candidate { + const TOPIC: Topics = Topics::Candidate; + fn consensus_header(&self) -> ConsensusHeader { + self.header.clone() } +} - /// Creates topics.Validation message - pub fn new_validation(payload: payload::Validation) -> Message { - Self { - header: payload.header.clone(), - topic: Topics::Validation, - payload: Payload::Validation(payload), - ..Default::default() - } +impl WireMessage for Validation { + const TOPIC: Topics = Topics::Validation; + fn consensus_header(&self) -> ConsensusHeader { + self.header.clone() } +} - /// Creates topics.Quorum message - pub fn new_quorum(payload: payload::Quorum) -> Message { - Self { - header: payload.header.clone(), - topic: Topics::Quorum, - payload: Payload::Quorum(payload), - ..Default::default() - } +impl WireMessage for Ratification { + const TOPIC: Topics = Topics::Ratification; + fn consensus_header(&self) -> ConsensusHeader { + self.header.clone() } +} - /// Creates topics.Block message - pub fn new_block(payload: ledger::Block) -> Message { - Self { - topic: Topics::Block, - payload: Payload::Block(Box::new(payload)), - ..Default::default() - } +impl WireMessage for payload::Quorum { + const TOPIC: Topics = Topics::Quorum; + fn consensus_header(&self) -> ConsensusHeader { + self.header.clone() } +} - /// Creates topics.Inv (inventory) message - pub fn new_inv(p: payload::Inv) -> Message { - Self { - topic: Topics::Inv, - payload: Payload::Inv(p), - ..Default::default() - } - } +impl WireMessage for payload::GetMempool { + const TOPIC: Topics = Topics::GetMempool; +} - /// Creates topics.GetResource message - pub fn new_get_resource(p: payload::GetResource) -> Message { - Self { - topic: Topics::GetResource, - payload: Payload::GetResource(p), - ..Default::default() - } - } +impl WireMessage for payload::Inv { + const TOPIC: Topics = Topics::Inv; +} - /// Creates topics.GetMempool message - pub fn new_get_mempool(p: payload::GetMempool) -> Message { - Self { - topic: Topics::GetMempool, - payload: Payload::GetMempool(p), - ..Default::default() - } - } +impl WireMessage for payload::GetBlocks { + const TOPIC: Topics = Topics::GetBlocks; +} - /// Creates topics.GetBlocks message - pub fn new_get_blocks(p: payload::GetBlocks) -> Message { - Self { - topic: Topics::GetBlocks, - payload: Payload::GetBlocks(p), - ..Default::default() - } - } +impl WireMessage for payload::GetResource { + const TOPIC: Topics = Topics::GetResource; +} - /// Creates topics.Tx message - pub fn new_transaction(tx: ledger::Transaction) -> Message { - Self { - topic: Topics::Tx, - payload: Payload::Transaction(Box::new(tx)), - ..Default::default() - } - } +impl WireMessage for ledger::Block { + const TOPIC: Topics = Topics::Block; +} - /// Creates a message with a validation_result - pub fn from_validation_result(p: payload::ValidationResult) -> Message { - Self { - topic: Topics::default(), - payload: Payload::ValidationResult(Box::new(p)), - ..Default::default() - } - } +impl WireMessage for ledger::Transaction { + const TOPIC: Topics = Topics::Tx; +} + +impl WireMessage for payload::ValidationResult { + const TOPIC: Topics = Topics::Unknown; +} +impl Message { /// Creates a unknown message with empty payload pub fn empty() -> Message { Self { @@ -360,6 +320,65 @@ pub enum Payload { Empty, } +impl From for Payload { + fn from(value: payload::Ratification) -> Self { + Self::Ratification(value) + } +} + +impl From for Payload { + fn from(value: payload::Validation) -> Self { + Self::Validation(value) + } +} + +impl From for Payload { + fn from(value: payload::Candidate) -> Self { + Self::Candidate(Box::new(value)) + } +} +impl From for Payload { + fn from(value: payload::Quorum) -> Self { + Self::Quorum(value) + } +} +impl From for Payload { + fn from(value: ledger::Block) -> Self { + Self::Block(Box::new(value)) + } +} +impl From for Payload { + fn from(value: ledger::Transaction) -> Self { + Self::Transaction(Box::new(value)) + } +} +impl From for Payload { + fn from(value: payload::GetMempool) -> Self { + Self::GetMempool(value) + } +} +impl From for Payload { + fn from(value: payload::Inv) -> Self { + Self::Inv(value) + } +} +impl From for Payload { + fn from(value: payload::GetBlocks) -> Self { + Self::GetBlocks(value) + } +} +impl From for Payload { + fn from(value: payload::GetResource) -> Self { + Self::GetResource(value) + } +} + +impl From for Payload { + fn from(value: payload::ValidationResult) -> Self { + Self::ValidationResult(Box::new(value)) + } +} + pub mod payload { use crate::ledger::{self, to_str, Attestation, Block, Hash, StepVotes}; use crate::{get_current_timestamp, Serializable}; @@ -711,7 +730,7 @@ pub mod payload { } #[derive(Debug, Clone, Default)] - pub struct GetMempool {} + pub struct GetMempool; impl Serializable for GetMempool { fn write(&self, _w: &mut W) -> io::Result<()> { @@ -722,7 +741,7 @@ pub mod payload { where Self: Sized, { - Ok(GetMempool::default()) + Ok(GetMempool) } } From 9806d6be254e42a160034461c2d57832902695fd Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Tue, 3 Sep 2024 19:30:16 +0200 Subject: [PATCH 02/10] consensus: use generic `Message::from` --- consensus/src/proposal/block_generator.rs | 2 +- consensus/src/ratification/handler.rs | 2 +- consensus/src/ratification/step.rs | 2 +- consensus/src/step_votes_reg.rs | 2 +- consensus/src/validation/handler.rs | 2 +- consensus/src/validation/step.rs | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/consensus/src/proposal/block_generator.rs b/consensus/src/proposal/block_generator.rs index e53a0585e4..43f1f45c4d 100644 --- a/consensus/src/proposal/block_generator.rs +++ b/consensus/src/proposal/block_generator.rs @@ -80,7 +80,7 @@ impl Generator { candidate.sign(&ru.secret_key, ru.pubkey_bls.inner()); - Ok(Message::new_candidate(candidate)) + Ok(candidate.into()) } async fn generate_block( diff --git a/consensus/src/ratification/handler.rs b/consensus/src/ratification/handler.rs index 2f04e885e0..d2015e9e98 100644 --- a/consensus/src/ratification/handler.rs +++ b/consensus/src/ratification/handler.rs @@ -205,7 +205,7 @@ impl RatificationHandler { }, }; - Message::new_quorum(quorum) + quorum.into() } pub(crate) fn reset(&mut self, iter: u8, validation: ValidationResult) { diff --git a/consensus/src/ratification/step.rs b/consensus/src/ratification/step.rs index 8154089b42..998d5f28c7 100644 --- a/consensus/src/ratification/step.rs +++ b/consensus/src/ratification/step.rs @@ -33,7 +33,7 @@ impl RatificationStep { let ratification = self::build_ratification_payload(ru, iteration, result); - let msg = Message::new_ratification(ratification); + let msg = Message::from(ratification); // Publish ratification vote info!(event = "send_vote", validation_bitset = result.sv().bitset); diff --git a/consensus/src/step_votes_reg.rs b/consensus/src/step_votes_reg.rs index 69e3bb89a0..39f81c106c 100644 --- a/consensus/src/step_votes_reg.rs +++ b/consensus/src/step_votes_reg.rs @@ -202,7 +202,7 @@ impl AttInfoRegistry { let payload = payload::Quorum { header, att }; - Message::new_quorum(payload) + payload.into() } pub(crate) fn get_failed_atts(&self, to: u8) -> Vec> { diff --git a/consensus/src/validation/handler.rs b/consensus/src/validation/handler.rs index 1a7d236c6f..5d0b37dbc1 100644 --- a/consensus/src/validation/handler.rs +++ b/consensus/src/validation/handler.rs @@ -26,7 +26,7 @@ fn final_result( quorum: QuorumType, ) -> HandleMsgOutput { let p = payload::ValidationResult::new(sv, vote, quorum); - let msg = Message::from_validation_result(p); + let msg = Message::from(p); HandleMsgOutput::Ready(msg) } diff --git a/consensus/src/validation/step.rs b/consensus/src/validation/step.rs index a62e09c6fc..c8ef8b15e8 100644 --- a/consensus/src/validation/step.rs +++ b/consensus/src/validation/step.rs @@ -124,7 +124,7 @@ impl ValidationStep { // Sign and construct validation message let validation = self::build_validation_payload(vote, ru, iteration); info!(event = "send_vote", vote = ?validation.vote); - let msg = Message::new_validation(validation); + let msg = Message::from(validation); // Publish outbound.try_send(msg.clone()); From c96e09301dbc5b3685d2e6d44972a363119bb646 Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Tue, 3 Sep 2024 19:32:26 +0200 Subject: [PATCH 03/10] node: use generic `Message::from` --- node/src/chain/fsm.rs | 15 +++++++-------- node/src/databroker.rs | 19 +++++++------------ node/src/mempool.rs | 4 ++-- node/src/network.rs | 16 +++++++--------- 4 files changed, 23 insertions(+), 31 deletions(-) diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index 0d86668390..8e4c690372 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -119,9 +119,10 @@ impl SimpleFSM { self.blacklisted_blocks.write().await.clear(); // Request missing blocks since my last finalized block - let get_blocks = Message::new_get_blocks(GetBlocks { + let get_blocks = GetBlocks { locator: last_finalized.header().hash, - }); + } + .into(); if let Err(e) = self .network .read() @@ -674,11 +675,9 @@ impl InSyncImpl { let req = GetResource::new(inv, this_peer, u64::MAX, 1); debug!(event = "request block by height", ?req, ?peer_addr); - if let Err(err) = network - .read() - .await - .send_to_peer(&Message::new_get_resource(req), peer_addr) - .await + let req = req.into(); + if let Err(err) = + network.read().await.send_to_peer(&req, peer_addr).await { warn!("could not request block {err}") } @@ -745,7 +744,7 @@ impl ); // Request missing blocks from source peer - let gb_msg = Message::new_get_blocks(GetBlocks { locator }); + let gb_msg = GetBlocks { locator }.into(); if let Err(e) = self .network diff --git a/node/src/databroker.rs b/node/src/databroker.rs index 07a1cf0e91..37bcafea22 100644 --- a/node/src/databroker.rs +++ b/node/src/databroker.rs @@ -268,7 +268,7 @@ impl DataBrokerSrv { }) .map_err(|e| anyhow::anyhow!(e))?; - Ok(Message::new_inv(inv)) + Ok(inv.into()) } /// Handles GetBlocks message request. @@ -316,7 +316,7 @@ impl DataBrokerSrv { }) .map_err(|e| anyhow::anyhow!(e))?; - Ok(Message::new_inv(inv)) + Ok(inv.into()) } /// Handles inventory message request. @@ -391,12 +391,7 @@ impl DataBrokerSrv { // Send GetResource request with disabled rebroadcast (hops_limit = 1), // Inv message is part of one-to-one messaging flows // (GetBlocks/Mempool) so it should not be treated as flooding request - Ok(Message::new_get_resource(GetResource::new( - inv, - requester_addr, - u64::MAX, - 1, - ))) + Ok(GetResource::new(inv, requester_addr, u64::MAX, 1).into()) } /// Handles GetResource message request. @@ -424,7 +419,7 @@ impl DataBrokerSrv { Ledger::fetch_block_by_height(&t, *height) .ok() .flatten() - .map(Message::new_block) + .map(Message::from) } else { None } @@ -434,7 +429,7 @@ impl DataBrokerSrv { Ledger::fetch_block(&t, hash) .ok() .flatten() - .map(Message::new_block) + .map(Message::from) } else { None } @@ -449,7 +444,7 @@ impl DataBrokerSrv { .ok() .flatten() }) - .map(Message::new_block) + .map(Message::from) } else { None } @@ -459,7 +454,7 @@ impl DataBrokerSrv { Mempool::get_tx(&t, *tx_id) .ok() .flatten() - .map(Message::new_transaction) + .map(Message::from) } else { None } diff --git a/node/src/mempool.rs b/node/src/mempool.rs index 7ddf67184a..90226c8a8f 100644 --- a/node/src/mempool.rs +++ b/node/src/mempool.rs @@ -247,11 +247,11 @@ impl MempoolSrv { .mempool_download_redundancy .unwrap_or(DEFAULT_DOWNLOAD_REDUNDANCY); - let payload = payload::GetMempool {}; + let msg = payload::GetMempool.into(); if let Err(err) = network .read() .await - .send_to_alive_peers(&Message::new_get_mempool(payload), max_peers) + .send_to_alive_peers(&msg, max_peers) .await { error!("could not request mempool from network: {err}"); diff --git a/node/src/network.rs b/node/src/network.rs index abb05f71e1..3310ae532f 100644 --- a/node/src/network.rs +++ b/node/src/network.rs @@ -208,16 +208,14 @@ impl crate::Network for Kadcast { let ttl_as_sec = ttl_as_sec .map_or_else(|| u64::MAX, |v| get_current_timestamp() + v); - self.send_to_alive_peers( - &Message::new_get_resource(GetResource::new( - msg_inv.clone(), - self.public_addr, - ttl_as_sec, - hops_limit, - )), - REDUNDANCY_PEER_COUNT, + let msg = GetResource::new( + msg_inv.clone(), + self.public_addr, + ttl_as_sec, + hops_limit, ) - .await + .into(); + self.send_to_alive_peers(&msg, REDUNDANCY_PEER_COUNT).await } /// Sends an encoded message to a given peer. From 1f9e1f2822163fc50bb6d736980dae31f80cb797 Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Tue, 3 Sep 2024 19:32:58 +0200 Subject: [PATCH 04/10] rusk: use generic `Message::from` for Transactions --- rusk/src/lib/http/chain.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/rusk/src/lib/http/chain.rs b/rusk/src/lib/http/chain.rs index 7a47dcddb7..8f038df49f 100644 --- a/rusk/src/lib/http/chain.rs +++ b/rusk/src/lib/http/chain.rs @@ -10,6 +10,7 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::sync::Arc; +use execution_core::transfer::Transaction as ProtocolTransaction; use node::database::rocksdb::{Backend, DBTransaction}; use node::database::{Mempool, DB}; use node::network::Kadcast; @@ -153,10 +154,10 @@ impl RuskNode { } async fn propagate_tx(&self, tx: &[u8]) -> anyhow::Result { - let tx = execution_core::transfer::Transaction::from_slice(tx) + let tx: Transaction = ProtocolTransaction::from_slice(tx) .map_err(|e| anyhow::anyhow!("Invalid Data {e:?}"))? .into(); - let tx_message = Message::new_transaction(tx); + let tx_message = tx.into(); let network = self.network(); network.read().await.route_internal(tx_message); From 0cf20ae6770091091e1551a25bb0a464d5038db7 Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Thu, 5 Sep 2024 11:51:52 +0200 Subject: [PATCH 05/10] node-data: add protocol version to Message See also #2197 --- node-data/src/message.rs | 57 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 55 insertions(+), 2 deletions(-) diff --git a/node-data/src/message.rs b/node-data/src/message.rs index d5a54972f4..3c2de3124f 100644 --- a/node-data/src/message.rs +++ b/node-data/src/message.rs @@ -16,6 +16,7 @@ use crate::bls::PublicKey; use crate::ledger::{to_str, Hash, Signature}; use crate::StepName; use crate::{bls, ledger, Serializable}; +use core::fmt; use std::cmp::Ordering; use std::io::{self, Read, Write}; use std::net::SocketAddr; @@ -25,7 +26,46 @@ use async_channel::TrySendError; use self::payload::{Candidate, Ratification, Validation}; /// Topic field position in the message binary representation -pub const TOPIC_FIELD_POS: usize = 8 + 8 + 4; +pub const TOPIC_FIELD_POS: usize = 1 + 2 + 2; +pub const PROTOCOL_VERSION: Version = Version(1, 0, 0); + +#[derive(Debug, Clone)] +/// Represent version (major, minor, patch) +pub struct Version(pub u8, pub u16, pub u16); + +impl Default for Version { + fn default() -> Self { + PROTOCOL_VERSION + } +} + +impl fmt::Display for Version { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let Version(maj, min, patch) = self; + write!(f, "{maj}.{min}.{patch}") + } +} + +impl crate::Serializable for Version { + fn write(&self, w: &mut W) -> io::Result<()> { + let Version(maj, min, patch) = self; + w.write_all(&[*maj])?; + w.write_all(&min.to_le_bytes())?; + w.write_all(&patch.to_le_bytes())?; + + Ok(()) + } + + fn read(r: &mut R) -> io::Result + where + Self: Sized, + { + let maj = Self::read_u8(r)?; + let min = Self::read_u16_le(r)?; + let patch = Self::read_u16_le(r)?; + Ok(Self(maj, min, patch)) + } +} pub enum Status { Past, @@ -46,6 +86,7 @@ impl From for Status { /// Message definition #[derive(Debug, Default, Clone)] pub struct Message { + pub version: Version, topic: Topics, pub header: ConsensusHeader, pub payload: Payload, @@ -95,6 +136,15 @@ impl Message { _ => StepName::Proposal.to_step(self.header.iteration), } } + + pub fn version(&self) -> &Version { + &self.version + } + + pub fn with_version(mut self, v: Version) -> Self { + self.version = v; + self + } } /// Defines a transport-related properties that determines how the message @@ -107,6 +157,7 @@ pub struct Metadata { impl Serializable for Message { fn write(&self, w: &mut W) -> io::Result<()> { + self.version.write(w)?; w.write_all(&[self.topic as u8])?; match &self.payload { @@ -128,6 +179,8 @@ impl Serializable for Message { where Self: Sized, { + let version = Version::read(r)?; + // Read topic let topic = Topics::from(Self::read_u8(r)?); let message: Message = match topic { @@ -149,7 +202,7 @@ impl Serializable for Message { } }; - Ok(message) + Ok(message.with_version(version)) } } From dd00cc91d86bfbf2cbae239dd5afc2b16a3811aa Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Tue, 3 Sep 2024 19:34:45 +0200 Subject: [PATCH 06/10] node: remove `PDU` and use `Message` directly --- node/src/network.rs | 25 ++++------ node/src/network/frame.rs | 97 --------------------------------------- 2 files changed, 9 insertions(+), 113 deletions(-) delete mode 100644 node/src/network/frame.rs diff --git a/node/src/network.rs b/node/src/network.rs index 3310ae532f..3f4a2a1b73 100644 --- a/node/src/network.rs +++ b/node/src/network.rs @@ -12,15 +12,12 @@ use async_trait::async_trait; use kadcast::config::Config; use kadcast::{MessageInfo, Peer}; use metrics::counter; -use node_data::get_current_timestamp; use node_data::message::payload::{GetResource, Inv}; use node_data::message::{AsyncQueue, Metadata}; -use std::sync::atomic::{AtomicU64, Ordering}; +use node_data::{get_current_timestamp, Serializable}; use tokio::sync::RwLock; use tracing::{error, info, trace}; -mod frame; - /// Number of alive peers randomly selected which a `flood_request` is sent to const REDUNDANCY_PEER_COUNT: usize = 8; @@ -59,10 +56,8 @@ impl Listener { impl kadcast::NetworkListen for Listener { fn on_message(&self, blob: Vec, md: MessageInfo) { let msg_size = blob.len(); - match frame::Pdu::decode(&mut &blob.to_vec()[..]) { - Ok(d) => { - let mut msg = d.payload; - + match Message::read(&mut &blob.to_vec()[..]) { + Ok(mut msg) => { counter!("dusk_bytes_recv").increment(msg_size as u64); counter!(format!("dusk_inbound_{:?}_size", msg.topic())) .increment(msg_size as u64); @@ -99,8 +94,6 @@ pub struct Kadcast { filters: Arc>>, conf: Config, - counter: AtomicU64, - /// Represents a parsed conf.public_addr public_addr: SocketAddr, } @@ -132,7 +125,6 @@ impl Kadcast { filters, peer, conf, - counter: AtomicU64::new(0), public_addr, }) } @@ -171,7 +163,8 @@ impl crate::Network for Kadcast { None => None, }; - let encoded = frame::Pdu::encode(msg, 0).map_err(|err| { + let mut encoded = vec![]; + msg.write(&mut encoded).map_err(|err| { error!("could not encode message {msg:?}: {err}"); anyhow::anyhow!("failed to broadcast: {err}") })?; @@ -224,9 +217,8 @@ impl crate::Network for Kadcast { msg: &Message, recv_addr: SocketAddr, ) -> anyhow::Result<()> { - // rnd_count is added to bypass kadcast dupemap - let rnd_count = self.counter.fetch_add(1, Ordering::SeqCst); - let encoded = frame::Pdu::encode(msg, rnd_count) + let mut encoded = vec![]; + msg.write(&mut encoded) .map_err(|err| anyhow::anyhow!("failed to send_to_peer: {err}"))?; let topic = msg.topic(); @@ -242,7 +234,8 @@ impl crate::Network for Kadcast { msg: &Message, amount: usize, ) -> anyhow::Result<()> { - let encoded = frame::Pdu::encode(msg, 0) + let mut encoded = vec![]; + msg.write(&mut encoded) .map_err(|err| anyhow::anyhow!("failed to encode: {err}"))?; let topic = msg.topic(); diff --git a/node/src/network/frame.rs b/node/src/network/frame.rs deleted file mode 100644 index 12d485e2c4..0000000000 --- a/node/src/network/frame.rs +++ /dev/null @@ -1,97 +0,0 @@ -// This Source Code Form is subject to the terms of the Mozilla Public -// License, v. 2.0. If a copy of the MPL was not distributed with this -// file, You can obtain one at http://mozilla.org/MPL/2.0/. -// -// Copyright (c) DUSK NETWORK. All rights reserved. - -use node_data::message::Message; -use node_data::Serializable; -use std::io::{self, ErrorKind, Read, Write}; - -const PROTOCOL_VERSION: [u8; 8] = [0, 0, 0, 0, 1, 0, 0, 0]; - -/// Defines PDU (Protocol Data Unit) structure. -#[derive(Debug, Default)] -pub struct Pdu { - pub header: Header, - pub payload: node_data::message::Message, -} - -/// Frame Header definition. -#[derive(Debug, Default)] -pub struct Header { - version: [u8; 8], - reserved: u64, - checksum: [u8; 4], -} - -impl Pdu { - pub fn encode(msg: &Message, reserved: u64) -> io::Result> { - let mut payload_buf = vec![]; - msg.write(&mut payload_buf)?; - - let mut header_buf = vec![]; - Header { - checksum: calc_checksum(&payload_buf[..]), - version: PROTOCOL_VERSION, - reserved, - } - .write(&mut header_buf)?; - - Ok([header_buf, payload_buf].concat()) - } - - pub fn decode(r: &mut R) -> io::Result - where - Self: Sized, - { - let header = Header::read(r)?; - - let mut payload_buf = vec![]; - r.read_to_end(&mut payload_buf)?; - - if header.checksum != calc_checksum(&payload_buf[..]) { - return Err(io::Error::new(ErrorKind::Other, "Checksum is wrong")); - } - - let payload = Message::read(&mut &payload_buf[..])?; - Ok(Pdu { header, payload }) - } -} - -impl Serializable for Header { - fn write(&self, w: &mut W) -> io::Result<()> { - w.write_all(&self.version)?; - w.write_all(&self.reserved.to_le_bytes())?; - w.write_all(&self.checksum)?; - Ok(()) - } - - /// Deserialize struct from buf by consuming N bytes. - fn read(r: &mut R) -> io::Result - where - Self: Sized, - { - let version = Self::read_bytes(r)?; - let reserved = Self::read_u64_le(r)?; - let checksum = Self::read_bytes(r)?; - - Ok(Header { - version, - reserved, - checksum, - }) - } -} - -fn calc_checksum(buf: &[u8]) -> [u8; 4] { - use blake2::{digest::consts::U32, Blake2b, Digest}; - - let mut h = Blake2b::::new(); - h.update(buf); - let res = h.finalize(); - - let mut v = [0u8; 4]; - v.clone_from_slice(&res[0..4]); - v -} From 619b06d70abd40e5488a18a0467f47f7a4503ebf Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Thu, 5 Sep 2024 11:56:07 +0200 Subject: [PATCH 07/10] node-data: add `Nonce` to GetMempool and GetBlocks This is required to allow messages to not be filtered by the kadcast dupemap. --- node-data/src/message.rs | 112 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 104 insertions(+), 8 deletions(-) diff --git a/node-data/src/message.rs b/node-data/src/message.rs index 3c2de3124f..ff0c66eb8c 100644 --- a/node-data/src/message.rs +++ b/node-data/src/message.rs @@ -10,6 +10,7 @@ use execution_core::signatures::bls::{ MultisigSignature as BlsMultisigSignature, PublicKey as BlsPublicKey, SecretKey as BlsSecretKey, }; +use payload::Nonce; use tracing::{error, warn}; use crate::bls::PublicKey; @@ -373,6 +374,16 @@ pub enum Payload { Empty, } +impl Payload { + pub fn set_nonce>(&mut self, nonce: N) { + match self { + Payload::GetMempool(p) => p.set_nonce(nonce), + Payload::GetBlocks(p) => p.set_nonce(nonce), + _ => {} + } + } +} + impl From for Payload { fn from(value: payload::Ratification) -> Self { Self::Ratification(value) @@ -438,7 +449,7 @@ pub mod payload { use std::fmt; use std::io::{self, Read, Write}; use std::net::{ - Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, + IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6, }; use super::{ConsensusHeader, SignInfo}; @@ -781,20 +792,89 @@ pub mod payload { }) } } + #[derive(Debug, Clone, Default)] + pub struct Nonce([u8; 8]); + + impl Serializable for Nonce { + fn write(&self, w: &mut W) -> io::Result<()> { + w.write_all(&self.0) + } + + fn read(r: &mut R) -> io::Result + where + Self: Sized, + { + let nonce = Self::read_bytes(r)?; + Ok(Self(nonce)) + } + } + + impl From for u64 { + fn from(value: Nonce) -> Self { + u64::from_le_bytes(value.0) + } + } + + impl From for Nonce { + fn from(value: u64) -> Self { + Self(value.to_le_bytes()) + } + } + + impl From for Nonce { + fn from(value: IpAddr) -> Self { + match value { + IpAddr::V4(v4) => v4.into(), + IpAddr::V6(v6) => v6.into(), + } + } + } + + impl From for Nonce { + fn from(value: Ipv4Addr) -> Self { + let num = u32::from_le_bytes(value.octets()); + (num as u64).into() + } + } + + impl From for Nonce { + fn from(value: Ipv6Addr) -> Self { + let mut ret = [0u8; 8]; + let value = value.octets(); + let (a, b) = value.split_at(8); + a.iter() + .zip(b) + .map(|(a, b)| a.wrapping_add(*b)) + .enumerate() + .for_each(|(idx, v)| ret[idx] = v); + + Self(ret) + } + } #[derive(Debug, Clone, Default)] - pub struct GetMempool; + pub struct GetMempool { + pub(crate) nonce: Nonce, + } + + impl GetMempool { + pub fn set_nonce>(&mut self, nonce: N) { + self.nonce = nonce.into() + } + } impl Serializable for GetMempool { - fn write(&self, _w: &mut W) -> io::Result<()> { + fn write(&self, w: &mut W) -> io::Result<()> { + self.nonce.write(w)?; Ok(()) } - fn read(_r: &mut R) -> io::Result + fn read(r: &mut R) -> io::Result where Self: Sized, { - Ok(GetMempool) + let nonce = Nonce::read(r)?; + Ok(GetMempool { nonce }) } } @@ -944,9 +1024,22 @@ pub mod payload { } } - #[derive(Clone, Default)] + #[derive(Clone)] pub struct GetBlocks { pub locator: [u8; 32], + pub(crate) nonce: Nonce, + } + + impl GetBlocks { + pub fn new(locator: [u8; 32]) -> Self { + Self { + locator, + nonce: Nonce::default(), + } + } + pub fn set_nonce>(&mut self, nonce: N) { + self.nonce = nonce.into() + } } impl fmt::Debug for GetBlocks { @@ -957,7 +1050,9 @@ pub mod payload { impl Serializable for GetBlocks { fn write(&self, w: &mut W) -> io::Result<()> { - w.write_all(&self.locator[..]) + w.write_all(&self.locator[..])?; + self.nonce.write(w)?; + Ok(()) } fn read(r: &mut R) -> io::Result @@ -965,7 +1060,8 @@ pub mod payload { Self: Sized, { let locator = Self::read_bytes(r)?; - Ok(Self { locator }) + let nonce = Nonce::read(r)?; + Ok(Self { locator, nonce }) } } From fa1adaa6f8a2be70aec83ca04ea6a7804dc95837 Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Thu, 5 Sep 2024 11:58:14 +0200 Subject: [PATCH 08/10] node: set Nonce for direct requests --- node/src/chain/fsm.rs | 38 +++++++++++++++++--------------------- node/src/databroker.rs | 4 ++-- node/src/lib.rs | 4 ++-- node/src/mempool.rs | 5 ++--- node/src/network.rs | 28 ++++++++++++++++++++++------ 5 files changed, 45 insertions(+), 34 deletions(-) diff --git a/node/src/chain/fsm.rs b/node/src/chain/fsm.rs index 8e4c690372..20eef7c2cd 100644 --- a/node/src/chain/fsm.rs +++ b/node/src/chain/fsm.rs @@ -119,15 +119,13 @@ impl SimpleFSM { self.blacklisted_blocks.write().await.clear(); // Request missing blocks since my last finalized block - let get_blocks = GetBlocks { - locator: last_finalized.header().hash, - } - .into(); + let locator = last_finalized.header().hash; + let get_blocks = GetBlocks::new(locator).into(); if let Err(e) = self .network .read() .await - .send_to_alive_peers(&get_blocks, REDUNDANCY_PEER_FACTOR) + .send_to_alive_peers(get_blocks, REDUNDANCY_PEER_FACTOR) .await { warn!("Unable to request GetBlocks {e}"); @@ -188,7 +186,7 @@ impl SimpleFSM { self.acc.clone(), self.network.clone(), ); - next.on_entering(&b, peer_addr).await; + next.on_entering(b, peer_addr).await; self.curr = State::OutOfSync(next); } } @@ -675,9 +673,11 @@ impl InSyncImpl { let req = GetResource::new(inv, this_peer, u64::MAX, 1); debug!(event = "request block by height", ?req, ?peer_addr); - let req = req.into(); - if let Err(err) = - network.read().await.send_to_peer(&req, peer_addr).await + if let Err(err) = network + .read() + .await + .send_to_peer(req.into(), peer_addr) + .await { warn!("could not request block {err}") } @@ -729,7 +729,7 @@ impl } } /// performed when entering the OutOfSync state - async fn on_entering(&mut self, blk: &Block, dest_addr: SocketAddr) { + async fn on_entering(&mut self, blk: Block, peer_addr: SocketAddr) { let (curr_height, locator) = { let acc = self.acc.read().await; (acc.get_curr_height().await, acc.get_curr_hash().await) @@ -744,13 +744,13 @@ impl ); // Request missing blocks from source peer - let gb_msg = GetBlocks { locator }.into(); + let gb_msg = GetBlocks::new(locator).into(); if let Err(e) = self .network .read() .await - .send_to_peer(&gb_msg, dest_addr) + .send_to_peer(gb_msg, peer_addr) .await { warn!("Unable to send GetBlocks: {e}") @@ -759,15 +759,11 @@ impl // add to the pool let key = blk.header().height; self.pool.clear(); - self.pool.insert(key, blk.clone()); - self.peer_addr = dest_addr; - - info!( - event = "entering out-of-sync", - from = self.range.0, - to = self.range.1, - peer = format!("{:?}", dest_addr), - ); + self.pool.insert(key, blk); + self.peer_addr = peer_addr; + + let (from, to) = &self.range; + info!(event = "entering out-of-sync", from, to, ?peer_addr); } /// performed when exiting the state diff --git a/node/src/databroker.rs b/node/src/databroker.rs index 37bcafea22..83860e69ea 100644 --- a/node/src/databroker.rs +++ b/node/src/databroker.rs @@ -140,7 +140,7 @@ impl // Send response let net = network.read().await; for msg in resp.msgs { - let send = net.send_to_peer(&msg, resp.recv_peer); + let send = net.send_to_peer(msg, resp.recv_peer); if let Err(e) = send.await { warn!("Unable to send_to_peer {e}") }; @@ -235,7 +235,7 @@ impl DataBrokerSrv { let _ = network .read() .await - .send_to_alive_peers(&msg, 1) + .send_to_alive_peers(msg, 1) .await; } Err(err) diff --git a/node/src/lib.rs b/node/src/lib.rs index 240a6ccbdb..ac504c63ac 100644 --- a/node/src/lib.rs +++ b/node/src/lib.rs @@ -53,14 +53,14 @@ pub trait Network: Send + Sync + 'static { /// Sends a message to a specified peer. async fn send_to_peer( &self, - msg: &Message, + msg: Message, peer_addr: std::net::SocketAddr, ) -> anyhow::Result<()>; /// Sends to random set of alive peers. async fn send_to_alive_peers( &self, - msg: &Message, + msg: Message, amount: usize, ) -> anyhow::Result<()>; diff --git a/node/src/mempool.rs b/node/src/mempool.rs index 90226c8a8f..9225860281 100644 --- a/node/src/mempool.rs +++ b/node/src/mempool.rs @@ -246,12 +246,11 @@ impl MempoolSrv { .conf .mempool_download_redundancy .unwrap_or(DEFAULT_DOWNLOAD_REDUNDANCY); - - let msg = payload::GetMempool.into(); + let msg = payload::GetMempool::default().into(); if let Err(err) = network .read() .await - .send_to_alive_peers(&msg, max_peers) + .send_to_alive_peers(msg, max_peers) .await { error!("could not request mempool from network: {err}"); diff --git a/node/src/network.rs b/node/src/network.rs index 3f4a2a1b73..19c2054f31 100644 --- a/node/src/network.rs +++ b/node/src/network.rs @@ -5,6 +5,7 @@ // Copyright (c) DUSK NETWORK. All rights reserved. use std::net::{AddrParseError, SocketAddr}; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use crate::{BoxedFilter, Message}; @@ -12,7 +13,7 @@ use async_trait::async_trait; use kadcast::config::Config; use kadcast::{MessageInfo, Peer}; use metrics::counter; -use node_data::message::payload::{GetResource, Inv}; +use node_data::message::payload::{GetResource, Inv, Nonce}; use node_data::message::{AsyncQueue, Metadata}; use node_data::{get_current_timestamp, Serializable}; use tokio::sync::RwLock; @@ -96,6 +97,8 @@ pub struct Kadcast { /// Represents a parsed conf.public_addr public_addr: SocketAddr, + + counter: AtomicU64, } impl Kadcast { @@ -120,12 +123,15 @@ impl Kadcast { .parse::() .expect("valid kadcast public address"); + let nonce = Nonce::from(public_addr.ip()); + Ok(Kadcast { routes, filters, peer, conf, public_addr, + counter: AtomicU64::new(nonce.into()), }) } @@ -206,17 +212,22 @@ impl crate::Network for Kadcast { self.public_addr, ttl_as_sec, hops_limit, - ) - .into(); - self.send_to_alive_peers(&msg, REDUNDANCY_PEER_COUNT).await + ); + self.send_to_alive_peers(msg.into(), REDUNDANCY_PEER_COUNT) + .await } /// Sends an encoded message to a given peer. async fn send_to_peer( &self, - msg: &Message, + mut msg: Message, recv_addr: SocketAddr, ) -> anyhow::Result<()> { + // rnd_count is added to bypass kadcast dupemap + let rnd_count = self.counter.fetch_add(1, Ordering::SeqCst); + + msg.payload.set_nonce(rnd_count); + let mut encoded = vec![]; msg.write(&mut encoded) .map_err(|err| anyhow::anyhow!("failed to send_to_peer: {err}"))?; @@ -231,9 +242,14 @@ impl crate::Network for Kadcast { /// Sends to random set of alive peers. async fn send_to_alive_peers( &self, - msg: &Message, + mut msg: Message, amount: usize, ) -> anyhow::Result<()> { + // rnd_count is added to bypass kadcast dupemap + let rnd_count = self.counter.fetch_add(1, Ordering::SeqCst); + + msg.payload.set_nonce(rnd_count); + let mut encoded = vec![]; msg.write(&mut encoded) .map_err(|err| anyhow::anyhow!("failed to encode: {err}"))?; From f3d9b51c268265dcd02ec378ac29fa31f6965d0e Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Thu, 5 Sep 2024 16:02:42 +0200 Subject: [PATCH 09/10] node: inject protocol version into network level --- node/Cargo.toml | 2 +- node/src/network.rs | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/node/Cargo.toml b/node/Cargo.toml index c83175e6ea..5e6eecc55e 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -12,7 +12,7 @@ license = "MPL-2.0" tracing = "0.1" hex = "0.4" dusk-consensus = { version = "0.1.1-rc.3", path = "../consensus" } -kadcast = "0.7.0-rc" +kadcast = "0.7.0-rc.5" sha3 = { version = "0.10" } anyhow = "1.0" async-trait = "0.1" diff --git a/node/src/network.rs b/node/src/network.rs index 19c2054f31..994970e7f0 100644 --- a/node/src/network.rs +++ b/node/src/network.rs @@ -14,7 +14,7 @@ use kadcast::config::Config; use kadcast::{MessageInfo, Peer}; use metrics::counter; use node_data::message::payload::{GetResource, Inv, Nonce}; -use node_data::message::{AsyncQueue, Metadata}; +use node_data::message::{AsyncQueue, Metadata, PROTOCOL_VERSION}; use node_data::{get_current_timestamp, Serializable}; use tokio::sync::RwLock; use tracing::{error, info, trace}; @@ -102,7 +102,7 @@ pub struct Kadcast { } impl Kadcast { - pub fn new(conf: Config) -> Result { + pub fn new(mut conf: Config) -> Result { const INIT: Option> = None; let routes = Arc::new(RwLock::new([INIT; N])); @@ -117,6 +117,8 @@ impl Kadcast { routes: routes.clone(), filters: filters.clone(), }; + conf.version = format!("{PROTOCOL_VERSION}"); + conf.version_match = format!("{PROTOCOL_VERSION}"); let peer = Peer::new(conf.clone(), listener)?; let public_addr = conf .public_address From 34785600c559df90e8eee1cf24e5992b800a9966 Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Thu, 5 Sep 2024 16:27:35 +0200 Subject: [PATCH 10/10] rusk: bump kadcast dep to `0.7.0-rc.5` --- rusk/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rusk/Cargo.toml b/rusk/Cargo.toml index 4918e1d2ef..81fceff1fd 100644 --- a/rusk/Cargo.toml +++ b/rusk/Cargo.toml @@ -44,7 +44,7 @@ blake2b_simd = { version = "1", default-features = false } poseidon-merkle = { version = "0.7", features = ["rkyv-impl", "size_32"] } sha3 = "0.10" dusk-bytes = "0.1" -kadcast = "0.7.0-rc" +kadcast = "0.7.0-rc.5" pin-project = "1" tungstenite = "0.21" hyper-tungstenite = "0.13"