Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

consensus: Handle messages in emergency mode #1241

Merged
merged 6 commits into from
Jan 12, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions consensus/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,8 @@ pub const DEFAULT_BLOCK_GAS_LIMIT: u64 = 5 * 1_000_000_000;

pub const RELAX_ITERATION_THRESHOLD: u8 = 10;

/// Emergency mode is enabled only for the last N iterations
pub const EMERGENCY_MODE_ITERATION_THRESHOLD: u8 = CONSENSUS_MAX_ITER - 5;
goshawk-3 marked this conversation as resolved.
Show resolved Hide resolved

pub const ROUND_BASE_TIMEOUT: Duration = Duration::from_secs(5);
pub const MAX_STEP_TIMEOUT: Duration = Duration::from_secs(60);
129 changes: 81 additions & 48 deletions consensus/src/execution_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,22 @@ use crate::user::sortition;
use node_data::bls::PublicKeyBytes;
use node_data::ledger::{to_str, Block};
use node_data::message::Payload;
use node_data::message::{AsyncQueue, Message, Topics};
use node_data::message::{AsyncQueue, Message};

use node_data::StepName;

use crate::config::EMERGENCY_MODE_ITERATION_THRESHOLD;
use crate::ratification::step::RatificationStep;
use crate::validation::step::ValidationStep;
use node_data::message::payload::ValidationResult;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time;
use tokio::time::Instant;
use tracing::{debug, error, info, trace};

/// ExecutionCtx encapsulates all data needed by a single step to be fully
/// executed.
/// ExecutionCtx encapsulates all data needed in the execution of consensus
/// messages handlers.
pub struct ExecutionCtx<'a, DB: Database, T> {
pub iter_ctx: &'a mut IterationCtx<DB>,

Expand All @@ -47,7 +51,7 @@ pub struct ExecutionCtx<'a, DB: Database, T> {
pub iteration: u8,
step: StepName,

_executor: Arc<Mutex<T>>,
executor: Arc<Mutex<T>>,

pub sv_registry: SafeCertificateInfoRegistry,
quorum_sender: QuorumMsgSender,
Expand Down Expand Up @@ -78,7 +82,7 @@ impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> {
round_update,
iteration,
step,
_executor: executor,
executor,
sv_registry,
quorum_sender,
}
Expand Down Expand Up @@ -149,79 +153,108 @@ impl<'a, DB: Database, T: Operations + 'static> ExecutionCtx<'a, DB, T> {
}
}

