Skip to content

Commit

Permalink
fix(consensus): reduce output-only messaging,fix jmt error (#1155)
Browse files Browse the repository at this point in the history
Description
---
Output only committees do not send LocalPrepare
Always use parent block ID to fetch JMT pending state.
Simplify evidence update apply to transaction pool record
Improve performance of missing transaction processing
Fix leader failure
Reduce/simplify prepare phase evidence
Avoid block propose delay when changing epochs

Motivation and Context
---
Output-only shard groups do not need to send prepare-phase cross-shard
exchanges as they don't involve any inputs. This PR removes this
pledging for output only nodes.

The previous pending JMT diffs query used the justify block_id. However,
if the justify != parent, the JMT should include pending state for the
parent since state changes will still be committed.

Fixes #1154 

How Has This Been Tested?
---
Manually tariswap test and existing tests

What process can a PR reviewer use to test or verify this change?
---
Bug fixes 

Breaking Changes
---

- [x] None
- [ ] Requires data directory to be deleted
- [ ] Other - Please specify
  • Loading branch information
sdbondi authored Sep 24, 2024
1 parent 70fcc69 commit 49c82f7
Show file tree
Hide file tree
Showing 65 changed files with 1,173 additions and 692 deletions.
2 changes: 1 addition & 1 deletion applications/tari_indexer/src/dry_run/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ where TSubstateCache: SubstateCache + 'static
for (epoch, public_key) in claim_instructions {
let vn = self
.epoch_manager
.get_validator_node_by_public_key(epoch, &public_key)
.get_validator_node_by_public_key(epoch, public_key.clone())
.await?;
let address = VirtualSubstateId::UnclaimedValidatorFee {
epoch: epoch.as_u64(),
Expand Down
42 changes: 42 additions & 0 deletions applications/tari_swarm_daemon/src/webserver/rpc/instances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,27 @@ use serde::{Deserialize, Serialize};

use crate::{config::InstanceType, process_manager::InstanceId, webserver::context::HandlerContext};

#[derive(Debug, Clone, Deserialize)]
pub struct StartAllRequest {
instance_type: Option<InstanceType>,
}

#[derive(Debug, Clone, Serialize)]
pub struct StartAllResponse {
pub num_instances: u32,
}

pub async fn start_all(context: &HandlerContext, req: StartAllRequest) -> Result<StartAllResponse, anyhow::Error> {
let instances = context.process_manager().list_instances(req.instance_type).await?;

let num_instances = instances.len() as u32;
for instance in instances {
context.process_manager().start_instance(instance.id).await?;
}

Ok(StartAllResponse { num_instances })
}

pub type StartInstanceRequest = String;

#[derive(Debug, Clone, Serialize)]
Expand Down Expand Up @@ -65,6 +86,27 @@ pub async fn stop(context: &HandlerContext, req: StopInstanceRequest) -> Result<
Ok(StopInstanceResponse { success: true })
}

#[derive(Debug, Clone, Deserialize)]
pub struct StopAllRequest {
instance_type: Option<InstanceType>,
}

#[derive(Debug, Clone, Serialize)]
pub struct StopAllResponse {
pub num_instances: u32,
}

pub async fn stop_all(context: &HandlerContext, req: StopAllRequest) -> Result<StopAllResponse, anyhow::Error> {
let instances = context.process_manager().list_instances(req.instance_type).await?;

let num_instances = instances.len() as u32;
for instance in instances {
context.process_manager().stop_instance(instance.id).await?;
}

Ok(StopAllResponse { num_instances })
}

#[derive(Debug, Clone, Deserialize)]
pub struct ListInstancesRequest {
pub by_type: Option<InstanceType>,
Expand Down
2 changes: 2 additions & 0 deletions applications/tari_swarm_daemon/src/webserver/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ async fn json_rpc_handler(Extension(context): Extension<Arc<HandlerContext>>, va
"add_indexer" => call_handler(context, value, rpc::indexers::create).await,
"add_validator_node" => call_handler(context, value, rpc::validator_nodes::create).await,
"start" => call_handler(context, value, rpc::instances::start).await,
"start_all" => call_handler(context, value, rpc::instances::start_all).await,
"stop" => call_handler(context, value, rpc::instances::stop).await,
"stop_all" => call_handler(context, value, rpc::instances::stop_all).await,
"list_instances" => call_handler(context, value, rpc::instances::list).await,
"delete_data" => call_handler(context, value, rpc::instances::delete_data).await,
"burn_funds" => call_handler(context, value, rpc::minotari_wallets::burn_funds).await,
Expand Down
13 changes: 12 additions & 1 deletion applications/tari_swarm_daemon/webui/src/routes/Main.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ function ExtraInfoVN({ name, url, addTxToPool, autoRefresh, state, horizontal }:
}
return (<>
<hr />
<h3>Pool transaction</h3>
<h3>Pool transactions {pool.length}</h3>
<table style={{
width: "100%",
}}>
Expand Down Expand Up @@ -493,8 +493,19 @@ export default function Main() {
console.log("resp", resp);
});
};

const stopAll = () => {
jsonRpc("stop_all", { instance_type: "TariValidatorNode" }).then(getInfo);
};

const startAll = () => {
jsonRpc("start_all", { instance_type: "TariValidatorNode" }).then(getInfo);
};

return (
<div className="main">
<button onClick={() => stopAll()}>Stop all VNs</button>
<button onClick={() => startAll()}>Start all VNs</button>
<button onClick={() => setShowLogs(!showLogs)}>{showLogs && "Hide" || "Show"} logs</button>
<button onClick={() => setAutoRefresh(!autoRefresh)}>{autoRefresh && "Disable" || "Enable"} autorefresh
</button>
Expand Down
3 changes: 3 additions & 0 deletions applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ pub async fn spawn_services(
state_store.clone(),
mempool.clone(),
virtual_substate_manager,
consensus_handle.clone(),
)
.await?;
// Save final node identity after comms has initialized. This is required because the public_address can be
Expand Down Expand Up @@ -434,6 +435,7 @@ async fn spawn_p2p_rpc(
shard_store_store: SqliteStateStore<PeerAddress>,
mempool: MempoolHandle,
virtual_substate_manager: VirtualSubstateManager<SqliteStateStore<PeerAddress>, EpochManagerHandle<PeerAddress>>,
consensus: ConsensusHandle,
) -> anyhow::Result<()> {
let rpc_server = RpcServer::builder()
.with_maximum_simultaneous_sessions(config.validator_node.rpc.max_simultaneous_sessions)
Expand All @@ -444,6 +446,7 @@ async fn spawn_p2p_rpc(
shard_store_store,
mempool,
virtual_substate_manager,
consensus,
));

let (notify_tx, notify_rx) = mpsc::unbounded_channel();
Expand Down
5 changes: 5 additions & 0 deletions applications/tari_validator_node/src/consensus/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: BSD-3-Clause

use tari_consensus::hotstuff::{ConsensusCurrentState, CurrentView, HotstuffEvent};
use tari_dan_common_types::Epoch;
use tari_transaction::Transaction;
use tokio::sync::{broadcast, mpsc, watch};

Expand Down Expand Up @@ -30,6 +31,10 @@ impl ConsensusHandle {
}
}

pub fn current_epoch(&self) -> Epoch {
self.current_view.get_epoch()
}

pub async fn notify_new_transaction(
&self,
transaction: Transaction,
Expand Down
8 changes: 7 additions & 1 deletion applications/tari_validator_node/src/p2p/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,24 @@ use tari_epoch_manager::base_layer::EpochManagerHandle;
use tari_state_store_sqlite::SqliteStateStore;
use tari_validator_node_rpc::rpc_service::ValidatorNodeRpcServer;

use crate::{p2p::services::mempool::MempoolHandle, virtual_substate::VirtualSubstateManager};
use crate::{
consensus::ConsensusHandle,
p2p::services::mempool::MempoolHandle,
virtual_substate::VirtualSubstateManager,
};

pub fn create_tari_validator_node_rpc_service(
epoch_manager: EpochManagerHandle<PeerAddress>,
shard_store_store: SqliteStateStore<PeerAddress>,
mempool: MempoolHandle,
virtual_substate_manager: VirtualSubstateManager<SqliteStateStore<PeerAddress>, EpochManagerHandle<PeerAddress>>,
consensus: ConsensusHandle,
) -> ValidatorNodeRpcServer<ValidatorNodeRpcServiceImpl> {
ValidatorNodeRpcServer::new(ValidatorNodeRpcServiceImpl::new(
epoch_manager,
shard_store_store,
mempool,
virtual_substate_manager,
consensus,
))
}
10 changes: 5 additions & 5 deletions applications/tari_validator_node/src/p2p/rpc/service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use tari_validator_node_rpc::rpc_service::ValidatorNodeRpcService;
use tokio::{sync::mpsc, task};

use crate::{
consensus::ConsensusHandle,
p2p::{
rpc::{block_sync_task::BlockSyncTask, state_sync_task::StateSyncTask},
services::mempool::MempoolHandle,
Expand All @@ -80,6 +81,7 @@ pub struct ValidatorNodeRpcServiceImpl {
shard_state_store: SqliteStateStore<PeerAddress>,
mempool: MempoolHandle,
virtual_substate_manager: VirtualSubstateManager<SqliteStateStore<PeerAddress>, EpochManagerHandle<PeerAddress>>,
consensus: ConsensusHandle,
}

impl ValidatorNodeRpcServiceImpl {
Expand All @@ -91,12 +93,14 @@ impl ValidatorNodeRpcServiceImpl {
SqliteStateStore<PeerAddress>,
EpochManagerHandle<PeerAddress>,
>,
consensus: ConsensusHandle,
) -> Self {
Self {
epoch_manager,
shard_state_store,
mempool,
virtual_substate_manager,
consensus,
}
}
}
Expand Down Expand Up @@ -340,11 +344,7 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl {
request: Request<GetCheckpointRequest>,
) -> Result<Response<GetCheckpointResponse>, RpcStatus> {
let msg = request.into_message();
let current_epoch = self
.epoch_manager
.current_epoch()
.await
.map_err(RpcStatus::log_internal_error(LOG_TARGET))?;
let current_epoch = self.consensus.current_epoch();
if msg.current_epoch != current_epoch {
// This may occur if one of the nodes has not fully scanned the base layer
return Err(RpcStatus::bad_request(format!(
Expand Down
12 changes: 0 additions & 12 deletions dan_layer/common_types/src/committee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,18 +259,6 @@ impl CommitteeInfo {
.into_iter()
.filter(|substate_address| self.includes_substate_address(substate_address.borrow()))
}

/// Calculates the number of distinct shard groups for the given addresses
pub fn count_distinct_shard_groups<B: Borrow<SubstateAddress>, I: IntoIterator<Item = B>>(
&self,
addresses: I,
) -> usize {
addresses
.into_iter()
.map(|addr| addr.borrow().to_shard_group(self.num_shards, self.num_committees))
.collect::<std::collections::HashSet<_>>()
.len()
}
}

#[derive(Debug, Clone, Serialize)]
Expand Down
16 changes: 5 additions & 11 deletions dan_layer/common_types/src/versioned_substate_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ impl SubstateRequirement {
.map(|v| SubstateAddress::from_substate_id(self.substate_id(), v))
}

pub fn to_substate_address_zero_version(&self) -> SubstateAddress {
SubstateAddress::from_substate_id(self.substate_id(), 0)
}

/// Calculates and returns the shard number that this SubstateAddress belongs.
/// A shard is a fixed division of the 256-bit shard space.
/// If the substate version is not known, None is returned.
Expand Down Expand Up @@ -118,7 +122,7 @@ impl Display for SubstateRequirement {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self.version {
Some(v) => write!(f, "{}:{}", self.substate_id, v),
None => write!(f, "{}", self.substate_id),
None => write!(f, "{}:?", self.substate_id),
}
}
}
Expand Down Expand Up @@ -180,16 +184,6 @@ impl VersionedSubstateId {
self.version
}

/// Calculates and returns the shard number that this SubstateAddress belongs.
/// A shard is an equal division of the 256-bit shard space.
pub fn to_shard(&self, num_shards: NumPreshards) -> Shard {
self.to_substate_address().to_shard(num_shards)
}

pub fn to_shard_group(&self, num_shards: NumPreshards, num_committees: u32) -> ShardGroup {
self.to_substate_address().to_shard_group(num_shards, num_committees)
}

pub fn to_previous_version(&self) -> Option<Self> {
self.version
.checked_sub(1)
Expand Down
7 changes: 4 additions & 3 deletions dan_layer/consensus/src/block_validations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub async fn check_proposal<TConsensusSpec: ConsensusSpec>(
check_sidechain_id(block, config)?;
check_hash_and_height(block)?;
let committee_for_block = epoch_manager
.get_committee_by_validator_public_key(block.epoch(), block.proposed_by())
.get_committee_by_validator_public_key(block.epoch(), block.proposed_by().clone())
.await?;
check_proposed_by_leader(leader_strategy, &committee_for_block, block)?;
check_signature(block)?;
Expand Down Expand Up @@ -181,7 +181,7 @@ pub async fn check_quorum_certificate<TConsensusSpec: ConsensusSpec>(
let mut vns = vec![];
for signature in qc.signatures() {
let vn = epoch_manager
.get_validator_node_by_public_key(qc.epoch(), signature.public_key())
.get_validator_node_by_public_key(qc.epoch(), signature.public_key().clone())
.await?;
let committee_info = epoch_manager
.get_committee_info_for_substate(qc.epoch(), vn.shard_key)
Expand Down Expand Up @@ -209,7 +209,8 @@ pub async fn check_quorum_certificate<TConsensusSpec: ConsensusSpec>(
qc.signatures()
.first()
.ok_or::<HotStuffError>(ProposalValidationError::QuorumWasNotReached { qc: qc.clone() }.into())?
.public_key(),
.public_key()
.clone(),
)
.await?;

Expand Down
Loading

0 comments on commit 49c82f7

Please sign in to comment.