Skip to content

Commit

Permalink
fix(consensus): improve block fullness when tx rate is high (#916)
Browse files Browse the repository at this point in the history
Description
---
Remove calls to `beat` when receiving a local proposal
Only call `beat` on new transactions when there no transactions pending
execution
Reduced some info log noisiness

Motivation and Context
---
`beat` was called when receiving a local proposal, however this is not
necessary unless you are the leader. `beat` is called when receiving
votes (including from self) as the next leader so this PR essentially
only calls beat on reciept of a proposal if it is the next leader.

If there is only a single transaction, we avoid waiting for the block
time by allowing the mempool to send how many transactions are awaiting
execution allowing consensus to process the single transaction
immediately, or wait for multiple transactions to finish executing
before proposing.

How Has This Been Tested?
---
Manually, stess test and single transactions.

250 stress test with 2 shards:

![image](https://github.com/tari-project/tari-dan/assets/1057902/b614aedc-4633-4463-bd09-f3638f6da80d)

What process can a PR reviewer use to test or verify this change?
---
Stress test and check how many commands are proposed in a block

Breaking Changes
---

- [x] None
- [ ] Requires data directory to be deleted
- [ ] Other - Please specify
  • Loading branch information
sdbondi authored Jan 25, 2024
1 parent 7448b4f commit dc84898
Show file tree
Hide file tree
Showing 11 changed files with 42 additions and 33 deletions.
2 changes: 1 addition & 1 deletion applications/tari_validator_node/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub async fn spawn(
store: SqliteStateStore<PeerAddress>,
keypair: RistrettoKeypair,
epoch_manager: EpochManagerHandle<PeerAddress>,
rx_new_transactions: mpsc::Receiver<TransactionId>,
rx_new_transactions: mpsc::Receiver<(TransactionId, usize)>,
inbound_messaging: ConsensusInboundMessaging<SqliteMessageLogger>,
outbound_messaging: ConsensusOutboundMessaging<SqliteMessageLogger>,
client_factory: TariValidatorNodeRpcClientFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::{

pub fn spawn<TExecutor, TValidator, TExecutedValidator, TSubstateResolver>(
gossip: Gossip,
tx_executed_transactions: mpsc::Sender<TransactionId>,
tx_executed_transactions: mpsc::Sender<(TransactionId, usize)>,
epoch_manager: EpochManagerHandle<PeerAddress>,
transaction_executor: TExecutor,
substate_resolver: TSubstateResolver,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub struct MempoolService<TValidator, TExecutedValidator, TExecutor, TSubstateRe
transactions: HashSet<TransactionId>,
pending_executions: FuturesUnordered<BoxFuture<'static, MempoolTransactionExecution>>,
mempool_requests: mpsc::Receiver<MempoolRequest>,
tx_executed_transactions: mpsc::Sender<TransactionId>,
tx_executed_transactions: mpsc::Sender<(TransactionId, usize)>,
epoch_manager: EpochManagerHandle<PeerAddress>,
before_execute_validator: TValidator,
after_execute_validator: TExecutedValidator,
Expand All @@ -90,7 +90,7 @@ where
pub(super) fn new(
mempool_requests: mpsc::Receiver<MempoolRequest>,
gossip: Gossip,
tx_executed_transactions: mpsc::Sender<TransactionId>,
tx_executed_transactions: mpsc::Sender<(TransactionId, usize)>,
epoch_manager: EpochManagerHandle<PeerAddress>,
transaction_executor: TExecutor,
substate_resolver: TSubstateResolver,
Expand Down Expand Up @@ -624,7 +624,13 @@ where
}

// Notify consensus that a transaction is ready to go!
if is_consensus_running && self.tx_executed_transactions.send(*executed.id()).await.is_err() {
let pending_exec_size = self.pending_executions.len();
if is_consensus_running &&
self.tx_executed_transactions
.send((*executed.id(), pending_exec_size))
.await
.is_err()
{
debug!(
target: LOG_TARGET,
"Executed transaction channel closed before executed transaction could be sent"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,21 @@ where TStateStore: StateStore + Send + Sync

async fn validate(&self, executed: &ExecutedTransaction) -> Result<(), Self::Error> {
if executed.resulting_outputs().is_empty() {
info!(target: LOG_TARGET, "OutputsDontExistLocally - OK");
debug!(target: LOG_TARGET, "OutputsDontExistLocally - OK");
return Ok(());
}

if self
.store
.with_read_tx(|tx| SubstateRecord::any_exist(tx, executed.resulting_outputs()))?
{
info!(target: LOG_TARGET, "OutputsDontExistLocally - FAIL");
warn!(target: LOG_TARGET, "OutputsDontExistLocally - FAIL");
return Err(MempoolError::OutputSubstateExists {
transaction_id: *executed.id(),
});
}

info!(target: LOG_TARGET, "OutputsDontExistLocally - OK");
debug!(target: LOG_TARGET, "OutputsDontExistLocally - OK");
Ok(())
}
}
6 changes: 1 addition & 5 deletions dan_layer/consensus/src/hotstuff/on_inbound_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use tokio::{sync::mpsc, time};

use crate::{
block_validations::{check_hash_and_height, check_proposed_by_leader, check_quorum_certificate, check_signature},
hotstuff::{error::HotStuffError, pacemaker_handle::PaceMakerHandle},
hotstuff::error::HotStuffError,
messages::{HotstuffMessage, ProposalMessage, RequestMissingTransactionsMessage},
traits::{ConsensusSpec, OutboundMessaging},
};
Expand All @@ -32,7 +32,6 @@ pub struct OnInboundMessage<TConsensusSpec: ConsensusSpec> {
store: TConsensusSpec::StateStore,
epoch_manager: TConsensusSpec::EpochManager,
leader_strategy: TConsensusSpec::LeaderStrategy,
pacemaker: PaceMakerHandle,
vote_signing_service: TConsensusSpec::SignatureService,
outbound_messaging: TConsensusSpec::OutboundMessaging,
tx_msg_ready: mpsc::UnboundedSender<(TConsensusSpec::Addr, HotstuffMessage)>,
Expand All @@ -46,7 +45,6 @@ where TConsensusSpec: ConsensusSpec
store: TConsensusSpec::StateStore,
epoch_manager: TConsensusSpec::EpochManager,
leader_strategy: TConsensusSpec::LeaderStrategy,
pacemaker: PaceMakerHandle,
vote_signing_service: TConsensusSpec::SignatureService,
outbound_messaging: TConsensusSpec::OutboundMessaging,
) -> Self {
Expand All @@ -55,7 +53,6 @@ where TConsensusSpec: ConsensusSpec
store,
epoch_manager,
leader_strategy,
pacemaker,
vote_signing_service,
outbound_messaging,
tx_msg_ready,
Expand Down Expand Up @@ -176,7 +173,6 @@ where TConsensusSpec: ConsensusSpec
HotstuffMessage::Proposal(ProposalMessage { block: unparked_block }),
)?;
}
self.pacemaker.beat();
Ok(())
}

Expand Down
2 changes: 0 additions & 2 deletions dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,6 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveLocalProposalHandler<TConsensusSpec
.await?;

self.on_ready_to_vote_on_local_block.handle(valid_block).await?;

self.pacemaker.beat();
}

Ok(())
Expand Down
16 changes: 7 additions & 9 deletions dan_layer/consensus/src/hotstuff/pacemaker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ use crate::hotstuff::{

const LOG_TARGET: &str = "tari::dan::consensus::hotstuff::pacemaker";
const MAX_DELTA: Duration = Duration::from_secs(300);
const BLOCK_TIME: Duration = Duration::from_secs(10);

pub struct PaceMaker {
pace_maker_handle: PaceMakerHandle,
handle_receiver: mpsc::Receiver<PacemakerRequest>,
block_time: Duration,
current_height: CurrentHeight,
current_high_qc_height: NodeHeight,
}
Expand All @@ -47,8 +47,6 @@ impl PaceMaker {
on_leader_timeout,
current_height.clone(),
),
// TODO: make network constant. We're starting slow with 10s but should be 1s in the future
block_time: Duration::from_secs(10),
current_height,
current_high_qc_height: NodeHeight(0),
}
Expand Down Expand Up @@ -101,7 +99,7 @@ impl PaceMaker {
info!(target: LOG_TARGET, "Reset! Current height: {}, Delta: {:.2?}", self.current_height, delta);
leader_timeout.as_mut().reset(tokio::time::Instant::now() + delta);
// set a timer for when we must send a block...
block_timer.as_mut().reset(tokio::time::Instant::now() + self.block_time);
block_timer.as_mut().reset(tokio::time::Instant::now() + BLOCK_TIME);
},
PacemakerRequest::Start { high_qc_height } => {
info!(target: LOG_TARGET, "🚀 Starting pacemaker at leaf height {} and high QC: {}", self.current_height, high_qc_height);
Expand All @@ -112,7 +110,7 @@ impl PaceMaker {
let delta = self.delta_time();
info!(target: LOG_TARGET, "Reset! Current height: {}, Delta: {:.2?}", self.current_height, delta);
leader_timeout.as_mut().reset(tokio::time::Instant::now() + delta);
block_timer.as_mut().reset(tokio::time::Instant::now() + self.block_time);
block_timer.as_mut().reset(tokio::time::Instant::now() + BLOCK_TIME);
on_beat.beat();
started = true;
}
Expand All @@ -130,11 +128,11 @@ impl PaceMaker {
}
},
() = &mut block_timer => {
block_timer.as_mut().reset(tokio::time::Instant::now() + self.block_time);
block_timer.as_mut().reset(tokio::time::Instant::now() + BLOCK_TIME);
on_force_beat.beat(None);
}
() = &mut leader_timeout => {
block_timer.as_mut().reset(tokio::time::Instant::now() + self.block_time);
block_timer.as_mut().reset(tokio::time::Instant::now() + BLOCK_TIME);

let delta = self.delta_time();
leader_timeout.as_mut().reset(tokio::time::Instant::now() + delta);
Expand All @@ -156,7 +154,7 @@ impl PaceMaker {
let current_height = self.current_height.get();
if current_height.is_zero() {
// Allow extra time for the first block
return self.block_time * 2;
return BLOCK_TIME * 2;
}
let exp = u32::try_from(cmp::min(
u64::from(u32::MAX),
Expand All @@ -169,7 +167,7 @@ impl PaceMaker {
);
// TODO: get real avg latency
let avg_latency = Duration::from_secs(2);
self.block_time + delta + avg_latency
BLOCK_TIME + delta + avg_latency
}
}

Expand Down
11 changes: 7 additions & 4 deletions dan_layer/consensus/src/hotstuff/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct HotstuffWorker<TConsensusSpec: ConsensusSpec> {
tx_events: broadcast::Sender<HotstuffEvent>,
outbound_messaging: TConsensusSpec::OutboundMessaging,
inbound_messaging: TConsensusSpec::InboundMessaging,
rx_new_transactions: mpsc::Receiver<TransactionId>,
rx_new_transactions: mpsc::Receiver<(TransactionId, usize)>,

on_inbound_message: OnInboundMessage<TConsensusSpec>,
on_next_sync_view: OnNextSyncViewHandler<TConsensusSpec>,
Expand Down Expand Up @@ -77,7 +77,7 @@ impl<TConsensusSpec: ConsensusSpec> HotstuffWorker<TConsensusSpec> {
validator_addr: TConsensusSpec::Addr,
inbound_messaging: TConsensusSpec::InboundMessaging,
outbound_messaging: TConsensusSpec::OutboundMessaging,
rx_new_transactions: mpsc::Receiver<TransactionId>,
rx_new_transactions: mpsc::Receiver<(TransactionId, usize)>,
state_store: TConsensusSpec::StateStore,
epoch_manager: TConsensusSpec::EpochManager,
leader_strategy: TConsensusSpec::LeaderStrategy,
Expand Down Expand Up @@ -109,7 +109,6 @@ impl<TConsensusSpec: ConsensusSpec> HotstuffWorker<TConsensusSpec> {
state_store.clone(),
epoch_manager.clone(),
leader_strategy.clone(),
pacemaker.clone_handle(),
signing_service.clone(),
outbound_messaging.clone(),
),
Expand Down Expand Up @@ -234,10 +233,14 @@ where TConsensusSpec: ConsensusSpec
}
},

Some(tx_id) = self.rx_new_transactions.recv() => {
Some((tx_id, pending)) = self.rx_new_transactions.recv() => {
if let Err(err) = self.on_inbound_message.update_parked_blocks(current_height, &tx_id).await {
error!(target: LOG_TARGET, "Error checking parked blocks: {}", err);
}
// Only propose now if there are no pending transactions
if pending == 0 {
self.pacemaker.beat();
}
},

Ok(event) = epoch_manager_events.recv() => {
Expand Down
14 changes: 11 additions & 3 deletions dan_layer/consensus_tests/src/support/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,15 @@ impl TestNetworkDestination {

pub struct TestNetworkWorker {
rx_new_transaction: Option<mpsc::Receiver<(TestNetworkDestination, ExecutedTransaction)>>,
tx_new_transactions: HashMap<TestAddress, (Shard, mpsc::Sender<TransactionId>, SqliteStateStore<TestAddress>)>,
#[allow(clippy::type_complexity)]
tx_new_transactions: HashMap<
TestAddress,
(
Shard,
mpsc::Sender<(TransactionId, usize)>,
SqliteStateStore<TestAddress>,
),
>,
tx_hs_message: HashMap<TestAddress, mpsc::Sender<(TestAddress, HotstuffMessage)>>,
#[allow(clippy::type_complexity)]
rx_broadcast: Option<HashMap<TestAddress, mpsc::Receiver<(Vec<TestAddress>, HotstuffMessage)>>>,
Expand Down Expand Up @@ -225,7 +233,7 @@ impl TestNetworkWorker {
})
.unwrap();
log::info!("🐞 New transaction {}", executed.id());
tx_new_transaction_to_consensus.send(*executed.id()).await.unwrap();
tx_new_transaction_to_consensus.send((*executed.id(), 0)).await.unwrap();
}
}
}
Expand Down Expand Up @@ -352,6 +360,6 @@ impl TestNetworkWorker {
})
.unwrap();

sender.send(*existing_executed_tx.id()).await.unwrap();
sender.send((*existing_executed_tx.id(), 0)).await.unwrap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct ValidatorChannels {
pub bucket: Shard,
pub state_store: SqliteStateStore<TestAddress>,

pub tx_new_transactions: mpsc::Sender<TransactionId>,
pub tx_new_transactions: mpsc::Sender<(TransactionId, usize)>,
pub tx_hs_message: mpsc::Sender<(TestAddress, HotstuffMessage)>,
pub rx_broadcast: mpsc::Receiver<(Vec<TestAddress>, HotstuffMessage)>,
pub rx_leader: mpsc::Receiver<(TestAddress, HotstuffMessage)>,
Expand Down
2 changes: 1 addition & 1 deletion networking/core/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ where
match event {
mdns::Event::Discovered(peers_and_addrs) => {
for (peer, addr) in peers_and_addrs {
info!(target: LOG_TARGET, "📡 mDNS discovered peer {} at {}", peer, addr);
debug!(target: LOG_TARGET, "📡 mDNS discovered peer {} at {}", peer, addr);
self.swarm
.dial(DialOpts::peer_id(peer).addresses(vec![addr]).build())
.or_else(|err| {
Expand Down

0 comments on commit dc84898

Please sign in to comment.