diff --git a/applications/tari_dan_app_utilities/src/base_layer_scanner.rs b/applications/tari_dan_app_utilities/src/base_layer_scanner.rs index 9d407a121..02ef8be6f 100644 --- a/applications/tari_dan_app_utilities/src/base_layer_scanner.rs +++ b/applications/tari_dan_app_utilities/src/base_layer_scanner.rs @@ -475,6 +475,8 @@ impl BaseLayerScanner { target: LOG_TARGET, "🌠 new template found with address {} at height {}", template_address, block_info.height ); + let consensus_constants = self.epoch_manager.get_base_layer_consensus_constants().await?; + let epoch = consensus_constants.height_to_epoch(block_info.height); self.template_manager .add_template( registration.author_public_key, @@ -484,6 +486,7 @@ impl BaseLayerScanner { registration.binary_sha, ), Some(template_name), + epoch, ) .await?; diff --git a/applications/tari_dan_app_utilities/src/template_manager/implementation/manager.rs b/applications/tari_dan_app_utilities/src/template_manager/implementation/manager.rs index f65504c63..aab37f575 100644 --- a/applications/tari_dan_app_utilities/src/template_manager/implementation/manager.rs +++ b/applications/tari_dan_app_utilities/src/template_manager/implementation/manager.rs @@ -26,7 +26,12 @@ use chrono::Utc; use log::*; use tari_common_types::types::{FixedHash, PublicKey}; use tari_crypto::tari_utilities::ByteArray; -use tari_dan_common_types::{optional::Optional, services::template_provider::TemplateProvider, NodeAddressable}; +use tari_dan_common_types::{ + optional::Optional, + services::template_provider::TemplateProvider, + Epoch, + NodeAddressable, +}; use tari_dan_engine::{ flow::FlowFactory, function_definitions::FlowFunctionDefinition, @@ -192,6 +197,7 @@ impl TemplateManager { template: TemplateExecutable, template_name: Option, template_status: Option, + epoch: Epoch, ) -> Result<(), TemplateManagerError> { enum TemplateHash { Hash(Hash), @@ -244,6 +250,7 @@ impl TemplateManager { flow_json, manifest, url: template_url, + epoch, }; let mut tx = self.global_db.create_transaction()?; @@ -326,21 +333,6 @@ impl TemplateProvider for Templa Ok(Some(loaded)) } - - fn add_wasm_template( - &self, - author_public_key: PublicKey, - template_address: tari_engine_types::TemplateAddress, - template: &[u8], - ) -> Result<(), Self::Error> { - self.add_template( - author_public_key, - template_address, - TemplateExecutable::CompiledWasm(template.to_vec()), - None, - Some(TemplateStatus::Active), - ) - } } impl Clone for TemplateManager { diff --git a/applications/tari_dan_app_utilities/src/template_manager/implementation/service.rs b/applications/tari_dan_app_utilities/src/template_manager/implementation/service.rs index b588968cb..aa20b249b 100644 --- a/applications/tari_dan_app_utilities/src/template_manager/implementation/service.rs +++ b/applications/tari_dan_app_utilities/src/template_manager/implementation/service.rs @@ -25,7 +25,7 @@ use log::*; use tari_common_types::types::PublicKey; -use tari_dan_common_types::{services::template_provider::TemplateProvider, NodeAddressable}; +use tari_dan_common_types::{services::template_provider::TemplateProvider, Epoch, NodeAddressable}; use tari_dan_engine::function_definitions::FlowFunctionDefinition; use tari_dan_storage::global::{DbTemplateType, DbTemplateUpdate, TemplateStatus}; use tari_engine_types::calculate_template_binary_hash; @@ -124,11 +124,12 @@ impl TemplateManagerService { template_address, template, template_name, + epoch, reply, } => { handle( reply, - self.handle_add_template(author_public_key, template_address, template, template_name) + self.handle_add_template(author_public_key, template_address, template, template_name, epoch) .await, ); }, @@ -243,6 +244,7 @@ impl TemplateManagerService { template_address: tari_engine_types::TemplateAddress, template: TemplateExecutable, template_name: Option, + epoch: Epoch, ) -> Result<(), TemplateManagerError> { let template_status = if matches!(template, TemplateExecutable::DownloadableWasm(_, _)) { TemplateStatus::New @@ -255,6 +257,7 @@ impl TemplateManagerService { template.clone(), template_name, Some(template_status), + epoch, )?; // TODO: remove when we remove support for base layer template registration diff --git a/applications/tari_dan_app_utilities/src/template_manager/interface/handle.rs b/applications/tari_dan_app_utilities/src/template_manager/interface/handle.rs index a62244a56..651cbd161 100644 --- a/applications/tari_dan_app_utilities/src/template_manager/interface/handle.rs +++ b/applications/tari_dan_app_utilities/src/template_manager/interface/handle.rs @@ -21,6 +21,7 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use tari_common_types::types::PublicKey; +use tari_dan_common_types::Epoch; use tari_template_lib::models::TemplateAddress; use tari_validator_node_client::types::TemplateAbi; use tokio::sync::{mpsc, oneshot}; @@ -70,6 +71,7 @@ impl TemplateManagerHandle { template_address: TemplateAddress, template: TemplateExecutable, template_name: Option, + epoch: Epoch, ) -> Result<(), TemplateManagerError> { let (tx, rx) = oneshot::channel(); self.request_tx @@ -78,6 +80,7 @@ impl TemplateManagerHandle { template_address, template, template_name, + epoch, reply: tx, }) .await diff --git a/applications/tari_dan_app_utilities/src/template_manager/interface/types.rs b/applications/tari_dan_app_utilities/src/template_manager/interface/types.rs index 08f3a60dd..13c1b427b 100644 --- a/applications/tari_dan_app_utilities/src/template_manager/interface/types.rs +++ b/applications/tari_dan_app_utilities/src/template_manager/interface/types.rs @@ -22,6 +22,7 @@ use reqwest::Url; use tari_common_types::types::{FixedHash, PublicKey}; +use tari_dan_common_types::Epoch; use tari_dan_storage::global::{DbTemplate, DbTemplateType}; use tari_template_lib::models::TemplateAddress; use tari_validator_node_client::types::TemplateAbi; @@ -91,6 +92,7 @@ pub enum TemplateManagerRequest { template_address: tari_engine_types::TemplateAddress, template: TemplateExecutable, template_name: Option, + epoch: Epoch, reply: oneshot::Sender>, }, GetTemplate { diff --git a/applications/tari_validator_node/src/bootstrap.rs b/applications/tari_validator_node/src/bootstrap.rs index d7c07d807..a9637b174 100644 --- a/applications/tari_validator_node/src/bootstrap.rs +++ b/applications/tari_validator_node/src/bootstrap.rs @@ -336,7 +336,6 @@ pub async fn spawn_services( handles.push(consensus_join_handle); let (mempool, join_handle) = mempool::spawn( - consensus_constants.num_preshards, epoch_manager.clone(), create_mempool_transaction_validator(template_manager.clone()), state_store.clone(), diff --git a/applications/tari_validator_node/src/dan_node.rs b/applications/tari_validator_node/src/dan_node.rs index 1a2d2b631..d451853d4 100644 --- a/applications/tari_validator_node/src/dan_node.rs +++ b/applications/tari_validator_node/src/dan_node.rs @@ -94,7 +94,6 @@ impl DanNode { .services .state_store .with_read_tx(|tx| block.get_committing_transactions(tx))?; - let templates = transactions .into_iter() .filter_map(|record| { @@ -115,6 +114,7 @@ impl DanNode { }) }); + let epoch = self.services.consensus_handle.current_epoch(); // adding templates to template manager let mut template_counter = 0; for (author_pub_key, template_address, template) in templates { @@ -127,7 +127,7 @@ impl DanNode { if let Err(err) = self .services .template_manager - .add_template(author_pub_key, template_address, template, None) + .add_template(author_pub_key, template_address, template, None, epoch) .await { error!(target: LOG_TARGET, "🚨Failed to add template: {}", err); diff --git a/applications/tari_validator_node/src/p2p/services/mempool/gossip.rs b/applications/tari_validator_node/src/p2p/services/mempool/gossip.rs index d106611b0..603ec7948 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/gossip.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/gossip.rs @@ -5,7 +5,7 @@ use std::{collections::HashSet, iter}; use libp2p::{gossipsub, PeerId}; use log::*; -use tari_dan_common_types::{Epoch, NumPreshards, PeerAddress, ShardGroup, ToSubstateAddress}; +use tari_dan_common_types::{Epoch, PeerAddress, ShardGroup, ToSubstateAddress}; use tari_dan_p2p::{proto, DanMessage, NewTransactionMessage, TariMessagingSpec}; use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerReader}; use tari_networking::{NetworkingHandle, NetworkingService}; @@ -47,7 +47,6 @@ impl MempoolGossipCodec { #[derive(Debug)] pub(super) struct MempoolGossip { - num_preshards: NumPreshards, epoch_manager: EpochManagerHandle, is_subscribed: Option, networking: NetworkingHandle, @@ -57,13 +56,11 @@ pub(super) struct MempoolGossip { impl MempoolGossip { pub fn new( - num_preshards: NumPreshards, epoch_manager: EpochManagerHandle, networking: NetworkingHandle, rx_gossip: mpsc::UnboundedReceiver<(PeerId, gossipsub::Message)>, ) -> Self { Self { - num_preshards, epoch_manager, is_subscribed: None, networking, @@ -143,28 +140,37 @@ impl MempoolGossip { exclude_shard_group: Option, ) -> Result<(), MempoolError> { let n = self.epoch_manager.get_num_committees(epoch).await?; - let committee_shard = self.epoch_manager.get_local_committee_info(epoch).await?; - let local_shard_group = committee_shard.shard_group(); - let shard_groups = msg - .transaction - .all_inputs_iter() - .map(|s| { - s.or_zero_version() - .to_substate_address() - .to_shard_group(self.num_preshards, n) - }) - .chain(iter::once( - msg.transaction - .id() - .to_substate_address() - .to_shard_group(self.num_preshards, n), - )) - .filter(|sg| exclude_shard_group.as_ref() != Some(sg) && sg != &local_shard_group) - .collect::>(); - // If the only shard group involved is the excluded one. - if shard_groups.is_empty() { - return Ok(()); - } + let committee_info = self.epoch_manager.get_local_committee_info(epoch).await?; + let local_shard_group = committee_info.shard_group(); + let shard_groups = if msg.transaction.is_global() { + Box::new( + committee_info + .all_shard_groups_iter() + .filter(|sg| exclude_shard_group.as_ref() != Some(sg) && sg != &local_shard_group), + ) + } else { + let shard_groups = msg + .transaction + .all_inputs_iter() + .map(|s| { + s.or_zero_version() + .to_substate_address() + .to_shard_group(committee_info.num_preshards(), n) + }) + .chain(iter::once( + msg.transaction + .id() + .to_substate_address() + .to_shard_group(committee_info.num_preshards(), n), + )) + .filter(|sg| exclude_shard_group.as_ref() != Some(sg) && sg != &local_shard_group) + .collect::>(); + // If the only shard group involved is the excluded one. + if shard_groups.is_empty() { + return Ok(()); + } + Box::new(shard_groups.into_iter()) as Box + Send> + }; let msg = self .codec diff --git a/applications/tari_validator_node/src/p2p/services/mempool/initializer.rs b/applications/tari_validator_node/src/p2p/services/mempool/initializer.rs index 2bb4d622e..b28753b9e 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/initializer.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/initializer.rs @@ -22,7 +22,7 @@ use libp2p::{gossipsub, PeerId}; use log::*; -use tari_dan_common_types::{NumPreshards, PeerAddress}; +use tari_dan_common_types::PeerAddress; use tari_dan_p2p::TariMessagingSpec; use tari_epoch_manager::base_layer::EpochManagerHandle; use tari_networking::NetworkingHandle; @@ -42,7 +42,6 @@ use crate::{ const LOG_TARGET: &str = "tari::dan::validator_node::mempool"; pub fn spawn( - num_preshards: NumPreshards, epoch_manager: EpochManagerHandle, transaction_validator: TValidator, state_store: SqliteStateStore, @@ -61,7 +60,6 @@ where #[cfg(feature = "metrics")] let metrics = PrometheusMempoolMetrics::new(metrics_registry); let mempool = MempoolService::new( - num_preshards, rx_mempool_request, epoch_manager, transaction_validator, diff --git a/applications/tari_validator_node/src/p2p/services/mempool/service.rs b/applications/tari_validator_node/src/p2p/services/mempool/service.rs index d0a4d3c7a..813145657 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/service.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/service.rs @@ -25,7 +25,7 @@ use std::{collections::HashSet, fmt::Display, iter}; use libp2p::{gossipsub, PeerId}; use log::*; use tari_consensus::hotstuff::HotstuffEvent; -use tari_dan_common_types::{optional::Optional, NumPreshards, PeerAddress, ShardGroup, ToSubstateAddress}; +use tari_dan_common_types::{optional::Optional, PeerAddress, ShardGroup, ToSubstateAddress}; use tari_dan_p2p::{DanMessage, NewTransactionMessage, TariMessagingSpec}; use tari_dan_storage::{consensus_models::TransactionRecord, StateStore}; use tari_engine_types::commit_result::RejectReason; @@ -67,7 +67,6 @@ impl MempoolService where TValidator: Validator { pub(super) fn new( - num_preshards: NumPreshards, mempool_requests: mpsc::Receiver, epoch_manager: EpochManagerHandle, before_execute_validator: TValidator, @@ -78,7 +77,7 @@ where TValidator: Validator Self { Self { - gossip: MempoolGossip::new(num_preshards, epoch_manager.clone(), networking, rx_gossip), + gossip: MempoolGossip::new(epoch_manager.clone(), networking, rx_gossip), transactions: Default::default(), mempool_requests, epoch_manager, diff --git a/dan_layer/common_types/src/committee.rs b/dan_layer/common_types/src/committee.rs index 606e03e75..a9a2e7ced 100644 --- a/dan_layer/common_types/src/committee.rs +++ b/dan_layer/common_types/src/committee.rs @@ -232,6 +232,9 @@ impl CommitteeInfo { } pub fn includes_substate_id(&self, substate_id: &SubstateId) -> bool { + if substate_id.is_global() { + return true; + } // version doesnt affect shard let addr = SubstateAddress::from_substate_id(substate_id, 0); let shard = addr.to_shard(self.num_shards); @@ -265,6 +268,10 @@ impl CommitteeInfo { .into_iter() .filter(|substate_address| self.includes_substate_address(substate_address.borrow())) } + + pub fn all_shard_groups_iter(&self) -> impl Iterator { + self.num_shards.all_shard_groups_iter(self.num_committees) + } } #[derive(Debug, Clone, Serialize)] diff --git a/dan_layer/common_types/src/lock_intent.rs b/dan_layer/common_types/src/lock_intent.rs index ad302cb75..e8453b916 100644 --- a/dan_layer/common_types/src/lock_intent.rs +++ b/dan_layer/common_types/src/lock_intent.rs @@ -51,6 +51,10 @@ impl SubstateLockType { pub fn is_output(&self) -> bool { matches!(self, Self::Output) } + + pub fn is_input(&self) -> bool { + !self.is_output() + } } impl fmt::Display for SubstateLockType { diff --git a/dan_layer/common_types/src/num_preshards.rs b/dan_layer/common_types/src/num_preshards.rs index b5835a071..61b2e9eb0 100644 --- a/dan_layer/common_types/src/num_preshards.rs +++ b/dan_layer/common_types/src/num_preshards.rs @@ -5,6 +5,8 @@ use std::{error::Error, fmt::Display}; use serde::{Deserialize, Serialize}; +use crate::ShardGroup; + #[cfg_attr( feature = "ts", derive(ts_rs::TS), @@ -33,6 +35,27 @@ impl NumPreshards { pub fn is_one(self) -> bool { self == Self::P1 } + + pub fn all_shard_groups_iter(&self, num_committees: u32) -> impl Iterator { + let num_shards = self.as_u32(); + let num_shards_per_committee = num_shards / num_committees; + let mut remainder = num_shards % num_committees; + let mut start = 0; + let mut end = num_shards_per_committee - 1; + std::iter::from_fn(move || { + if start >= num_shards { + return None; + } + if remainder > 0 { + end += 1; + remainder -= 1; + } + let group = ShardGroup::new(start, end); + start = end + 1; + end = start + num_shards_per_committee - 1; + Some(group) + }) + } } impl TryFrom for NumPreshards { @@ -76,3 +99,31 @@ impl Display for InvalidNumPreshards { write!(f, "{} is not a valid number of pre-shards", self.0) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn it_calculates_all_shard_groups() { + let num_preshards = NumPreshards::P256; + let num_committees = 3; + let groups: Vec<_> = num_preshards.all_shard_groups_iter(num_committees).collect(); + assert_eq!(groups.len(), 3); + assert_eq!(groups[0], ShardGroup::new(0, 85)); + assert_eq!(groups[0].len(), 86); + assert_eq!(groups[1], ShardGroup::new(86, 170)); + assert_eq!(groups[1].len(), 85); + assert_eq!(groups[2], ShardGroup::new(171, 255)); + assert_eq!(groups[2].len(), 85); + } + + #[test] + fn total_shard_group_lengths_equal_num_preshards() { + let num_preshards = NumPreshards::P256; + let num_committees = 234; + let groups: Vec<_> = num_preshards.all_shard_groups_iter(num_committees).collect(); + let total_length = groups.iter().map(|g| g.len()).sum::(); + assert_eq!(total_length, num_preshards.as_u32() as usize); + } +} diff --git a/dan_layer/common_types/src/services/template_provider.rs b/dan_layer/common_types/src/services/template_provider.rs index 684092e6c..629bdbfab 100644 --- a/dan_layer/common_types/src/services/template_provider.rs +++ b/dan_layer/common_types/src/services/template_provider.rs @@ -1,7 +1,6 @@ // Copyright 2022 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use tari_common_types::types::PublicKey; use tari_engine_types::TemplateAddress; pub trait TemplateProvider: Send + Sync + Clone + 'static { @@ -9,11 +8,4 @@ pub trait TemplateProvider: Send + Sync + Clone + 'static { type Error: std::error::Error + Sync + Send + 'static; fn get_template_module(&self, id: &TemplateAddress) -> Result, Self::Error>; - - fn add_wasm_template( - &self, - author_public_key: PublicKey, - template_address: TemplateAddress, - template: &[u8], - ) -> Result<(), Self::Error>; } diff --git a/dan_layer/common_types/src/substate_address.rs b/dan_layer/common_types/src/substate_address.rs index 6a780b57a..43bc23632 100644 --- a/dan_layer/common_types/src/substate_address.rs +++ b/dan_layer/common_types/src/substate_address.rs @@ -600,6 +600,22 @@ mod tests { } } + #[test] + fn it_matches_num_preshard_all_shard_iter() { + const NUM_COMMITTEES: u32 = 11; + let groups = (0..NUM_COMMITTEES).map(|i| { + address_at(i * (256 / NUM_COMMITTEES + 1), 256).to_shard_group(NumPreshards::P256, NUM_COMMITTEES) + }); + let mut iter = NumPreshards::P256.all_shard_groups_iter(NUM_COMMITTEES); + let mut total_length = 0; + for (i, group) in groups.enumerate() { + assert_eq!(iter.next(), Some(group), "Failed at {group} (i={i})"); + total_length += group.len(); + } + assert_eq!(iter.next(), None); + assert_eq!(total_length, 256); + } + #[test] fn it_returns_the_correct_shard_group_for_odd_num_committees() { // All shard groups except the last have 3 shards each diff --git a/dan_layer/consensus/src/hotstuff/block_change_set.rs b/dan_layer/consensus/src/hotstuff/block_change_set.rs index 1ab74c8a4..5104420da 100644 --- a/dan_layer/consensus/src/hotstuff/block_change_set.rs +++ b/dan_layer/consensus/src/hotstuff/block_change_set.rs @@ -24,7 +24,6 @@ use tari_dan_storage::{ LockedBlock, NoVoteReason, PendingShardStateTreeDiff, - QuorumCertificate, QuorumDecision, SubstateChange, SubstateLock, @@ -59,9 +58,8 @@ const MEM_MAX_PROPOSED_UTXO_MINTS_SIZE: usize = 1000; #[derive(Debug, Clone)] pub struct BlockDecision { pub quorum_decision: Option, - /// Contains newly-locked non-dummy blocks and the QC that justifies each block i.e. typically the parent block's - /// QC - pub locked_blocks: Vec<(Block, QuorumCertificate)>, + /// Contains newly-locked non-dummy blocks + pub locked_blocks: Vec, pub finalized_transactions: Vec>, pub end_of_epoch: Option, pub high_qc: HighQc, diff --git a/dan_layer/consensus/src/hotstuff/foreign_proposal_processor.rs b/dan_layer/consensus/src/hotstuff/foreign_proposal_processor.rs index dc21104d2..88df8fe52 100644 --- a/dan_layer/consensus/src/hotstuff/foreign_proposal_processor.rs +++ b/dan_layer/consensus/src/hotstuff/foreign_proposal_processor.rs @@ -187,7 +187,7 @@ pub fn process_foreign_block( tx_rec.current_stage() ); } - } else if tx_rec.current_stage().is_local_prepared() && tx_rec.evidence().all_inputs_prepared() { + } else if tx_rec.current_stage().is_local_prepared() && tx_rec.evidence().all_shard_groups_prepared() { // If all shards are complete, and we've already received our LocalPrepared, we can set out // LocalPrepared transaction as ready to propose ACCEPT. If we have not received // the local LocalPrepared, the transition will happen when we receive the local @@ -209,7 +209,7 @@ pub fn process_foreign_block( tx_rec.transaction_id(), tx_rec.current_decision(), tx_rec.current_stage(), - tx_rec.evidence().all_inputs_prepared() + tx_rec.evidence().all_shard_groups_prepared() ); // Update the evidence proposed_block_change_set.set_next_transaction_update(tx_rec)?; @@ -363,7 +363,7 @@ pub fn process_foreign_block( tx_rec.transaction_id(), tx_rec.current_decision(), tx_rec.current_stage(), - tx_rec.evidence().all_addresses_accepted() + tx_rec.evidence().all_objects_accepted() ); // Still need to update the evidence proposed_block_change_set.set_next_transaction_update(tx_rec)?; diff --git a/dan_layer/consensus/src/hotstuff/on_propose.rs b/dan_layer/consensus/src/hotstuff/on_propose.rs index e670f6772..7f49c3ddc 100644 --- a/dan_layer/consensus/src/hotstuff/on_propose.rs +++ b/dan_layer/consensus/src/hotstuff/on_propose.rs @@ -247,7 +247,7 @@ where TConsensusSpec: ConsensusSpec if local_committee_info.num_shard_group_members() <= 1 { info!( target: LOG_TARGET, - "🌿 Only member of local committee. No need to multicast proposal {leaf_block}", + "🌿 This node is the only member of the local committee. No need to multicast proposal {leaf_block}", ); } else { let committee = self @@ -816,6 +816,11 @@ where TConsensusSpec: ConsensusSpec tx_rec .evidence_mut() .update(&multishard.to_initial_evidence(local_committee_info)); + if true { + for shard_group in local_committee_info.all_shard_groups_iter() { + tx_rec.evidence_mut().add_shard_group(shard_group); + } + } } }, Decision::Abort(reason) => { @@ -868,9 +873,7 @@ where TConsensusSpec: ConsensusSpec .resulting_outputs() .iter() .filter(|o| { - o.substate_id().is_transaction_receipt() || - o.substate_id().is_published_template() || - local_committee_info.includes_substate_address(&o.to_substate_address()) + o.substate_id().is_transaction_receipt() || local_committee_info.includes_substate_id(o.substate_id()) }) .map(|output| SubstateRequirementLockIntent::from(output.clone())); let lock_status = substate_store.try_lock_all(*tx_rec.transaction_id(), local_outputs, false)?; diff --git a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs index 7d326ce1c..6590fb847 100644 --- a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs +++ b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs @@ -149,9 +149,9 @@ where TConsensusSpec: ConsensusSpec // Update nodes let high_qc = valid_block.block().update_nodes( tx, - |tx, _prev_locked, block, justify_qc| { + |tx, _prev_locked, block, _justify_qc| { if !block.is_dummy() { - locked_blocks.push((block.clone(), justify_qc.clone())); + locked_blocks.push(block.clone()); } self.on_lock_block(tx, block) }, @@ -987,14 +987,14 @@ where TConsensusSpec: ConsensusSpec })); } - if !tx_rec.evidence().all_inputs_prepared() { + if !tx_rec.evidence().all_shard_groups_prepared() { warn!( target: LOG_TARGET, - "❌ NO VOTE: AllPrepare disagreement for transaction {} in block {}. Leader proposed that all inputs are justified, but not all inputs are justified", + "❌ NO VOTE: AllPrepare disagreement for transaction {} in block {}. Leader proposed that all shard groups have prepared, but this is not the case", tx_rec.transaction_id(), block, ); - return Ok(Some(NoVoteReason::NotAllInputsPrepared)); + return Ok(Some(NoVoteReason::NotAllShardGroupsPrepared)); } let maybe_execution = if tx_rec.current_decision().is_commit() { @@ -1029,7 +1029,7 @@ where TConsensusSpec: ConsensusSpec // Lock all local outputs let local_outputs = execution.resulting_outputs().iter().filter(|o| { o.substate_id().is_transaction_receipt() || - local_committee_info.includes_substate_address(&o.to_substate_address()) + local_committee_info.includes_substate_id(o.substate_id()) }); let lock_status = substate_store.try_lock_all(*tx_rec.transaction_id(), local_outputs, false)?; if let Some(err) = lock_status.failures().first() { diff --git a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs index 5ce3eeae3..04776f4aa 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs @@ -16,16 +16,7 @@ use tari_dan_common_types::{ ShardGroup, }; use tari_dan_storage::{ - consensus_models::{ - Block, - HighQc, - LastSentVote, - QuorumCertificate, - QuorumDecision, - TransactionPool, - ValidBlock, - Vote, - }, + consensus_models::{Block, HighQc, LastSentVote, QuorumDecision, TransactionPool, ValidBlock, Vote}, StateStore, StateStoreWriteTransaction, }; @@ -507,11 +498,7 @@ impl OnReceiveLocalProposalHandler, - ) { + fn propose_newly_locked_blocks(&mut self, local_committee_info: CommitteeInfo, blocks: Vec) { if blocks.is_empty() { return; } @@ -881,7 +868,7 @@ async fn propose_newly_locked_blocks_task( outbound_messaging: TConsensusSpec::OutboundMessaging, num_preshards: NumPreshards, local_committee_info: CommitteeInfo, - blocks: Vec<(Block, QuorumCertificate)>, + blocks: Vec, ) { let _timer = TraceTimer::debug(LOG_TARGET, "propose_newly_locked_blocks_task").with_iterations(blocks.len()); if let Err(err) = propose_newly_locked_blocks_task_inner::( @@ -902,16 +889,15 @@ async fn propose_newly_locked_blocks_task_inner( mut outbound_messaging: TConsensusSpec::OutboundMessaging, num_preshards: NumPreshards, local_committee_info: &CommitteeInfo, - blocks: Vec<(Block, QuorumCertificate)>, + blocks: Vec, ) -> Result<(), HotStuffError> { - for (block, justify_qc) in blocks.into_iter().rev() { + for block in blocks.into_iter().rev() { broadcast_foreign_proposal_if_required::( &mut outbound_messaging, &epoch_manager, num_preshards, local_committee_info, block, - justify_qc, ) .await?; } @@ -924,7 +910,6 @@ async fn broadcast_foreign_proposal_if_required( num_preshards: NumPreshards, local_committee_info: &CommitteeInfo, block: Block, - justify_qc: QuorumCertificate, ) -> Result<(), HotStuffError> { let num_committees = epoch_manager.get_num_committees(block.epoch()).await?; @@ -939,25 +924,28 @@ async fn broadcast_foreign_proposal_if_required( .filter(|atom| !atom.evidence.is_committee_output_only(local_committee_info)) .or_else(|| c.local_accept()) }) - .flat_map(|p| p.evidence.shard_groups_iter().copied()) + .flat_map(|p| { + debug!( + target: LOG_TARGET, + "🌐 FOREIGN PROPOSE: atom {p}", + ); + p.evidence.shard_groups_iter().copied() + }) .filter(|shard_group| local_shard_group != *shard_group) .collect::>(); if non_local_shard_groups.is_empty() { debug!( target: LOG_TARGET, - "🌐 No foreign shards involved for new locked block {}", + "🌐 No foreign shards apply to new locked block {}", block, ); return Ok(()); } info!( target: LOG_TARGET, - "🌐 BROADCASTING new locked block {} to {} foreign shard groups. justify: {} ({}), parent: {}", - block, + "🌐 FOREIGN PROPOSE: new locked block to {} foreign shard groups. {}", non_local_shard_groups.len(), - justify_qc.block_id(), - justify_qc.block_height(), - block.parent() + block, ); for shard_group in non_local_shard_groups { diff --git a/dan_layer/consensus/src/hotstuff/on_receive_new_transaction.rs b/dan_layer/consensus/src/hotstuff/on_receive_new_transaction.rs index fb907147f..0954b16a8 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_new_transaction.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_new_transaction.rs @@ -153,8 +153,13 @@ where TConsensusSpec: ConsensusSpec transaction: &TransactionRecord, is_ready: bool, ) -> Result<(), HotStuffError> { - self.transaction_pool - .insert_new(tx, *transaction.id(), transaction.current_decision(), is_ready)?; + self.transaction_pool.insert_new( + tx, + *transaction.id(), + transaction.current_decision(), + is_ready, + transaction.transaction().is_global(), + )?; Ok(()) } } diff --git a/dan_layer/consensus/src/hotstuff/transaction_manager/manager.rs b/dan_layer/consensus/src/hotstuff/transaction_manager/manager.rs index d7dcf92e7..cd9ca027f 100644 --- a/dan_layer/consensus/src/hotstuff/transaction_manager/manager.rs +++ b/dan_layer/consensus/src/hotstuff/transaction_manager/manager.rs @@ -223,7 +223,7 @@ impl> ))); } - if non_local_inputs.is_empty() { + if !transaction.transaction.is_global() && non_local_inputs.is_empty() { // CASE: All inputs are local and we can execute the transaction. // Outputs may or may not be local let local_inputs = store.get_many(local_versions.iter().map(|(req, v)| (req.clone(), *v)))?; @@ -305,6 +305,7 @@ impl> warn!(target: LOG_TARGET, "⚠️ PREPARE: Hard conflict when locking inputs: {err}"); transaction.set_abort_reason(RejectReason::FailedToLockInputs(err.to_string())); } + // CASE: Multishard transaction, not executed Ok(PreparedTransaction::new_multishard( transaction.into_execution(), diff --git a/dan_layer/consensus/src/hotstuff/transaction_manager/prepared.rs b/dan_layer/consensus/src/hotstuff/transaction_manager/prepared.rs index 609d959bb..94bfb2f2f 100644 --- a/dan_layer/consensus/src/hotstuff/transaction_manager/prepared.rs +++ b/dan_layer/consensus/src/hotstuff/transaction_manager/prepared.rs @@ -132,8 +132,8 @@ impl MultiShardPreparedTransaction { outputs, ); - // Add foreign involved shard groups without adding any substates (because we do not know the pledged version - // yet) + // Add foreign involved shard groups without adding any substates (because we do not know the pledged + // version yet) self.foreign_inputs() .iter() .map(|r| { diff --git a/dan_layer/consensus_tests/fixtures/state.wasm b/dan_layer/consensus_tests/fixtures/state.wasm new file mode 100755 index 000000000..c7d0d4dc9 Binary files /dev/null and b/dan_layer/consensus_tests/fixtures/state.wasm differ diff --git a/dan_layer/consensus_tests/src/consensus.rs b/dan_layer/consensus_tests/src/consensus.rs index f5189bb14..53267ceeb 100644 --- a/dan_layer/consensus_tests/src/consensus.rs +++ b/dan_layer/consensus_tests/src/consensus.rs @@ -12,13 +12,23 @@ use std::time::Duration; use tari_common_types::types::PrivateKey; use tari_consensus::{hotstuff::HotStuffError, messages::HotstuffMessage}; -use tari_dan_common_types::{optional::Optional, Epoch, LockIntent, NodeHeight, SubstateRequirement}; +use tari_dan_common_types::{ + crypto::create_key_pair, + optional::Optional, + Epoch, + LockIntent, + NodeHeight, + SubstateRequirement, + ToSubstateAddress, + VersionedSubstateId, +}; use tari_dan_storage::{ consensus_models::{ AbortReason, BlockId, Command, Decision, + SubstateRecord, SubstateRequirementLockIntent, TransactionRecord, VersionedSubstateIdLockIntent, @@ -26,12 +36,17 @@ use tari_dan_storage::{ StateStore, StateStoreReadTransaction, }; -use tari_engine_types::commit_result::RejectReason; +use tari_engine_types::{ + commit_result::RejectReason, + published_template::PublishedTemplateAddress, + substate::SubstateId, +}; use tari_transaction::Transaction; use crate::support::{ build_transaction_from, helpers, + load_binary_fixture, logging::setup_logger, ExecuteSpec, Test, @@ -287,6 +302,7 @@ async fn node_requests_missing_transaction_from_local_leader() { async fn multi_shard_single_transaction() { setup_logger(); let mut test = Test::builder() + .debug_sql("/tmp/test{}.db") .add_committee(0, vec!["1"]) .add_committee(1, vec!["2"]) .start() @@ -1307,6 +1323,73 @@ async fn leader_failure_node_goes_down_and_gets_evicted() { test.assert_clean_shutdown_except(&[failure_node]).await; } +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn multishard_publish_template() { + setup_logger(); + let mut test = Test::builder() + .add_committee(0, vec!["1", "2"]) + .add_committee(1, vec!["3", "4"]) + .add_committee(2, vec!["5", "6"]) + .add_committee(3, vec!["7", "8"]) + .start() + .await; + // First get transaction in the mempool + let inputs = test.create_substates_on_vns(TestVnDestination::All, 1); + let (sk, pk) = create_key_pair(); + let wasm = load_binary_fixture("state.wasm"); + let tx = Transaction::builder() + .publish_template(wasm.clone()) + .with_inputs(inputs.iter().cloned().map(Into::into)) + .build_and_seal(&sk); + let tx = TransactionRecord::new(tx); + + test.send_transaction_to_destination(TestVnDestination::All, tx.clone()) + .await; + let template_id = PublishedTemplateAddress::from_author_and_code(&pk, &wasm); + test.add_execution_at_destination(TestVnDestination::All, ExecuteSpec { + transaction: tx.transaction().clone(), + decision: Decision::Commit, + fee: 1, + inputs: inputs + .into_iter() + .map(|input| VersionedSubstateIdLockIntent::write(input, true).into()) + .collect(), + new_outputs: vec![SubstateId::Template(template_id)], + }); + + test.start_epoch(Epoch(1)).await; + + loop { + test.on_block_committed().await; + + if test.is_transaction_pool_empty() { + break; + } + let leaf = test.get_validator(&TestAddress::new("1")).get_leaf_block(); + if leaf.height >= NodeHeight(30) { + panic!("Not all transaction committed after {} blocks", leaf.height); + } + } + + test.assert_all_validators_at_same_height().await; + test.assert_all_validators_committed(); + + // Assert all LocalOnly + let template_substate = test + .get_validator(&TestAddress::new("1")) + .state_store + .with_read_tx(|tx| SubstateRecord::get(tx, &VersionedSubstateId::new(template_id, 0).to_substate_address())) + .unwrap(); + let binary = template_substate + .substate_value + .into_template() + .expect("Expected template substate") + .binary; + assert_eq!(binary, wasm, "Template binary does not match"); + + test.assert_clean_shutdown().await; +} + // mod dump_data { // use super::*; // use std::fs::File; diff --git a/dan_layer/consensus_tests/src/dummy_blocks.rs b/dan_layer/consensus_tests/src/dummy_blocks.rs index 041e06b4c..39d7ff981 100644 --- a/dan_layer/consensus_tests/src/dummy_blocks.rs +++ b/dan_layer/consensus_tests/src/dummy_blocks.rs @@ -12,7 +12,7 @@ use tari_crypto::{keys::PublicKey as _, tari_utilities::ByteArray}; use tari_dan_common_types::{committee::Committee, DerivableFromPublicKey, Epoch, NodeHeight, PeerAddress, ShardGroup}; use tari_dan_storage::consensus_models::Block; -use crate::support::{load_fixture, RoundRobinLeaderStrategy}; +use crate::support::{load_json_fixture, RoundRobinLeaderStrategy}; #[test] fn dummy_blocks() { @@ -74,9 +74,9 @@ fn public_key_from_seed(seed: u8) -> PublicKey { #[test] fn last_matches_generated_using_real_data() { - let candidate = load_fixture::("block_with_dummies.json"); + let candidate = load_json_fixture::("block_with_dummies.json"); - let committee = load_fixture::("committee.json"); + let committee = load_json_fixture::("committee.json"); let committee: Vec<(PeerAddress, PublicKey)> = serde_json::from_value(committee["members"].clone()).unwrap(); let committee = Committee::new(committee); diff --git a/dan_layer/consensus_tests/src/eviction_proof.rs b/dan_layer/consensus_tests/src/eviction_proof.rs index ae604dd3e..275707839 100644 --- a/dan_layer/consensus_tests/src/eviction_proof.rs +++ b/dan_layer/consensus_tests/src/eviction_proof.rs @@ -4,11 +4,11 @@ use tari_consensus::hotstuff::eviction_proof::convert_block_to_sidechain_block_header; use tari_dan_storage::consensus_models::Block; -use crate::support::load_fixture; +use crate::support::load_json_fixture; #[test] fn it_produces_a_summarized_header_that_hashes_to_the_original() { - let block = load_fixture::("block.json"); + let block = load_json_fixture::("block.json"); let sidechain_block = convert_block_to_sidechain_block_header(block.header()); assert_eq!(sidechain_block.extra_data_hash, block.header().create_extra_data_hash()); assert_eq!( diff --git a/dan_layer/consensus_tests/src/support/fixtures.rs b/dan_layer/consensus_tests/src/support/fixtures.rs index 408a76994..b10672912 100644 --- a/dan_layer/consensus_tests/src/support/fixtures.rs +++ b/dan_layer/consensus_tests/src/support/fixtures.rs @@ -1,12 +1,24 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause +use std::io::Read; + use serde::de::DeserializeOwned; -pub fn load_fixture(name: &str) -> T { +pub fn load_json_fixture(name: &str) -> T { let path = format!("fixtures/{name}"); let file = std::fs::File::open(&path).unwrap_or_else(|_| { panic!("Could not open fixture file at path: {path}"); }); serde_json::from_reader(file).unwrap() } + +pub fn load_binary_fixture(name: &str) -> Vec { + let path = format!("fixtures/{name}"); + let mut file = std::fs::File::open(&path).unwrap_or_else(|_| { + panic!("Could not open fixture file at path: {path}"); + }); + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer).unwrap(); + buffer +} diff --git a/dan_layer/consensus_tests/src/support/transaction.rs b/dan_layer/consensus_tests/src/support/transaction.rs index 6263af0b3..d22dfad68 100644 --- a/dan_layer/consensus_tests/src/support/transaction.rs +++ b/dan_layer/consensus_tests/src/support/transaction.rs @@ -10,6 +10,7 @@ use tari_engine_types::{ commit_result::{ExecuteResult, FinalizeResult, RejectReason, TransactionResult}, component::{ComponentBody, ComponentHeader}, fees::{FeeBreakdown, FeeReceipt}, + published_template::PublishedTemplate, substate::{Substate, SubstateDiff, SubstateId}, transaction_receipt::{TransactionReceipt, TransactionReceiptAddress}, }; @@ -45,31 +46,48 @@ pub fn create_execution_result_for_transaction( if output.substate_id().is_transaction_receipt() { continue; } - assert!( - output.substate_id().is_component(), - "create_execution_result_for_transaction: Test harness only supports generating component outputs. \ - Got {output}" - ); - // Generate consistent state for the component by simply using the ID - let state = tari_bor::to_value(output.versioned_substate_id()).unwrap(); - diff.up( - output.versioned_substate_id().substate_id.clone(), - Substate::new(output.versioned_substate_id().version, ComponentHeader { - template_address: Default::default(), - module_name: "Test".to_string(), - owner_key: Default::default(), - owner_rule: Default::default(), - access_rules: Default::default(), - entity_id: output - .versioned_substate_id() - .substate_id - .as_component_address() - .unwrap() - .entity_id(), - body: ComponentBody { state }, - }), - ) + match output.substate_id() { + SubstateId::Component(_) => { + // Generate consistent state for the component by simply using the ID + let state = tari_bor::to_value(output.versioned_substate_id()).unwrap(); + diff.up( + output.versioned_substate_id().substate_id.clone(), + Substate::new(output.versioned_substate_id().version, ComponentHeader { + template_address: Default::default(), + module_name: "Test".to_string(), + owner_key: Default::default(), + owner_rule: Default::default(), + access_rules: Default::default(), + entity_id: output + .versioned_substate_id() + .substate_id + .as_component_address() + .unwrap() + .entity_id(), + body: ComponentBody { state }, + }), + ); + }, + SubstateId::Template(_) => { + let binary = transaction + .instructions() + .iter() + .find_map(|i| i.published_template_binary()) + .expect("No publish template instruction found in transaction") + .to_vec(); + diff.up( + output.versioned_substate_id().substate_id.clone(), + Substate::new(output.versioned_substate_id().version, PublishedTemplate { binary }), + ); + }, + _ => { + panic!( + "create_execution_result_for_transaction: Test harness only supports generating component and \ + template outputs. Got {output}" + ); + }, + } } // We MUST create the transaction receipt diff.up( diff --git a/dan_layer/engine_types/src/instruction.rs b/dan_layer/engine_types/src/instruction.rs index e8ed1f79d..af8b3b861 100644 --- a/dan_layer/engine_types/src/instruction.rs +++ b/dan_layer/engine_types/src/instruction.rs @@ -72,6 +72,15 @@ pub enum Instruction { }, } +impl Instruction { + pub fn published_template_binary(&self) -> Option<&[u8]> { + match self { + Self::PublishTemplate { binary } => Some(binary), + _ => None, + } + } +} + impl Display for Instruction { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { diff --git a/dan_layer/engine_types/src/substate.rs b/dan_layer/engine_types/src/substate.rs index 30e96bc72..fa47aed5c 100644 --- a/dan_layer/engine_types/src/substate.rs +++ b/dan_layer/engine_types/src/substate.rs @@ -284,6 +284,10 @@ impl SubstateId { matches!(self, Self::Template(_)) } + pub fn is_global(&self) -> bool { + self.is_published_template() + } + pub fn is_read_only(&self) -> bool { matches!(self, Self::TransactionReceipt(_) | Self::Resource(_)) } diff --git a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql index b3a842441..e927531a6 100644 --- a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql +++ b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql @@ -272,28 +272,28 @@ CREATE INDEX locked_block_idx_epoch ON locked_block (epoch); create table transactions ( - id integer not null primary key AUTOINCREMENT, - network integer not NULL, - transaction_id text not null, - fee_instructions text not NULL, - instructions text not NULL, - inputs text not NULL, - filled_inputs text not NULL, - resolved_inputs text NULL, - resulting_outputs text NULL, - signatures text not NULL, - seal_signature text not NULL, - is_seal_signer_authorized boolean not NULL, - result text NULL, - execution_time_ms bigint NULL, - final_decision text NULL, - finalized_at timestamp NULL, - outcome TEXT NULL, - abort_details text NULL, - min_epoch BIGINT NULL, - max_epoch BIGINT NULL, - schema_version BIGINT NOT NULL, - created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP + id integer not null primary key AUTOINCREMENT, + network integer not NULL, + transaction_id text not null, + fee_instructions text not NULL, + instructions text not NULL, + inputs text not NULL, + filled_inputs text not NULL, + resolved_inputs text NULL, + resulting_outputs text NULL, + signatures text not NULL, + seal_signature text not NULL, + is_seal_signer_authorized boolean not NULL, + result text NULL, + execution_time_ms bigint NULL, + final_decision text NULL, + finalized_at timestamp NULL, + outcome TEXT NULL, + abort_details text NULL, + min_epoch BIGINT NULL, + max_epoch BIGINT NULL, + schema_version BIGINT NOT NULL, + created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ); create unique index transactions_uniq_idx_id on transactions (transaction_id); @@ -343,6 +343,7 @@ create table transaction_pool pending_stage text null, is_ready boolean not null, confirm_stage text null, + is_global boolean not NULL, updated_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, created_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (transaction_id) REFERENCES transactions (transaction_id) diff --git a/dan_layer/state_store_sqlite/src/schema.rs b/dan_layer/state_store_sqlite/src/schema.rs index 56c091f38..f51769f85 100644 --- a/dan_layer/state_store_sqlite/src/schema.rs +++ b/dan_layer/state_store_sqlite/src/schema.rs @@ -452,6 +452,7 @@ diesel::table! { pending_stage -> Nullable, is_ready -> Bool, confirm_stage -> Nullable, + is_global -> Bool, updated_at -> Timestamp, created_at -> Timestamp, } diff --git a/dan_layer/state_store_sqlite/src/sql_models/transaction_pool.rs b/dan_layer/state_store_sqlite/src/sql_models/transaction_pool.rs index a5aa9818f..2fb43a56e 100644 --- a/dan_layer/state_store_sqlite/src/sql_models/transaction_pool.rs +++ b/dan_layer/state_store_sqlite/src/sql_models/transaction_pool.rs @@ -31,6 +31,7 @@ pub struct TransactionPoolRecord { pub is_ready: bool, #[allow(dead_code)] pub confirm_stage: Option, + pub is_global: bool, #[allow(dead_code)] pub updated_at: PrimitiveDateTime, #[allow(dead_code)] @@ -72,6 +73,7 @@ impl TransactionPoolRecord { Ok(consensus_models::TransactionPoolRecord::load( deserialize_hex_try_from(&self.transaction_id)?, evidence, + self.is_global, transaction_fee as u64, leader_fee, parse_from_string(&self.stage)?, diff --git a/dan_layer/state_store_sqlite/src/writer.rs b/dan_layer/state_store_sqlite/src/writer.rs index b37efa5c3..3be0699b1 100644 --- a/dan_layer/state_store_sqlite/src/writer.rs +++ b/dan_layer/state_store_sqlite/src/writer.rs @@ -1047,6 +1047,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta tx_id: TransactionId, decision: Decision, is_ready: bool, + is_global: bool, ) -> Result<(), StorageError> { use crate::schema::transaction_pool; @@ -1055,6 +1056,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta transaction_pool::original_decision.eq(decision.to_string()), transaction_pool::stage.eq(TransactionPoolStage::New.to_string()), transaction_pool::is_ready.eq(is_ready), + transaction_pool::is_global.eq(is_global), ); diesel::insert_into(transaction_pool::table) diff --git a/dan_layer/state_store_sqlite/tests/tests.rs b/dan_layer/state_store_sqlite/tests/tests.rs index 91e951f3f..6b0330541 100644 --- a/dan_layer/state_store_sqlite/tests/tests.rs +++ b/dan_layer/state_store_sqlite/tests/tests.rs @@ -72,9 +72,12 @@ mod confirm_all_transitions { .unwrap(); block1.insert(&mut tx).unwrap(); - tx.transaction_pool_insert_new(atom1.id, atom1.decision, true).unwrap(); - tx.transaction_pool_insert_new(atom2.id, atom2.decision, true).unwrap(); - tx.transaction_pool_insert_new(atom3.id, atom3.decision, true).unwrap(); + tx.transaction_pool_insert_new(atom1.id, atom1.decision, true, false) + .unwrap(); + tx.transaction_pool_insert_new(atom2.id, atom2.decision, true, false) + .unwrap(); + tx.transaction_pool_insert_new(atom3.id, atom3.decision, true, false) + .unwrap(); let block_id = *block1.id(); let transactions = tx.transaction_pool_get_all().unwrap(); diff --git a/dan_layer/storage/src/consensus_models/block_diff.rs b/dan_layer/storage/src/consensus_models/block_diff.rs index 9f3ddcede..8eb5eebb0 100644 --- a/dan_layer/storage/src/consensus_models/block_diff.rs +++ b/dan_layer/storage/src/consensus_models/block_diff.rs @@ -44,9 +44,8 @@ impl BlockDiff { .into_iter() // Commit all substates included in this shard. Every involved validator commits the transaction receipt. .filter(|change| - change.versioned_substate_id().substate_id.is_published_template() || change.versioned_substate_id().substate_id.is_transaction_receipt() || - info.includes_substate_address(&change.to_substate_address()) + info.includes_substate_id(change.versioned_substate_id().substate_id()) ) .collect(), } diff --git a/dan_layer/storage/src/consensus_models/evidence.rs b/dan_layer/storage/src/consensus_models/evidence.rs index 5a39597eb..c8b5a1994 100644 --- a/dan_layer/storage/src/consensus_models/evidence.rs +++ b/dan_layer/storage/src/consensus_models/evidence.rs @@ -65,23 +65,24 @@ impl Evidence { Evidence { evidence } } - pub fn all_addresses_accepted(&self) -> bool { + pub fn all_objects_accepted(&self) -> bool { // CASE: all inputs and outputs are accept justified. If they have been accept justified, they have implicitly // been prepare justified. This may happen if the local node is only involved in outputs (and therefore // sequences using the LocalAccept foreign proposal) self.evidence.values().all(|e| e.is_accept_justified()) } - pub fn all_inputs_prepared(&self) -> bool { + pub fn all_shard_groups_prepared(&self) -> bool { self.evidence .values() + // .filter(|e| { + // // CASE: we only require input shard groups to prepare + // e.substates().values().any(|e| e.is_input()) + // }) // CASE: we use prepare OR accept because inputs can only be accept justified if they were prepared. Prepared // may be implicit (null) if the local node is only involved in outputs (and therefore sequences using the LocalAccept // foreign proposal) - .all(|e| { - e.is_prepare_justified() || e.is_accept_justified() - - }) + .all(|e| e.is_prepare_justified() || e.is_accept_justified()) } /// Returns true if all substates in the given shard group are output locks. @@ -234,13 +235,11 @@ impl ShardGroupEvidence { } pub fn is_prepare_justified(&self) -> bool { - // No substates means that we have no pledges yet, so we cannot count this as justified - !self.substates.is_empty() && self.prepare_qc.is_some() + self.prepare_qc.is_some() } pub fn is_accept_justified(&self) -> bool { - // No substates means that we have no pledges yet, so we cannot count this as justified - !self.substates.is_empty() && self.accept_qc.is_some() + self.accept_qc.is_some() } pub fn substates(&self) -> &IndexMap { diff --git a/dan_layer/storage/src/consensus_models/no_vote.rs b/dan_layer/storage/src/consensus_models/no_vote.rs index adc81712b..c8909c0da 100644 --- a/dan_layer/storage/src/consensus_models/no_vote.rs +++ b/dan_layer/storage/src/consensus_models/no_vote.rs @@ -28,8 +28,8 @@ pub enum NoVoteReason { LocalOnlyProposedForMultiShard, #[error("Multi shard proposed for local only")] MultiShardProposedForLocalOnly, - #[error("Not all inputs prepared")] - NotAllInputsPrepared, + #[error("Not all shard groups are prepared")] + NotAllShardGroupsPrepared, #[error("Foreign proposal command in block missing")] ForeignProposalCommandInBlockMissing, #[error("Foreign proposal already proposed")] @@ -74,7 +74,7 @@ impl NoVoteReason { Self::NoLeaderFee => "NoLeaderFee", Self::LocalOnlyProposedForMultiShard => "LocalOnlyProposedForMultiShard", Self::MultiShardProposedForLocalOnly => "MultiShardProposedForLocalOnly", - Self::NotAllInputsPrepared => "NotAllInputsPrepared", + Self::NotAllShardGroupsPrepared => "NotAllShardGroupsPrepared", Self::ForeignProposalCommandInBlockMissing => "ForeignProposalCommandInBlockMissing", Self::ForeignProposalAlreadyProposed => "ForeignProposalAlreadyProposed", Self::ForeignProposalNotReceived => "ForeignProposalNotReceived", diff --git a/dan_layer/storage/src/consensus_models/transaction_pool.rs b/dan_layer/storage/src/consensus_models/transaction_pool.rs index cc6630d21..54781f408 100644 --- a/dan_layer/storage/src/consensus_models/transaction_pool.rs +++ b/dan_layer/storage/src/consensus_models/transaction_pool.rs @@ -86,8 +86,9 @@ impl TransactionPool { tx_id: TransactionId, decision: Decision, is_ready: bool, + is_global: bool, ) -> Result<(), TransactionPoolError> { - tx.transaction_pool_insert_new(tx_id, decision, is_ready)?; + tx.transaction_pool_insert_new(tx_id, decision, is_ready, is_global)?; Ok(()) } @@ -98,7 +99,12 @@ impl TransactionPool { ) -> Result<(), TransactionPoolError> { // TODO(perf) for (transaction, is_ready) in transactions { - tx.transaction_pool_insert_new(*transaction.id(), transaction.current_decision(), is_ready)?; + tx.transaction_pool_insert_new( + *transaction.id(), + transaction.current_decision(), + is_ready, + transaction.transaction().is_global(), + )?; } Ok(()) } @@ -332,6 +338,7 @@ pub struct TransactionPoolRecord { #[cfg_attr(feature = "ts", ts(type = "string"))] transaction_id: TransactionId, evidence: Evidence, + is_global: bool, #[cfg_attr(feature = "ts", ts(type = "number"))] transaction_fee: u64, leader_fee: Option, @@ -347,6 +354,7 @@ impl TransactionPoolRecord { pub fn load( id: TransactionId, evidence: Evidence, + is_global: bool, transaction_fee: u64, leader_fee: Option, stage: TransactionPoolStage, @@ -359,6 +367,7 @@ impl TransactionPoolRecord { Self { transaction_id: id, evidence, + is_global, transaction_fee, leader_fee, stage, @@ -381,12 +390,12 @@ impl TransactionPoolRecord { match next_stage { TransactionPoolStage::New => self.is_ready, TransactionPoolStage::Prepared => true, - TransactionPoolStage::LocalPrepared => self.evidence.all_inputs_prepared(), + TransactionPoolStage::LocalPrepared => self.evidence.all_shard_groups_prepared(), TransactionPoolStage::AllPrepared | TransactionPoolStage::SomePrepared => true, TransactionPoolStage::LocalAccepted => match self.current_decision() { - Decision::Commit => self.evidence.all_addresses_accepted(), + Decision::Commit => self.evidence.all_objects_accepted(), // If we have decided to abort, we can continue if all input addresses are justified - Decision::Abort(_) => self.evidence.all_inputs_prepared(), + Decision::Abort(_) => self.evidence.all_shard_groups_prepared(), }, TransactionPoolStage::AllAccepted | TransactionPoolStage::SomeAccepted | @@ -487,6 +496,10 @@ impl TransactionPoolRecord { } } + pub fn is_global(&self) -> bool { + self.is_global + } + pub fn calculate_leader_fee(&self, num_involved_shards: NonZeroU64, exhaust_divisor: u64) -> LeaderFee { let target_burn = self.transaction_fee.checked_div(exhaust_divisor).unwrap_or(0); let block_fee_after_burn = self.transaction_fee - target_burn; @@ -551,11 +564,21 @@ impl TransactionPoolRecord { let involved_locks = execution.resolved_inputs().iter().chain(execution.resulting_outputs()); for lock in involved_locks { - let addr = lock.to_substate_address(); - let shard_group = addr.to_shard_group(num_preshards, num_committees); - self.evidence_mut() - .add_shard_group(shard_group) - .insert(addr, lock.lock_type()); + if lock.versioned_substate_id().substate_id().is_global() { + // If global, all shard groups have this evidence + let addr = lock.to_substate_address(); + for shard_group in num_preshards.all_shard_groups_iter(num_committees) { + self.evidence_mut() + .add_shard_group(shard_group) + .insert(addr, lock.lock_type()); + } + } else { + let addr = lock.to_substate_address(); + let shard_group = addr.to_shard_group(num_preshards, num_committees); + self.evidence_mut() + .add_shard_group(shard_group) + .insert(addr, lock.lock_type()); + } } // Only change the local decision if we haven't already decided to ABORT if self.local_decision().map_or(true, |d| d.is_commit()) { @@ -773,6 +796,7 @@ mod tests { transaction_fee: fee, leader_fee: None, stage: TransactionPoolStage::New, + is_global: false, pending_stage: None, local_decision: None, remote_decision: None, diff --git a/dan_layer/storage/src/global/template_db.rs b/dan_layer/storage/src/global/template_db.rs index 0295fe751..01a886c1c 100644 --- a/dan_layer/storage/src/global/template_db.rs +++ b/dan_layer/storage/src/global/template_db.rs @@ -24,6 +24,7 @@ use std::str::FromStr; use chrono::NaiveDateTime; use tari_common_types::types::FixedHash; +use tari_dan_common_types::Epoch; use tari_engine_types::TemplateAddress; use crate::global::GlobalDbAdapter; @@ -69,6 +70,7 @@ pub struct DbTemplate { pub template_address: TemplateAddress, pub template_name: String, pub expected_hash: FixedHash, + pub epoch: Epoch, pub template_type: DbTemplateType, pub compiled_code: Option>, pub flow_json: Option, diff --git a/dan_layer/storage/src/state_store/mod.rs b/dan_layer/storage/src/state_store/mod.rs index 6e191b136..64af7c6e6 100644 --- a/dan_layer/storage/src/state_store/mod.rs +++ b/dan_layer/storage/src/state_store/mod.rs @@ -463,6 +463,7 @@ pub trait StateStoreWriteTransaction { tx_id: TransactionId, decision: Decision, is_ready: bool, + is_global: bool, ) -> Result<(), StorageError>; fn transaction_pool_add_pending_update( &mut self, diff --git a/dan_layer/storage_sqlite/migrations/2022-06-20-091532_initial/up.sql b/dan_layer/storage_sqlite/migrations/2022-06-20-091532_initial/up.sql index 4c3cb271f..764b911ce 100644 --- a/dan_layer/storage_sqlite/migrations/2022-06-20-091532_initial/up.sql +++ b/dan_layer/storage_sqlite/migrations/2022-06-20-091532_initial/up.sql @@ -52,29 +52,30 @@ CREATE INDEX committees_validator_node_id_epoch_index ON committees (validator_n create table templates ( - id Integer primary key autoincrement not null, + id Integer primary key autoincrement not null, -- template name - template_name text not null, - expected_hash blob not null, + template_name text not null, + expected_hash blob not null, -- the address is the hash of the content - template_address blob not null, + template_address blob not null, -- where to find the template code - url text not null, - -- the block height in which the template was published - height bigint not null, + url text null, + -- the epoch in which the template was published + epoch bigint not null, -- The type of template, used to create an enum in code - template_type text not null, + template_type text not null, + author_public_key blob not null, -- compiled template code as a WASM binary - compiled_code blob null, + compiled_code blob null, -- flow json - flow_json text null, - status VARCHAR(20) NOT NULL DEFAULT 'New', - wasm_path VARCHAR(255) NULL, - manifest text null, - added_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP + flow_json text null, + status VARCHAR(20) NOT NULL DEFAULT 'New', + manifest text null, + added_at timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ); + -- fetching by the template_address will be a very common operation create unique index templates_template_address_index on templates (template_address); diff --git a/dan_layer/storage_sqlite/migrations/2024-12-06-134951_templates_on_chain_updates/down.sql b/dan_layer/storage_sqlite/migrations/2024-12-06-134951_templates_on_chain_updates/down.sql deleted file mode 100644 index e2cd4e22d..000000000 --- a/dan_layer/storage_sqlite/migrations/2024-12-06-134951_templates_on_chain_updates/down.sql +++ /dev/null @@ -1,8 +0,0 @@ -ALTER TABLE templates DROP COLUMN author_public_key; -ALTER TABLE templates DROP COLUMN url; - -ALTER TABLE templates ADD COLUMN wasm_path VARCHAR(255) NULL; -ALTER TABLE templates ADD COLUMN url TEXT NOT NULL; -ALTER TABLE templates ADD COLUMN height bigint NOT NULL; - - diff --git a/dan_layer/storage_sqlite/migrations/2024-12-06-134951_templates_on_chain_updates/up.sql b/dan_layer/storage_sqlite/migrations/2024-12-06-134951_templates_on_chain_updates/up.sql deleted file mode 100644 index 5b5067905..000000000 --- a/dan_layer/storage_sqlite/migrations/2024-12-06-134951_templates_on_chain_updates/up.sql +++ /dev/null @@ -1,6 +0,0 @@ -ALTER TABLE templates DROP COLUMN wasm_path; -ALTER TABLE templates DROP COLUMN url; -ALTER TABLE templates DROP COLUMN height; - -ALTER TABLE templates ADD COLUMN author_public_key BLOB NOT NULL; -ALTER TABLE templates ADD COLUMN url TEXT NULL; diff --git a/dan_layer/storage_sqlite/src/global/backend_adapter.rs b/dan_layer/storage_sqlite/src/global/backend_adapter.rs index e301954ac..353ca7713 100644 --- a/dan_layer/storage_sqlite/src/global/backend_adapter.rs +++ b/dan_layer/storage_sqlite/src/global/backend_adapter.rs @@ -221,6 +221,7 @@ impl GlobalDbAdapter for SqliteGlobalDbAdapter { expected_hash: t.expected_hash.try_into()?, template_address: t.template_address.try_into()?, template_type: t.template_type.parse().expect("DB template type corrupted"), + epoch: Epoch(t.epoch as u64), compiled_code: t.compiled_code, flow_json: t.flow_json, manifest: t.manifest, @@ -264,6 +265,7 @@ impl GlobalDbAdapter for SqliteGlobalDbAdapter { url: t.url, status: t.status.parse().expect("DB status corrupted"), added_at: t.added_at, + epoch: Epoch(t.epoch as u64), }) }) .collect() @@ -299,6 +301,7 @@ impl GlobalDbAdapter for SqliteGlobalDbAdapter { url: t.url, status: t.status.parse().expect("DB status corrupted"), added_at: t.added_at, + epoch: Epoch(t.epoch as u64), }) }) .collect() @@ -312,6 +315,7 @@ impl GlobalDbAdapter for SqliteGlobalDbAdapter { template_address: item.template_address.to_vec(), template_type: item.template_type.as_str().to_string(), compiled_code: item.compiled_code, + epoch: item.epoch.as_u64() as i64, flow_json: item.flow_json, status: item.status.as_str().to_string(), manifest: item.manifest, diff --git a/dan_layer/storage_sqlite/src/global/models/template.rs b/dan_layer/storage_sqlite/src/global/models/template.rs index d60b174dc..28b19e0a7 100644 --- a/dan_layer/storage_sqlite/src/global/models/template.rs +++ b/dan_layer/storage_sqlite/src/global/models/template.rs @@ -28,16 +28,17 @@ use crate::global::schema::*; #[diesel(table_name = templates)] pub struct TemplateModel { pub id: i32, - pub author_public_key: Vec, - pub template_address: Vec, pub template_name: String, pub expected_hash: Vec, + pub template_address: Vec, + pub url: Option, + pub epoch: i64, pub template_type: String, + pub author_public_key: Vec, pub compiled_code: Option>, pub flow_json: Option, - pub manifest: Option, - pub url: Option, pub status: String, + pub manifest: Option, pub added_at: NaiveDateTime, } @@ -50,6 +51,7 @@ pub struct NewTemplateModel { pub expected_hash: Vec, pub template_type: String, pub compiled_code: Option>, + pub epoch: i64, pub flow_json: Option, pub status: String, pub manifest: Option, diff --git a/dan_layer/storage_sqlite/src/global/schema.rs b/dan_layer/storage_sqlite/src/global/schema.rs index 182f34916..4adcec647 100644 --- a/dan_layer/storage_sqlite/src/global/schema.rs +++ b/dan_layer/storage_sqlite/src/global/schema.rs @@ -52,16 +52,17 @@ diesel::table! { diesel::table! { templates (id) { id -> Integer, - author_public_key -> Binary, - template_address -> Binary, template_name -> Text, expected_hash -> Binary, + template_address -> Binary, + url -> Nullable, + epoch -> BigInt, template_type -> Text, + author_public_key -> Binary, compiled_code -> Nullable, flow_json -> Nullable, - manifest -> Nullable, - url -> Nullable, status -> Text, + manifest -> Nullable, added_at -> Timestamp, } } diff --git a/dan_layer/template_test_tooling/src/package_builder.rs b/dan_layer/template_test_tooling/src/package_builder.rs index 4c171df12..8f99bb6c2 100644 --- a/dan_layer/template_test_tooling/src/package_builder.rs +++ b/dan_layer/template_test_tooling/src/package_builder.rs @@ -7,7 +7,6 @@ use std::{ sync::{Arc, Mutex}, }; -use tari_common_types::types::PublicKey; use tari_dan_common_types::services::template_provider::TemplateProvider; use tari_dan_engine::{ abi::TemplateDef, @@ -110,17 +109,4 @@ impl TemplateProvider for Package { ) -> Result, Self::Error> { Ok(self.templates.lock().unwrap().get(id).cloned()) } - - fn add_wasm_template( - &self, - _author_public_key: PublicKey, - template_address: tari_engine_types::TemplateAddress, - template: &[u8], - ) -> Result<(), Self::Error> { - self.templates - .lock() - .unwrap() - .insert(template_address, WasmModule::load_template_from_code(template)?); - Ok(()) - } }