Skip to content

Commit

Permalink
consensus: pre-verify future messages from current iter
Browse files Browse the repository at this point in the history
  • Loading branch information
fed-franz committed Sep 9, 2024
1 parent 681ab42 commit 7068eab
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 24 deletions.
29 changes: 8 additions & 21 deletions consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::operations::Operations;
use crate::phase::Phase;

use node_data::message::{AsyncQueue, Message, Topics};
use node_data::StepName;

use crate::execution_ctx::ExecutionCtx;
use crate::proposal;
Expand Down Expand Up @@ -193,19 +192,8 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
// in case they are needed to process past-iteration messages in
// Emergency Mode
while iter <= saved_iter {
iter_ctx.on_begin(iter);
iter_ctx.generate_committee(
StepName::Proposal,
provisioners.as_ref(),
ru.seed(),
);
iter_ctx.generate_committee(
StepName::Validation,
provisioners.as_ref(),
ru.seed(),
);
iter_ctx.generate_committee(
StepName::Ratification,
iter_ctx.generate_iteration_committees(
iter,
provisioners.as_ref(),
ru.seed(),
);
Expand All @@ -221,6 +209,12 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {

iter_ctx.on_begin(iter);

iter_ctx.generate_iteration_committees(
iter,
provisioners.as_ref(),
ru.seed(),
);

let mut msg = Message::empty();
// Execute a single iteration
for phase in phases.iter_mut() {
Expand All @@ -229,13 +223,6 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
// phase.
phase.reinitialize(msg, ru.round, iter).await;

// Generate step committee
iter_ctx.generate_committee(
step_name,
provisioners.as_ref(),
ru.seed(),
);

// Construct phase execution context
let ctx = ExecutionCtx::new(
&mut iter_ctx,
Expand Down
14 changes: 14 additions & 0 deletions consensus/src/execution_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,20 @@ impl<'a, T: Operations + 'static> ExecutionCtx<'a, T> {
phase: Arc<Mutex<C>>,
msg: Message,
) -> Option<Message> {
// If it's a message from a future iteration of the current round, we
// generate the committees so that we can pre-verify its validity.
// We do it here because we need the IterationCtx
if msg.header.round == self.round_update.round
&& msg.header.iteration > self.iteration
{
// Generate committees for the iteration
self.iter_ctx.generate_iteration_committees(
msg.header.iteration,
self.provisioners,
self.round_update.seed(),
);
}

let committee = self
.get_current_committee()
.expect("committee to be created before run");
Expand Down
19 changes: 18 additions & 1 deletion consensus/src/iteration_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,11 @@ impl IterationCtx {

pub(crate) fn generate_committee(
&mut self,
iteration: u8,
step_name: StepName,
provisioners: &Provisioners,
seed: Seed,
) {
let iteration = self.iter;
let step = step_name.to_step(iteration);

// Check if we already generated the committee.
Expand Down Expand Up @@ -229,6 +229,23 @@ impl IterationCtx {
self.committees.insert(step, step_committee);
}

pub(crate) fn generate_iteration_committees(
&mut self,
iteration: u8,
provisioners: &Provisioners,
seed: Seed,
) {
let stepnames = [
StepName::Proposal,
StepName::Validation,
StepName::Ratification,
];

for stepname in &stepnames {
self.generate_committee(iteration, *stepname, provisioners, seed);
}
}

pub(crate) fn get_generator(&self, iter: u8) -> Option<PublicKeyBytes> {
let step = StepName::Proposal.to_step(iter);
self.committees
Expand Down
30 changes: 28 additions & 2 deletions consensus/src/msg_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ pub trait MsgHandler {

trace!(event = "msg received", msg = format!("{:#?}", msg),);

let msg_tip = msg.header.prev_block_hash;
match msg.compare(ru.round, iteration, step) {
Status::Past => Err(ConsensusError::PastEvent),
Status::Present => {
let msg_tip = msg.header.prev_block_hash;
if msg_tip != ru.hash() {
return Err(ConsensusError::InvalidPrevBlockHash(msg_tip));
}
Expand All @@ -66,7 +66,33 @@ pub trait MsgHandler {
// it is valid or not.
self.verify(msg, iteration, round_committees)
}
Status::Future => Err(ConsensusError::FutureEvent),
Status::Future => {
// Pre-verify future messages for the current round
if msg.header.round == ru.round {
if msg_tip != ru.hash() {
return Err(ConsensusError::InvalidPrevBlockHash(
msg_tip,
));
}

let future_committee = round_committees
.get_committee(msg.get_step())
.expect("committee to be created before run");

// Ensure the message originates from a committee member.
if !future_committee.is_member(signer) {
return Err(ConsensusError::NotCommitteeMember);
}

// Delegate message final verification to the phase
// instance. It is the phase that knows
// what message type to expect and if it
// is valid or not.
self.verify(msg, iteration, round_committees)?;
}

Err(ConsensusError::FutureEvent)
}
}
}

Expand Down

0 comments on commit 7068eab

Please sign in to comment.