From 135b4de77bd98a5e2ef9a56262c4507a2163eda2 Mon Sep 17 00:00:00 2001 From: Federico Franzoni <8609060+fed-franz@users.noreply.github.com> Date: Mon, 9 Sep 2024 16:37:18 +0200 Subject: [PATCH 1/5] consensus: propagate future messages for the current round This commit changes the behavior from re-propagating all future messages to re-propagating only those from future iterations of the current round --- consensus/src/execution_ctx.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/consensus/src/execution_ctx.rs b/consensus/src/execution_ctx.rs index 49ed6b43bc..ec3ff19373 100644 --- a/consensus/src/execution_ctx.rs +++ b/consensus/src/execution_ctx.rs @@ -322,7 +322,11 @@ impl<'a, T: Operations + 'static> ExecutionCtx<'a, T> { Err(ConsensusError::FutureEvent) => { trace!("future msg {:?}", msg); - self.outbound.try_send(msg.clone()); + // Re-propagate messages from future iterations of the current + // round + if msg.header.round == self.round_update.round { + self.outbound.try_send(msg.clone()); + } self.future_msgs.lock().await.put_msg( msg.header.round, From 53fd78d267afb0b60beb049c8795ac7c4a19536d Mon Sep 17 00:00:00 2001 From: Federico Franzoni <8609060+fed-franz@users.noreply.github.com> Date: Mon, 9 Sep 2024 17:59:16 +0200 Subject: [PATCH 2/5] consensus: pre-verify future messages from current iter --- consensus/src/consensus.rs | 29 ++++++++-------------------- consensus/src/execution_ctx.rs | 14 ++++++++++++++ consensus/src/iteration_ctx.rs | 19 +++++++++++++++++- consensus/src/msg_handler.rs | 35 +++++++++++++++++++++++++++++++--- 4 files changed, 72 insertions(+), 25 deletions(-) diff --git a/consensus/src/consensus.rs b/consensus/src/consensus.rs index 23efa671d4..9c818ffa50 100644 --- a/consensus/src/consensus.rs +++ b/consensus/src/consensus.rs @@ -10,7 +10,6 @@ use crate::operations::Operations; use crate::phase::Phase; use node_data::message::{AsyncQueue, Message, Topics}; -use node_data::StepName; use crate::execution_ctx::ExecutionCtx; use crate::proposal; @@ -193,19 +192,8 @@ impl Consensus { // in case they are needed to process past-iteration messages in // Emergency Mode while iter <= saved_iter { - iter_ctx.on_begin(iter); - iter_ctx.generate_committee( - StepName::Proposal, - provisioners.as_ref(), - ru.seed(), - ); - iter_ctx.generate_committee( - StepName::Validation, - provisioners.as_ref(), - ru.seed(), - ); - iter_ctx.generate_committee( - StepName::Ratification, + iter_ctx.generate_iteration_committees( + iter, provisioners.as_ref(), ru.seed(), ); @@ -221,6 +209,12 @@ impl Consensus { iter_ctx.on_begin(iter); + iter_ctx.generate_iteration_committees( + iter, + provisioners.as_ref(), + ru.seed(), + ); + let mut msg = Message::empty(); // Execute a single iteration for phase in phases.iter_mut() { @@ -229,13 +223,6 @@ impl Consensus { // phase. phase.reinitialize(msg, ru.round, iter).await; - // Generate step committee - iter_ctx.generate_committee( - step_name, - provisioners.as_ref(), - ru.seed(), - ); - // Construct phase execution context let ctx = ExecutionCtx::new( &mut iter_ctx, diff --git a/consensus/src/execution_ctx.rs b/consensus/src/execution_ctx.rs index ec3ff19373..2c2e73b65a 100644 --- a/consensus/src/execution_ctx.rs +++ b/consensus/src/execution_ctx.rs @@ -295,6 +295,20 @@ impl<'a, T: Operations + 'static> ExecutionCtx<'a, T> { phase: Arc>, msg: Message, ) -> Option { + // If it's a message from a future iteration of the current round, we + // generate the committees so that we can pre-verify its validity. + // We do it here because we need the IterationCtx + if msg.header.round == self.round_update.round + && msg.header.iteration > self.iteration + { + // Generate committees for the iteration + self.iter_ctx.generate_iteration_committees( + msg.header.iteration, + self.provisioners, + self.round_update.seed(), + ); + } + let committee = self .get_current_committee() .expect("committee to be created before run"); diff --git a/consensus/src/iteration_ctx.rs b/consensus/src/iteration_ctx.rs index 9625abd524..be399dee32 100644 --- a/consensus/src/iteration_ctx.rs +++ b/consensus/src/iteration_ctx.rs @@ -147,11 +147,11 @@ impl IterationCtx { pub(crate) fn generate_committee( &mut self, + iteration: u8, step_name: StepName, provisioners: &Provisioners, seed: Seed, ) { - let iteration = self.iter; let step = step_name.to_step(iteration); // Check if we already generated the committee. @@ -229,6 +229,23 @@ impl IterationCtx { self.committees.insert(step, step_committee); } + pub(crate) fn generate_iteration_committees( + &mut self, + iteration: u8, + provisioners: &Provisioners, + seed: Seed, + ) { + let stepnames = [ + StepName::Proposal, + StepName::Validation, + StepName::Ratification, + ]; + + for stepname in &stepnames { + self.generate_committee(iteration, *stepname, provisioners, seed); + } + } + pub(crate) fn get_generator(&self, iter: u8) -> Option { let step = StepName::Proposal.to_step(iter); self.committees diff --git a/consensus/src/msg_handler.rs b/consensus/src/msg_handler.rs index bfbfe22480..dd06958a66 100644 --- a/consensus/src/msg_handler.rs +++ b/consensus/src/msg_handler.rs @@ -11,7 +11,7 @@ use async_trait::async_trait; use node_data::bls::PublicKeyBytes; use node_data::message::{Message, Status}; use node_data::StepName; -use tracing::{debug, trace}; +use tracing::{debug, trace, warn}; /// Indicates whether an output value is available for current step execution /// (Step is Ready) or needs to collect data (Step is Pending) @@ -48,10 +48,10 @@ pub trait MsgHandler { trace!(event = "msg received", msg = format!("{:#?}", msg),); + let msg_tip = msg.header.prev_block_hash; match msg.compare(ru.round, iteration, step) { Status::Past => Err(ConsensusError::PastEvent), Status::Present => { - let msg_tip = msg.header.prev_block_hash; if msg_tip != ru.hash() { return Err(ConsensusError::InvalidPrevBlockHash(msg_tip)); } @@ -66,7 +66,36 @@ pub trait MsgHandler { // it is valid or not. self.verify(msg, iteration, round_committees) } - Status::Future => Err(ConsensusError::FutureEvent), + Status::Future => { + // Pre-verify future messages for the current round + if msg.header.round == ru.round { + if msg_tip != ru.hash() { + return Err(ConsensusError::InvalidPrevBlockHash( + msg_tip, + )); + } + + if let Some(future_committee) = + round_committees.get_committee(msg.get_step()) + { + // Ensure the message originates from a committee + // member. + if !future_committee.is_member(signer) { + return Err(ConsensusError::NotCommitteeMember); + } + + // Delegate message final verification to the phase + // instance. It is the phase that knows + // what message type to expect and if it + // is valid or not. + self.verify(msg, iteration, round_committees)?; + } else { + warn!("Future committee for iteration {iteration} not generated; skipping pre-verification for {:?} message", msg.topic()); + } + } + + Err(ConsensusError::FutureEvent) + } } } From 33e80c501f8d4567dfc3df4b656afece23bfe1a5 Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Tue, 10 Sep 2024 15:18:40 +0200 Subject: [PATCH 3/5] consensus: fix future message validation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Use a “stateless” version of “is_valid” that checks the message without context. This change is relevant, because we are not going to check if a vote is already collected before broadcast it --- consensus/src/msg_handler.rs | 37 +++++++-- consensus/src/proposal/handler.rs | 115 ++++++++++++++------------ consensus/src/ratification/handler.rs | 30 ++++++- consensus/src/validation/handler.rs | 29 +++++++ 4 files changed, 153 insertions(+), 58 deletions(-) diff --git a/consensus/src/msg_handler.rs b/consensus/src/msg_handler.rs index dd06958a66..7ab704c9dc 100644 --- a/consensus/src/msg_handler.rs +++ b/consensus/src/msg_handler.rs @@ -6,7 +6,10 @@ use crate::commons::{ConsensusError, RoundUpdate}; use crate::iteration_ctx::RoundCommittees; +use crate::proposal; +use crate::ratification::handler::RatificationHandler; use crate::user::committee::Committee; +use crate::validation::handler::ValidationHandler; use async_trait::async_trait; use node_data::bls::PublicKeyBytes; use node_data::message::{Message, Status}; @@ -84,11 +87,35 @@ pub trait MsgHandler { return Err(ConsensusError::NotCommitteeMember); } - // Delegate message final verification to the phase - // instance. It is the phase that knows - // what message type to expect and if it - // is valid or not. - self.verify(msg, iteration, round_committees)?; + match &msg.payload { + node_data::message::Payload::Ratification(_) => { + RatificationHandler::verify_stateless( + msg, + round_committees, + )?; + } + node_data::message::Payload::Validation(_) => { + ValidationHandler::verify_stateless( + msg, + round_committees, + )?; + } + node_data::message::Payload::Candidate(c) => { + proposal::handler::verify_stateless( + c, + round_committees, + )?; + } + node_data::message::Payload::Quorum(_) => {} + node_data::message::Payload::Block(_) => {} + _ => { + warn!( + "future message not repropagated {:?}", + msg.topic() + ); + Err(ConsensusError::InvalidMsgType)?; + } + } } else { warn!("Future committee for iteration {iteration} not generated; skipping pre-verification for {:?} message", msg.topic()); } diff --git a/consensus/src/proposal/handler.rs b/consensus/src/proposal/handler.rs index b60a0928fa..687243df22 100644 --- a/consensus/src/proposal/handler.rs +++ b/consensus/src/proposal/handler.rs @@ -16,7 +16,7 @@ use node_data::bls::PublicKeyBytes; use node_data::message::payload::Candidate; use crate::iteration_ctx::RoundCommittees; -use node_data::message::{Message, Payload, StepMessage}; +use node_data::message::{Message, Payload, StepMessage, WireMessage}; use std::sync::Arc; use tokio::sync::Mutex; @@ -36,7 +36,8 @@ impl MsgHandler for ProposalHandler { let generator = round_committees .get_generator(iteration) .expect("committee to be created before run"); - self.verify_new_block(msg, &generator)?; + let p = Self::unwrap_msg(msg)?; + super::handler::verify_new_block(p, &generator)?; Ok(()) } @@ -80,65 +81,75 @@ impl ProposalHandler { Self { db } } - fn verify_new_block( - &self, - msg: &Message, - expected_generator: &PublicKeyBytes, - ) -> Result<(), ConsensusError> { - let p = Self::unwrap_msg(msg)?; - - if expected_generator != p.sign_info.signer.bytes() { - return Err(ConsensusError::NotCommitteeMember); + fn unwrap_msg(msg: &Message) -> Result<&Candidate, ConsensusError> { + match &msg.payload { + Payload::Candidate(c) => Ok(c), + _ => Err(ConsensusError::InvalidMsgType), } + } +} - let candidate_size = p - .candidate - .size() - .map_err(|_| ConsensusError::UnknownBlockSize)?; - if candidate_size > MAX_BLOCK_SIZE { - return Err(ConsensusError::InvalidBlockSize(candidate_size)); - } +fn verify_new_block( + p: &Candidate, + expected_generator: &PublicKeyBytes, +) -> Result<(), ConsensusError> { + if expected_generator != p.sign_info.signer.bytes() { + return Err(ConsensusError::NotCommitteeMember); + } - // Verify new_block msg signature - p.verify_signature()?; + let candidate_size = p + .candidate + .size() + .map_err(|_| ConsensusError::UnknownBlockSize)?; + if candidate_size > MAX_BLOCK_SIZE { + return Err(ConsensusError::InvalidBlockSize(candidate_size)); + } - if msg.header.prev_block_hash != p.candidate.header().prev_block_hash { - return Err(ConsensusError::InvalidBlockHash); - } + // Verify new_block msg signature + p.verify_signature()?; - if p.candidate.txs().len() > MAX_NUMBER_OF_TRANSACTIONS { - return Err(ConsensusError::TooManyTransactions( - p.candidate.txs().len(), - )); - } - - let tx_hashes: Vec<_> = - p.candidate.txs().iter().map(|t| t.hash()).collect(); - let tx_root = merkle_root(&tx_hashes[..]); - if tx_root != p.candidate.header().txroot { - return Err(ConsensusError::InvalidBlock); - } + if p.consensus_header().prev_block_hash + != p.candidate.header().prev_block_hash + { + return Err(ConsensusError::InvalidBlockHash); + } - if p.candidate.faults().len() > MAX_NUMBER_OF_FAULTS { - return Err(ConsensusError::TooManyFaults( - p.candidate.faults().len(), - )); - } + if p.candidate.txs().len() > MAX_NUMBER_OF_TRANSACTIONS { + return Err(ConsensusError::TooManyTransactions( + p.candidate.txs().len(), + )); + } - let fault_hashes: Vec<_> = - p.candidate.faults().iter().map(|t| t.hash()).collect(); - let fault_root = merkle_root(&fault_hashes[..]); - if fault_root != p.candidate.header().faultroot { - return Err(ConsensusError::InvalidBlock); - } + let tx_hashes: Vec<_> = + p.candidate.txs().iter().map(|t| t.hash()).collect(); + let tx_root = merkle_root(&tx_hashes[..]); + if tx_root != p.candidate.header().txroot { + return Err(ConsensusError::InvalidBlock); + } - Ok(()) + if p.candidate.faults().len() > MAX_NUMBER_OF_FAULTS { + return Err(ConsensusError::TooManyFaults(p.candidate.faults().len())); } - fn unwrap_msg(msg: &Message) -> Result<&Candidate, ConsensusError> { - match &msg.payload { - Payload::Candidate(c) => Ok(c), - _ => Err(ConsensusError::InvalidMsgType), - } + let fault_hashes: Vec<_> = + p.candidate.faults().iter().map(|t| t.hash()).collect(); + let fault_root = merkle_root(&fault_hashes[..]); + if fault_root != p.candidate.header().faultroot { + return Err(ConsensusError::InvalidBlock); } + + Ok(()) +} + +pub fn verify_stateless( + c: &Candidate, + round_committees: &RoundCommittees, +) -> Result<(), ConsensusError> { + let iteration = c.header.iteration; + let generator = round_committees + .get_generator(iteration) + .expect("committee to be created before run"); + verify_new_block(c, &generator)?; + + Ok(()) } diff --git a/consensus/src/ratification/handler.rs b/consensus/src/ratification/handler.rs index d2015e9e98..462bff3d99 100644 --- a/consensus/src/ratification/handler.rs +++ b/consensus/src/ratification/handler.rs @@ -11,7 +11,7 @@ use async_trait::async_trait; use node_data::bls::PublicKeyBytes; use node_data::ledger::Attestation; use node_data::{ledger, StepName}; -use tracing::{error, warn}; +use tracing::{error, info, warn}; use crate::aggregator::{Aggregator, StepVote}; @@ -39,6 +39,34 @@ impl StepVote for Ratification { } } +impl RatificationHandler { + pub fn verify_stateless( + msg: &Message, + round_committees: &RoundCommittees, + ) -> Result<(), ConsensusError> { + match &msg.payload { + Payload::Ratification(p) => { + p.verify_signature()?; + let signer = &p.sign_info.signer; + let committee = round_committees + .get_committee(msg.get_step()) + .expect("committee to be created before run"); + + committee + .votes_for(signer) + .ok_or(ConsensusError::NotCommitteeMember)?; + } + Payload::Empty => (), + _ => { + info!("cannot verify in validation handler"); + Err(ConsensusError::InvalidMsgType)? + } + } + + Ok(()) + } +} + #[async_trait] impl MsgHandler for RatificationHandler { fn verify( diff --git a/consensus/src/validation/handler.rs b/consensus/src/validation/handler.rs index 5d0b37dbc1..5da200c260 100644 --- a/consensus/src/validation/handler.rs +++ b/consensus/src/validation/handler.rs @@ -45,6 +45,35 @@ impl StepVote for Validation { } } +impl ValidationHandler { + pub fn verify_stateless( + msg: &Message, + round_committees: &RoundCommittees, + ) -> Result<(), ConsensusError> { + match &msg.payload { + Payload::Validation(p) => { + p.verify_signature()?; + + let signer = &p.sign_info.signer; + let committee = round_committees + .get_committee(msg.get_step()) + .expect("committee to be created before run"); + + committee + .votes_for(signer) + .ok_or(ConsensusError::NotCommitteeMember)?; + } + Payload::Empty => (), + _ => { + info!("cannot verify in validation handler"); + Err(ConsensusError::InvalidMsgType)? + } + } + + Ok(()) + } +} + impl ValidationHandler { pub(crate) fn new(sv_registry: SafeAttestationInfoRegistry) -> Self { Self { From 8ae3cfcd8257f0d5beceb10d1b322c3bae6fcb0e Mon Sep 17 00:00:00 2001 From: Herr Seppia Date: Tue, 10 Sep 2024 17:49:22 +0200 Subject: [PATCH 4/5] consensus: fix future committee generation --- consensus/src/iteration_ctx.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/consensus/src/iteration_ctx.rs b/consensus/src/iteration_ctx.rs index be399dee32..b7747ef2b7 100644 --- a/consensus/src/iteration_ctx.rs +++ b/consensus/src/iteration_ctx.rs @@ -216,10 +216,12 @@ impl IterationCtx { // If the step is Proposal, the only extracted member is the generator // For Validation and Ratification steps, extracted members are // delegated to vote on the candidate block - let step_committee = Committee::new( - provisioners, - &self.get_sortition_config(seed, step_name, exclusion), - ); + + let sortition_step = step_name.to_step(iteration); + let mut config_step = + self.get_sortition_config(seed, step_name, exclusion); + config_step.step = sortition_step; + let step_committee = Committee::new(provisioners, &config_step); debug!( event = "committee_generated", From 85703059b9d74d3fc46332cfe613f18152a00b1d Mon Sep 17 00:00:00 2001 From: Federico Franzoni <8609060+fed-franz@users.noreply.github.com> Date: Tue, 10 Sep 2024 17:57:29 +0200 Subject: [PATCH 5/5] consensus: check prev_block before generating future committee --- consensus/src/execution_ctx.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/consensus/src/execution_ctx.rs b/consensus/src/execution_ctx.rs index 2c2e73b65a..41da044e29 100644 --- a/consensus/src/execution_ctx.rs +++ b/consensus/src/execution_ctx.rs @@ -300,6 +300,7 @@ impl<'a, T: Operations + 'static> ExecutionCtx<'a, T> { // We do it here because we need the IterationCtx if msg.header.round == self.round_update.round && msg.header.iteration > self.iteration + && msg.header.prev_block_hash == self.round_update.hash() { // Generate committees for the iteration self.iter_ctx.generate_iteration_committees(