Skip to content

Commit

Permalink
feat(consensus): sequence transaction from foreign LocalPrepare/Accept
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Aug 19, 2024
1 parent 4278f20 commit baebd2a
Show file tree
Hide file tree
Showing 23 changed files with 648 additions and 225 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl ProcessManager {
// inputs for a transaction.
sleep(Duration::from_secs(2)).await;
}
self.mine(10).await?;
self.mine(20).await?;
Ok(())
}

Expand Down
11 changes: 9 additions & 2 deletions dan_layer/common_types/src/committee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{borrow::Borrow, cmp, ops::RangeInclusive};
use rand::{rngs::OsRng, seq::SliceRandom};
use serde::{Deserialize, Serialize};
use tari_common_types::types::PublicKey;
use tari_engine_types::substate::SubstateId;

use crate::{shard::Shard, Epoch, NumPreshards, ShardGroup, SubstateAddress};

Expand Down Expand Up @@ -203,7 +204,7 @@ impl CommitteeInfo {
(len - 1) / 3
}

pub fn num_shards(&self) -> NumPreshards {
pub fn num_preshards(&self) -> NumPreshards {
self.num_shards
}

Expand All @@ -215,12 +216,18 @@ impl CommitteeInfo {
self.shard_group.to_substate_address_range(self.num_shards)
}

// TODO: change these to take in a SubstateId
pub fn includes_substate_address(&self, substate_address: &SubstateAddress) -> bool {
let s = substate_address.to_shard(self.num_shards);
self.shard_group.contains(&s)
}

pub fn includes_substate_id(&self, substate_id: &SubstateId) -> bool {
// version doesnt affect shard
let addr = SubstateAddress::from_substate_id(substate_id, 0);
let shard = addr.to_shard(self.num_shards);
self.shard_group.contains(&shard)
}

pub fn includes_all_substate_addresses<I: IntoIterator<Item = B>, B: Borrow<SubstateAddress>>(
&self,
substate_addresses: I,
Expand Down
123 changes: 95 additions & 28 deletions dan_layer/consensus/src/hotstuff/on_message_validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,20 @@ use std::collections::HashSet;

use log::*;
use tari_common_types::types::PublicKey;
use tari_dan_common_types::{Epoch, NodeHeight};
use tari_dan_common_types::{committee::CommitteeInfo, Epoch, NodeHeight};
use tari_dan_storage::{
consensus_models::{Block, BlockId, TransactionRecord},
consensus_models::{Block, BlockId, ForeignParkedProposal, TransactionRecord},
StateStore,
StateStoreWriteTransaction,
};
use tari_epoch_manager::EpochManagerReader;
use tari_transaction::TransactionId;
use tokio::sync::broadcast;

use super::config::HotstuffConfig;
use crate::{
block_validations,
hotstuff::{error::HotStuffError, HotstuffEvent},
messages::{HotstuffMessage, MissingTransactionsRequest, ProposalMessage},
messages::{ForeignProposalMessage, HotstuffMessage, MissingTransactionsRequest, ProposalMessage},
traits::{ConsensusSpec, OutboundMessaging},
};

Expand Down Expand Up @@ -64,23 +63,15 @@ impl<TConsensusSpec: ConsensusSpec> OnMessageValidate<TConsensusSpec> {
pub async fn handle(
&mut self,
current_height: NodeHeight,
local_committee_info: &CommitteeInfo,
from: TConsensusSpec::Addr,
msg: HotstuffMessage,
) -> Result<MessageValidationResult<TConsensusSpec::Addr>, HotStuffError> {
match msg {
HotstuffMessage::Proposal(msg) => self.process_local_proposal(current_height, from, msg).await,
HotstuffMessage::ForeignProposal(proposal) => {
if let Err(err) = self.check_proposal(&proposal.block).await {
return Ok(MessageValidationResult::Invalid {
from,
message: HotstuffMessage::ForeignProposal(proposal),
err,
});
}
Ok(MessageValidationResult::Ready {
from,
message: HotstuffMessage::ForeignProposal(proposal),
})
self.process_foreign_proposal(local_committee_info, from, proposal)
.await
},
HotstuffMessage::MissingTransactionsResponse(msg) => {
if !self.active_missing_transaction_requests.remove_element(&msg.request_id) {
Expand Down Expand Up @@ -163,14 +154,14 @@ impl<TConsensusSpec: ConsensusSpec> OnMessageValidate<TConsensusSpec> {
});
}

self.handle_missing_transactions(from, block).await
self.handle_missing_transactions_local_block(from, block).await
}

pub async fn update_parked_blocks(
pub fn update_local_parked_blocks(
&self,
current_height: NodeHeight,
transaction_id: &TransactionId,
) -> Result<Option<(TConsensusSpec::Addr, HotstuffMessage)>, HotStuffError> {
) -> Result<Option<ProposalMessage>, HotStuffError> {
let maybe_unparked_block = self
.store
.with_write_tx(|tx| tx.missing_transactions_remove(current_height, transaction_id))?;
Expand All @@ -181,19 +172,28 @@ impl<TConsensusSpec: ConsensusSpec> OnMessageValidate<TConsensusSpec> {

info!(target: LOG_TARGET, "♻️ all transactions for block {unparked_block} are ready for consensus");

let vn = self
.epoch_manager
.get_validator_node_by_public_key(unparked_block.epoch(), unparked_block.proposed_by())
.await?;

let _ignore = self.tx_events.send(HotstuffEvent::ParkedBlockReady {
block: unparked_block.as_leaf_block(),
});

Ok(Some((
vn.address,
HotstuffMessage::Proposal(ProposalMessage { block: unparked_block }),
)))
Ok(Some(ProposalMessage { block: unparked_block }))
}

pub fn update_foreign_parked_blocks(
&self,
transaction_id: &TransactionId,
) -> Result<Vec<ForeignParkedProposal>, HotStuffError> {
let unparked_foreign_blocks = self
.store
.with_write_tx(|tx| ForeignParkedProposal::remove_by_transaction_id(tx, transaction_id))?;

if unparked_foreign_blocks.is_empty() {
return Ok(vec![]);
};

info!(target: LOG_TARGET, "♻️ all transactions for {} foreign block(s) are ready for consensus", unparked_foreign_blocks.len());

Ok(unparked_foreign_blocks)
}

async fn check_proposal(&self, block: &Block) -> Result<(), HotStuffError> {
Expand All @@ -208,7 +208,7 @@ impl<TConsensusSpec: ConsensusSpec> OnMessageValidate<TConsensusSpec> {
Ok(())
}

async fn handle_missing_transactions(
async fn handle_missing_transactions_local_block(
&mut self,
from: TConsensusSpec::Addr,
block: Block,
Expand Down Expand Up @@ -270,6 +270,73 @@ impl<TConsensusSpec: ConsensusSpec> OnMessageValidate<TConsensusSpec> {

Ok(missing_tx_ids)
}

async fn process_foreign_proposal(
&mut self,
local_committee_info: &CommitteeInfo,
from: TConsensusSpec::Addr,
msg: ForeignProposalMessage,
) -> Result<MessageValidationResult<TConsensusSpec::Addr>, HotStuffError> {
info!(
target: LOG_TARGET,
"🧩 new unvalidated FOREIGN PROPOSAL message {} from {}",
msg,
from
);

if let Err(err) = self.check_proposal(&msg.block).await {
return Ok(MessageValidationResult::Invalid {
from,
message: HotstuffMessage::ForeignProposal(msg),
err,
});
}

if msg.block.commands().is_empty() {
debug!(
target: LOG_TARGET,
"✅ Block {} is empty (no missing transactions)", msg.block
);
return Ok(MessageValidationResult::Ready {
from,
message: HotstuffMessage::ForeignProposal(msg),
});
}

self.store.with_write_tx(|tx| {
let missing_tx_ids = TransactionRecord::get_missing(
&**tx,
msg.block.all_transaction_ids_in_committee(local_committee_info),
)?;

if missing_tx_ids.is_empty() {
debug!(
target: LOG_TARGET,
"✅ Foreign Block {} has no missing transactions", msg.block
);
return Ok(MessageValidationResult::Ready {
from,
message: HotstuffMessage::ForeignProposal(msg),
});
}

info!(
target: LOG_TARGET,
"⏳ Foreign Block {} has {} missing transactions", msg.block, missing_tx_ids.len(),
);

let parked_block = ForeignParkedProposal::from(msg);
parked_block.insert(tx)?;
parked_block.add_missing_transactions(tx, &missing_tx_ids)?;

Ok(MessageValidationResult::ParkedProposal {
block_id: *parked_block.block.id(),
epoch: parked_block.block.epoch(),
proposed_by: parked_block.block.proposed_by().clone(),
missing_txs: missing_tx_ids,
})
})
}
}

#[derive(Debug)]
Expand Down
16 changes: 9 additions & 7 deletions dan_layer/consensus/src/hotstuff/on_propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,14 @@ where TConsensusSpec: ConsensusSpec
)?;

// Add executions for this block
debug!(
target: LOG_TARGET,
"Adding {} executed transaction(s) to block {}",
executed_transactions.len(),
next_block.id()
);
if !executed_transactions.is_empty() {
debug!(
target: LOG_TARGET,
"Saving {} executed transaction(s) for block {}",
executed_transactions.len(),
next_block.id()
);
}
for executed in executed_transactions.into_values() {
executed.for_block(*next_block.id()).insert_if_required(tx)?;
}
Expand Down Expand Up @@ -645,7 +647,7 @@ pub fn get_non_local_shards(diff: &[SubstateChange], local_committee_info: &Comm
.map(|ch| {
ch.versioned_substate_id()
.to_substate_address()
.to_shard(local_committee_info.num_shards())
.to_shard(local_committee_info.num_preshards())
})
.filter(|shard| local_committee_info.shard_group().contains(shard))
.collect()
Expand Down
Loading

0 comments on commit baebd2a

Please sign in to comment.