Skip to content

Commit

Permalink
send foreign proposals with local proposals
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Aug 23, 2024
1 parent 828f6c3 commit cba2ab0
Show file tree
Hide file tree
Showing 30 changed files with 856 additions and 590 deletions.
14 changes: 14 additions & 0 deletions dan_layer/consensus/src/hotstuff/block_change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use tari_dan_storage::{
consensus_models::{
Block,
BlockDiff,
BlockId,
BlockTransactionExecution,
ForeignProposal,
LeafBlock,
PendingShardStateTreeDiff,
QuorumCertificate,
Expand Down Expand Up @@ -52,6 +54,7 @@ pub struct ProposedBlockChangeSet {
state_tree_diffs: IndexMap<Shard, VersionedStateHashTreeDiff>,
substate_locks: IndexMap<SubstateId, Vec<SubstateLock>>,
transaction_changes: IndexMap<TransactionId, TransactionChangeSet>,
proposed_foreign_proposals: Vec<BlockId>,
}

impl ProposedBlockChangeSet {
Expand All @@ -63,6 +66,7 @@ impl ProposedBlockChangeSet {
substate_locks: IndexMap::new(),
transaction_changes: IndexMap::new(),
state_tree_diffs: IndexMap::new(),
proposed_foreign_proposals: Vec::new(),
}
}

Expand All @@ -72,6 +76,7 @@ impl ProposedBlockChangeSet {
self.transaction_changes = IndexMap::new();
self.state_tree_diffs = IndexMap::new();
self.substate_locks = IndexMap::new();
self.proposed_foreign_proposals = Vec::new();
self
}

Expand All @@ -95,6 +100,11 @@ impl ProposedBlockChangeSet {
self
}

pub fn set_foreign_proposal_proposed_in(&mut self, foreign_proposal_block_id: BlockId) -> &mut Self {
self.proposed_foreign_proposals.push(foreign_proposal_block_id);
self
}

// TODO: this is a hack to allow the update to be modified after the fact. This should be removed.
pub fn next_update_mut(&mut self, transaction_id: &TransactionId) -> Option<&mut TransactionPoolStatusUpdate> {
self.transaction_changes
Expand Down Expand Up @@ -213,6 +223,10 @@ impl ProposedBlockChangeSet {
}
}

for block_id in self.proposed_foreign_proposals {
ForeignProposal::set_proposed_in(tx, &block_id, &self.block.block_id)?;
}

Ok(())
}
}
Expand Down
57 changes: 29 additions & 28 deletions dan_layer/consensus/src/hotstuff/on_message_validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,35 +126,33 @@ impl<TConsensusSpec: ConsensusSpec> OnMessageValidate<TConsensusSpec> {
from: TConsensusSpec::Addr,
proposal: ProposalMessage,
) -> Result<MessageValidationResult<TConsensusSpec::Addr>, HotStuffError> {
let ProposalMessage { block } = proposal;

info!(
target: LOG_TARGET,
"📜 new unvalidated PROPOSAL message {} from {} (current height = {})",
block,
block.proposed_by(),
proposal.block,
proposal.block.proposed_by(),
current_height,
);

if block.height() < current_height {
if proposal.block.height() < current_height {
info!(
target: LOG_TARGET,
"🔥 Block {} is lower than current height {}. Ignoring.",
block,
proposal.block,
current_height
);
return Ok(MessageValidationResult::Discard);
}

if let Err(err) = self.check_proposal(&block).await {
if let Err(err) = self.check_proposal(&proposal.block).await {
return Ok(MessageValidationResult::Invalid {
from,
message: HotstuffMessage::Proposal(ProposalMessage { block }),
message: HotstuffMessage::Proposal(proposal),
err,
});
}

self.handle_missing_transactions_local_block(from, block).await
self.handle_missing_transactions_local_block(from, proposal).await
}

pub fn update_local_parked_blocks(
Expand All @@ -166,7 +164,7 @@ impl<TConsensusSpec: ConsensusSpec> OnMessageValidate<TConsensusSpec> {
.store
.with_write_tx(|tx| tx.missing_transactions_remove(current_height, transaction_id))?;

let Some(unparked_block) = maybe_unparked_block else {
let Some((unparked_block, foreign_proposals)) = maybe_unparked_block else {
return Ok(None);
};

Expand All @@ -176,7 +174,10 @@ impl<TConsensusSpec: ConsensusSpec> OnMessageValidate<TConsensusSpec> {
block: unparked_block.as_leaf_block(),
});

Ok(Some(ProposalMessage { block: unparked_block }))
Ok(Some(ProposalMessage {
block: unparked_block,
foreign_proposals,
}))
}

pub fn update_foreign_parked_blocks(
Expand Down Expand Up @@ -211,62 +212,62 @@ impl<TConsensusSpec: ConsensusSpec> OnMessageValidate<TConsensusSpec> {
async fn handle_missing_transactions_local_block(
&mut self,
from: TConsensusSpec::Addr,
block: Block,
proposal: ProposalMessage,
) -> Result<MessageValidationResult<TConsensusSpec::Addr>, HotStuffError> {
let missing_tx_ids = self
.store
.with_write_tx(|tx| self.check_for_missing_transactions(tx, &block))?;
.with_write_tx(|tx| self.check_for_missing_transactions(tx, &proposal))?;

if missing_tx_ids.is_empty() {
return Ok(MessageValidationResult::Ready {
from,
message: HotstuffMessage::Proposal(ProposalMessage { block }),
message: HotstuffMessage::Proposal(proposal),
});
}

let _ignore = self.tx_events.send(HotstuffEvent::ProposedBlockParked {
block: block.as_leaf_block(),
block: proposal.block.as_leaf_block(),
num_missing_txs: missing_tx_ids.len(),
// TODO: remove
num_awaiting_txs: 0,
});

Ok(MessageValidationResult::ParkedProposal {
block_id: *block.id(),
epoch: block.epoch(),
proposed_by: block.proposed_by().clone(),
block_id: *proposal.block.id(),
epoch: proposal.block.epoch(),
proposed_by: proposal.block.proposed_by().clone(),
missing_txs: missing_tx_ids,
})
}

fn check_for_missing_transactions(
&self,
tx: &mut <TConsensusSpec::StateStore as StateStore>::WriteTransaction<'_>,
block: &Block,
proposal: &ProposalMessage,
) -> Result<HashSet<TransactionId>, HotStuffError> {
if block.commands().is_empty() {
if proposal.block.commands().is_empty() {
debug!(
target: LOG_TARGET,
"✅ Block {} is empty (no missing transactions)", block
"✅ Block {} is empty (no missing transactions)", proposal.block
);
return Ok(HashSet::new());
}
let missing_tx_ids = TransactionRecord::get_missing(&**tx, block.all_transaction_ids())?;
let missing_tx_ids = TransactionRecord::get_missing(&**tx, proposal.block.all_transaction_ids())?;

if missing_tx_ids.is_empty() {
debug!(
target: LOG_TARGET,
"✅ Block {} has no missing transactions", block
"✅ Block {} has no missing transactions", proposal.block
);
return Ok(HashSet::new());
}

info!(
target: LOG_TARGET,
"⏳ Block {} has {} missing transactions", block, missing_tx_ids.len(),
"⏳ Block {} has {} missing transactions", proposal.block, missing_tx_ids.len(),
);

tx.missing_transactions_insert(block, &missing_tx_ids, &[])?;
tx.missing_transactions_insert(&proposal.block, &proposal.foreign_proposals, &missing_tx_ids)?;

Ok(missing_tx_ids)
}
Expand Down Expand Up @@ -353,9 +354,9 @@ impl<TConsensusSpec: ConsensusSpec> OnMessageValidate<TConsensusSpec> {
parked_block.add_missing_transactions(tx, &missing_tx_ids)?;

Ok(MessageValidationResult::ParkedProposal {
block_id: *parked_block.block.id(),
epoch: parked_block.block.epoch(),
proposed_by: parked_block.block.proposed_by().clone(),
block_id: *parked_block.block().id(),
epoch: parked_block.block().epoch(),
proposed_by: parked_block.block().proposed_by().clone(),
missing_txs: missing_tx_ids,
})
})
Expand Down
101 changes: 57 additions & 44 deletions dan_layer/consensus/src/hotstuff/on_propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use tari_dan_storage::{
HighQc,
LastProposed,
LeafBlock,
LockedBlock,
PendingShardStateTreeDiff,
QuorumCertificate,
SubstateChange,
Expand Down Expand Up @@ -67,6 +66,12 @@ use crate::{

const LOG_TARGET: &str = "tari::dan::consensus::hotstuff::on_local_propose";

type NextBlock = (
Block,
Vec<ForeignProposal>,
HashMap<TransactionId, TransactionExecution>,
);

pub struct OnPropose<TConsensusSpec: ConsensusSpec> {
config: HotstuffConfig,
store: TConsensusSpec::StateStore,
Expand Down Expand Up @@ -118,21 +123,25 @@ where TConsensusSpec: ConsensusSpec
// is_newview_propose means that a NEWVIEW has reached quorum and nodes are expecting us to propose.
// Re-broadcast the previous proposal
if is_newview_propose {
if let Some(next_block) = self.store.with_read_tx(|tx| last_proposed.get_block(tx)).optional()? {
info!(
target: LOG_TARGET,
"🌿 RE-BROADCASTING local block {}({}) to {} validators. {} command(s), justify: {} ({}), parent: {}",
next_block.id(),
next_block.height(),
local_committee.len(),
next_block.commands().len(),
next_block.justify().block_id(),
next_block.justify().block_height(),
next_block.parent(),
);
self.broadcast_local_proposal(next_block, local_committee).await?;
return Ok(());
}
warn!(
target: LOG_TARGET,
"⚠️ Newview propose {leaf_block} but we already proposed block {last_proposed}.",
);
// if let Some(next_block) = self.store.with_read_tx(|tx| last_proposed.get_block(tx)).optional()? {
// info!(
// target: LOG_TARGET,
// "🌿 RE-BROADCASTING local block {}({}) to {} validators. {} command(s), justify: {} ({}),
// parent: {}", next_block.id(),
// next_block.height(),
// local_committee.len(),
// next_block.commands().len(),
// next_block.justify().block_id(),
// next_block.justify().block_height(),
// next_block.parent(),
// );
// self.broadcast_local_proposal(next_block, local_committee).await?;
// return Ok(());
// }
}

info!(
Expand All @@ -153,10 +162,10 @@ where TConsensusSpec: ConsensusSpec
let base_layer_block_hash = current_base_layer_block_hash;
let base_layer_block_height = current_base_layer_block_height;

let next_block = self.store.with_write_tx(|tx| {
let (next_block, foreign_proposals) = self.store.with_write_tx(|tx| {
let high_qc = HighQc::get(&**tx)?;
let high_qc_cert = high_qc.get_quorum_certificate(&**tx)?;
let (next_block, executed_transactions) = self.build_next_block(
let (next_block, foreign_proposals, executed_transactions) = self.build_next_block(
tx,
epoch,
&leaf_block,
Expand Down Expand Up @@ -185,7 +194,7 @@ where TConsensusSpec: ConsensusSpec
}

next_block.as_last_proposed().set(tx)?;
Ok::<_, HotStuffError>(next_block)
Ok::<_, HotStuffError>((next_block, foreign_proposals))
})?;

info!(
Expand All @@ -199,14 +208,16 @@ where TConsensusSpec: ConsensusSpec
next_block.parent()
);

self.broadcast_local_proposal(next_block, local_committee).await?;
self.broadcast_local_proposal(next_block, foreign_proposals, local_committee)
.await?;

Ok(())
}

pub async fn broadcast_local_proposal(
&mut self,
next_block: Block,
foreign_proposals: Vec<ForeignProposal>,
local_committee: &Committee<TConsensusSpec::Addr>,
) -> Result<(), HotStuffError> {
info!(
Expand All @@ -221,7 +232,8 @@ where TConsensusSpec: ConsensusSpec
.multicast(
local_committee.iter().map(|(addr, _)| addr),
HotstuffMessage::Proposal(ProposalMessage {
block: next_block.clone(),
block: next_block,
foreign_proposals,
}),
)
.await?;
Expand Down Expand Up @@ -300,38 +312,39 @@ where TConsensusSpec: ConsensusSpec
base_layer_block_height: u64,
base_layer_block_hash: FixedHash,
propose_epoch_end: bool,
) -> Result<(Block, HashMap<TransactionId, TransactionExecution>), HotStuffError> {
) -> Result<NextBlock, HotStuffError> {
// TODO: Configure
const TARGET_BLOCK_SIZE: usize = 500;

let next_height = parent_block.height() + NodeHeight(1);

let mut total_leader_fee = 0;

let foreign_proposals = if propose_epoch_end {
vec![]
} else {
ForeignProposal::get_all_new(tx, base_layer_block_height, parent_block.block_id())?
};

let batch = if dont_propose_transactions || propose_epoch_end {
vec![]
} else {
self.transaction_pool.get_batch_for_next_block(tx, TARGET_BLOCK_SIZE)?
TARGET_BLOCK_SIZE
// Each foreign proposal is "heavier" than a transaction command
.checked_sub(foreign_proposals.len() * 4)
.map(|size| self.transaction_pool.get_batch_for_next_block(tx, size))
.transpose()?
.unwrap_or_default()
};
let next_height = parent_block.height() + NodeHeight(1);

let mut total_leader_fee = 0;
let locked_block = LockedBlock::get(tx)?;
let pending_proposals = ForeignProposal::get_all_pending(tx, locked_block.block_id(), parent_block.block_id())?;
let mut commands = if propose_epoch_end {
BTreeSet::from_iter([Command::EndEpoch])
} else {
ForeignProposal::get_all_new(tx)?
.into_iter()
.filter(|foreign_proposal| {
// If the proposal base layer height is too high, ignore for now.
foreign_proposal.base_layer_block_height <= base_layer_block_height &&
// If the foreign proposal is already pending, don't propose it again
!pending_proposals.iter().any(|pending_proposal| {
pending_proposal.shard_group == foreign_proposal.shard_group &&
pending_proposal.block_id == foreign_proposal.block_id
})
})
.map(|mut foreign_proposal| {
foreign_proposal.set_proposed_height(parent_block.height().saturating_add(NodeHeight(1)));
Command::ForeignProposal(foreign_proposal)
})
.collect()
BTreeSet::from_iter(
foreign_proposals
.iter()
.map(|fp| Command::ForeignProposal(fp.to_atom())),
)
};

// batch is empty for is_empty, is_epoch_end and is_epoch_start blocks
Expand Down Expand Up @@ -405,7 +418,7 @@ where TConsensusSpec: ConsensusSpec
let signature = self.signing_service.sign(next_block.id());
next_block.set_signature(signature);

Ok((next_block, executed_transactions))
Ok((next_block, foreign_proposals, executed_transactions))
}

fn prepare_transaction(
Expand Down
Loading

0 comments on commit cba2ab0

Please sign in to comment.