Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: foreign broadcast reliability counter #757

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use tari_dan_app_utilities::{
use tari_dan_common_types::{Epoch, NodeAddressable, NodeHeight, ShardId};
use tari_dan_engine::fees::FeeTable;
use tari_dan_storage::{
consensus_models::{Block, BlockId, ExecutedTransaction, SubstateRecord},
consensus_models::{Block, BlockId, ExecutedTransaction, ForeignReceiveCounters, SubstateRecord},
global::GlobalDb,
StateStore,
StateStoreReadTransaction,
Expand Down Expand Up @@ -206,6 +206,7 @@ pub async fn spawn_services(

// Consensus
let (tx_executed_transaction, rx_executed_transaction) = mpsc::channel(10);
let foreign_receive_counter = state_store.with_read_tx(|tx| ForeignReceiveCounters::get(tx))?;
let (consensus_join_handle, consensus_handle, rx_consensus_to_mempool) = consensus::spawn(
state_store.clone(),
node_identity.clone(),
Expand All @@ -214,6 +215,7 @@ pub async fn spawn_services(
rx_consensus_message,
outbound_messaging.clone(),
validator_node_client_factory.clone(),
foreign_receive_counter,
shutdown.clone(),
)
.await;
Expand Down
4 changes: 3 additions & 1 deletion applications/tari_validator_node/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tari_consensus::{
};
use tari_dan_common_types::committee::Committee;
use tari_dan_p2p::{Message, OutboundService};
use tari_dan_storage::consensus_models::TransactionPool;
use tari_dan_storage::consensus_models::{ForeignReceiveCounters, TransactionPool};
use tari_epoch_manager::base_layer::EpochManagerHandle;
use tari_shutdown::ShutdownSignal;
use tari_state_store_sqlite::SqliteStateStore;
Expand Down Expand Up @@ -50,6 +50,7 @@ pub async fn spawn(
rx_hs_message: mpsc::Receiver<(CommsPublicKey, HotstuffMessage<PublicKey>)>,
outbound_messaging: OutboundMessaging,
client_factory: TariCommsValidatorNodeClientFactory,
foreign_receive_counter: ForeignReceiveCounters,
shutdown_signal: ShutdownSignal,
) -> (
JoinHandle<Result<(), anyhow::Error>>,
Expand Down Expand Up @@ -81,6 +82,7 @@ pub async fn spawn(
tx_leader,
tx_hotstuff_events.clone(),
tx_mempool,
foreign_receive_counter,
shutdown_signal.clone(),
);

Expand Down
4 changes: 4 additions & 0 deletions dan_layer/consensus/src/hotstuff/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ pub enum ProposalValidationError {
},
#[error("Node proposed by {proposed_by} with hash {hash} did not satisfy the safeNode predicate")]
NotSafeBlock { proposed_by: String, hash: BlockId },
#[error("Node proposed by {proposed_by} with hash {hash} is missing foreign index")]
MissingForeignCounters { proposed_by: String, hash: BlockId },
#[error("Node proposed by {proposed_by} with hash {hash} has invalid foreign counters")]
InvalidForeignCounters { proposed_by: String, hash: BlockId },
#[error("Node proposed by {proposed_by} with hash {hash} is the genesis block")]
ProposingGenesisBlock { proposed_by: String, hash: BlockId },
#[error("Justification block {justify_block} for proposed block {block_description} by {proposed_by} not found")]
Expand Down
24 changes: 21 additions & 3 deletions dan_layer/consensus/src/hotstuff/on_propose.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::{collections::BTreeSet, num::NonZeroU64};
use std::{collections::BTreeSet, num::NonZeroU64, ops::DerefMut};

use log::*;
use tari_dan_common_types::{
Expand All @@ -14,6 +14,7 @@ use tari_dan_storage::{
consensus_models::{
Block,
Command,
ForeignSendCounters,
HighQc,
LastProposed,
LeafBlock,
Expand All @@ -29,7 +30,7 @@ use tokio::sync::mpsc;

use super::common::CommitteeAndMessage;
use crate::{
hotstuff::{common::EXHAUST_DIVISOR, error::HotStuffError},
hotstuff::{common::EXHAUST_DIVISOR, error::HotStuffError, proposer},
messages::{HotstuffMessage, ProposalMessage},
traits::ConsensusSpec,
};
Expand Down Expand Up @@ -109,7 +110,7 @@ where TConsensusSpec: ConsensusSpec
let mut tx = self.store.create_write_tx()?;
let high_qc = HighQc::get(&mut *tx)?;
let high_qc = high_qc.get_quorum_certificate(&mut *tx)?;

let mut foreign_counters = ForeignSendCounters::get(tx.deref_mut(), leaf_block.block_id())?;
next_block = self.build_next_block(
&mut tx,
epoch,
Expand All @@ -120,6 +121,7 @@ where TConsensusSpec: ConsensusSpec
// TODO: This just avoids issues with proposed transactions causing leader failures. Not sure if this
// is a good idea.
is_newview_propose,
&mut foreign_counters,
)?;

next_block.as_last_proposed().set(&mut tx)?;
Expand Down Expand Up @@ -184,6 +186,7 @@ where TConsensusSpec: ConsensusSpec
proposed_by: <TConsensusSpec::EpochManager as EpochManagerReader>::Addr,
local_committee_shard: &CommitteeShard,
empty_block: bool,
foreign_counters: &mut ForeignSendCounters,
) -> Result<Block<TConsensusSpec::Addr>, HotStuffError> {
// TODO: Configure
const TARGET_BLOCK_SIZE: usize = 1000;
Expand Down Expand Up @@ -235,6 +238,20 @@ where TConsensusSpec: ConsensusSpec
commands.iter().map(|c| c.to_string()).collect::<Vec<_>>().join(",")
);

let non_local_buckets = proposer::get_non_local_buckets_from_commands(
tx,
&commands,
local_committee_shard.num_committees(),
local_committee_shard.bucket(),
)?;

// let mut foreign_indexes = HashMap::new();

let foreign_indexes = non_local_buckets
.iter()
.map(|bucket| (*bucket, foreign_counters.increment_counter(*bucket)))
.collect();

let next_block = Block::new(
*parent_block.block_id(),
high_qc,
Expand All @@ -243,6 +260,7 @@ where TConsensusSpec: ConsensusSpec
proposed_by,
commands,
total_leader_fee,
foreign_indexes,
);

Ok(next_block)
Expand Down
41 changes: 35 additions & 6 deletions dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
use std::ops::DerefMut;

use log::*;
use tari_dan_common_types::{committee::CommitteeShard, optional::Optional, NodeHeight};
use tari_dan_common_types::{committee::CommitteeShard, optional::Optional, shard_bucket::ShardBucket, NodeHeight};
use tari_dan_storage::{
consensus_models::{Block, LeafBlock, TransactionPool, TransactionPoolStage},
consensus_models::{Block, ForeignReceiveCounters, LeafBlock, TransactionPool, TransactionPoolStage},
StateStore,
};
use tari_epoch_manager::EpochManagerReader;
Expand All @@ -23,6 +23,7 @@ pub struct OnReceiveForeignProposalHandler<TConsensusSpec: ConsensusSpec> {
epoch_manager: TConsensusSpec::EpochManager,
transaction_pool: TransactionPool<TConsensusSpec::StateStore>,
pacemaker: PaceMakerHandle,
foreign_receive_counter: ForeignReceiveCounters,
}

impl<TConsensusSpec> OnReceiveForeignProposalHandler<TConsensusSpec>
Expand All @@ -33,17 +34,19 @@ where TConsensusSpec: ConsensusSpec
epoch_manager: TConsensusSpec::EpochManager,
transaction_pool: TransactionPool<TConsensusSpec::StateStore>,
pacemaker: PaceMakerHandle,
foreign_receive_counter: ForeignReceiveCounters,
) -> Self {
Self {
store,
epoch_manager,
transaction_pool,
pacemaker,
foreign_receive_counter,
}
}

pub async fn handle(
&self,
&mut self,
from: TConsensusSpec::Addr,
message: ProposalMessage<TConsensusSpec::Addr>,
) -> Result<(), HotStuffError> {
Expand All @@ -63,9 +66,14 @@ where TConsensusSpec: ConsensusSpec
.epoch_manager
.get_committee_shard(block.epoch(), vn.shard_key)
.await?;
self.validate_proposed_block(&from, &block)?;
self.store
.with_write_tx(|tx| self.on_receive_foreign_block(tx, &block, &committee_shard))?;
let local_shard = self.epoch_manager.get_local_committee_shard(block.epoch()).await?;
self.validate_proposed_block(&from, &block, committee_shard.bucket(), local_shard.bucket())?;
// Is this ok? Can foreign node send invalid block that should still increment the counter?
self.foreign_receive_counter.increment(&committee_shard.bucket());
self.store.with_write_tx(|tx| {
self.foreign_receive_counter.save(tx)?;
self.on_receive_foreign_block(tx, &block, &committee_shard)
})?;

// We could have ready transactions at this point, so if we're the leader for the next block we can propose
self.pacemaker.beat();
Expand Down Expand Up @@ -141,7 +149,28 @@ where TConsensusSpec: ConsensusSpec
&self,
from: &TConsensusSpec::Addr,
candidate_block: &Block<TConsensusSpec::Addr>,
foreign_bucket: ShardBucket,
local_bucket: ShardBucket,
) -> Result<(), ProposalValidationError> {
let incoming_index = match candidate_block.get_foreign_index(&local_bucket) {
Some(i) => *i,
None => {
debug!(target:LOG_TARGET, "Our bucket {local_bucket:?} is missing reliability index in the proposed block {candidate_block:?}");
return Err(ProposalValidationError::MissingForeignCounters {
proposed_by: from.to_string(),
hash: *candidate_block.id(),
});
},
};
let current_index = self.foreign_receive_counter.get_index(&foreign_bucket);
if current_index + 1 != incoming_index {
debug!(target:LOG_TARGET, "We were expecting the index to be {expected_index}, but the index was
{incoming_index}", expected_index = current_index + 1);
return Err(ProposalValidationError::InvalidForeignCounters {
proposed_by: from.to_string(),
hash: *candidate_block.id(),
});
}
if candidate_block.height() == NodeHeight::zero() || candidate_block.id().is_genesis() {
return Err(ProposalValidationError::ProposingGenesisBlock {
proposed_by: from.to_string(),
Expand Down
58 changes: 49 additions & 9 deletions dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,23 @@
// ----[foreign:LocalPrepared]--->(LocalPrepared, true) ----cmd:AllPrepare ---> (AllPrepared, true) ---cmd:Accept --->
// Complete

use std::{collections::HashMap, ops::DerefMut};

use log::*;
use tari_dan_common_types::{committee::Committee, optional::Optional, NodeHeight};
use tari_dan_common_types::{
committee::{Committee, CommitteeShard},
optional::Optional,
shard_bucket::ShardBucket,
NodeHeight,
};
use tari_dan_storage::{
consensus_models::{Block, HighQc, TransactionPool, ValidBlock},
consensus_models::{Block, ForeignSendCounters, HighQc, TransactionPool, ValidBlock},
StateStore,
};
use tari_epoch_manager::EpochManagerReader;
use tokio::sync::{broadcast, mpsc};

use super::proposer::Proposer;
use super::proposer::{self, Proposer};
use crate::{
hotstuff::{
error::HotStuffError,
Expand Down Expand Up @@ -128,9 +135,10 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveProposalHandler<TConsensusSpec> {
.epoch_manager
.get_committee_by_validator_address(block.epoch(), block.proposed_by())
.await?;
let local_committee_shard = self.epoch_manager.get_local_committee_shard(block.epoch()).await?;
// First save the block in one db transaction
self.store.with_read_tx(|tx| {
match self.validate_local_proposed_block(tx, block, &local_committee) {
self.store.with_write_tx(|tx| {
match self.validate_local_proposed_block(tx, block, &local_committee, &local_committee_shard) {
Ok(validated) => Ok(Some(validated)),
// Validation errors should not cause a FAILURE state transition
Err(HotStuffError::ProposalValidationError(err)) => {
Expand All @@ -143,15 +151,35 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveProposalHandler<TConsensusSpec> {
})
}

fn check_foreign_indexes(
&self,
tx: &mut <TConsensusSpec::StateStore as StateStore>::WriteTransaction<'_>,
num_committees: u32,
local_bucket: ShardBucket,
block: &Block<TConsensusSpec::Addr>,
) -> Result<bool, HotStuffError> {
let mut foreign_counters = ForeignSendCounters::get(tx.deref_mut(), block.parent())?;
let non_local_buckets = proposer::get_non_local_buckets(tx.deref_mut(), block, num_committees, local_bucket)?;
let mut foreign_indexes = HashMap::new();
for non_local_bucket in non_local_buckets {
foreign_indexes
.entry(non_local_bucket)
.or_insert(foreign_counters.increment_counter(non_local_bucket));
}
foreign_counters.set(tx, block.id())?;
Ok(foreign_indexes == *block.get_foreign_indexes())
}

/// Perform final block validations (TODO: implement all validations)
/// We assume at this point that initial stateless validations have been done (in inbound messages)
fn validate_local_proposed_block(
&self,
tx: &mut <TConsensusSpec::StateStore as StateStore>::ReadTransaction<'_>,
tx: &mut <TConsensusSpec::StateStore as StateStore>::WriteTransaction<'_>,
candidate_block: Block<TConsensusSpec::Addr>,
local_committee: &Committee<TConsensusSpec::Addr>,
local_committee_shard: &CommitteeShard,
) -> Result<ValidBlock<TConsensusSpec::Addr>, HotStuffError> {
if Block::has_been_processed(tx, candidate_block.id())? {
if Block::has_been_processed(tx.deref_mut(), candidate_block.id())? {
return Err(ProposalValidationError::BlockAlreadyProcessed {
block_id: *candidate_block.id(),
height: candidate_block.height(),
Expand All @@ -160,7 +188,7 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveProposalHandler<TConsensusSpec> {
}

// Check that details included in the justify match previously added blocks
let Some(justify_block) = candidate_block.justify().get_block(tx).optional()? else {
let Some(justify_block) = candidate_block.justify().get_block(tx.deref_mut()).optional()? else {
// This will trigger a sync
return Err(ProposalValidationError::JustifyBlockNotFound {
proposed_by: candidate_block.proposed_by().to_string(),
Expand Down Expand Up @@ -241,13 +269,25 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveProposalHandler<TConsensusSpec> {

// Now that we have all dummy blocks (if any) in place, we can check if the candidate block is safe.
// Specifically, it should extend the locked block via the dummy blocks.
if !candidate_block.is_safe(tx)? {
if !candidate_block.is_safe(tx.deref_mut())? {
return Err(ProposalValidationError::NotSafeBlock {
proposed_by: candidate_block.proposed_by().to_string(),
hash: *candidate_block.id(),
}
.into());
}
if !self.check_foreign_indexes(
tx,
local_committee_shard.num_committees(),
local_committee_shard.bucket(),
&candidate_block,
)? {
return Err(ProposalValidationError::InvalidForeignCounters {
proposed_by: candidate_block.proposed_by().to_string(),
hash: *candidate_block.id(),
}
.into());
}

Ok(ValidBlock::new(candidate_block))
}
Expand Down
19 changes: 12 additions & 7 deletions dan_layer/consensus/src/hotstuff/proposer.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::collections::HashSet;
use std::collections::{BTreeSet, HashSet};

use log::{debug, info};
use tari_dan_common_types::shard_bucket::ShardBucket;
use tari_dan_storage::{
consensus_models::{Block, ExecutedTransaction},
consensus_models::{Block, Command, ExecutedTransaction},
StateStore,
StateStoreReadTransaction,
};
Expand Down Expand Up @@ -95,11 +95,16 @@ pub fn get_non_local_buckets<TTx: StateStoreReadTransaction>(
num_committees: u32,
local_bucket: ShardBucket,
) -> Result<HashSet<ShardBucket>, HotStuffError> {
let prepared_iter = block
.commands()
.iter()
.filter_map(|cmd| cmd.local_prepared())
.map(|t| &t.id);
get_non_local_buckets_from_commands(tx, block.commands(), num_committees, local_bucket)
}

pub fn get_non_local_buckets_from_commands<TTx: StateStoreReadTransaction>(
tx: &mut TTx,
commands: &BTreeSet<Command>,
num_committees: u32,
local_bucket: ShardBucket,
) -> Result<HashSet<ShardBucket>, HotStuffError> {
let prepared_iter = commands.iter().filter_map(|cmd| cmd.local_prepared()).map(|t| &t.id);
let prepared_txs = ExecutedTransaction::get_involved_shards(tx, prepared_iter)?;
let non_local_buckets = prepared_txs
.into_iter()
Expand Down
Loading
Loading