pub(crate) async fn vote_for_former_candidate(
/// Cast a validation vote for a candidate that originates from former
/// iteration
pub(crate) async fn try_cast_validation_vote(
&mut self,
msg_step: u16,
msg_iteration: u8,
candidate: &Block,
) {
debug!(
event = "former candidate received",
hash = to_str(&candidate.header().hash),
msg_step,
);

if msg_step < self.step() {
self.try_vote(msg_step + 1, candidate, Topics::Validation);
}
let step = StepName::Validation.to_step(msg_iteration);

if msg_step + 2 <= self.step() {
self.try_vote(msg_step + 2, candidate, Topics::Ratification);
if let Some(committee) = self.iter_ctx.committees.get_committee(step) {
if self.am_member(committee) {
ValidationStep::try_vote(
candidate,
&self.round_update,
msg_iteration,
self.outbound.clone(),
self.inbound.clone(),
self.executor.clone(),
)
.await;
};
} else {
error!(event = "committee not found", msg_iteration);
}
}

fn try_vote(&mut self, msg_step: u16, candidate: &Block, topic: Topics) {
if let Some(committee) =
self.iter_ctx.committees.get_committee(msg_step)
{
if self.am_member(committee) {
debug!(
event = "vote for former candidate",
step_topic = format!("{:?}", topic),
hash = to_str(&candidate.header().hash),
msg_step,
);
pub(crate) async fn try_cast_ratification_vote(
&self,
msg_iteration: u8,
validation: &ValidationResult,
) {
let step = StepName::Ratification.to_step(msg_iteration);

// TODO: Verify
};
} else {
error!(event = "committee not found", step = self.step(), msg_step);
if let Some(committee) = self.iter_ctx.committees.get_committee(step) {
if self.am_member(committee) {
RatificationStep::<T, DB>::try_vote(
&self.round_update,
msg_iteration,
validation,
self.outbound.clone(),
)
.await;
}
}
}

/// Process messages from past
async fn process_past_events(&mut self, msg: Message) -> Option<Message> {
if msg.header.round != self.round_update.round {
if msg.header.round != self.round_update.round
|| self.iteration < EMERGENCY_MODE_ITERATION_THRESHOLD
{
// Discard messages from past if current iteration is not considered
// an emergency iteration
return None;
}

self.on_emergency_mode(msg).await
}

/// Handles a consensus message in emergency mode
async fn on_emergency_mode(&mut self, msg: Message) -> Option<Message> {
if let Err(e) = self.outbound.send(msg.clone()).await {
error!("could not send msg due to {:?}", e);
}

// Try to vote for candidate block from former iteration
// Try to cast validation vote for a candidate block from former
// iteration
if let Payload::Candidate(p) = &msg.payload {
// TODO: Perform block header/ Certificate full verification
// To be addressed with another PR

self.vote_for_former_candidate(msg.header.get_step(), &p.candidate)
self.try_cast_validation_vote(msg.header.iteration, &p.candidate)
.await;
}

// Collect message from a previous reduction step/iteration.
let msg_iteration = msg.header.iteration;

// Collect message from a previous iteration/step.
fed-franz marked this conversation as resolved.
Show resolved Hide resolved
if let Some(m) = self
fed-franz marked this conversation as resolved.
Show resolved Hide resolved
.iter_ctx
.collect_past_event(&self.round_update, msg)
.await
{
if m.header.topic == Topics::Quorum {
debug!(
event = "quorum",
src = "prev_step",
msg_step = m.header.get_step(),
hash = to_str(&m.header.block_hash),
);
match &m.payload {
fed-franz marked this conversation as resolved.
Show resolved Hide resolved
Payload::Quorum(_) => {
debug!(
event = "quorum",
src = "prev_step",
msg_step = m.header.get_step(),
hash = to_str(&m.header.block_hash),
);

self.quorum_sender.send(m).await;
self.quorum_sender.send(m).await;
}

Payload::ValidationResult(validation_result) => {
self.try_cast_ratification_vote(
msg_iteration,
validation_result,
)
.await
}
_ => {
// Not supported.
}
}
}

Expand Down
16 changes: 7 additions & 9 deletions consensus/src/ratification/step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ pub struct RatificationStep<T, DB> {

impl<T: Operations + 'static, DB: Database> RatificationStep<T, DB> {
pub async fn try_vote(
&self,
ru: &RoundUpdate,
iteration: u8,
result: &ValidationResult,
Expand Down Expand Up @@ -125,14 +124,13 @@ impl<T: Operations + 'static, DB: Database> RatificationStep<T, DB> {
if ctx.am_member(committee) {
let mut handler = self.handler.lock().await;

let vote_msg = self
.try_vote(
&ctx.round_update,
ctx.iteration,
handler.validation_result(),
ctx.outbound.clone(),
)
.await;
let vote_msg = Self::try_vote(
&ctx.round_update,
ctx.iteration,
handler.validation_result(),
ctx.outbound.clone(),
)
.await;

// Collect my own vote
let res = handler
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/validation/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ impl MsgHandler<Message> for ValidationHandler {
{
return Ok(HandleMsgOutput::Ready(quorum_msg));
}

return Ok(final_result(sv, hash, QuorumType::ValidQuorum));
}

Ok(HandleMsgOutput::Pending(msg))
Expand Down
39 changes: 23 additions & 16 deletions consensus/src/validation/step.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// Copyright (c) DUSK NETWORK. All rights reserved.

use crate::commons::{ConsensusError, Database, RoundUpdate};
use crate::config;
use crate::contract_state::{CallParams, Operations};
use crate::execution_ctx::ExecutionCtx;
use crate::validation::handler;
Expand All @@ -24,8 +25,7 @@ pub struct ValidationStep<T> {
}

impl<T: Operations + 'static> ValidationStep<T> {
#[allow(clippy::too_many_arguments)]
pub fn spawn_try_vote(
pub(crate) fn spawn_try_vote(
join_set: &mut JoinSet<()>,
candidate: Block,
ru: RoundUpdate,
Expand All @@ -38,27 +38,28 @@ impl<T: Operations + 'static> ValidationStep<T> {
join_set.spawn(
async move {
Self::try_vote(
candidate, ru, iteration, outbound, inbound, executor,
&candidate, &ru, iteration, outbound, inbound, executor,
)
.await
}
.instrument(tracing::info_span!("voting", hash)),
);
}

async fn try_vote(
candidate: Block,
ru: RoundUpdate,
pub(crate) async fn try_vote(
candidate: &Block,
ru: &RoundUpdate,
iteration: u8,
outbound: AsyncQueue<Message>,
inbound: AsyncQueue<Message>,
executor: Arc<Mutex<T>>,
) {
// TODO: Verify Block Header
let hash = candidate.header().hash;

// Call VST for non-empty blocks
if hash != [0u8; 32] {
if let Err(err) = Self::call_vst(&candidate, &ru, executor).await {
if let Err(err) = Self::call_vst(candidate, ru, executor).await {
error!(
event = "failed_vst_call",
reason = format!("{:?}", err)
Expand Down Expand Up @@ -197,15 +198,21 @@ impl<T: Operations + 'static> ValidationStep<T> {
if ctx.am_member(committee) {
let candidate = self.handler.lock().await.candidate.clone();

Self::spawn_try_vote(
&mut ctx.iter_ctx.join_set,
candidate,
ctx.round_update.clone(),
ctx.iteration,
ctx.outbound.clone(),
ctx.inbound.clone(),
self.executor.clone(),
);
// Casting a NIL vote is disabled in Emergency Mode
fed-franz marked this conversation as resolved.
Show resolved Hide resolved
let voting_enabled = candidate.header().hash != [0u8; 32]
|| ctx.iteration < config::EMERGENCY_MODE_ITERATION_THRESHOLD;

if voting_enabled {
Self::spawn_try_vote(
&mut ctx.iter_ctx.join_set,
candidate,
ctx.round_update.clone(),
ctx.iteration,
ctx.outbound.clone(),
ctx.inbound.clone(),
self.executor.clone(),
);
}
}

// handle queued messages for current round and step.
Expand Down