Skip to content

Commit

Permalink
feat: add reliability counter
Browse files Browse the repository at this point in the history
  • Loading branch information
Cifko committed Nov 23, 2023
1 parent 42452d4 commit 71acf0a
Show file tree
Hide file tree
Showing 23 changed files with 464 additions and 34 deletions.
4 changes: 3 additions & 1 deletion applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use tari_dan_app_utilities::{
use tari_dan_common_types::{Epoch, NodeAddressable, NodeHeight, ShardId};
use tari_dan_engine::fees::FeeTable;
use tari_dan_storage::{
consensus_models::{Block, ExecutedTransaction, SubstateRecord},
consensus_models::{Block, ExecutedTransaction, ForeignReceiveCounters, SubstateRecord},
global::GlobalDb,
StateStore,
StateStoreReadTransaction,
Expand Down Expand Up @@ -205,6 +205,7 @@ pub async fn spawn_services(

// Consensus
let (tx_executed_transaction, rx_executed_transaction) = mpsc::channel(10);
let foreign_receive_counter = state_store.with_read_tx(|tx| ForeignReceiveCounters::get(tx))?;
let (consensus_join_handle, consensus_handle, rx_consensus_to_mempool) = consensus::spawn(
state_store.clone(),
node_identity.clone(),
Expand All @@ -213,6 +214,7 @@ pub async fn spawn_services(
rx_consensus_message,
outbound_messaging.clone(),
validator_node_client_factory.clone(),
foreign_receive_counter,
shutdown.clone(),
)
.await;
Expand Down
4 changes: 3 additions & 1 deletion applications/tari_validator_node/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tari_consensus::{
};
use tari_dan_common_types::committee::Committee;
use tari_dan_p2p::{Message, OutboundService};
use tari_dan_storage::consensus_models::TransactionPool;
use tari_dan_storage::consensus_models::{ForeignReceiveCounters, TransactionPool};
use tari_epoch_manager::base_layer::EpochManagerHandle;
use tari_shutdown::ShutdownSignal;
use tari_state_store_sqlite::SqliteStateStore;
Expand Down Expand Up @@ -50,6 +50,7 @@ pub async fn spawn(
rx_hs_message: mpsc::Receiver<(CommsPublicKey, HotstuffMessage<PublicKey>)>,
outbound_messaging: OutboundMessaging,
client_factory: TariCommsValidatorNodeClientFactory,
foreign_receive_counter: ForeignReceiveCounters,
shutdown_signal: ShutdownSignal,
) -> (
JoinHandle<Result<(), anyhow::Error>>,
Expand Down Expand Up @@ -81,6 +82,7 @@ pub async fn spawn(
tx_leader,
tx_hotstuff_events.clone(),
tx_mempool,
foreign_receive_counter,
shutdown_signal.clone(),
);

Expand Down
4 changes: 4 additions & 0 deletions dan_layer/consensus/src/hotstuff/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ pub enum ProposalValidationError {
},
#[error("Node proposed by {proposed_by} with hash {hash} did not satisfy the safeNode predicate")]
NotSafeBlock { proposed_by: String, hash: BlockId },
#[error("Node proposed by {proposed_by} with hash {hash} is missing foreign index")]
MissingForeignCounters { proposed_by: String, hash: BlockId },
#[error("Node proposed by {proposed_by} with hash {hash} has invalid foreign counters")]
InvalidForeignCounters { proposed_by: String, hash: BlockId },
#[error("Node proposed by {proposed_by} with hash {hash} is the genesis block")]
ProposingGenesisBlock { proposed_by: String, hash: BlockId },
#[error("Justification block {justify_block} for proposed block {block_description} by {proposed_by} not found")]
Expand Down
29 changes: 26 additions & 3 deletions dan_layer/consensus/src/hotstuff/on_propose.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::{collections::BTreeSet, num::NonZeroU64};
use std::{
collections::{BTreeSet, HashMap},
num::NonZeroU64,
ops::DerefMut,
};

use log::*;
use tari_dan_common_types::{
committee::{Committee, CommitteeShard},
optional::Optional,
shard_bucket::ShardBucket,
Epoch,
NodeHeight,
};
use tari_dan_storage::{
consensus_models::{
Block,
Command,
ForeignSendCounters,
HighQc,
LastProposed,
LeafBlock,
Expand All @@ -29,7 +35,7 @@ use tokio::sync::mpsc;

use super::common::CommitteeAndMessage;
use crate::{
hotstuff::{common::EXHAUST_DIVISOR, error::HotStuffError},
hotstuff::{common::EXHAUST_DIVISOR, error::HotStuffError, proposer},
messages::{HotstuffMessage, ProposalMessage},
traits::ConsensusSpec,
};
Expand Down Expand Up @@ -109,7 +115,7 @@ where TConsensusSpec: ConsensusSpec
let mut tx = self.store.create_write_tx()?;
let high_qc = HighQc::get(&mut *tx)?;
let high_qc = high_qc.get_quorum_certificate(&mut *tx)?;

let mut foreign_counters = ForeignSendCounters::get(tx.deref_mut(), leaf_block.block_id())?;
next_block = self.build_next_block(
&mut tx,
epoch,
Expand All @@ -120,6 +126,7 @@ where TConsensusSpec: ConsensusSpec
// TODO: This just avoids issues with proposed transactions causing leader failures. Not sure if this
// is a good idea.
is_newview_propose,
&mut foreign_counters,
)?;

next_block.as_last_proposed().set(&mut tx)?;
Expand Down Expand Up @@ -184,6 +191,7 @@ where TConsensusSpec: ConsensusSpec
proposed_by: <TConsensusSpec::EpochManager as EpochManagerReader>::Addr,
local_committee_shard: &CommitteeShard,
empty_block: bool,
foreign_counters: &mut ForeignSendCounters,
) -> Result<Block<TConsensusSpec::Addr>, HotStuffError> {
// TODO: Configure
const TARGET_BLOCK_SIZE: usize = 1000;
Expand Down Expand Up @@ -235,6 +243,20 @@ where TConsensusSpec: ConsensusSpec
commands.iter().map(|c| c.to_string()).collect::<Vec<_>>().join(",")
);

let non_local_buckets = proposer::get_non_local_buckets_from_commands(
tx,
&commands,
local_committee_shard.num_committees(),
local_committee_shard.bucket(),
)?;

// let mut foreign_indexes = HashMap::new();

let foreign_indexes = non_local_buckets
.iter()
.map(|bucket| (*bucket, foreign_counters.increment_counter(*bucket)))
.collect();

let next_block = Block::new(
*parent_block.block_id(),
high_qc,
Expand All @@ -243,6 +265,7 @@ where TConsensusSpec: ConsensusSpec
proposed_by,
commands,
total_leader_fee,
foreign_indexes,
);

Ok(next_block)
Expand Down
41 changes: 35 additions & 6 deletions dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
use std::ops::DerefMut;

use log::*;
use tari_dan_common_types::{committee::CommitteeShard, optional::Optional, NodeHeight};
use tari_dan_common_types::{committee::CommitteeShard, optional::Optional, shard_bucket::ShardBucket, NodeHeight};
use tari_dan_storage::{
consensus_models::{Block, LeafBlock, TransactionPool, TransactionPoolStage},
consensus_models::{Block, ForeignReceiveCounters, LeafBlock, TransactionPool, TransactionPoolStage},
StateStore,
};
use tari_epoch_manager::EpochManagerReader;
Expand All @@ -23,6 +23,7 @@ pub struct OnReceiveForeignProposalHandler<TConsensusSpec: ConsensusSpec> {
epoch_manager: TConsensusSpec::EpochManager,
transaction_pool: TransactionPool<TConsensusSpec::StateStore>,
pacemaker: PaceMakerHandle,
foreign_receive_counter: ForeignReceiveCounters,
}

impl<TConsensusSpec> OnReceiveForeignProposalHandler<TConsensusSpec>
Expand All @@ -33,17 +34,19 @@ where TConsensusSpec: ConsensusSpec
epoch_manager: TConsensusSpec::EpochManager,
transaction_pool: TransactionPool<TConsensusSpec::StateStore>,
pacemaker: PaceMakerHandle,
foreign_receive_counter: ForeignReceiveCounters,
) -> Self {
Self {
store,
epoch_manager,
transaction_pool,
pacemaker,
foreign_receive_counter,
}
}

pub async fn handle(
&self,
&mut self,
from: TConsensusSpec::Addr,
message: ProposalMessage<TConsensusSpec::Addr>,
) -> Result<(), HotStuffError> {
Expand All @@ -63,9 +66,14 @@ where TConsensusSpec: ConsensusSpec
.epoch_manager
.get_committee_shard(block.epoch(), vn.shard_key)
.await?;
self.validate_proposed_block(&from, &block)?;
self.store
.with_write_tx(|tx| self.on_receive_foreign_block(tx, &block, &committee_shard))?;
let local_shard = self.epoch_manager.get_local_committee_shard(block.epoch()).await?;
self.validate_proposed_block(&from, &block, committee_shard.bucket(), local_shard.bucket())?;
// Is this ok? Can foreign node send invalid block that should still increment the counter?
self.foreign_receive_counter.increment(&committee_shard.bucket());
self.store.with_write_tx(|tx| {
self.foreign_receive_counter.save(tx)?;
self.on_receive_foreign_block(tx, &block, &committee_shard)
})?;

// We could have ready transactions at this point, so if we're the leader for the next block we can propose
self.pacemaker.beat();
Expand Down Expand Up @@ -141,7 +149,28 @@ where TConsensusSpec: ConsensusSpec
&self,
from: &TConsensusSpec::Addr,
candidate_block: &Block<TConsensusSpec::Addr>,
foreign_bucket: ShardBucket,
local_bucket: ShardBucket,
) -> Result<(), ProposalValidationError> {
let incoming_index = match candidate_block.get_foreign_index(&local_bucket) {
Some(i) => *i,
None => {
debug!(target:LOG_TARGET, "Our bucket {local_bucket:?} is missing reliability index in the proposed block {candidate_block:?}");
return Err(ProposalValidationError::MissingForeignCounters {
proposed_by: from.to_string(),
hash: *candidate_block.id(),
});
},
};
let current_index = self.foreign_receive_counter.get_index(&foreign_bucket);
if current_index + 1 != incoming_index {
debug!(target:LOG_TARGET, "We were expecting the index to be {expected_index}, but the index was
{incoming_index}", expected_index = current_index + 1);
return Err(ProposalValidationError::InvalidForeignCounters {
proposed_by: from.to_string(),
hash: *candidate_block.id(),
});
}
if candidate_block.height() == NodeHeight::zero() || candidate_block.id().is_genesis() {
return Err(ProposalValidationError::ProposingGenesisBlock {
proposed_by: from.to_string(),
Expand Down
58 changes: 49 additions & 9 deletions dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,23 @@
// ----[foreign:LocalPrepared]--->(LocalPrepared, true) ----cmd:AllPrepare ---> (AllPrepared, true) ---cmd:Accept --->
// Complete

use std::{collections::HashMap, ops::DerefMut};

use log::*;
use tari_dan_common_types::{committee::Committee, optional::Optional, NodeHeight};
use tari_dan_common_types::{
committee::{Committee, CommitteeShard},
optional::Optional,
shard_bucket::ShardBucket,
NodeHeight,
};
use tari_dan_storage::{
consensus_models::{Block, HighQc, TransactionPool, ValidBlock},
consensus_models::{Block, ForeignSendCounters, HighQc, TransactionPool, ValidBlock},
StateStore,
};
use tari_epoch_manager::EpochManagerReader;
use tokio::sync::{broadcast, mpsc};

use super::proposer::Proposer;
use super::proposer::{self, Proposer};
use crate::{
hotstuff::{
error::HotStuffError,
Expand Down Expand Up @@ -128,9 +135,10 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveProposalHandler<TConsensusSpec> {
.epoch_manager
.get_committee_by_validator_address(block.epoch(), block.proposed_by())
.await?;
let local_committee_shard = self.epoch_manager.get_local_committee_shard(block.epoch()).await?;
// First save the block in one db transaction
self.store.with_read_tx(|tx| {
match self.validate_local_proposed_block(tx, block, &local_committee) {
self.store.with_write_tx(|tx| {
match self.validate_local_proposed_block(tx, block, &local_committee, &local_committee_shard) {
Ok(validated) => Ok(Some(validated)),
// Validation errors should not cause a FAILURE state transition
Err(HotStuffError::ProposalValidationError(err)) => {
Expand All @@ -143,15 +151,35 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveProposalHandler<TConsensusSpec> {
})
}

fn check_foreign_indexes(
&self,
tx: &mut <TConsensusSpec::StateStore as StateStore>::WriteTransaction<'_>,
num_committees: u32,
local_bucket: ShardBucket,
block: &Block<TConsensusSpec::Addr>,
) -> Result<bool, HotStuffError> {
let mut foreign_counters = ForeignSendCounters::get(tx.deref_mut(), block.parent())?;
let non_local_buckets = proposer::get_non_local_buckets(tx.deref_mut(), block, num_committees, local_bucket)?;
let mut foreign_indexes = HashMap::new();
for non_local_bucket in non_local_buckets {
foreign_indexes
.entry(non_local_bucket)
.or_insert(foreign_counters.increment_counter(non_local_bucket));
}
foreign_counters.set(tx, block.id())?;
Ok(foreign_indexes == *block.get_foreign_indexes())
}

/// Perform final block validations (TODO: implement all validations)
/// We assume at this point that initial stateless validations have been done (in inbound messages)
fn validate_local_proposed_block(
&self,
tx: &mut <TConsensusSpec::StateStore as StateStore>::ReadTransaction<'_>,
tx: &mut <TConsensusSpec::StateStore as StateStore>::WriteTransaction<'_>,
candidate_block: Block<TConsensusSpec::Addr>,
local_committee: &Committee<TConsensusSpec::Addr>,
local_committee_shard: &CommitteeShard,
) -> Result<ValidBlock<TConsensusSpec::Addr>, HotStuffError> {
if Block::has_been_processed(tx, candidate_block.id())? {
if Block::has_been_processed(tx.deref_mut(), candidate_block.id())? {
return Err(ProposalValidationError::BlockAlreadyProcessed {
block_id: *candidate_block.id(),
height: candidate_block.height(),
Expand All @@ -160,7 +188,7 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveProposalHandler<TConsensusSpec> {
}

// Check that details included in the justify match previously added blocks
let Some(justify_block) = candidate_block.justify().get_block(tx).optional()? else {
let Some(justify_block) = candidate_block.justify().get_block(tx.deref_mut()).optional()? else {
// This will trigger a sync
return Err(ProposalValidationError::JustifyBlockNotFound {
proposed_by: candidate_block.proposed_by().to_string(),
Expand Down Expand Up @@ -241,13 +269,25 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveProposalHandler<TConsensusSpec> {

// Now that we have all dummy blocks (if any) in place, we can check if the candidate block is safe.
// Specifically, it should extend the locked block via the dummy blocks.
if !candidate_block.is_safe(tx)? {
if !candidate_block.is_safe(tx.deref_mut())? {
return Err(ProposalValidationError::NotSafeBlock {
proposed_by: candidate_block.proposed_by().to_string(),
hash: *candidate_block.id(),
}
.into());
}
if !self.check_foreign_indexes(
tx,
local_committee_shard.num_committees(),
local_committee_shard.bucket(),
&candidate_block,
)? {
return Err(ProposalValidationError::InvalidForeignCounters {
proposed_by: candidate_block.proposed_by().to_string(),
hash: *candidate_block.id(),
}
.into());
}

Ok(ValidBlock::new(candidate_block))
}
Expand Down
19 changes: 12 additions & 7 deletions dan_layer/consensus/src/hotstuff/proposer.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::collections::HashSet;
use std::collections::{BTreeSet, HashSet};

use log::{debug, info};
use tari_dan_common_types::shard_bucket::ShardBucket;
use tari_dan_storage::{
consensus_models::{Block, ExecutedTransaction},
consensus_models::{Block, Command, ExecutedTransaction},
StateStore,
StateStoreReadTransaction,
};
Expand Down Expand Up @@ -95,11 +95,16 @@ pub fn get_non_local_buckets<TTx: StateStoreReadTransaction>(
num_committees: u32,
local_bucket: ShardBucket,
) -> Result<HashSet<ShardBucket>, HotStuffError> {
let prepared_iter = block
.commands()
.iter()
.filter_map(|cmd| cmd.local_prepared())
.map(|t| &t.id);
get_non_local_buckets_from_commands(tx, block.commands(), num_committees, local_bucket)
}

pub fn get_non_local_buckets_from_commands<TTx: StateStoreReadTransaction>(
tx: &mut TTx,
commands: &BTreeSet<Command>,
num_committees: u32,
local_bucket: ShardBucket,
) -> Result<HashSet<ShardBucket>, HotStuffError> {
let prepared_iter = commands.iter().filter_map(|cmd| cmd.local_prepared()).map(|t| &t.id);
let prepared_txs = ExecutedTransaction::get_involved_shards(tx, prepared_iter)?;
let non_local_buckets = prepared_txs
.into_iter()
Expand Down
Loading

0 comments on commit 71acf0a

Please sign in to comment.