Skip to content

Commit

Permalink
feat(consensus): sequence transaction from foreign LocalPrepare/Accept
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Aug 19, 2024
1 parent 4278f20 commit 8d83588
Show file tree
Hide file tree
Showing 25 changed files with 647 additions and 262 deletions.
10 changes: 0 additions & 10 deletions applications/tari_dan_app_utilities/src/transaction_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,16 +130,6 @@ where TTemplateProvider: TemplateProvider<Template = LoadedTemplate>
self.network,
);
let result = processor.execute(transaction.clone())?;
// Ok(result) => result,
// // TODO: This may occur due to an internal error (e.g. OOM, etc).
// Err(err) => ExecuteResult {
// finalize: FinalizeResult::new_rejected(
// tx_id,
// RejectReason::ExecutionFailure(format!("BUG: {err}")),
// ),
// execution_time: Duration::default(),
// },
// };

Ok(ExecutionOutput { transaction, result })
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ impl ProcessManager {
// inputs for a transaction.
sleep(Duration::from_secs(2)).await;
}
self.mine(10).await?;
self.mine(20).await?;
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,33 +85,6 @@ where
) -> Result<ExecutedTransaction, BlockTransactionExecutorError> {
let id = *transaction.id();

// if let Some(abort_reason) = transaction.abort_reason() {
// // TODO: Hacky - if a transaction uses DOWNed/non-existent inputs we error here. This changes the hard
// // error to a propose REJECT. So that we have involved shards, we use the inputs as resolved inputs and
// // assume v0 if version is not provided.
// let inputs = transaction
// .transaction()
// .all_inputs_iter()
// .map(|input| VersionedSubstateId::new(input.substate_id, input.version.unwrap_or(0)))
// .map(|id| VersionedSubstateIdLockIntent::new(id, SubstateLockFlag::Write))
// .collect();
// return Ok(ExecutedTransaction::new(
// transaction.into_transaction(),
// ExecuteResult {
// finalize: FinalizeResult {
// transaction_hash: id.into_array().into(),
// events: vec![],
// logs: vec![],
// execution_results: vec![],
// result: TransactionResult::Reject(abort_reason.clone()),
// fee_receipt: Default::default(),
// },
// },
// inputs,
// vec![],
// Duration::from_secs(0),
// ));
// }
info!(target: LOG_TARGET, "Transaction {} executing. Inputs: {:?}", id, resolved_inputs);

// Create a memory db with all the input substates, needed for the transaction execution
Expand Down
11 changes: 9 additions & 2 deletions dan_layer/common_types/src/committee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{borrow::Borrow, cmp, ops::RangeInclusive};
use rand::{rngs::OsRng, seq::SliceRandom};
use serde::{Deserialize, Serialize};
use tari_common_types::types::PublicKey;
use tari_engine_types::substate::SubstateId;

use crate::{shard::Shard, Epoch, NumPreshards, ShardGroup, SubstateAddress};

Expand Down Expand Up @@ -203,7 +204,7 @@ impl CommitteeInfo {
(len - 1) / 3
}

pub fn num_shards(&self) -> NumPreshards {
pub fn num_preshards(&self) -> NumPreshards {
self.num_shards
}

Expand All @@ -215,12 +216,18 @@ impl CommitteeInfo {
self.shard_group.to_substate_address_range(self.num_shards)
}

// TODO: change these to take in a SubstateId
pub fn includes_substate_address(&self, substate_address: &SubstateAddress) -> bool {
let s = substate_address.to_shard(self.num_shards);
self.shard_group.contains(&s)
}

pub fn includes_substate_id(&self, substate_id: &SubstateId) -> bool {
// version doesnt affect shard
let addr = SubstateAddress::from_substate_id(substate_id, 0);
let shard = addr.to_shard(self.num_shards);
self.shard_group.contains(&shard)
}

pub fn includes_all_substate_addresses<I: IntoIterator<Item = B>, B: Borrow<SubstateAddress>>(
&self,
substate_addresses: I,
Expand Down
123 changes: 95 additions & 28 deletions dan_layer/consensus/src/hotstuff/on_message_validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,20 @@ use std::collections::HashSet;

use log::*;
use tari_common_types::types::PublicKey;
use tari_dan_common_types::{Epoch, NodeHeight};
use tari_dan_common_types::{committee::CommitteeInfo, Epoch, NodeHeight};
use tari_dan_storage::{
consensus_models::{Block, BlockId, TransactionRecord},
consensus_models::{Block, BlockId, ForeignParkedProposal, TransactionRecord},
StateStore,
StateStoreWriteTransaction,
};
use tari_epoch_manager::EpochManagerReader;
use tari_transaction::TransactionId;
use tokio::sync::broadcast;

use super::config::HotstuffConfig;
use crate::{
block_validations,
hotstuff::{error::HotStuffError, HotstuffEvent},
messages::{HotstuffMessage, MissingTransactionsRequest, ProposalMessage},
messages::{ForeignProposalMessage, HotstuffMessage, MissingTransactionsRequest, ProposalMessage},
traits::{ConsensusSpec, OutboundMessaging},
};

Expand Down Expand Up @@ -64,23 +63,15 @@ impl<TConsensusSpec: ConsensusSpec> OnMessageValidate<TConsensusSpec> {
pub async fn handle(
&mut self,
current_height: NodeHeight,
local_committee_info: &CommitteeInfo,
from: TConsensusSpec::Addr,
msg: HotstuffMessage,
) -> Result<MessageValidationResult<TConsensusSpec::Addr>, HotStuffError> {
match msg {
HotstuffMessage::Proposal(msg) => self.process_local_proposal(current_height, from, msg).await,
HotstuffMessage::ForeignProposal(proposal) => {
if let Err(err) = self.check_proposal(&proposal.block).await {
return Ok(MessageValidationResult::Invalid {
from,
message: HotstuffMessage::ForeignProposal(proposal),
err,
});
}
Ok(MessageValidationResult::Ready {
from,
message: HotstuffMessage::ForeignProposal(proposal),
})
self.process_foreign_proposal(local_committee_info, from, proposal)
.await
},
HotstuffMessage::MissingTransactionsResponse(msg) => {
if !self.active_missing_transaction_requests.remove_element(&msg.request_id) {
Expand Down Expand Up @@ -163,14 +154,14 @@ impl<TConsensusSpec: ConsensusSpec> OnMessageValidate<TConsensusSpec> {
});
}

self.handle_missing_transactions(from, block).await
self.handle_missing_transactions_local_block(from, block).await
}

pub async fn update_parked_blocks(
pub fn update_local_parked_blocks(
&self,
current_height: NodeHeight,
transaction_id: &TransactionId,
) -> Result<Option<(TConsensusSpec::Addr, HotstuffMessage)>, HotStuffError> {
) -> Result<Option<ProposalMessage>, HotStuffError> {
let maybe_unparked_block = self
.store
.with_write_tx(|tx| tx.missing_transactions_remove(current_height, transaction_id))?;
Expand All @@ -181,19 +172,28 @@ impl<TConsensusSpec: ConsensusSpec> OnMessageValidate<TConsensusSpec> {

info!(target: LOG_TARGET, "♻️ all transactions for block {unparked_block} are ready for consensus");

let vn = self
.epoch_manager
.get_validator_node_by_public_key(unparked_block.epoch(), unparked_block.proposed_by())
.await?;

let _ignore = self.tx_events.send(HotstuffEvent::ParkedBlockReady {
block: unparked_block.as_leaf_block(),
});

Ok(Some((
vn.address,
HotstuffMessage::Proposal(ProposalMessage { block: unparked_block }),
)))
Ok(Some(ProposalMessage { block: unparked_block }))
}

pub fn update_foreign_parked_blocks(
&self,
transaction_id: &TransactionId,
) -> Result<Vec<ForeignParkedProposal>, HotStuffError> {
let unparked_foreign_blocks = self
.store
.with_write_tx(|tx| ForeignParkedProposal::remove_by_transaction_id(tx, transaction_id))?;

if unparked_foreign_blocks.is_empty() {
return Ok(vec![]);
};

info!(target: LOG_TARGET, "♻️ all transactions for {} foreign block(s) are ready for consensus", unparked_foreign_blocks.len());

Ok(unparked_foreign_blocks)
}

async fn check_proposal(&self, block: &Block) -> Result<(), HotStuffError> {
Expand All @@ -208,7 +208,7 @@ impl<TConsensusSpec: ConsensusSpec> OnMessageValidate<TConsensusSpec> {
Ok(())
}

async fn handle_missing_transactions(
async fn handle_missing_transactions_local_block(
&mut self,
from: TConsensusSpec::Addr,
block: Block,
Expand Down Expand Up @@ -270,6 +270,73 @@ impl<TConsensusSpec: ConsensusSpec> OnMessageValidate<TConsensusSpec> {

Ok(missing_tx_ids)
}

async fn process_foreign_proposal(
&mut self,
local_committee_info: &CommitteeInfo,
from: TConsensusSpec::Addr,
msg: ForeignProposalMessage,
) -> Result<MessageValidationResult<TConsensusSpec::Addr>, HotStuffError> {
info!(
target: LOG_TARGET,
"🧩 new unvalidated FOREIGN PROPOSAL message {} from {}",
msg,
from
);

if let Err(err) = self.check_proposal(&msg.block).await {
return Ok(MessageValidationResult::Invalid {
from,
message: HotstuffMessage::ForeignProposal(msg),
err,
});
}

if msg.block.commands().is_empty() {
debug!(
target: LOG_TARGET,
"✅ Block {} is empty (no missing transactions)", msg.block
);
return Ok(MessageValidationResult::Ready {
from,
message: HotstuffMessage::ForeignProposal(msg),
});
}

self.store.with_write_tx(|tx| {
let missing_tx_ids = TransactionRecord::get_missing(
&**tx,
msg.block.all_transaction_ids_in_committee(local_committee_info),
)?;

if missing_tx_ids.is_empty() {
debug!(
target: LOG_TARGET,
"✅ Foreign Block {} has no missing transactions", msg.block
);
return Ok(MessageValidationResult::Ready {
from,
message: HotstuffMessage::ForeignProposal(msg),
});
}

info!(
target: LOG_TARGET,
"⏳ Foreign Block {} has {} missing transactions", msg.block, missing_tx_ids.len(),
);

let parked_block = ForeignParkedProposal::from(msg);
parked_block.insert(tx)?;
parked_block.add_missing_transactions(tx, &missing_tx_ids)?;

Ok(MessageValidationResult::ParkedProposal {
block_id: *parked_block.block.id(),
epoch: parked_block.block.epoch(),
proposed_by: parked_block.block.proposed_by().clone(),
missing_txs: missing_tx_ids,
})
})
}
}

#[derive(Debug)]
Expand Down
16 changes: 9 additions & 7 deletions dan_layer/consensus/src/hotstuff/on_propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,14 @@ where TConsensusSpec: ConsensusSpec
)?;

// Add executions for this block
debug!(
target: LOG_TARGET,
"Adding {} executed transaction(s) to block {}",
executed_transactions.len(),
next_block.id()
);
if !executed_transactions.is_empty() {
debug!(
target: LOG_TARGET,
"Saving {} executed transaction(s) for block {}",
executed_transactions.len(),
next_block.id()
);
}
for executed in executed_transactions.into_values() {
executed.for_block(*next_block.id()).insert_if_required(tx)?;
}
Expand Down Expand Up @@ -645,7 +647,7 @@ pub fn get_non_local_shards(diff: &[SubstateChange], local_committee_info: &Comm
.map(|ch| {
ch.versioned_substate_id()
.to_substate_address()
.to_shard(local_committee_info.num_shards())
.to_shard(local_committee_info.num_preshards())
})
.filter(|shard| local_committee_info.shard_group().contains(shard))
.collect()
Expand Down
Loading

0 comments on commit 8d83588

Please sign in to comment.