Skip to content

Commit

Permalink
fix: multi committee transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
Cifko committed Nov 15, 2023
1 parent e3d4c02 commit 6c156b9
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: BSD-3-Clause

use tari_consensus::traits::StateManager;
use tari_dan_common_types::ShardId;
use tari_dan_common_types::{committee::CommitteeShard, ShardId};
use tari_dan_storage::{
consensus_models::{Block, ExecutedTransaction, SubstateRecord},
StateStore,
Expand All @@ -25,6 +25,7 @@ impl<TStateStore: StateStore> StateManager<TStateStore> for TariStateManager {
tx: &mut TStateStore::WriteTransaction<'_>,
block: &Block<TStateStore::Addr>,
transaction: &ExecutedTransaction,
local_committee_shard: &CommitteeShard,
) -> Result<(), Self::Error> {
let Some(diff) = transaction.result().finalize.result.accept() else {
// We should only commit accepted transactions, might want to change this API to reflect that
Expand All @@ -33,7 +34,8 @@ impl<TStateStore: StateStore> StateManager<TStateStore> for TariStateManager {

let down_shards = diff
.down_iter()
.map(|(addr, version)| ShardId::from_address(addr, *version));
.map(|(addr, version)| ShardId::from_address(addr, *version))
.filter(|shard| local_committee_shard.includes_shard(shard));
SubstateRecord::destroy_many(
tx,
down_shards,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -586,21 +586,14 @@ where
};

let current_epoch = self.epoch_manager.current_epoch().await?;
let local_committee_shard = self.epoch_manager.get_local_committee_shard(current_epoch).await?;
let is_input_shard = local_committee_shard.includes_any_shard(executed.transaction().all_inputs_iter());

if should_propagate && is_input_shard {
// Forward the transaction to any output shards that are not part of the input shard set as these have
// already been forwarded
let num_committees = self.epoch_manager.get_num_committees(current_epoch).await?;
let input_buckets = executed
.involved_shards_iter()
.map(|s| s.to_committee_bucket(num_committees))
.collect::<HashSet<_>>();

// TODO: Should we always propagate? The TX is initially send only to single output shard (TxReceipt).
if should_propagate {
// Should we propagate to everyone?
let output_shards = executed
.resulting_outputs()
.iter()
.filter(|s| !input_buckets.contains(&s.to_committee_bucket(num_committees)))
// .filter(|s| !input_buckets.contains(&s.to_committee_bucket(num_committees)))
.copied()
.collect();

Expand Down
33 changes: 20 additions & 13 deletions dan_layer/consensus/src/hotstuff/on_propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,26 +196,33 @@ where TConsensusSpec: ConsensusSpec
let mut total_leader_fee = 0;
let commands = batch
.into_iter()
.map(|t| match t.current_stage() {
.filter_map(|t| match t.current_stage() {
// If the transaction is New, propose to Prepare it
TransactionPoolStage::New => Ok(Command::Prepare(t.get_local_transaction_atom())),
TransactionPoolStage::New => Some(Ok(Command::Prepare(t.get_local_transaction_atom()))),
// The transaction is Prepared, this stage is only _ready_ once we know that all local nodes
// accepted Prepared so we propose LocalPrepared
TransactionPoolStage::Prepared => Ok(Command::LocalPrepared(t.get_local_transaction_atom())),
TransactionPoolStage::Prepared => Some(Ok(Command::LocalPrepared(t.get_local_transaction_atom()))),
// The transaction is LocalPrepared, meaning that we know that all foreign and local nodes have
// prepared. We can now propose to Accept it. We also propose the decision change which everyone should
// agree with if they received the same foreign LocalPrepare.
TransactionPoolStage::LocalPrepared => {
let involved = local_committee_shard.count_distinct_buckets(t.transaction().evidence.shards_iter());
let involved = NonZeroU64::new(involved as u64).ok_or_else(|| {
HotStuffError::InvariantError(format!(
"Number of involved shards is zero for transaction {}",
t.transaction_id(),
))
})?;
let leader_fee = t.calculate_leader_fee(involved, EXHAUST_DIVISOR);
total_leader_fee += leader_fee;
Ok(Command::Accept(t.get_final_transaction_atom(leader_fee)))
if t.transaction().evidence.all_shards_complete() {
let involved =
local_committee_shard.count_distinct_buckets(t.transaction().evidence.shards_iter());
match NonZeroU64::new(involved as u64) {
Some(involved) => {
let leader_fee = t.calculate_leader_fee(involved, EXHAUST_DIVISOR);
total_leader_fee += leader_fee;
Some(Ok(Command::Accept(t.get_final_transaction_atom(leader_fee))))
},
None => Some(Err(HotStuffError::InvariantError(format!(
"Number of involved shards is zero for transaction {}",
t.transaction_id(),
)))),
}
} else {
None
}
},
// Not reachable as there is nothing to propose for these stages. To confirm that all local nodes agreed
// with the Accept, more (possibly empty) blocks with QCs will be proposed and accepted,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ where TConsensusSpec: ConsensusSpec
}

self.state_manager
.commit_transaction(tx, block, &executed)
.commit_transaction(tx, block, &executed, local_committee_shard)
.map_err(|e| HotStuffError::StateManagerError(e.into()))?;
}

Expand Down
2 changes: 2 additions & 0 deletions dan_layer/consensus/src/traits/state_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use tari_dan_common_types::committee::CommitteeShard;
use tari_dan_storage::{
consensus_models::{Block, ExecutedTransaction},
StateStore,
Expand All @@ -14,5 +15,6 @@ pub trait StateManager<TStateStore: StateStore> {
tx: &mut TStateStore::WriteTransaction<'_>,
block: &Block<TStateStore::Addr>,
transaction: &ExecutedTransaction,
local_committee_shard: &CommitteeShard,
) -> Result<(), Self::Error>;
}
2 changes: 2 additions & 0 deletions dan_layer/consensus_tests/src/support/state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::sync::{atomic::AtomicBool, Arc};

use tari_consensus::traits::StateManager;
use tari_dan_common_types::committee::CommitteeShard;
use tari_dan_storage::{
consensus_models::{Block, ExecutedTransaction},
StateStore,
Expand Down Expand Up @@ -35,6 +36,7 @@ impl<TStateStore: StateStore> StateManager<TStateStore> for NoopStateManager {
_tx: &mut TStateStore::WriteTransaction<'_>,
_block: &Block<TStateStore::Addr>,
_transaction: &ExecutedTransaction,
_local_committee_shard: &CommitteeShard,
) -> Result<(), Self::Error> {
self.0.store(true, std::sync::atomic::Ordering::Relaxed);
Ok(())
Expand Down

0 comments on commit 6c156b9

Please sign in to comment.