Skip to content

Commit

Permalink
fix: multi committee transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
Cifko committed Nov 16, 2023
1 parent 9d06a84 commit e0831ae
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 19 deletions.
33 changes: 20 additions & 13 deletions applications/tari_validator_node/src/consensus/state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: BSD-3-Clause

use tari_consensus::traits::StateManager;
use tari_dan_common_types::ShardId;
use tari_dan_common_types::{committee::CommitteeShard, ShardId};
use tari_dan_storage::{
consensus_models::{Block, ExecutedTransaction, SubstateRecord},
StateStore,
Expand All @@ -25,6 +25,7 @@ impl<TStateStore: StateStore> StateManager<TStateStore> for TariStateManager {
tx: &mut TStateStore::WriteTransaction<'_>,
block: &Block<TStateStore::Addr>,
transaction: &ExecutedTransaction,
local_committee_shard: &CommitteeShard,
) -> Result<(), Self::Error> {
let Some(diff) = transaction.result().finalize.result.accept() else {
// We should only commit accepted transactions, might want to change this API to reflect that
Expand All @@ -33,7 +34,8 @@ impl<TStateStore: StateStore> StateManager<TStateStore> for TariStateManager {

let down_shards = diff
.down_iter()
.map(|(addr, version)| ShardId::from_address(addr, *version));
.map(|(addr, version)| ShardId::from_address(addr, *version))
.filter(|shard| local_committee_shard.includes_shard(shard));
SubstateRecord::destroy_many(
tx,
down_shards,
Expand All @@ -44,17 +46,22 @@ impl<TStateStore: StateStore> StateManager<TStateStore> for TariStateManager {
true,
)?;

let to_up = diff.up_iter().map(|(addr, substate)| {
SubstateRecord::new(
addr.clone(),
substate.version(),
substate.substate_value().clone(),
block.epoch(),
block.height(),
*block.id(),
*transaction.id(),
*block.justify().id(),
)
let to_up = diff.up_iter().filter_map(|(addr, substate)| {
let shard_id = ShardId::from_address(addr, substate.version());
if local_committee_shard.includes_shard(&shard_id) {
Some(SubstateRecord::new(
addr.clone(),
substate.version(),
substate.substate_value().clone(),
block.epoch(),
block.height(),
*block.id(),
*transaction.id(),
*block.justify().id(),
))
} else {
None
}
});

for up in to_up {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ where
);
}

// Only input shards propagate to foreign shards
// Only tx receipt shards propagate to foreign shards
if is_input_shard {
// Forward to foreign replicas.
// We assume that at least f other local replicas receive this transaction and also forward to their
Expand Down Expand Up @@ -568,15 +568,19 @@ where
};

let current_epoch = self.epoch_manager.current_epoch().await?;

self.epoch_manager.get_local_committee_shard(current_epoch).await?;
let local_committee_shard = self.epoch_manager.get_local_committee_shard(current_epoch).await?;
let is_input_shard = local_committee_shard.includes_any_shard(executed.transaction().all_inputs_iter());
let is_input_shard = local_committee_shard.includes_any_shard(executed.transaction().all_inputs_iter()) |
(executed.transaction().inputs().len() + executed.transaction().input_refs().len() == 0);

if should_propagate && is_input_shard {
// Forward the transaction to any output shards that are not part of the input shard set as these have
// already been forwarded
let num_committees = self.epoch_manager.get_num_committees(current_epoch).await?;
let input_buckets = executed
.involved_shards_iter()
let input_buckets: HashSet<ShardBucket> = executed
.transaction()
.all_inputs_iter()
.map(|s| s.to_committee_bucket(num_committees))
.collect::<HashSet<_>>();
let output_shards = executed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -977,7 +977,7 @@ where TConsensusSpec: ConsensusSpec
}

self.state_manager
.commit_transaction(tx, block, &executed)
.commit_transaction(tx, block, &executed, local_committee_shard)
.map_err(|e| HotStuffError::StateManagerError(e.into()))?;
}

Expand Down
2 changes: 2 additions & 0 deletions dan_layer/consensus/src/traits/state_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use tari_dan_common_types::committee::CommitteeShard;
use tari_dan_storage::{
consensus_models::{Block, ExecutedTransaction},
StateStore,
Expand All @@ -14,5 +15,6 @@ pub trait StateManager<TStateStore: StateStore> {
tx: &mut TStateStore::WriteTransaction<'_>,
block: &Block<TStateStore::Addr>,
transaction: &ExecutedTransaction,
local_committee_shard: &CommitteeShard,
) -> Result<(), Self::Error>;
}
2 changes: 2 additions & 0 deletions dan_layer/consensus_tests/src/support/state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use std::sync::{atomic::AtomicBool, Arc};

use tari_consensus::traits::StateManager;
use tari_dan_common_types::committee::CommitteeShard;
use tari_dan_storage::{
consensus_models::{Block, ExecutedTransaction},
StateStore,
Expand Down Expand Up @@ -35,6 +36,7 @@ impl<TStateStore: StateStore> StateManager<TStateStore> for NoopStateManager {
_tx: &mut TStateStore::WriteTransaction<'_>,
_block: &Block<TStateStore::Addr>,
_transaction: &ExecutedTransaction,
_local_committee_shard: &CommitteeShard,
) -> Result<(), Self::Error> {
self.0.store(true, std::sync::atomic::Ordering::Relaxed);
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion dan_layer/state_store_sqlite/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1079,7 +1079,7 @@ impl<TAddr: NodeAddressable + Serialize + DeserializeOwned> StateStoreReadTransa
let maybe_update = updates.remove(&rec.transaction_id);
match rec.try_convert(maybe_update) {
Ok(rec) => {
if rec.is_ready() || rec.stage().is_new() {
if rec.is_ready() {
Some(Ok(rec))
} else {
None
Expand Down

0 comments on commit e0831ae

Please sign in to comment.