diff --git a/applications/tari_validator_node/src/consensus/state_manager.rs b/applications/tari_validator_node/src/consensus/state_manager.rs index c4ba8583a5..d0deeffb79 100644 --- a/applications/tari_validator_node/src/consensus/state_manager.rs +++ b/applications/tari_validator_node/src/consensus/state_manager.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: BSD-3-Clause use tari_consensus::traits::StateManager; -use tari_dan_common_types::ShardId; +use tari_dan_common_types::{committee::CommitteeShard, ShardId}; use tari_dan_storage::{ consensus_models::{Block, ExecutedTransaction, SubstateRecord}, StateStore, @@ -25,6 +25,7 @@ impl StateManager for TariStateManager { tx: &mut TStateStore::WriteTransaction<'_>, block: &Block, transaction: &ExecutedTransaction, + local_committee_shard: &CommitteeShard, ) -> Result<(), Self::Error> { let Some(diff) = transaction.result().finalize.result.accept() else { // We should only commit accepted transactions, might want to change this API to reflect that @@ -33,7 +34,8 @@ impl StateManager for TariStateManager { let down_shards = diff .down_iter() - .map(|(addr, version)| ShardId::from_address(addr, *version)); + .map(|(addr, version)| ShardId::from_address(addr, *version)) + .filter(|shard| local_committee_shard.includes_shard(shard)); SubstateRecord::destroy_many( tx, down_shards, @@ -44,17 +46,22 @@ impl StateManager for TariStateManager { true, )?; - let to_up = diff.up_iter().map(|(addr, substate)| { - SubstateRecord::new( - addr.clone(), - substate.version(), - substate.substate_value().clone(), - block.epoch(), - block.height(), - *block.id(), - *transaction.id(), - *block.justify().id(), - ) + let to_up = diff.up_iter().filter_map(|(addr, substate)| { + let shard_id = ShardId::from_address(addr, substate.version()); + if local_committee_shard.includes_shard(&shard_id) { + Some(SubstateRecord::new( + addr.clone(), + substate.version(), + substate.substate_value().clone(), + block.epoch(), + block.height(), + *block.id(), + *transaction.id(), + *block.justify().id(), + )) + } else { + None + } }); for up in to_up { diff --git a/applications/tari_validator_node/src/p2p/services/mempool/service.rs b/applications/tari_validator_node/src/p2p/services/mempool/service.rs index 6ff003f2d4..c3b0f450cb 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/service.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/service.rs @@ -344,7 +344,7 @@ where ); } - // Only input shards propagate to foreign shards + // Only tx receipt shards propagate to foreign shards if is_input_shard { // Forward to foreign replicas. // We assume that at least f other local replicas receive this transaction and also forward to their @@ -568,15 +568,19 @@ where }; let current_epoch = self.epoch_manager.current_epoch().await?; + + self.epoch_manager.get_local_committee_shard(current_epoch).await?; let local_committee_shard = self.epoch_manager.get_local_committee_shard(current_epoch).await?; - let is_input_shard = local_committee_shard.includes_any_shard(executed.transaction().all_inputs_iter()); + let is_input_shard = local_committee_shard.includes_any_shard(executed.transaction().all_inputs_iter()) | + (executed.transaction().inputs().len() + executed.transaction().input_refs().len() == 0); if should_propagate && is_input_shard { // Forward the transaction to any output shards that are not part of the input shard set as these have // already been forwarded let num_committees = self.epoch_manager.get_num_committees(current_epoch).await?; - let input_buckets = executed - .involved_shards_iter() + let input_buckets: HashSet = executed + .transaction() + .all_inputs_iter() .map(|s| s.to_committee_bucket(num_committees)) .collect::>(); let output_shards = executed diff --git a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs index 82f8377b5d..63720ec2fc 100644 --- a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs +++ b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs @@ -977,7 +977,7 @@ where TConsensusSpec: ConsensusSpec } self.state_manager - .commit_transaction(tx, block, &executed) + .commit_transaction(tx, block, &executed, local_committee_shard) .map_err(|e| HotStuffError::StateManagerError(e.into()))?; } diff --git a/dan_layer/consensus/src/traits/state_manager.rs b/dan_layer/consensus/src/traits/state_manager.rs index 995aa359d5..ad664e971a 100644 --- a/dan_layer/consensus/src/traits/state_manager.rs +++ b/dan_layer/consensus/src/traits/state_manager.rs @@ -1,6 +1,7 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause +use tari_dan_common_types::committee::CommitteeShard; use tari_dan_storage::{ consensus_models::{Block, ExecutedTransaction}, StateStore, @@ -14,5 +15,6 @@ pub trait StateManager { tx: &mut TStateStore::WriteTransaction<'_>, block: &Block, transaction: &ExecutedTransaction, + local_committee_shard: &CommitteeShard, ) -> Result<(), Self::Error>; } diff --git a/dan_layer/consensus_tests/src/support/state_manager.rs b/dan_layer/consensus_tests/src/support/state_manager.rs index 8966d47f4b..12afc550d8 100644 --- a/dan_layer/consensus_tests/src/support/state_manager.rs +++ b/dan_layer/consensus_tests/src/support/state_manager.rs @@ -4,6 +4,7 @@ use std::sync::{atomic::AtomicBool, Arc}; use tari_consensus::traits::StateManager; +use tari_dan_common_types::committee::CommitteeShard; use tari_dan_storage::{ consensus_models::{Block, ExecutedTransaction}, StateStore, @@ -35,6 +36,7 @@ impl StateManager for NoopStateManager { _tx: &mut TStateStore::WriteTransaction<'_>, _block: &Block, _transaction: &ExecutedTransaction, + _local_committee_shard: &CommitteeShard, ) -> Result<(), Self::Error> { self.0.store(true, std::sync::atomic::Ordering::Relaxed); Ok(()) diff --git a/dan_layer/state_store_sqlite/src/reader.rs b/dan_layer/state_store_sqlite/src/reader.rs index 8274104756..b1dfcce8b8 100644 --- a/dan_layer/state_store_sqlite/src/reader.rs +++ b/dan_layer/state_store_sqlite/src/reader.rs @@ -1079,7 +1079,7 @@ impl StateStoreReadTransa let maybe_update = updates.remove(&rec.transaction_id); match rec.try_convert(maybe_update) { Ok(rec) => { - if rec.is_ready() || rec.stage().is_new() { + if rec.is_ready() { Some(Ok(rec)) } else { None