diff --git a/dan_layer/consensus/src/block_validations.rs b/dan_layer/consensus/src/block_validations.rs index 64aa7d511..51403db1e 100644 --- a/dan_layer/consensus/src/block_validations.rs +++ b/dan_layer/consensus/src/block_validations.rs @@ -1,12 +1,13 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use log::debug; +use log::{debug, warn}; use tari_common::configuration::Network; use tari_crypto::{ristretto::RistrettoPublicKey, tari_utilities::ByteArray}; use tari_dan_common_types::{ committee::{Committee, CommitteeInfo}, DerivableFromPublicKey, + Epoch, ExtraFieldKey, }; use tari_dan_storage::consensus_models::Block; @@ -18,6 +19,27 @@ use crate::{ }; const LOG_TARGET: &str = "tari::dan::consensus::hotstuff::block_validations"; +pub fn check_local_proposal( + current_epoch: Epoch, + block: &Block, + committee_info: &CommitteeInfo, + committee_for_block: &Committee, + vote_signing_service: &TConsensusSpec::SignatureService, + leader_strategy: &TConsensusSpec::LeaderStrategy, + config: &HotstuffConfig, +) -> Result<(), HotStuffError> { + check_proposal::( + block, + committee_info, + committee_for_block, + vote_signing_service, + leader_strategy, + config, + )?; + // This proposal is valid, if it is for an epoch ahead of us, we need to sync + check_current_epoch(block, current_epoch)?; + Ok(()) +} pub fn check_proposal( block: &Block, @@ -30,6 +52,7 @@ pub fn check_proposal( // TODO: in order to do the base layer block has validation, we need to ensure that we have synced to the tip. // If not, we need some strategy for "parking" the blocks until we are at least at the provided hash or the // tip. Without this, the check has a race condition between the base layer scanner and consensus. + // A simpler suggestion is to use the BL epoch block which does not change within epochs // check_base_layer_block_hash::(block, epoch_manager, config).await?; check_network(block, config.network)?; if block.is_genesis() { @@ -49,6 +72,19 @@ pub fn check_proposal( Ok(()) } +pub fn check_current_epoch(candidate_block: &Block, current_epoch: Epoch) -> Result<(), ProposalValidationError> { + if candidate_block.epoch() > current_epoch { + warn!(target: LOG_TARGET, "⚠️ Proposal for future epoch {} received. Current epoch is {}", candidate_block.epoch(), current_epoch); + return Err(ProposalValidationError::FutureEpoch { + block_id: *candidate_block.id(), + current_epoch, + block_epoch: candidate_block.epoch(), + }); + } + + Ok(()) +} + pub fn check_dummy(candidate_block: &Block) -> Result<(), ProposalValidationError> { if candidate_block.signature().is_some() { return Err(ProposalValidationError::DummyBlockWithSignature { diff --git a/dan_layer/consensus/src/hotstuff/error.rs b/dan_layer/consensus/src/hotstuff/error.rs index 67af6fa6f..5d66aac47 100644 --- a/dan_layer/consensus/src/hotstuff/error.rs +++ b/dan_layer/consensus/src/hotstuff/error.rs @@ -257,4 +257,10 @@ pub enum ProposalValidationError { DummyBlockWithCommands { block_id: BlockId }, #[error("Malformed block {block_id}: {details}")] MalformedBlock { block_id: BlockId, details: String }, + #[error("Block {block_id} is for a future epoch. Current epoch: {current_epoch}, block epoch: {block_epoch}")] + FutureEpoch { + block_id: BlockId, + current_epoch: Epoch, + block_epoch: Epoch, + }, } diff --git a/dan_layer/consensus/src/hotstuff/on_inbound_message.rs b/dan_layer/consensus/src/hotstuff/on_inbound_message.rs index 86ecc4576..0a1955627 100644 --- a/dan_layer/consensus/src/hotstuff/on_inbound_message.rs +++ b/dan_layer/consensus/src/hotstuff/on_inbound_message.rs @@ -94,6 +94,15 @@ impl MessageBuffer { while let Some(result) = self.inbound_messaging.next_message().await { let (from, msg) = result?; + + // If we receive an FP that is greater than our current epoch, we buffer it + if let HotstuffMessage::ForeignProposal(ref m) = msg { + if m.justify_qc.epoch() > current_epoch { + self.push_to_buffer(m.justify_qc.epoch(), NodeHeight::zero(), from, msg); + continue; + } + } + match msg_epoch_and_height(&msg) { // Discard old message Some((e, h)) if e < current_epoch || (e == current_epoch && h < next_height) => { @@ -101,7 +110,7 @@ impl MessageBuffer { continue; }, // Buffer message for future epoch/height - Some((epoch, height)) if epoch > current_epoch || height > next_height => { + Some((epoch, height)) if epoch == current_epoch && height > next_height => { if msg.proposal().is_some() { info!(target: LOG_TARGET, "🦴Proposal {msg} is for future view (Current view: {current_epoch}, {next_height})"); } else { @@ -110,6 +119,17 @@ impl MessageBuffer { self.push_to_buffer(epoch, height, from, msg); continue; }, + Some((epoch, height)) if epoch > current_epoch => { + warn!(target: LOG_TARGET, "⚠️ Message {msg} is for future epoch {epoch}. Current epoch {current_epoch}"); + if matches!(&msg, HotstuffMessage::Vote(_)) { + // Buffer VOTE messages. As it does not contain a QC we can use to prove that a BFT-majority has + // reached the epoch + self.push_to_buffer(epoch, height, from, msg); + continue; + } + // Return the message, it will be validated and if valid, will kick consensus into sync + return Ok(Some((from, msg))); + }, // Height is irrelevant or current, return message _ => return Ok(Some((from, msg))), } diff --git a/dan_layer/consensus/src/hotstuff/on_message_validate.rs b/dan_layer/consensus/src/hotstuff/on_message_validate.rs index bce67d70e..987459fd2 100644 --- a/dan_layer/consensus/src/hotstuff/on_message_validate.rs +++ b/dan_layer/consensus/src/hotstuff/on_message_validate.rs @@ -21,7 +21,7 @@ use tokio::sync::broadcast; use super::config::HotstuffConfig; use crate::{ block_validations, - hotstuff::{error::HotStuffError, HotstuffEvent, ProposalValidationError}, + hotstuff::{error::HotStuffError, CurrentView, HotstuffEvent, ProposalValidationError}, messages::{ForeignProposalMessage, HotstuffMessage, MissingTransactionsRequest, ProposalMessage}, tracing::TraceTimer, traits::{ConsensusSpec, OutboundMessaging}, @@ -33,6 +33,7 @@ pub struct OnMessageValidate { config: HotstuffConfig, store: TConsensusSpec::StateStore, epoch_manager: TConsensusSpec::EpochManager, + current_view: CurrentView, leader_strategy: TConsensusSpec::LeaderStrategy, vote_signing_service: TConsensusSpec::SignatureService, outbound_messaging: TConsensusSpec::OutboundMessaging, @@ -47,6 +48,7 @@ impl OnMessageValidate { config: HotstuffConfig, store: TConsensusSpec::StateStore, epoch_manager: TConsensusSpec::EpochManager, + current_view: CurrentView, leader_strategy: TConsensusSpec::LeaderStrategy, vote_signing_service: TConsensusSpec::SignatureService, outbound_messaging: TConsensusSpec::OutboundMessaging, @@ -56,6 +58,7 @@ impl OnMessageValidate { config, store, epoch_manager, + current_view, leader_strategy, vote_signing_service, outbound_messaging, @@ -146,6 +149,7 @@ impl OnMessageValidate { ); if proposal.block.height() < current_height { + // Should never happen since the on_inbound_message handler filters these out info!( target: LOG_TARGET, "🔥 Block {} is lower than current height {}. Ignoring.", @@ -155,7 +159,7 @@ impl OnMessageValidate { return Ok(MessageValidationResult::Discard); } - if let Err(err) = self.check_proposal(&proposal.block, local_committee, local_committee_info) { + if let Err(err) = self.check_local_proposal(&proposal.block, local_committee, local_committee_info) { return Ok(MessageValidationResult::Invalid { from, message: HotstuffMessage::Proposal(proposal), @@ -202,7 +206,24 @@ impl OnMessageValidate { }) } - fn check_proposal( + fn check_local_proposal( + &self, + block: &Block, + committee_for_block: &Committee, + committee_info: &CommitteeInfo, + ) -> Result<(), HotStuffError> { + block_validations::check_local_proposal::( + self.current_view.get_epoch(), + block, + committee_info, + committee_for_block, + &self.vote_signing_service, + &self.leader_strategy, + &self.config, + ) + } + + fn check_foreign_proposal( &self, block: &Block, committee_for_block: &Committee, @@ -325,7 +346,7 @@ impl OnMessageValidate { .get_committee_info_by_validator_public_key(msg.block.epoch(), msg.block.proposed_by().clone()) .await?; - if let Err(err) = self.check_proposal(&msg.block, &committee, &committee_info) { + if let Err(err) = self.check_foreign_proposal(&msg.block, &committee, &committee_info) { return Ok(MessageValidationResult::Invalid { from, message: HotstuffMessage::ForeignProposal(msg), diff --git a/dan_layer/consensus/src/hotstuff/state_machine/running.rs b/dan_layer/consensus/src/hotstuff/state_machine/running.rs index f1d39000a..3abf25f98 100644 --- a/dan_layer/consensus/src/hotstuff/state_machine/running.rs +++ b/dan_layer/consensus/src/hotstuff/state_machine/running.rs @@ -12,6 +12,7 @@ use crate::{ worker::ConsensusWorkerContext, }, HotStuffError, + ProposalValidationError, }, traits::ConsensusSpec, }; @@ -39,7 +40,8 @@ where TSpec: ConsensusSpec info!(target: LOG_TARGET, "Not registered for current epoch ({err})"); Ok(ConsensusStateEvent::NotRegisteredForEpoch { epoch }) }, - Err(err @ HotStuffError::FallenBehind { .. }) => { + Err(err @ HotStuffError::FallenBehind { .. }) | + Err(err @ HotStuffError::ProposalValidationError(ProposalValidationError::FutureEpoch { .. })) => { info!(target: LOG_TARGET, "⚠️ Behind peers, starting sync ({err})"); Ok(ConsensusStateEvent::NeedSync) }, diff --git a/dan_layer/consensus/src/hotstuff/worker.rs b/dan_layer/consensus/src/hotstuff/worker.rs index 1f96b9123..21f3e8d09 100644 --- a/dan_layer/consensus/src/hotstuff/worker.rs +++ b/dan_layer/consensus/src/hotstuff/worker.rs @@ -139,6 +139,7 @@ impl HotstuffWorker { config.clone(), state_store.clone(), epoch_manager.clone(), + pacemaker.clone_handle().current_view().clone(), leader_strategy.clone(), signing_service.clone(), outbound_messaging.clone(), @@ -438,6 +439,18 @@ impl HotstuffWorker { Ok(()) }, MessageValidationResult::Discard => Ok(()), + // In these cases, we want to propagate the error back to the state machine, to allow sync + MessageValidationResult::Invalid { + err: err @ HotStuffError::FallenBehind { .. }, + .. + } | + MessageValidationResult::Invalid { + err: err @ HotStuffError::ProposalValidationError(ProposalValidationError::FutureEpoch { .. }), + .. + } => { + self.hooks.on_error(&err); + Err(err) + }, MessageValidationResult::Invalid { err, from, message } => { self.hooks.on_error(&err); error!(target: LOG_TARGET, "🚨 Invalid new message from {from}: {err} - {message}"); diff --git a/dan_layer/consensus_tests/src/consensus.rs b/dan_layer/consensus_tests/src/consensus.rs index 7079ddcd9..028cbb4d2 100644 --- a/dan_layer/consensus_tests/src/consensus.rs +++ b/dan_layer/consensus_tests/src/consensus.rs @@ -1220,7 +1220,6 @@ async fn leader_failure_node_goes_down_and_gets_evicted() { let mut test = Test::builder() // Allow enough time for leader failures .with_test_timeout(Duration::from_secs(30)) - .debug_sql("/tmp/test{}.db") .modify_consensus_constants(|config_mut| { // The node will be evicted after three missed proposals config_mut.missed_proposal_suspend_threshold = 1;