Skip to content

Commit

Permalink
fix(consensus): vote handling improvement, bug fixes (#1157)
Browse files Browse the repository at this point in the history
Description
---
fix(consensus): vote handling improvement, bug fixes
Set preshards to 256 
Fix sending duplicate transaction gossips
Wait for foreign proposal before sequencing an output-only transaction
that only currently involves the transaction receipt substate

Motivation and Context
---

Removed unnecessary view change calls when receiving votes from newviews
Removed several epoch manager calls that can use committee info
Fixed bug in vote processing
Set number of preshards to 256 to test that it works (which it does so
I've kept it)
Add checks for valid block ids to a number of pending data database
calls
Fix propose bug where incorrect state could be used when parent block if
a dummy
  • Loading branch information
sdbondi authored Sep 27, 2024
1 parent 49c82f7 commit f7038d3
Show file tree
Hide file tree
Showing 53 changed files with 599 additions and 596 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl ConsensusConstants {
committee_size: 7,
max_base_layer_blocks_ahead: 5,
max_base_layer_blocks_behind: 5,
num_preshards: NumPreshards::P64,
num_preshards: NumPreshards::P256,
pacemaker_max_base_time: Duration::from_secs(10),
}
}
Expand Down
4 changes: 2 additions & 2 deletions applications/tari_indexer/src/transaction_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ where
target: LOG_TARGET,
"Submitting transaction with hash {} to the validator node", tx_hash
);
let transaction_substate_address = SubstateAddress::for_transaction_receipt(tx_hash.into_array().into());
let transaction_substate_address = tx_hash.to_substate_address();

