Skip to content

Commit

Permalink
feat: count votes as newview (#678)
Browse files Browse the repository at this point in the history
Description
---

Motivation and Context
---

How Has This Been Tested?
---

What process can a PR reviewer use to test or verify this change?
---


Breaking Changes
---

- [x] None
- [ ] Requires data directory to be deleted
- [ ] Other - Please specify

---------

Co-authored-by: Stan Bondi <[email protected]>
  • Loading branch information
Cifko and sdbondi authored Oct 13, 2023
1 parent 4d97fff commit 30d4a87
Show file tree
Hide file tree
Showing 44 changed files with 684 additions and 340 deletions.
3 changes: 0 additions & 3 deletions applications/tari_validator_node/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ pub async fn spawn(
let state_manager = TariStateManager::new();
let (tx_hotstuff_events, _) = broadcast::channel(100);

let epoch_events = epoch_manager.subscribe().await.unwrap();

let hotstuff_worker = HotstuffWorker::<TariConsensusSpec>::new(
validator_addr,
rx_new_transactions,
Expand All @@ -89,7 +87,6 @@ pub async fn spawn(
let (tx_current_state, rx_current_state) = watch::channel(Default::default());
let context = ConsensusWorkerContext {
epoch_manager: epoch_manager.clone(),
epoch_events,
hotstuff: hotstuff_worker,
state_sync: CommsRpcStateSyncManager::new(epoch_manager, store, client_factory),
tx_current_state,
Expand Down
1 change: 1 addition & 0 deletions applications/tari_validator_node/src/consensus/spec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::consensus::{
state_manager::TariStateManager,
};

#[derive(Clone)]
pub struct TariConsensusSpec;

impl ConsensusSpec for TariConsensusSpec {
Expand Down
7 changes: 4 additions & 3 deletions applications/tari_validator_node/src/p2p/rpc/sync_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::collections::HashSet;
use log::*;
use tari_comms::protocol::rpc::RpcStatus;
use tari_dan_storage::{
consensus_models::{Block, BlockId, LockedBlock, QuorumCertificate, SubstateUpdate, TransactionRecord},
consensus_models::{Block, BlockId, LeafBlock, QuorumCertificate, SubstateUpdate, TransactionRecord},
StateStore,
StateStoreReadTransaction,
StorageError,
Expand Down Expand Up @@ -143,8 +143,9 @@ impl<TStateStore: StateStore> BlockSyncTask<TStateStore> {
current_block_id: &BlockId,
) -> Result<(), StorageError> {
self.store.with_read_tx(|tx| {
let locked_block = LockedBlock::get(tx)?;
let blocks = Block::get_all_blocks_between(tx, current_block_id, locked_block.block_id())?;
// TODO: if there are any transactions this will break the syncing node.
let leaf_block = LeafBlock::get(tx)?;
let blocks = Block::get_all_blocks_between(tx, current_block_id, leaf_block.block_id())?;
for block in blocks {
debug!(
target: LOG_TARGET,
Expand Down
2 changes: 2 additions & 0 deletions dan_layer/consensus/src/hotstuff/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub enum HotStuffError {
InternalChannelClosed { context: &'static str },
#[error("Epoch {epoch} is not active. {details}")]
EpochNotActive { epoch: Epoch, details: String },
#[error("Not registered for current epoch {epoch}")]
NotRegisteredForCurrentEpoch { epoch: Epoch },
#[error("Received message from non-committee member. Epoch: {epoch}, Sender: {sender}, {context}")]
ReceivedMessageFromNonCommitteeMember {
epoch: Epoch,
Expand Down
1 change: 1 addition & 0 deletions dan_layer/consensus/src/hotstuff/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ mod on_sync_request;
mod pacemaker;
mod pacemaker_handle;
mod state_machine;
mod vote_receiver;
mod worker;

pub use error::*;
Expand Down
34 changes: 25 additions & 9 deletions dan_layer/consensus/src/hotstuff/on_next_sync_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
// SPDX-License-Identifier: BSD-3-Clause

use log::*;
use tari_dan_common_types::{Epoch, NodeHeight};
use tari_dan_storage::{consensus_models::HighQc, StateStore};
use tari_dan_common_types::{optional::Optional, NodeHeight};
use tari_dan_storage::{
consensus_models::{HighQc, LastSentVote},
StateStore,
};
use tari_epoch_manager::EpochManagerReader;
use tokio::sync::mpsc;

use crate::{
hotstuff::HotStuffError,
messages::{HotstuffMessage, NewViewMessage},
messages::{HotstuffMessage, NewViewMessage, VoteMessage},
traits::{ConsensusSpec, LeaderStrategy},
};

Expand Down Expand Up @@ -37,15 +40,27 @@ impl<TConsensusSpec: ConsensusSpec> OnNextSyncViewHandler<TConsensusSpec> {
}
}

pub async fn handle(&mut self, epoch: Epoch, new_height: NodeHeight) -> Result<(), HotStuffError> {
info!(target: LOG_TARGET, "⚠️ Leader failure: NEXTSYNCVIEW for epoch {} and node height {}", epoch, new_height);
let local_committee = self.epoch_manager.get_local_committee(epoch).await?;
pub async fn handle(&mut self, new_height: NodeHeight) -> Result<(), HotStuffError> {
let current_epoch = self.epoch_manager.current_epoch().await?;
info!(target: LOG_TARGET, "⚠️ Leader failure: NEXTSYNCVIEW for epoch {} and node height {}", current_epoch, new_height);
// Is the VN registered?
if !self.epoch_manager.is_epoch_active(current_epoch).await? {
info!(
target: LOG_TARGET,
"[on_leader_timeout] Validator is not active within this epoch"
);
return Ok(());
}

let high_qc = self
.store
.with_read_tx(|tx| HighQc::get(tx)?.get_quorum_certificate(tx))?;
let (high_qc, last_sent_vote) = self.store.with_read_tx(|tx| {
let high_qc = HighQc::get(tx)?.get_quorum_certificate(tx)?;
let last_sent_vote = LastSentVote::get(tx)
.optional()?
.filter(|vote| high_qc.block_height() < vote.block_height);
Ok::<_, HotStuffError>((high_qc, last_sent_vote))
})?;

let local_committee = self.epoch_manager.get_local_committee(current_epoch).await?;
let next_leader = self
.leader_strategy
.get_leader_for_next_block(&local_committee, new_height);
Expand All @@ -55,6 +70,7 @@ impl<TConsensusSpec: ConsensusSpec> OnNextSyncViewHandler<TConsensusSpec> {
high_qc,
new_height,
epoch: current_epoch,
last_vote: last_sent_vote.map(VoteMessage::from),
};

self.tx_leader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use tari_dan_storage::{
ExecutedTransaction,
HighQc,
LastExecuted,
LastSentVote,
LastVoted,
LockedBlock,
LockedOutput,
Expand Down Expand Up @@ -131,7 +132,7 @@ where TConsensusSpec: ConsensusSpec

let vote = self.generate_vote_message(valid_block.block(), decision).await?;
self.send_vote_to_leader(&local_committee, vote, valid_block.block())
.await;
.await?;
} else {
info!(
target: LOG_TARGET,
Expand Down Expand Up @@ -302,7 +303,7 @@ where TConsensusSpec: ConsensusSpec
local_committee: &Committee<TConsensusSpec::Addr>,
vote: VoteMessage<TConsensusSpec::Addr>,
block: &Block<TConsensusSpec::Addr>,
) {
) -> Result<(), HotStuffError> {
let leader = self
.leader_strategy
.get_leader_for_next_block(local_committee, block.height());
Expand All @@ -316,7 +317,7 @@ where TConsensusSpec: ConsensusSpec
);
if self
.tx_leader
.send((leader.clone(), HotstuffMessage::Vote(vote)))
.send((leader.clone(), HotstuffMessage::Vote(vote.clone())))
.await
.is_err()
{
Expand All @@ -325,6 +326,18 @@ where TConsensusSpec: ConsensusSpec
"tx_leader in OnLocalProposalReady::send_vote_to_leader is closed",
);
}
self.store.with_write_tx(|tx| {
let last_sent_vote = LastSentVote {
epoch: vote.epoch,
block_id: vote.block_id,
block_height: vote.block_height,
decision: vote.decision,
signature: vote.signature,
merkle_proof: vote.merkle_proof,
};
last_sent_vote.set(tx)
})?;
Ok(())
}

#[allow(clippy::too_many_lines)]
Expand Down
2 changes: 2 additions & 0 deletions dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveProposalHandler<TConsensusSpec> {
.await?;

self.on_ready_to_vote_on_local_block.handle(valid_block).await?;

self.pacemaker.beat();
}

Ok(())
Expand Down
13 changes: 13 additions & 0 deletions dan_layer/consensus/src/hotstuff/on_receive_new_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tari_dan_storage::{
};
use tari_epoch_manager::EpochManagerReader;

use super::vote_receiver::VoteReceiver;
use crate::{
hotstuff::{common::calculate_dummy_blocks, error::HotStuffError, pacemaker_handle::PaceMakerHandle},
messages::NewViewMessage,
Expand All @@ -28,6 +29,7 @@ pub struct OnReceiveNewViewHandler<TConsensusSpec: ConsensusSpec> {
epoch_manager: TConsensusSpec::EpochManager,
newview_message_counts: HashMap<(NodeHeight, BlockId), HashSet<TConsensusSpec::Addr>>,
pacemaker: PaceMakerHandle,
vote_receiver: VoteReceiver<TConsensusSpec>,
}

impl<TConsensusSpec> OnReceiveNewViewHandler<TConsensusSpec>
Expand All @@ -38,13 +40,15 @@ where TConsensusSpec: ConsensusSpec
leader_strategy: TConsensusSpec::LeaderStrategy,
epoch_manager: TConsensusSpec::EpochManager,
pacemaker: PaceMakerHandle,
vote_receiver: VoteReceiver<TConsensusSpec>,
) -> Self {
Self {
store,
leader_strategy,
epoch_manager,
newview_message_counts: HashMap::default(),
pacemaker,
vote_receiver,
}
}

Expand Down Expand Up @@ -76,6 +80,7 @@ where TConsensusSpec: ConsensusSpec
high_qc,
new_height,
epoch,
last_vote,
} = message;
debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -139,6 +144,14 @@ where TConsensusSpec: ConsensusSpec
});
}

if let Some(vote) = last_vote {
debug!(
target: LOG_TARGET,
"🔥 Receive VOTE with NEWVIEW for node {} from {}", vote.block_id, from,
);
self.vote_receiver.handle(vote, false).await?;
}

// Are nodes requesting to create more than the minimum number of dummy blocks?
if high_qc.block_height().saturating_sub(new_height).as_u64() > local_committee.len() as u64 {
return Err(HotStuffError::BadNewViewMessage {
Expand Down
Loading

0 comments on commit 30d4a87

Please sign in to comment.