Skip to content

Commit

Permalink
fix(consensus): correct handling lagging behind epoch
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Dec 9, 2024
1 parent aefa5a0 commit 5b200bc
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 8 deletions.
38 changes: 37 additions & 1 deletion dan_layer/consensus/src/block_validations.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use log::debug;
use log::{debug, warn};
use tari_common::configuration::Network;
use tari_crypto::{ristretto::RistrettoPublicKey, tari_utilities::ByteArray};
use tari_dan_common_types::{
committee::{Committee, CommitteeInfo},
DerivableFromPublicKey,
Epoch,
ExtraFieldKey,
};
use tari_dan_storage::consensus_models::Block;
Expand All @@ -18,6 +19,27 @@ use crate::{
};

const LOG_TARGET: &str = "tari::dan::consensus::hotstuff::block_validations";
pub fn check_local_proposal<TConsensusSpec: ConsensusSpec>(
current_epoch: Epoch,
block: &Block,
committee_info: &CommitteeInfo,
committee_for_block: &Committee<TConsensusSpec::Addr>,
vote_signing_service: &TConsensusSpec::SignatureService,
leader_strategy: &TConsensusSpec::LeaderStrategy,
config: &HotstuffConfig,
) -> Result<(), HotStuffError> {
check_proposal::<TConsensusSpec>(
block,
committee_info,
committee_for_block,
vote_signing_service,
leader_strategy,
config,
)?;
// This proposal is valid, if it is for an epoch ahead of us, we need to sync
check_current_epoch(block, current_epoch)?;
Ok(())
}

