Skip to content

Commit

Permalink
consensus: do not call collect_from_past for Candidate
Browse files Browse the repository at this point in the history
  • Loading branch information
fed-franz committed Aug 23, 2024
1 parent d9ea964 commit 576d2c3
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 71 deletions.
3 changes: 1 addition & 2 deletions consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
Phase::Proposal(proposal::step::ProposalStep::new(
executor.clone(),
db.clone(),
proposal_handler.clone(),
proposal_handler,
)),
Phase::Validation(validation::step::ValidationStep::new(
executor.clone(),
Expand All @@ -179,7 +179,6 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
let mut iter_ctx = IterationCtx::new(
ru.round,
iter,
proposal_handler,
validation_handler,
ratification_handler,
ru.base_timeouts.clone(),
Expand Down
75 changes: 37 additions & 38 deletions consensus/src/execution_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//
// Copyright (c) DUSK NETWORK. All rights reserved.

use crate::commons::{ConsensusError, Database, QuorumMsgSender, RoundUpdate};
use crate::commons::{ConsensusError, QuorumMsgSender, RoundUpdate};

use crate::iteration_ctx::IterationCtx;
use crate::msg_handler::{HandleMsgOutput, MsgHandler};
Expand Down Expand Up @@ -34,8 +34,8 @@ use tracing::{debug, error, info, trace, warn};

/// ExecutionCtx encapsulates all data needed in the execution of consensus
/// messages handlers.
pub struct ExecutionCtx<'a, DB: Database, T> {
pub iter_ctx: &'a mut IterationCtx<DB>,
pub struct ExecutionCtx<'a, T> {
pub iter_ctx: &'a mut IterationCtx,

/// Messaging-related fields
pub inbound: AsyncQueue<Message>,
Expand All @@ -57,11 +57,11 @@ pub struct ExecutionCtx<'a, DB: Database, T> {
quorum_sender: QuorumMsgSender,
}

impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> {
impl<'a, T: Operations + 'static> ExecutionCtx<'a, T> {
/// Creates step execution context.
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
iter_ctx: &'a mut IterationCtx<DB>,
iter_ctx: &'a mut IterationCtx,
inbound: AsyncQueue<Message>,
outbound: AsyncQueue<Message>,
future_msgs: Arc<Mutex<MsgRegistry<Message>>>,
Expand Down Expand Up @@ -220,7 +220,7 @@ impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> {

if let Some(committee) = self.iter_ctx.committees.get_committee(step) {
if self.am_member(committee) {
RatificationStep::<DB>::try_vote(
RatificationStep::try_vote(
&self.round_update,
msg_iteration,
validation,
Expand Down Expand Up @@ -248,41 +248,40 @@ impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> {
async fn on_emergency_mode(&mut self, msg: Message) {
self.outbound.try_send(msg.clone());

// Try to cast validation vote for a candidate block from former
// iteration
// Try to vote for a candidate block from former iteration
if let Payload::Candidate(p) = &msg.payload {
self.try_cast_validation_vote(&p.candidate).await;
}

let msg_iteration = msg.header.iteration;

// Collect message from a previous iteration/step.
if let Some(m) = self
.iter_ctx
.collect_past_event(&self.round_update, msg)
.await
{
match &m.payload {
Payload::Quorum(q) => {
debug!(
event = "quorum",
src = "prev_step",
msg_step = m.get_step(),
vote = ?q.vote(),
);

self.quorum_sender.send_quorum(m).await;
}
} else {
let msg_iteration = msg.header.iteration;

// Collect message from a previous iteration/step.
if let Some(m) = self
.iter_ctx
.collect_past_event(&self.round_update, msg)
.await
{
match &m.payload {
Payload::Quorum(q) => {
debug!(
event = "quorum",
src = "prev_step",
msg_step = m.get_step(),
vote = ?q.vote(),
);

self.quorum_sender.send_quorum(m).await;
}

Payload::ValidationResult(validation_result) => {
self.try_cast_ratification_vote(
msg_iteration,
validation_result,
)
.await
}
_ => {
// Not supported.
Payload::ValidationResult(validation_result) => {
self.try_cast_ratification_vote(
msg_iteration,
validation_result,
)
.await
}
_ => {
// Not supported.
}
}
}
}
Expand Down
18 changes: 4 additions & 14 deletions consensus/src/iteration_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
//
// Copyright (c) DUSK NETWORK. All rights reserved.

use crate::commons::Database;
use crate::commons::{RoundUpdate, TimeoutSet};
use std::cmp;

Expand All @@ -14,7 +13,7 @@ use crate::msg_handler::MsgHandler;

use crate::user::committee::Committee;

use crate::{proposal, ratification, validation};
use crate::{ratification, validation};
use node_data::bls::PublicKeyBytes;

use node_data::message::Message;
Expand Down Expand Up @@ -60,11 +59,10 @@ impl RoundCommittees {

/// Represents a shared state within a context of the execution of a single
/// iteration.
pub struct IterationCtx<DB: Database> {
pub struct IterationCtx {
validation_handler: Arc<Mutex<validation::handler::ValidationHandler>>,
ratification_handler:
Arc<Mutex<ratification::handler::RatificationHandler>>,
proposal_handler: Arc<Mutex<proposal::handler::ProposalHandler<DB>>>,

pub join_set: JoinSet<()>,

Expand All @@ -79,11 +77,10 @@ pub struct IterationCtx<DB: Database> {
timeouts: TimeoutSet,
}

impl<D: Database> IterationCtx<D> {
impl IterationCtx {
pub fn new(
round: u64,
iter: u8,
proposal_handler: Arc<Mutex<proposal::handler::ProposalHandler<D>>>,
validation_handler: Arc<Mutex<validation::handler::ValidationHandler>>,
ratification_handler: Arc<
Mutex<ratification::handler::RatificationHandler>,
Expand All @@ -94,7 +91,6 @@ impl<D: Database> IterationCtx<D> {
round,
join_set: JoinSet::new(),
iter,
proposal_handler,
validation_handler,
ratification_handler,
committees: Default::default(),
Expand Down Expand Up @@ -152,12 +148,6 @@ impl<D: Database> IterationCtx<D> {
let generator = self.get_generator(msg.header.iteration);

match msg.topic() {
node_data::message::Topics::Candidate => {
let mut handler = self.proposal_handler.lock().await;
_ = handler
.collect_from_past(msg, ru, committee, generator)
.await;
}
node_data::message::Topics::Validation => {
let mut handler = self.validation_handler.lock().await;
if let Ok(HandleMsgOutput::Ready(m)) = handler
Expand All @@ -183,7 +173,7 @@ impl<D: Database> IterationCtx<D> {
}
}

impl<DB: Database> Drop for IterationCtx<DB> {
impl Drop for IterationCtx {
fn drop(&mut self) {
self.on_close();
}
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/phase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ macro_rules! await_phase {
pub enum Phase<T: Operations, D: Database> {
Proposal(proposal::step::ProposalStep<T, D>),
Validation(validation::step::ValidationStep<T>),
Ratification(ratification::step::RatificationStep<D>),
Ratification(ratification::step::RatificationStep),
}

impl<T: Operations + 'static, D: Database + 'static> Phase<T, D> {
Expand All @@ -54,7 +54,7 @@ impl<T: Operations + 'static, D: Database + 'static> Phase<T, D> {

pub async fn run(
&mut self,
mut ctx: ExecutionCtx<'_, D, T>,
mut ctx: ExecutionCtx<'_, T>,
) -> Result<Message, ConsensusError> {
ctx.set_start_time();

Expand Down
2 changes: 1 addition & 1 deletion consensus/src/proposal/step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl<T: Operations + 'static, D: Database> ProposalStep<T, D> {

pub async fn run(
&mut self,
mut ctx: ExecutionCtx<'_, D, T>,
mut ctx: ExecutionCtx<'_, T>,
) -> Result<Message, ConsensusError> {
let committee = ctx
.get_current_committee()
Expand Down
17 changes: 6 additions & 11 deletions consensus/src/ratification/step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
//
// Copyright (c) DUSK NETWORK. All rights reserved.

use crate::commons::{ConsensusError, Database, RoundUpdate};
use crate::commons::{ConsensusError, RoundUpdate};
use crate::execution_ctx::ExecutionCtx;
use crate::operations::Operations;
use std::marker::PhantomData;

use crate::msg_handler::{HandleMsgOutput, MsgHandler};
use crate::ratification::handler;
Expand All @@ -19,12 +18,11 @@ use tokio::sync::Mutex;

use tracing::{info, Instrument};

pub struct RatificationStep<DB> {
pub struct RatificationStep {
handler: Arc<Mutex<handler::RatificationHandler>>,
marker: PhantomData<DB>,
}

impl<DB: Database> RatificationStep<DB> {
impl RatificationStep {
pub async fn try_vote(
ru: &RoundUpdate,
iteration: u8,
Expand Down Expand Up @@ -70,14 +68,11 @@ pub fn build_ratification_payload(
ratification
}

impl<DB: Database> RatificationStep<DB> {
impl RatificationStep {
pub(crate) fn new(
handler: Arc<Mutex<handler::RatificationHandler>>,
) -> Self {
Self {
handler,
marker: PhantomData,
}
Self { handler }
}

pub async fn reinitialize(
Expand Down Expand Up @@ -112,7 +107,7 @@ impl<DB: Database> RatificationStep<DB> {

pub async fn run<T: Operations + 'static>(
&mut self,
mut ctx: ExecutionCtx<'_, DB, T>,
mut ctx: ExecutionCtx<'_, T>,
) -> Result<Message, ConsensusError> {
let committee = ctx
.get_current_committee()
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/validation/step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//
// Copyright (c) DUSK NETWORK. All rights reserved.

use crate::commons::{ConsensusError, Database, RoundUpdate};
use crate::commons::{ConsensusError, RoundUpdate};
use crate::config;
use crate::execution_ctx::ExecutionCtx;
use crate::operations::{Operations, Voter};
Expand Down Expand Up @@ -227,9 +227,9 @@ impl<T: Operations + 'static> ValidationStep<T> {
)
}

pub async fn run<DB: Database>(
pub async fn run(
&mut self,
mut ctx: ExecutionCtx<'_, DB, T>,
mut ctx: ExecutionCtx<'_, T>,
) -> Result<Message, ConsensusError> {
let committee = ctx
.get_current_committee()
Expand Down

0 comments on commit 576d2c3

Please sign in to comment.