Skip to content

Commit

Permalink
Merge pull request #3157 from dusk-network/refactor_execution_ctx
Browse files Browse the repository at this point in the history
Refactor and fix event_loop
  • Loading branch information
fed-franz authored Dec 20, 2024
2 parents 4c8fca7 + d590d60 commit 9099531
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 104 deletions.
7 changes: 4 additions & 3 deletions consensus/src/aggregator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,11 @@ impl<V: StepVote> Aggregator<V> {
let quorum_reached = total >= quorum_target;
if quorum_reached {
tracing::info!(
event = "quorum reached",
?vote,
iter = v.header().iteration,
event = "Quorum reached",
step = ?V::STEP_NAME,
round = v.header().round,
iter = v.header().iteration,
?vote,
total,
target = quorum_target,
bitset,
Expand Down
34 changes: 1 addition & 33 deletions consensus/src/commons.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ use std::time::Duration;
use dusk_core::signatures::bls::SecretKey as BlsSecretKey;
use node_data::bls::PublicKey;
use node_data::ledger::*;
use node_data::message::{
payload, AsyncQueue, ConsensusHeader, Message, Payload,
};
use node_data::message::{payload, ConsensusHeader};
use node_data::StepName;

use crate::operations::Voter;
Expand Down Expand Up @@ -100,33 +98,3 @@ pub trait Database: Send + Sync {
async fn get_last_iter(&self) -> (Hash, u8);
async fn store_last_iter(&mut self, data: (Hash, u8));
}

#[derive(Clone)]
pub(crate) struct QuorumMsgSender {
queue: AsyncQueue<Message>,
}

impl QuorumMsgSender {
pub(crate) fn new(queue: AsyncQueue<Message>) -> Self {
Self { queue }
}

/// Sends a quorum (internally) to the lower layer.
pub(crate) async fn send_quorum(&self, msg: Message) {
match &msg.payload {
Payload::Quorum(q) if !q.att.ratification.is_empty() => {
tracing::debug!(
event = "send quorum_msg",
vote = ?q.vote(),
round = msg.header.round,
iteration = msg.header.iteration,
validation = ?q.att.validation,
ratification = ?q.att.ratification,
);
}
_ => return,
}

self.queue.try_send(msg);
}
}
9 changes: 3 additions & 6 deletions consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tokio::sync::{oneshot, Mutex};
use tokio::task::JoinHandle;
use tracing::{debug, error, warn, Instrument};

use crate::commons::{Database, QuorumMsgSender, RoundUpdate};
use crate::commons::{Database, RoundUpdate};
use crate::config::{CONSENSUS_MAX_ITER, EMERGENCY_MODE_ITERATION_THRESHOLD};
use crate::errors::ConsensusError;
use crate::execution_ctx::ExecutionCtx;
Expand Down Expand Up @@ -86,10 +86,9 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
) -> Result<(), ConsensusError> {
let round = ru.round;
debug!(event = "consensus started", round);
let sender = QuorumMsgSender::new(self.outbound.clone());

// proposal-validation-ratification loop
let mut handle = self.spawn_consensus(ru, provisioners, sender);
let mut handle = self.spawn_consensus(ru, provisioners);

// Usually this select will be terminated due to cancel signal however
// it may also be terminated due to unrecoverable error in the main loop
Expand Down Expand Up @@ -122,7 +121,6 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
&self,
ru: RoundUpdate,
provisioners: Arc<Provisioners>,
sender: QuorumMsgSender,
) -> JoinHandle<()> {
let inbound = self.inbound.clone();
let outbound = self.outbound.clone();
Expand Down Expand Up @@ -237,7 +235,6 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
step_name,
executor.clone(),
sv_registry.clone(),
sender.clone(),
);

// Execute a phase
Expand Down Expand Up @@ -265,7 +262,7 @@ impl<T: Operations + 'static, D: Database + 'static> Consensus<T, D> {
);

// Broadcast/Rebroadcast
sender.send_quorum(msg.clone()).await;
outbound.try_send(msg.clone());

// INFO: we keep running consensus even with Success
// Quorum in case we fail to accept the block.
Expand Down
82 changes: 44 additions & 38 deletions consensus/src/execution_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tokio::time;
use tokio::time::Instant;
use tracing::{debug, error, info, trace, warn};

use crate::commons::{Database, QuorumMsgSender, RoundUpdate};
use crate::commons::{Database, RoundUpdate};
use crate::config::{
is_emergency_iter, CONSENSUS_MAX_ITER, MAX_ROUND_DISTANCE,
};
Expand Down Expand Up @@ -56,7 +56,6 @@ pub struct ExecutionCtx<'a, T, DB: Database> {
pub client: Arc<T>,

pub sv_registry: SafeAttestationInfoRegistry,
quorum_sender: QuorumMsgSender,
}

impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> {
Expand All @@ -73,7 +72,6 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> {
step: StepName,
client: Arc<T>,
sv_registry: SafeAttestationInfoRegistry,
quorum_sender: QuorumMsgSender,
) -> Self {
Self {
iter_ctx,
Expand All @@ -86,7 +84,6 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> {
step,
client,
sv_registry,
quorum_sender,
step_start_time: None,
}
}
Expand All @@ -113,7 +110,7 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> {
}

/// Returns true if the last step of last iteration is currently running
fn last_step_running(&self) -> bool {
fn is_last_step(&self) -> bool {
self.iteration == CONSENSUS_MAX_ITER - 1
&& self.step_name() == StepName::Ratification
}
Expand All @@ -132,22 +129,25 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> {
phase: Arc<Mutex<C>>,
additional_timeout: Option<Duration>,
) -> Message {
let open_consensus_mode = self.last_step_running();

// When consensus is in open_consensus_mode then it keeps Ratification
// step running indefinitely until either a valid block or
// emergency block is accepted
let timeout = if open_consensus_mode {
let dur = Duration::new(u32::MAX as u64, 0);
info!(event = "run event_loop", ?dur, mode = "open_consensus",);
dur
} else {
let dur = self.iter_ctx.get_timeout(self.step_name());
debug!(event = "run event_loop", ?dur, ?additional_timeout);
dur + additional_timeout.unwrap_or_default()
};
let round = self.round_update.round;
let iter = self.iteration;
let step = self.step_name();

let mut open_consensus_mode = false;

let step_timeout = self.iter_ctx.get_timeout(step);
let timeout = step_timeout + additional_timeout.unwrap_or_default();

debug!(
event = "Start step loop",
?step,
round,
iter,
?step_timeout,
?additional_timeout
);

let deadline = Instant::now().checked_add(timeout).unwrap();
let mut deadline = Instant::now().checked_add(timeout).unwrap();
let inbound = self.inbound.clone();

// Handle both timeout event and messages from inbound queue.
Expand All @@ -166,12 +166,6 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> {
.process_inbound_msg(phase.clone(), msg.clone())
.await
{
info!(
event = "Step completed",
step = ?self.step_name(),
info = ?msg.header
);

// In the normal case, we just return the result
// to Consensus
if !open_consensus_mode {
Expand All @@ -197,9 +191,8 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> {
is_local = true
);

self.quorum_sender
.send_quorum(msg)
.await;
// Broadcast Quorum
self.outbound.try_send(msg);
}
RatificationResult::Fail(vote) => {
debug!(
Expand Down Expand Up @@ -259,10 +252,8 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> {
is_local = false
);

// Repropagate Success Quorum
self.quorum_sender
.send_quorum(msg.clone())
.await;
// Broadcast Success Quorum
self.outbound.try_send(msg.clone());
}
RatificationResult::Fail(vote) => {
debug!(
Expand Down Expand Up @@ -345,15 +336,29 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> {
}
}
}

Ok(Err(e)) => {
warn!("Error while receiving msg: {e}");
}

// Timeout event. Phase could not reach its final goal.
// Increase timeout for next execution of this step and move on.
Err(_) => {
info!(event = "timeout-ed");
if open_consensus_mode {
error!("Timeout detected during last step running. This should never happen")
info!(event = "Step timeout expired", ?step, round, iter);

if self.is_last_step() {
info!(event = "Step ended", ?step, round, iter);

// If the last step expires, we enter Open Consensus
// mode. In this mode, the last step (Ratification)
// keeps running indefinitely, until a block is
// accepted.
info!(event = "Entering Open Consensus mode", round);

let timeout = Duration::new(u32::MAX as u64, 0);
deadline = Instant::now().checked_add(timeout).unwrap();

open_consensus_mode = true;
} else {
self.process_timeout_event(phase).await;
return Message::empty();
Expand Down Expand Up @@ -490,12 +495,13 @@ impl<'a, T: Operations + 'static, DB: Database> ExecutionCtx<'a, T, DB> {
info!(
event = "New Quorum",
mode = "emergency",
inf = ?m.header,
round = q.header.round,
iter = q.header.iteration,
vote = ?q.vote(),
);

// Broadcast Quorum
self.quorum_sender.send_quorum(m).await;
self.outbound.try_send(m);
}
}

Expand Down
14 changes: 10 additions & 4 deletions consensus/src/phase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use node_data::message::Message;
use node_data::StepName;
use tracing::{debug, trace};
use tracing::{info, trace};

use crate::commons::Database;
use crate::execution_ctx::ExecutionCtx;
Expand Down Expand Up @@ -54,10 +54,16 @@ impl<T: Operations + 'static, D: Database + 'static> Phase<T, D> {
pub async fn run(&mut self, mut ctx: ExecutionCtx<'_, T, D>) -> Message {
ctx.set_start_time();

let timeout = ctx.iter_ctx.get_timeout(ctx.step_name());
debug!(event = "execute_step", ?timeout);
let step = ctx.step_name();
let round = ctx.round_update.round;
let iter = ctx.iteration;
let timeout = ctx.iter_ctx.get_timeout(step);

// Execute step
await_phase!(self, run(ctx))
info!(event = "Step started", ?step, round, iter, ?timeout);
let msg = await_phase!(self, run(ctx));
info!(event = "Step ended", ?step, round, iter);

msg
}
}
3 changes: 3 additions & 0 deletions consensus/src/proposal/block_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ impl<T: Operations> Generator<T> {
info!(
event = "Candidate generated",
hash = &to_str(&candidate.header().hash),
round = candidate.header().height,
iter = candidate.header().iteration,
prev_block = &to_str(&candidate.header().prev_block_hash),
gas_limit = candidate.header().gas_limit,
state_hash = &to_str(&candidate.header().state_hash),
dur = format!("{:?}ms", start.elapsed().as_millis()),
Expand Down
16 changes: 16 additions & 0 deletions consensus/src/proposal/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ impl<D: Database> MsgHandler for ProposalHandler<D> {
.store_candidate_block(p.candidate.clone())
.await;

info!(
event = "New Candidate",
hash = &to_str(&p.candidate.header().hash),
round = p.candidate.header().height,
iter = p.candidate.header().iteration,
prev_block = &to_str(&p.candidate.header().prev_block_hash)
);

Ok(StepOutcome::Ready(msg))
}

Expand All @@ -84,6 +92,14 @@ impl<D: Database> MsgHandler for ProposalHandler<D> {
.store_candidate_block(p.candidate.clone())
.await;

info!(
event = "New Candidate",
hash = &to_str(&p.candidate.header().hash),
round = p.candidate.header().height,
iter = p.candidate.header().iteration,
prev_block = &to_str(&p.candidate.header().prev_block_hash)
);

Ok(StepOutcome::Ready(msg))
}

Expand Down
Loading

0 comments on commit 9099531

Please sign in to comment.