pub fn check_proposal<TConsensusSpec: ConsensusSpec>(
block: &Block,
Expand All @@ -30,6 +52,7 @@ pub fn check_proposal<TConsensusSpec: ConsensusSpec>(
// TODO: in order to do the base layer block has validation, we need to ensure that we have synced to the tip.
// If not, we need some strategy for "parking" the blocks until we are at least at the provided hash or the
// tip. Without this, the check has a race condition between the base layer scanner and consensus.
// A simpler suggestion is to use the BL epoch block which does not change within epochs
// check_base_layer_block_hash::<TConsensusSpec>(block, epoch_manager, config).await?;
check_network(block, config.network)?;
if block.is_genesis() {
Expand All @@ -49,6 +72,19 @@ pub fn check_proposal<TConsensusSpec: ConsensusSpec>(
Ok(())
}

pub fn check_current_epoch(candidate_block: &Block, current_epoch: Epoch) -> Result<(), ProposalValidationError> {
if candidate_block.epoch() > current_epoch {
warn!(target: LOG_TARGET, "⚠️ Proposal for future epoch {} received. Current epoch is {}", candidate_block.epoch(), current_epoch);
return Err(ProposalValidationError::FutureEpoch {
block_id: *candidate_block.id(),
current_epoch,
block_epoch: candidate_block.epoch(),
});
}

Ok(())
}

pub fn check_dummy(candidate_block: &Block) -> Result<(), ProposalValidationError> {
if candidate_block.signature().is_some() {
return Err(ProposalValidationError::DummyBlockWithSignature {
Expand Down
6 changes: 6 additions & 0 deletions dan_layer/consensus/src/hotstuff/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,10 @@ pub enum ProposalValidationError {
DummyBlockWithCommands { block_id: BlockId },
#[error("Malformed block {block_id}: {details}")]
MalformedBlock { block_id: BlockId, details: String },
#[error("Block {block_id} is for a future epoch. Current epoch: {current_epoch}, block epoch: {block_epoch}")]
FutureEpoch {
block_id: BlockId,
current_epoch: Epoch,
block_epoch: Epoch,
},
}
22 changes: 21 additions & 1 deletion dan_layer/consensus/src/hotstuff/on_inbound_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,23 @@ impl<TConsensusSpec: ConsensusSpec> MessageBuffer<TConsensusSpec> {

while let Some(result) = self.inbound_messaging.next_message().await {
let (from, msg) = result?;

// If we receive an FP that is greater than our current epoch, we buffer it
if let HotstuffMessage::ForeignProposal(ref m) = msg {
if m.justify_qc.epoch() > current_epoch {
self.push_to_buffer(m.justify_qc.epoch(), NodeHeight::zero(), from, msg);
continue;
}
}

match msg_epoch_and_height(&msg) {
// Discard old message
Some((e, h)) if e < current_epoch || (e == current_epoch && h < next_height) => {
info!(target: LOG_TARGET, "🗑️ Discard message {} is for previous view {}/{}. Current view {}/{}", msg, e, h, current_epoch, next_height);
continue;
},
// Buffer message for future epoch/height
Some((epoch, height)) if epoch > current_epoch || height > next_height => {
Some((epoch, height)) if epoch == current_epoch && height > next_height => {
if msg.proposal().is_some() {
info!(target: LOG_TARGET, "🦴Proposal {msg} is for future view (Current view: {current_epoch}, {next_height})");
} else {
Expand All @@ -110,6 +119,17 @@ impl<TConsensusSpec: ConsensusSpec> MessageBuffer<TConsensusSpec> {
self.push_to_buffer(epoch, height, from, msg);
continue;
},
Some((epoch, height)) if epoch > current_epoch => {
warn!(target: LOG_TARGET, "⚠️ Message {msg} is for future epoch {epoch}. Current epoch {current_epoch}");
if matches!(&msg, HotstuffMessage::Vote(_)) {
// Buffer VOTE messages. As it does not contain a QC we can use to prove that a BFT-majority has
// reached the epoch
self.push_to_buffer(epoch, height, from, msg);
continue;
}
// Return the message, it will be validated and if valid, will kick consensus into sync
return Ok(Some((from, msg)));
},
// Height is irrelevant or current, return message
_ => return Ok(Some((from, msg))),
}
Expand Down
29 changes: 25 additions & 4 deletions dan_layer/consensus/src/hotstuff/on_message_validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use tokio::sync::broadcast;
use super::config::HotstuffConfig;
use crate::{
block_validations,
hotstuff::{error::HotStuffError, HotstuffEvent, ProposalValidationError},
hotstuff::{error::HotStuffError, CurrentView, HotstuffEvent, ProposalValidationError},
messages::{ForeignProposalMessage, HotstuffMessage, MissingTransactionsRequest, ProposalMessage},
tracing::TraceTimer,
traits::{ConsensusSpec, OutboundMessaging},
Expand All @@ -33,6 +33,7 @@ pub struct OnMessageValidate<TConsensusSpec: ConsensusSpec> {
config: HotstuffConfig,
store: TConsensusSpec::StateStore,
epoch_manager: TConsensusSpec::EpochManager,
current_view: CurrentView,
leader_strategy: TConsensusSpec::LeaderStrategy,
vote_signing_service: TConsensusSpec::SignatureService,
outbound_messaging: TConsensusSpec::OutboundMessaging,
Expand All @@ -47,6 +48,7 @@ impl<TConsensusSpec: ConsensusSpec> OnMessageValidate<TConsensusSpec> {
config: HotstuffConfig,
store: TConsensusSpec::StateStore,
epoch_manager: TConsensusSpec::EpochManager,
current_view: CurrentView,
leader_strategy: TConsensusSpec::LeaderStrategy,
vote_signing_service: TConsensusSpec::SignatureService,
outbound_messaging: TConsensusSpec::OutboundMessaging,
Expand All @@ -56,6 +58,7 @@ impl<TConsensusSpec: ConsensusSpec> OnMessageValidate<TConsensusSpec> {
config,
store,
epoch_manager,
current_view,
leader_strategy,
vote_signing_service,
outbound_messaging,
Expand Down Expand Up @@ -146,6 +149,7 @@ impl<TConsensusSpec: ConsensusSpec> OnMessageValidate<TConsensusSpec> {
);

if proposal.block.height() < current_height {
// Should never happen since the on_inbound_message handler filters these out
info!(
target: LOG_TARGET,
"🔥 Block {} is lower than current height {}. Ignoring.",
Expand All @@ -155,7 +159,7 @@ impl<TConsensusSpec: ConsensusSpec> OnMessageValidate<TConsensusSpec> {
return Ok(MessageValidationResult::Discard);
}

if let Err(err) = self.check_proposal(&proposal.block, local_committee, local_committee_info) {
if let Err(err) = self.check_local_proposal(&proposal.block, local_committee, local_committee_info) {
return Ok(MessageValidationResult::Invalid {
from,
message: HotstuffMessage::Proposal(proposal),
Expand Down Expand Up @@ -202,7 +206,24 @@ impl<TConsensusSpec: ConsensusSpec> OnMessageValidate<TConsensusSpec> {
})
}

fn check_proposal(
fn check_local_proposal(
&self,
block: &Block,
committee_for_block: &Committee<TConsensusSpec::Addr>,
committee_info: &CommitteeInfo,
) -> Result<(), HotStuffError> {
block_validations::check_local_proposal::<TConsensusSpec>(
self.current_view.get_epoch(),
block,
committee_info,
committee_for_block,
&self.vote_signing_service,
&self.leader_strategy,
&self.config,
)
}

fn check_foreign_proposal(
&self,
block: &Block,
committee_for_block: &Committee<TConsensusSpec::Addr>,
Expand Down Expand Up @@ -325,7 +346,7 @@ impl<TConsensusSpec: ConsensusSpec> OnMessageValidate<TConsensusSpec> {
.get_committee_info_by_validator_public_key(msg.block.epoch(), msg.block.proposed_by().clone())
.await?;

if let Err(err) = self.check_proposal(&msg.block, &committee, &committee_info) {
if let Err(err) = self.check_foreign_proposal(&msg.block, &committee, &committee_info) {
return Ok(MessageValidationResult::Invalid {
from,
message: HotstuffMessage::ForeignProposal(msg),
Expand Down
4 changes: 3 additions & 1 deletion dan_layer/consensus/src/hotstuff/state_machine/running.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
worker::ConsensusWorkerContext,
},
HotStuffError,
ProposalValidationError,
},
traits::ConsensusSpec,
};
Expand Down Expand Up @@ -39,7 +40,8 @@ where TSpec: ConsensusSpec
info!(target: LOG_TARGET, "Not registered for current epoch ({err})");
Ok(ConsensusStateEvent::NotRegisteredForEpoch { epoch })
},
Err(err @ HotStuffError::FallenBehind { .. }) => {
Err(err @ HotStuffError::FallenBehind { .. }) |
Err(err @ HotStuffError::ProposalValidationError(ProposalValidationError::FutureEpoch { .. })) => {
info!(target: LOG_TARGET, "⚠️ Behind peers, starting sync ({err})");
Ok(ConsensusStateEvent::NeedSync)
},
Expand Down
13 changes: 13 additions & 0 deletions dan_layer/consensus/src/hotstuff/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ impl<TConsensusSpec: ConsensusSpec> HotstuffWorker<TConsensusSpec> {
config.clone(),
state_store.clone(),
epoch_manager.clone(),
pacemaker.clone_handle().current_view().clone(),
leader_strategy.clone(),
signing_service.clone(),
outbound_messaging.clone(),
Expand Down Expand Up @@ -438,6 +439,18 @@ impl<TConsensusSpec: ConsensusSpec> HotstuffWorker<TConsensusSpec> {
Ok(())
},
MessageValidationResult::Discard => Ok(()),
// In these cases, we want to propagate the error back to the state machine, to allow sync
MessageValidationResult::Invalid {
err: err @ HotStuffError::FallenBehind { .. },
..
} |
MessageValidationResult::Invalid {
err: err @ HotStuffError::ProposalValidationError(ProposalValidationError::FutureEpoch { .. }),
..
} => {
self.hooks.on_error(&err);
Err(err)
},
MessageValidationResult::Invalid { err, from, message } => {
self.hooks.on_error(&err);
error!(target: LOG_TARGET, "🚨 Invalid new message from {from}: {err} - {message}");
Expand Down
1 change: 0 additions & 1 deletion dan_layer/consensus_tests/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,6 @@ async fn leader_failure_node_goes_down_and_gets_evicted() {
let mut test = Test::builder()
// Allow enough time for leader failures
.with_test_timeout(Duration::from_secs(30))
.debug_sql("/tmp/test{}.db")
.modify_consensus_constants(|config_mut| {
// The node will be evicted after three missed proposals
config_mut.missed_proposal_suspend_threshold = 1;
Expand Down

0 comments on commit 5b200bc

Please sign in to comment.