Skip to content

Commit

Permalink
Merge pull request #2319 from dusk-network/2247-disable-future-messag…
Browse files Browse the repository at this point in the history
…e-repropagation

Disable future message repropagation
  • Loading branch information
fed-franz authored Sep 10, 2024
2 parents 5b9e98a + 8570305 commit 81f31c4
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 83 deletions.
29 changes: 8 additions & 21 deletions consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::operations::Operations;
use crate::phase::Phase;

use node_data::message::{AsyncQueue, Message, Topics};
use node_data::StepName;

use crate::execution_ctx::ExecutionCtx;
use crate::proposal;
Expand Down Expand Up @@ -193,19 +192,8 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
// in case they are needed to process past-iteration messages in
// Emergency Mode
while iter <= saved_iter {
iter_ctx.on_begin(iter);
iter_ctx.generate_committee(
StepName::Proposal,
provisioners.as_ref(),
ru.seed(),
);
iter_ctx.generate_committee(
StepName::Validation,
provisioners.as_ref(),
ru.seed(),
);
iter_ctx.generate_committee(
StepName::Ratification,
iter_ctx.generate_iteration_committees(
iter,
provisioners.as_ref(),
ru.seed(),
);
Expand All @@ -221,6 +209,12 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {

iter_ctx.on_begin(iter);

iter_ctx.generate_iteration_committees(
iter,
provisioners.as_ref(),
ru.seed(),
);

let mut msg = Message::empty();
// Execute a single iteration
for phase in phases.iter_mut() {
Expand All @@ -229,13 +223,6 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
// phase.
phase.reinitialize(msg, ru.round, iter).await;

// Generate step committee
iter_ctx.generate_committee(
step_name,
provisioners.as_ref(),
ru.seed(),
);

// Construct phase execution context
let ctx = ExecutionCtx::new(
&mut iter_ctx,
Expand Down
21 changes: 20 additions & 1 deletion consensus/src/execution_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,21 @@ impl<'a, T: Operations + 'static> ExecutionCtx<'a, T> {
phase: Arc<Mutex<C>>,
msg: Message,
) -> Option<Message> {
// If it's a message from a future iteration of the current round, we
// generate the committees so that we can pre-verify its validity.
// We do it here because we need the IterationCtx
if msg.header.round == self.round_update.round
&& msg.header.iteration > self.iteration
&& msg.header.prev_block_hash == self.round_update.hash()
{
// Generate committees for the iteration
self.iter_ctx.generate_iteration_committees(
msg.header.iteration,
self.provisioners,
self.round_update.seed(),
);
}

let committee = self
.get_current_committee()
.expect("committee to be created before run");
Expand Down Expand Up @@ -322,7 +337,11 @@ impl<'a, T: Operations + 'static> ExecutionCtx<'a, T> {
Err(ConsensusError::FutureEvent) => {
trace!("future msg {:?}", msg);

self.outbound.try_send(msg.clone());
// Re-propagate messages from future iterations of the current
// round
if msg.header.round == self.round_update.round {
self.outbound.try_send(msg.clone());
}

self.future_msgs.lock().await.put_msg(
msg.header.round,
Expand Down
29 changes: 24 additions & 5 deletions consensus/src/iteration_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,11 @@ impl IterationCtx {

pub(crate) fn generate_committee(
&mut self,
iteration: u8,
step_name: StepName,
provisioners: &Provisioners,
seed: Seed,
) {
let iteration = self.iter;
let step = step_name.to_step(iteration);

// Check if we already generated the committee.
Expand Down Expand Up @@ -216,10 +216,12 @@ impl IterationCtx {
// If the step is Proposal, the only extracted member is the generator
// For Validation and Ratification steps, extracted members are
// delegated to vote on the candidate block
let step_committee = Committee::new(
provisioners,
&self.get_sortition_config(seed, step_name, exclusion),
);

let sortition_step = step_name.to_step(iteration);
let mut config_step =
self.get_sortition_config(seed, step_name, exclusion);
config_step.step = sortition_step;
let step_committee = Committee::new(provisioners, &config_step);

debug!(
event = "committee_generated",
Expand All @@ -229,6 +231,23 @@ impl IterationCtx {
self.committees.insert(step, step_committee);
}

pub(crate) fn generate_iteration_committees(
&mut self,
iteration: u8,
provisioners: &Provisioners,
seed: Seed,
) {
let stepnames = [
StepName::Proposal,
StepName::Validation,
StepName::Ratification,
];

for stepname in &stepnames {
self.generate_committee(iteration, *stepname, provisioners, seed);
}
}

pub(crate) fn get_generator(&self, iter: u8) -> Option<PublicKeyBytes> {
let step = StepName::Proposal.to_step(iter);
self.committees
Expand Down
62 changes: 59 additions & 3 deletions consensus/src/msg_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,15 @@

use crate::commons::{ConsensusError, RoundUpdate};
use crate::iteration_ctx::RoundCommittees;
use crate::proposal;
use crate::ratification::handler::RatificationHandler;
use crate::user::committee::Committee;
use crate::validation::handler::ValidationHandler;
use async_trait::async_trait;
use node_data::bls::PublicKeyBytes;
use node_data::message::{Message, Status};
use node_data::StepName;
use tracing::{debug, trace};
use tracing::{debug, trace, warn};

/// Indicates whether an output value is available for current step execution
/// (Step is Ready) or needs to collect data (Step is Pending)
Expand Down Expand Up @@ -48,10 +51,10 @@ pub trait MsgHandler {

trace!(event = "msg received", msg = format!("{:#?}", msg),);

let msg_tip = msg.header.prev_block_hash;
match msg.compare(ru.round, iteration, step) {
Status::Past => Err(ConsensusError::PastEvent),
Status::Present => {
let msg_tip = msg.header.prev_block_hash;
if msg_tip != ru.hash() {
return Err(ConsensusError::InvalidPrevBlockHash(msg_tip));
}
Expand All @@ -66,7 +69,60 @@ pub trait MsgHandler {
// it is valid or not.
self.verify(msg, iteration, round_committees)
}
Status::Future => Err(ConsensusError::FutureEvent),
Status::Future => {
// Pre-verify future messages for the current round
if msg.header.round == ru.round {
if msg_tip != ru.hash() {
return Err(ConsensusError::InvalidPrevBlockHash(
msg_tip,
));
}

if let Some(future_committee) =
round_committees.get_committee(msg.get_step())
{
// Ensure the message originates from a committee
// member.
if !future_committee.is_member(signer) {
return Err(ConsensusError::NotCommitteeMember);
}

match &msg.payload {
node_data::message::Payload::Ratification(_) => {
RatificationHandler::verify_stateless(
msg,
round_committees,
)?;
}
node_data::message::Payload::Validation(_) => {
ValidationHandler::verify_stateless(
msg,
round_committees,
)?;
}
node_data::message::Payload::Candidate(c) => {
proposal::handler::verify_stateless(
c,
round_committees,
)?;
}
node_data::message::Payload::Quorum(_) => {}
node_data::message::Payload::Block(_) => {}
_ => {
warn!(
"future message not repropagated {:?}",
msg.topic()
);
Err(ConsensusError::InvalidMsgType)?;
}
}
} else {
warn!("Future committee for iteration {iteration} not generated; skipping pre-verification for {:?} message", msg.topic());
}
}

Err(ConsensusError::FutureEvent)
}
}
}

Expand Down
115 changes: 63 additions & 52 deletions consensus/src/proposal/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use node_data::bls::PublicKeyBytes;
use node_data::message::payload::Candidate;

use crate::iteration_ctx::RoundCommittees;
use node_data::message::{Message, Payload, StepMessage};
use node_data::message::{Message, Payload, StepMessage, WireMessage};
use std::sync::Arc;
use tokio::sync::Mutex;

Expand All @@ -36,7 +36,8 @@ impl<D: Database> MsgHandler for ProposalHandler<D> {
let generator = round_committees
.get_generator(iteration)
.expect("committee to be created before run");
self.verify_new_block(msg, &generator)?;
let p = Self::unwrap_msg(msg)?;
super::handler::verify_new_block(p, &generator)?;

Ok(())
}
Expand Down Expand Up @@ -80,65 +81,75 @@ impl<D: Database> ProposalHandler<D> {
Self { db }
}

fn verify_new_block(
&self,
msg: &Message,
expected_generator: &PublicKeyBytes,
) -> Result<(), ConsensusError> {
let p = Self::unwrap_msg(msg)?;

if expected_generator != p.sign_info.signer.bytes() {
return Err(ConsensusError::NotCommitteeMember);
fn unwrap_msg(msg: &Message) -> Result<&Candidate, ConsensusError> {
match &msg.payload {
Payload::Candidate(c) => Ok(c),
_ => Err(ConsensusError::InvalidMsgType),
}
}
}

let candidate_size = p
.candidate
.size()
.map_err(|_| ConsensusError::UnknownBlockSize)?;
if candidate_size > MAX_BLOCK_SIZE {
return Err(ConsensusError::InvalidBlockSize(candidate_size));
}
fn verify_new_block(
p: &Candidate,
expected_generator: &PublicKeyBytes,
) -> Result<(), ConsensusError> {
if expected_generator != p.sign_info.signer.bytes() {
return Err(ConsensusError::NotCommitteeMember);
}

// Verify new_block msg signature
p.verify_signature()?;
let candidate_size = p
.candidate
.size()
.map_err(|_| ConsensusError::UnknownBlockSize)?;
if candidate_size > MAX_BLOCK_SIZE {
return Err(ConsensusError::InvalidBlockSize(candidate_size));
}

if msg.header.prev_block_hash != p.candidate.header().prev_block_hash {
return Err(ConsensusError::InvalidBlockHash);
}
// Verify new_block msg signature
p.verify_signature()?;

if p.candidate.txs().len() > MAX_NUMBER_OF_TRANSACTIONS {
return Err(ConsensusError::TooManyTransactions(
p.candidate.txs().len(),
));
}

let tx_hashes: Vec<_> =
p.candidate.txs().iter().map(|t| t.hash()).collect();
let tx_root = merkle_root(&tx_hashes[..]);
if tx_root != p.candidate.header().txroot {
return Err(ConsensusError::InvalidBlock);
}
if p.consensus_header().prev_block_hash
!= p.candidate.header().prev_block_hash
{
return Err(ConsensusError::InvalidBlockHash);
}

if p.candidate.faults().len() > MAX_NUMBER_OF_FAULTS {
return Err(ConsensusError::TooManyFaults(
p.candidate.faults().len(),
));
}
if p.candidate.txs().len() > MAX_NUMBER_OF_TRANSACTIONS {
return Err(ConsensusError::TooManyTransactions(
p.candidate.txs().len(),
));
}

let fault_hashes: Vec<_> =
p.candidate.faults().iter().map(|t| t.hash()).collect();
let fault_root = merkle_root(&fault_hashes[..]);
if fault_root != p.candidate.header().faultroot {
return Err(ConsensusError::InvalidBlock);
}
let tx_hashes: Vec<_> =
p.candidate.txs().iter().map(|t| t.hash()).collect();
let tx_root = merkle_root(&tx_hashes[..]);
if tx_root != p.candidate.header().txroot {
return Err(ConsensusError::InvalidBlock);
}

Ok(())
if p.candidate.faults().len() > MAX_NUMBER_OF_FAULTS {
return Err(ConsensusError::TooManyFaults(p.candidate.faults().len()));
}

fn unwrap_msg(msg: &Message) -> Result<&Candidate, ConsensusError> {
match &msg.payload {
Payload::Candidate(c) => Ok(c),
_ => Err(ConsensusError::InvalidMsgType),
}
let fault_hashes: Vec<_> =
p.candidate.faults().iter().map(|t| t.hash()).collect();
let fault_root = merkle_root(&fault_hashes[..]);
if fault_root != p.candidate.header().faultroot {
return Err(ConsensusError::InvalidBlock);
}

Ok(())
}

pub fn verify_stateless(
c: &Candidate,
round_committees: &RoundCommittees,
) -> Result<(), ConsensusError> {
let iteration = c.header.iteration;
let generator = round_committees
.get_generator(iteration)
.expect("committee to be created before run");
verify_new_block(c, &generator)?;

Ok(())
}
Loading

0 comments on commit 81f31c4

Please sign in to comment.