if transaction.all_inputs_iter().next().is_none() {
self.try_with_committee(iter::once(transaction_substate_address), 2, |mut client| {
Expand Down Expand Up @@ -122,7 +122,7 @@ where
&self,
transaction_id: TransactionId,
) -> Result<TransactionResultStatus, TransactionManagerError> {
let transaction_substate_address = SubstateAddress::for_transaction_receipt(transaction_id.into_array().into());
let transaction_substate_address = transaction_id.to_substate_address();
self.try_with_committee(iter::once(transaction_substate_address), 1, |mut client| async move {
client.get_finalized_transaction_result(transaction_id).await.optional()
})
Expand Down
4 changes: 2 additions & 2 deletions applications/tari_swarm_daemon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async-trait = { workspace = true }
axum = { workspace = true, features = ["multipart"] }
axum-jrpc = { workspace = true }
base64 = "0.22.1"
clap = { workspace = true, features = ["derive"] }
clap = { workspace = true, features = ["derive", "env"] }
fern = { workspace = true, features = ["colored"] }
futures = { workspace = true }
humantime = { workspace = true }
Expand All @@ -43,7 +43,7 @@ thiserror = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal", "process", "time", "fs"] }
toml = "0.8.12"
tonic = { workspace = true }
tower-http = { workspace = true, features = ["fs"] }
tower-http = { workspace = true, features = ["fs", "cors"] }
url = { workspace = true }

[target.'cfg(unix)'.dependencies]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ fn cargo_build<P: AsRef<Path>>(working_dir: P, package: &str) -> io::Result<Chil
Command::new("cargo")
.args(["build", "--release", "--bin", package])
.current_dir(working_dir)
.kill_on_drop(true)
.spawn()
}

Expand Down
29 changes: 10 additions & 19 deletions applications/tari_validator_node/src/consensus/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use prometheus::{core::Collector, IntCounter, IntGauge, IntGaugeVec, Opts, Regis
use tari_consensus::{hotstuff::HotStuffError, messages::HotstuffMessage, traits::hooks::ConsensusHooks};
use tari_dan_common_types::{NodeHeight, PeerAddress};
use tari_dan_storage::{
consensus_models::{Decision, QuorumDecision, TransactionAtom, TransactionPool, ValidBlock},
consensus_models::{Decision, QuorumDecision, TransactionAtom, ValidBlock},
StateStore,
};
use tari_state_store_sqlite::SqliteStateStore;
Expand All @@ -17,7 +17,7 @@ use crate::metrics::{CollectorRegister, LabelledCollector};

#[derive(Debug, Clone)]
pub struct PrometheusConsensusMetrics<S = SqliteStateStore<PeerAddress>> {
state_store: S,
_state_store: S,
local_blocks_received: IntCounter,
blocks_accepted: IntCounter,
blocks_rejected: IntCounter,
Expand All @@ -33,7 +33,7 @@ pub struct PrometheusConsensusMetrics<S = SqliteStateStore<PeerAddress>> {
pacemaker_leader_failures: IntCounter,
needs_sync: IntCounter,

transactions_pool_size: IntGauge,
_transactions_pool_size: IntGauge,
transactions_ready_for_consensus: IntCounter,
transactions_finalized_committed: IntCounter,
transactions_finalized_aborted: IntCounter,
Expand All @@ -42,7 +42,7 @@ pub struct PrometheusConsensusMetrics<S = SqliteStateStore<PeerAddress>> {
impl<S: StateStore> PrometheusConsensusMetrics<S> {
pub fn new(state_store: S, registry: &Registry) -> Self {
Self {
state_store,
_state_store: state_store,
local_blocks_received: IntCounter::new("consensus_blocks_received", "Number of blocks added")
.unwrap()
.register_at(registry),
Expand Down Expand Up @@ -97,9 +97,12 @@ impl<S: StateStore> PrometheusConsensusMetrics<S> {
)
.unwrap()
.register_at(registry),
transactions_pool_size: IntGauge::new("consensus_transactions_pool_size", "Number of transactions in pool")
.unwrap()
.register_at(registry),
_transactions_pool_size: IntGauge::new(
"consensus_transactions_pool_size",
"Number of transactions in pool",
)
.unwrap()
.register_at(registry),
}
}

Expand Down Expand Up @@ -168,18 +171,6 @@ impl<S: StateStore> ConsensusHooks for PrometheusConsensusMetrics<S> {
self.pacemaker_leader_failures.inc()
}

fn on_beat(&mut self) {
let Some(count) = self
.state_store
.with_read_tx(|tx| TransactionPool::<S>::new().count(tx))
.ok()
else {
return;
};

self.transactions_pool_size.set(count as i64);
}

fn on_needs_sync(&mut self, _local_height: NodeHeight, _remote_qc_height: NodeHeight) {
self.needs_sync.inc();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: BSD-3-Clause

use rand::rngs::OsRng;
use tari_common_types::types::{FixedHash, PublicKey};
use tari_common_types::types::PublicKey;
use tari_consensus::traits::{ValidatorSignatureService, VoteSignatureService};
use tari_dan_app_utilities::keypair::RistrettoKeypair;
use tari_dan_storage::consensus_models::{BlockId, QuorumDecision, ValidatorSchnorrSignature, ValidatorSignature};
Expand All @@ -29,14 +29,8 @@ impl ValidatorSignatureService for TariSignatureService {
}

impl VoteSignatureService for TariSignatureService {
fn verify(
&self,
signature: &ValidatorSignature,
leaf_hash: &FixedHash,
block_id: &BlockId,
decision: &QuorumDecision,
) -> bool {
let message = self.create_message(leaf_hash, block_id, decision);
fn verify(&self, signature: &ValidatorSignature, block_id: &BlockId, decision: &QuorumDecision) -> bool {
let message = self.create_message(block_id, decision);
signature.verify(message)
}
}
6 changes: 1 addition & 5 deletions applications/tari_validator_node/src/p2p/rpc/service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -319,11 +319,7 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl {
}

async fn get_high_qc(&self, _request: Request<GetHighQcRequest>) -> Result<Response<GetHighQcResponse>, RpcStatus> {
let current_epoch = self
.epoch_manager
.current_epoch()
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;
let current_epoch = self.consensus.current_epoch();
let high_qc = self
.shard_state_store
.with_read_tx(|tx| {
Expand Down
44 changes: 24 additions & 20 deletions applications/tari_validator_node/src/p2p/services/mempool/gossip.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::collections::HashSet;
use std::{collections::HashSet, iter};

use log::*;
use tari_dan_common_types::{Epoch, NumPreshards, PeerAddress, ShardGroup, SubstateAddress};
use tari_dan_p2p::{proto, DanMessage};
use tari_dan_common_types::{Epoch, NumPreshards, PeerAddress, ShardGroup, ToSubstateAddress};
use tari_dan_p2p::{proto, DanMessage, NewTransactionMessage};
use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerReader};

use crate::p2p::services::{mempool::MempoolError, messaging::Gossip};
Expand All @@ -30,7 +30,7 @@ impl MempoolGossip<PeerAddress> {
}
}

pub async fn next_message(&mut self) -> Option<Result<(PeerAddress, DanMessage), MempoolError>> {
pub async fn next_message(&mut self) -> Option<Result<(PeerAddress, DanMessage, usize), MempoolError>> {
self.gossip
.next_message()
.await
Expand Down Expand Up @@ -79,19 +79,33 @@ impl MempoolGossip<PeerAddress> {
Ok(())
}

pub async fn forward_to_foreign_replicas<T: Into<DanMessage>>(
pub fn get_num_incoming_messages(&self) -> usize {
self.gossip.get_num_incoming_messages()
}

pub async fn forward_to_foreign_replicas(
&mut self,
epoch: Epoch,
substate_addresses: HashSet<SubstateAddress>,
msg: T,
msg: NewTransactionMessage,
exclude_shard_group: Option<ShardGroup>,
) -> Result<(), MempoolError> {
let n = self.epoch_manager.get_num_committees(epoch).await?;
let committee_shard = self.epoch_manager.get_local_committee_info(epoch).await?;
let local_shard_group = committee_shard.shard_group();
let shard_groups = substate_addresses
.into_iter()
.map(|s| s.to_shard_group(self.num_preshards, n))
let shard_groups = msg
.transaction
.all_inputs_iter()
.map(|s| {
s.or_zero_version()
.to_substate_address()
.to_shard_group(self.num_preshards, n)
})
.chain(iter::once(
msg.transaction
.id()
.to_substate_address()
.to_shard_group(self.num_preshards, n),
))
.filter(|sg| exclude_shard_group.as_ref() != Some(sg) && sg != &local_shard_group)
.collect::<HashSet<_>>();

Expand All @@ -108,16 +122,6 @@ impl MempoolGossip<PeerAddress> {

Ok(())
}

pub async fn gossip_to_foreign_replicas<T: Into<DanMessage>>(
&mut self,
epoch: Epoch,
addresses: HashSet<SubstateAddress>,
msg: T,
) -> Result<(), MempoolError> {
self.forward_to_foreign_replicas(epoch, addresses, msg, None).await?;
Ok(())
}
}

fn shard_group_to_topic(shard_group: ShardGroup) -> String {
Expand Down
Loading

0 comments on commit f7038d3

Please sign in to comment.