diff --git a/Cargo.lock b/Cargo.lock index b351b6039..ad1c8e85a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9382,6 +9382,7 @@ dependencies = [ "blake2", "digest", "hex", + "indexmap 2.2.6", "lazy_static", "rand", "serde", diff --git a/applications/tari_dan_app_utilities/src/transaction_executor.rs b/applications/tari_dan_app_utilities/src/transaction_executor.rs index 3396ac21b..c06b11faf 100644 --- a/applications/tari_dan_app_utilities/src/transaction_executor.rs +++ b/applications/tari_dan_app_utilities/src/transaction_executor.rs @@ -54,6 +54,7 @@ impl ExecutionOutput { inputs .iter() .map(|(substate_req, substate)| { + let requested_specific_version = substate_req.version().is_some(); let lock_flag = if diff.down_iter().any(|(id, _)| id == substate_req.substate_id()) { // Update all inputs that were DOWNed to be write locked SubstateLockType::Write @@ -64,6 +65,7 @@ impl ExecutionOutput { VersionedSubstateIdLockIntent::new( VersionedSubstateId::new(substate_req.substate_id().clone(), substate.version()), lock_flag, + requested_specific_version, ) }) .collect() @@ -76,6 +78,7 @@ impl ExecutionOutput { VersionedSubstateIdLockIntent::new( VersionedSubstateId::new(substate_req.substate_id().clone(), substate.version()), SubstateLockType::Read, + true, ) }) .collect() diff --git a/applications/tari_swarm_daemon/src/logger.rs b/applications/tari_swarm_daemon/src/logger.rs index f18bb0ac1..47132c48c 100644 --- a/applications/tari_swarm_daemon/src/logger.rs +++ b/applications/tari_swarm_daemon/src/logger.rs @@ -6,7 +6,7 @@ use fern::FormatCallback; pub fn init_logger() -> Result<(), log::SetLoggerError> { fn should_skip(target: &str) -> bool { const SKIP: [&str; 3] = ["hyper::", "h2::", "tower::"]; - SKIP.iter().any(|s| target.starts_with(s)) + target.is_empty() || SKIP.iter().any(|s| target.starts_with(s)) } let colors = fern::colors::ColoredLevelConfig::new().info(fern::colors::Color::Green); diff --git a/applications/tari_validator_node/src/p2p/services/mempool/service.rs b/applications/tari_validator_node/src/p2p/services/mempool/service.rs index 314e4f909..e0799eb74 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/service.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/service.rs @@ -56,7 +56,6 @@ const LOG_TARGET: &str = "tari::validator_node::mempool::service"; #[derive(Debug)] pub struct MempoolService { - num_preshards: NumPreshards, transactions: HashSet, mempool_requests: mpsc::Receiver, epoch_manager: EpochManagerHandle, @@ -82,7 +81,6 @@ where TValidator: Validator Self { Self { - num_preshards, gossip: MempoolGossip::new(num_preshards, epoch_manager.clone(), gossip), transactions: Default::default(), mempool_requests, @@ -158,9 +156,7 @@ where TValidator: Validator; } diff --git a/bindings/dist/types/FeeCostBreakdown.d.ts b/bindings/dist/types/FeeCostBreakdown.d.ts index 6a48d0f63..81a71884b 100644 --- a/bindings/dist/types/FeeCostBreakdown.d.ts +++ b/bindings/dist/types/FeeCostBreakdown.d.ts @@ -2,5 +2,5 @@ import type { Amount } from "./Amount"; import type { FeeBreakdown } from "./FeeBreakdown"; export interface FeeCostBreakdown { total_fees_charged: Amount; - breakdown: Array; + breakdown: FeeBreakdown; } diff --git a/bindings/dist/types/FeeReceipt.d.ts b/bindings/dist/types/FeeReceipt.d.ts index 4af221f96..0ed2edea8 100644 --- a/bindings/dist/types/FeeReceipt.d.ts +++ b/bindings/dist/types/FeeReceipt.d.ts @@ -3,5 +3,5 @@ import type { FeeBreakdown } from "./FeeBreakdown"; export interface FeeReceipt { total_fee_payment: Amount; total_fees_paid: Amount; - cost_breakdown: Array; + cost_breakdown: FeeBreakdown; } diff --git a/bindings/dist/types/VersionedSubstateIdLockIntent.d.ts b/bindings/dist/types/VersionedSubstateIdLockIntent.d.ts index 808f69223..bbd14311d 100644 --- a/bindings/dist/types/VersionedSubstateIdLockIntent.d.ts +++ b/bindings/dist/types/VersionedSubstateIdLockIntent.d.ts @@ -3,4 +3,5 @@ import type { VersionedSubstateId } from "./VersionedSubstateId"; export interface VersionedSubstateIdLockIntent { versioned_substate_id: VersionedSubstateId; lock_type: SubstateLockType; + require_version: boolean; } diff --git a/bindings/src/types/FeeBreakdown.ts b/bindings/src/types/FeeBreakdown.ts index f4b3941e6..24327e287 100644 --- a/bindings/src/types/FeeBreakdown.ts +++ b/bindings/src/types/FeeBreakdown.ts @@ -2,6 +2,5 @@ import type { FeeSource } from "./FeeSource"; export interface FeeBreakdown { - source: FeeSource; - amount: number; + breakdown: Record; } diff --git a/bindings/src/types/FeeCostBreakdown.ts b/bindings/src/types/FeeCostBreakdown.ts index 863caf7ed..61ecdddd4 100644 --- a/bindings/src/types/FeeCostBreakdown.ts +++ b/bindings/src/types/FeeCostBreakdown.ts @@ -4,5 +4,5 @@ import type { FeeBreakdown } from "./FeeBreakdown"; export interface FeeCostBreakdown { total_fees_charged: Amount; - breakdown: Array; + breakdown: FeeBreakdown; } diff --git a/bindings/src/types/FeeReceipt.ts b/bindings/src/types/FeeReceipt.ts index a4ac5366d..61690500c 100644 --- a/bindings/src/types/FeeReceipt.ts +++ b/bindings/src/types/FeeReceipt.ts @@ -5,5 +5,5 @@ import type { FeeBreakdown } from "./FeeBreakdown"; export interface FeeReceipt { total_fee_payment: Amount; total_fees_paid: Amount; - cost_breakdown: Array; + cost_breakdown: FeeBreakdown; } diff --git a/bindings/src/types/VersionedSubstateIdLockIntent.ts b/bindings/src/types/VersionedSubstateIdLockIntent.ts index d6029330c..6eab119e8 100644 --- a/bindings/src/types/VersionedSubstateIdLockIntent.ts +++ b/bindings/src/types/VersionedSubstateIdLockIntent.ts @@ -5,4 +5,5 @@ import type { VersionedSubstateId } from "./VersionedSubstateId"; export interface VersionedSubstateIdLockIntent { versioned_substate_id: VersionedSubstateId; lock_type: SubstateLockType; + require_version: boolean; } diff --git a/dan_layer/consensus/src/hotstuff/common.rs b/dan_layer/consensus/src/hotstuff/common.rs index 274318102..89b76ca24 100644 --- a/dan_layer/consensus/src/hotstuff/common.rs +++ b/dan_layer/consensus/src/hotstuff/common.rs @@ -96,7 +96,7 @@ pub fn calculate_dummy_blocks( let mut parent_block = high_qc.as_leaf_block(); let mut current_height = high_qc.block_height() + NodeHeight(1); if current_height > new_height { - warn!( + error!( target: LOG_TARGET, "BUG: 🍼 no dummy blocks to calculate. current height {} is greater than new height {}", current_height, @@ -152,7 +152,8 @@ fn with_dummy_blocks( debug!( target: LOG_TARGET, - "🍼 calculating dummy blocks from {} to {}", + "🍼 calculating dummy blocks in epoch {} from {} to {}", + epoch, current_height, new_height, ); diff --git a/dan_layer/consensus/src/hotstuff/foreign_proposal_processor.rs b/dan_layer/consensus/src/hotstuff/foreign_proposal_processor.rs index e73176337..b59d99422 100644 --- a/dan_layer/consensus/src/hotstuff/foreign_proposal_processor.rs +++ b/dan_layer/consensus/src/hotstuff/foreign_proposal_processor.rs @@ -14,9 +14,11 @@ use tari_dan_storage::{ TransactionAtom, TransactionPoolRecord, TransactionPoolStage, + TransactionRecord, }, StateStoreReadTransaction, }; +use tari_engine_types::commit_result::RejectReason; use tari_transaction::TransactionId; use crate::hotstuff::{block_change_set::ProposedBlockChangeSet, error::HotStuffError, ProposalValidationError}; @@ -112,6 +114,15 @@ pub fn process_foreign_block( "⚠️ Foreign committee ABORT transaction {}. Update overall decision to ABORT. Local stage: {}, Leaf: {}", tx_rec.transaction_id(), tx_rec.current_stage(), local_leaf ); + + // Add an abort execution since we previously decided to commit + let mut transaction = TransactionRecord::get(tx, tx_rec.transaction_id())?; + transaction.set_abort_reason(RejectReason::ForeignShardGroupDecidedToAbort(format!( + "Foreign shard group {} decided to abort the transaction", + foreign_committee_info.shard_group() + ))); + let exec = transaction.into_execution().expect("ABORT set above"); + proposed_block_change_set.add_transaction_execution(exec)?; } // We need to add the justify QC to the evidence because the all prepare block could not include it diff --git a/dan_layer/consensus/src/hotstuff/on_propose.rs b/dan_layer/consensus/src/hotstuff/on_propose.rs index 81b581160..025b928f8 100644 --- a/dan_layer/consensus/src/hotstuff/on_propose.rs +++ b/dan_layer/consensus/src/hotstuff/on_propose.rs @@ -49,6 +49,7 @@ use tari_dan_storage::{ use tari_engine_types::{commit_result::RejectReason, substate::Substate}; use tari_epoch_manager::EpochManagerReader; use tari_transaction::TransactionId; +use tokio::task; use crate::{ hotstuff::{ @@ -78,6 +79,7 @@ type NextBlock = ( HashMap, ); +#[derive(Debug, Clone)] pub struct OnPropose { config: HotstuffConfig, store: TConsensusSpec::StateStore, @@ -119,7 +121,7 @@ where TConsensusSpec: ConsensusSpec &mut self, epoch: Epoch, local_committee: &Committee, - local_committee_info: &CommitteeInfo, + local_committee_info: CommitteeInfo, leaf_block: LeafBlock, is_newview_propose: bool, propose_epoch_end: bool, @@ -168,41 +170,45 @@ where TConsensusSpec: ConsensusSpec let base_layer_block_hash = current_base_layer_block_hash; let base_layer_block_height = current_base_layer_block_height; - let (next_block, foreign_proposals) = self.store.with_write_tx(|tx| { - let high_qc = HighQc::get(&**tx, epoch)?; - let high_qc_cert = high_qc.get_quorum_certificate(&**tx)?; + let on_propose = self.clone(); + let (next_block, foreign_proposals) = task::spawn_blocking(move || { + on_propose.store.with_write_tx(|tx| { + let high_qc = HighQc::get(&**tx, epoch)?; + let high_qc_cert = high_qc.get_quorum_certificate(&**tx)?; - let (next_block, foreign_proposals, executed_transactions) = self.build_next_block( - tx, - epoch, - &leaf_block, - high_qc_cert, - validator.public_key, - local_committee_info, - // TODO: This just avoids issues with proposed transactions causing leader failures. Not sure if this - // is a good idea. - is_newview_propose, - base_layer_block_height, - base_layer_block_hash, - propose_epoch_end, - )?; + let (next_block, foreign_proposals, executed_transactions) = on_propose.build_next_block( + tx, + epoch, + &leaf_block, + high_qc_cert, + validator.public_key, + &local_committee_info, + // TODO: This just avoids issues with proposed transactions causing leader failures. Not sure if + // this is a good idea. + is_newview_propose, + base_layer_block_height, + base_layer_block_hash, + propose_epoch_end, + )?; - // Add executions for this block - if !executed_transactions.is_empty() { - debug!( - target: LOG_TARGET, - "Saving {} executed transaction(s) for block {}", - executed_transactions.len(), - next_block.id() - ); - } - for executed in executed_transactions.into_values() { - executed.for_block(*next_block.id()).insert_if_required(tx)?; - } + // Add executions for this block + if !executed_transactions.is_empty() { + debug!( + target: LOG_TARGET, + "Saving {} executed transaction(s) for block {}", + executed_transactions.len(), + next_block.id() + ); + } + for executed in executed_transactions.into_values() { + executed.for_block(*next_block.id()).insert_if_required(tx)?; + } - next_block.as_last_proposed().set(tx)?; - Ok::<_, HotStuffError>((next_block, foreign_proposals)) - })?; + next_block.as_last_proposed().set(tx)?; + Ok::<_, HotStuffError>((next_block, foreign_proposals)) + }) + }) + .await??; info!( target: LOG_TARGET, diff --git a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs index 406ecdeaf..280b22e0b 100644 --- a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs +++ b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs @@ -894,6 +894,8 @@ where TConsensusSpec: ConsensusSpec let execution = self.execute_transaction(tx, block.id(), block.epoch(), tx_rec.transaction_id())?; let mut execution = execution.into_transaction_execution(); + // TODO: check the diff is valid against the provided input evidence (correct locks etc). + // TODO: can we modify the locks at this point? For multi-shard input transactions, we locked all inputs // as Write due to lack of information. We now know what locks are necessary, and this // block has the correct evidence (TODO: verify the atom) so this should be fine. diff --git a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs index f74ef6ef9..84e58b5d9 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs @@ -121,402 +121,6 @@ where TConsensusSpec: ConsensusSpec Ok(()) } - // #[allow(clippy::too_many_lines)] - // fn process_foreign_block( - // &self, - // tx: &mut ::WriteTransaction<'_>, - // foreign_proposal: ForeignProposal, - // foreign_committee_info: &CommitteeInfo, - // local_committee_info: &CommitteeInfo, - // ) -> Result<(), HotStuffError> { - // let ForeignProposal { - // block, - // justify_qc, - // mut block_pledge, - // .. - // } = foreign_proposal; - // let local_leaf = LeafBlock::get(&**tx)?; - // // We only want to save the QC once if applicable - // let mut command_count = 0usize; - // - // for cmd in block.commands() { - // match cmd { - // Command::LocalPrepare(atom) => { - // if !local_committee_info.includes_any_address(atom.evidence.substate_addresses_iter()) { - // continue; - // } - // - // debug!( - // target: LOG_TARGET, - // "🧩 FOREIGN PROPOSAL: Command: LocalPrepare({}, {}), block: {}", - // atom.id,atom.decision, block.id(), - // ); - // - // let Some(mut tx_rec) = self.transaction_pool.get(tx, local_leaf, &atom.id).optional()? else { - // // If this happens, it could be a bug in the foreign missing transaction handling - // warn!( - // target: LOG_TARGET, - // "⚠️ NEVER HAPPEN: Foreign proposal received for transaction {} but this transaction is - // not in the pool.", atom.id - // ); - // continue; - // }; - // - // if tx_rec.current_stage() > TransactionPoolStage::LocalPrepared { - // // TODO: This can happen if the foreign shard group is only responsible for outputs (the - // input // SGs have already progressed to LocalAccept) in which case it is safe to ignore - // this command. // However we should not send the proposal in the first place (assuming it - // does not involve any // other shard-applicable transactions). - // warn!( - // target: LOG_TARGET, - // "⚠️ Foreign LocalPrepare proposal ({}) received LOCAL_PREPARE for transaction {} but - // current transaction stage is {}. Ignoring.", block, - // tx_rec.transaction_id(), tx_rec.current_stage() - // ); - // continue; - // } - // - // command_count += 1; - // - // let remote_decision = atom.decision; - // let local_decision = tx_rec.current_decision(); - // if remote_decision.is_abort() && local_decision.is_commit() { - // info!( - // target: LOG_TARGET, - // "⚠️ Foreign committee ABORT transaction {}. Update overall decision to ABORT. Local - // stage: {}, Leaf: {}", tx_rec.transaction_id(), tx_rec.current_stage(), local_leaf - // ); - // } - // - // // We need to add the justify QC to the evidence because the all prepare block could not include - // it // yet - // let mut foreign_evidence = atom.evidence.clone(); - // foreign_evidence.add_qc_evidence(foreign_committee_info, *justify_qc.id()); - // - // // Update the transaction record with any new information provided by this foreign block - // tx_rec.update_remote_data( - // tx, - // remote_decision, - // *justify_qc.id(), - // foreign_committee_info, - // foreign_evidence, - // )?; - // - // self.validate_and_add_pledges( - // tx, - // &tx_rec, - // block.id(), - // atom, - // &mut block_pledge, - // foreign_committee_info, - // )?; - // - // if tx_rec.current_stage().is_new() { - // info!( - // target: LOG_TARGET, - // "🧩 FOREIGN PROPOSAL: (Initial sequence from LocalPrepare) Transaction is ready for - // Prepare({}, {}) Local Stage: {}", tx_rec.transaction_id(), - // tx_rec.current_decision(), - // tx_rec.current_stage() - // ); - // // If the transaction is New, we're waiting for all foreign pledges. Propose transaction once - // we // have them. - // - // // CASE: One foreign SG is involved in all inputs and executed the transaction, local SG is - // // involved in the outputs - // let transaction = tx_rec.get_transaction(&**tx)?; - // let is_ready = local_committee_info.includes_substate_id(&transaction.to_receipt_id().into()) - // || transaction.has_any_local_inputs(local_committee_info) || - // transaction.has_all_foreign_input_pledges(&**tx, local_committee_info)?; - // - // if is_ready { - // info!( - // target: LOG_TARGET, - // "🧩 FOREIGN PROPOSAL: (Initial sequence from LocalPrepare) Transaction is ready for - // Prepare({}, {}) Local Stage: {}", tx_rec.transaction_id(), - // tx_rec.current_decision(), - // tx_rec.current_stage() - // ); - // tx_rec.add_pending_status_update(tx, local_leaf, TransactionPoolStage::New, true)?; - // } else { - // info!( - // target: LOG_TARGET, - // "🧩 FOREIGN PROPOSAL: (Initial sequence from LocalPrepare) Transaction is NOT ready - // for Prepare({}, {}) Local Stage: {}", tx_rec.transaction_id(), - // tx_rec.current_decision(), - // tx_rec.current_stage() - // ); - // } - // } else if tx_rec.current_stage().is_local_prepared() && - // tx_rec.evidence().all_input_addresses_justified() - // { - // // If all shards are complete, and we've already received our LocalPrepared, we can set out - // // LocalPrepared transaction as ready to propose ACCEPT. If we have not received - // // the local LocalPrepared, the transition will happen when we receive the local - // // block. - // info!( - // target: LOG_TARGET, - // "🧩 FOREIGN PROPOSAL: Transaction is ready for propose AllPrepared({}, {}) Local Stage: - // {}", tx_rec.transaction_id(), - // tx_rec.current_decision(), - // tx_rec.current_stage() - // ); - // - // tx_rec.add_pending_status_update(tx, local_leaf, TransactionPoolStage::LocalPrepared, true)?; - // // TODO: there is a race condition between the local node receiving the foreign LocalPrepare - // and // the leader proposing AllPrepare. If the latter comes first, this node - // // will not vote on this block which leads inevitably to erroneous - // // leader failures. Currently we simply vote ACCEPT on the block, with is ready == false, so - // we // need to handle this here. When we confirm foreign proposals correctly, we can - // remove this. } else { - // info!( - // target: LOG_TARGET, - // "🧩 FOREIGN PROPOSAL: Transaction is NOT ready for AllPrepared({}, {}) Local Stage: {}, - // All Justified: {}. Waiting for local proposal.", tx_rec.transaction_id(), - // tx_rec.current_decision(), - // tx_rec.current_stage(), - // tx_rec.evidence().all_input_addresses_justified() - // ); - // tx_rec.add_pending_status_update(tx, local_leaf, tx_rec.current_stage(), tx_rec.is_ready())?; - // } - // }, - // Command::LocalAccept(atom) => { - // if !local_committee_info.includes_any_address(atom.evidence.substate_addresses_iter()) { - // continue; - // } - // - // debug!( - // target: LOG_TARGET, - // "🧩 FOREIGN PROPOSAL: Command: LocalAccept({}, {}), block: {}", - // atom.id, atom.decision, block.id(), - // ); - // - // let Some(mut tx_rec) = self.transaction_pool.get(tx, local_leaf, &atom.id).optional()? else { - // warn!( - // target: LOG_TARGET, - // "⚠️ NEVER HAPPEN: Foreign proposal received for transaction {} but this transaction is - // not in the pool.", atom.id - // ); - // continue; - // }; - // - // if tx_rec.current_stage() > TransactionPoolStage::LocalAccepted { - // warn!( - // target: LOG_TARGET, - // "⚠️ Foreign proposal {} received LOCAL_ACCEPT for transaction {} but current transaction - // stage is {}. Ignoring.", block, - // tx_rec.transaction_id(), - // tx_rec.current_stage(), - // ); - // continue; - // } - // - // command_count += 1; - // - // let remote_decision = atom.decision; - // let local_decision = tx_rec.current_local_decision(); - // if remote_decision.is_abort() && local_decision.is_commit() { - // info!( - // target: LOG_TARGET, - // "⚠️ Foreign ABORT {}. Update overall decision to ABORT. Local stage: {}, Leaf: {}", - // tx_rec.transaction_id(), tx_rec.current_stage(), local_leaf - // ); - // } - // - // // We need to add the justify QC to the evidence because the all prepare block could not include - // it // yet - // let mut foreign_evidence = atom.evidence.clone(); - // foreign_evidence.add_qc_evidence(foreign_committee_info, *justify_qc.id()); - // - // // Update the transaction record with any new information provided by this foreign block - // tx_rec.update_remote_data( - // tx, - // remote_decision, - // *justify_qc.id(), - // foreign_committee_info, - // foreign_evidence, - // )?; - // - // self.validate_and_add_pledges( - // tx, - // &tx_rec, - // block.id(), - // atom, - // &mut block_pledge, - // foreign_committee_info, - // )?; - // - // // Good debug info - // // tx_rec.evidence().iter().for_each(|(addr, ev)| { - // // let includes_local = local_committee_info.includes_substate_address(addr); - // // log::error!( - // // target: LOG_TARGET, - // // "🐞 LOCALACCEPT EVIDENCE (l={}, f={}) {}: {}", includes_local, !includes_local, addr, - // ev // ); - // // }); - // - // if tx_rec.current_stage().is_new() { - // // If the transaction is New, we're waiting for all foreign pledges. Propose transaction once - // we // have them. - // // CASE: Foreign SGs have pledged all inputs and executed the transaction, local SG is - // involved // in the outputs - // let transaction = tx_rec.get_transaction(&**tx)?; - // let is_ready = local_committee_info.includes_substate_id(&transaction.to_receipt_id().into()) - // || transaction.has_any_local_inputs(local_committee_info) || - // transaction.has_all_foreign_input_pledges(&**tx, local_committee_info)?; - // if is_ready { - // info!( - // target: LOG_TARGET, - // "🧩 FOREIGN PROPOSAL: (Initial sequence from LocalAccept) Transaction is ready for - // Prepare({}, {}) Local Stage: {}", tx_rec.transaction_id(), - // tx_rec.current_decision(), - // tx_rec.current_stage() - // ); - // tx_rec.add_pending_status_update(tx, local_leaf, TransactionPoolStage::New, true)?; - // } else { - // info!( - // target: LOG_TARGET, - // "🧩 FOREIGN PROPOSAL: (Initial sequence from LocalAccept) Transaction is NOT ready - // for Prepare({}, {}) Local Stage: {}", tx_rec.transaction_id(), - // tx_rec.current_decision(), - // tx_rec.current_stage() - // ); - // } - // } else if tx_rec.current_stage().is_local_accepted() && - // tx_rec.evidence().all_addresses_justified() { - // info!( - // target: LOG_TARGET, - // "🧩 FOREIGN PROPOSAL: Transaction is ready for propose ALL_ACCEPT({}, {}) Local Stage: - // {}", tx_rec.transaction_id(), - // tx_rec.current_decision(), - // tx_rec.current_stage() - // ); - // - // tx_rec.add_pending_status_update(tx, local_leaf, TransactionPoolStage::LocalAccepted, true)?; - // } else { - // info!( - // target: LOG_TARGET, - // "🧩 FOREIGN PROPOSAL: Transaction is NOT ready for ALL_ACCEPT({}, {}) Local Stage: {}, - // All Justified: {}. Waiting for local proposal.", tx_rec.transaction_id(), - // tx_rec.current_decision(), - // tx_rec.current_stage(), - // tx_rec.evidence().all_addresses_justified() - // ); - // // Still need to update the evidence - // tx_rec.add_pending_status_update(tx, local_leaf, tx_rec.current_stage(), tx_rec.is_ready())?; - // } - // }, - // // Should never receive this - // Command::EndEpoch => { - // warn!( - // target: LOG_TARGET, - // "❓️ NEVER HAPPEN: Foreign proposal received for block {} contains an EndEpoch command. This - // is invalid behaviour.", block.id() - // ); - // continue; - // }, - // // TODO(perf): Can we find a way to exclude these unused commands to reduce message size? - // Command::AllAccept(_) | - // Command::SomeAccept(_) | - // Command::AllPrepare(_) | - // Command::SomePrepare(_) | - // Command::Prepare(_) | - // Command::LocalOnly(_) | - // Command::ForeignProposal(_) | - // Command::MintConfidentialOutput(_) => { - // // Disregard - // continue; - // }, - // } - // } - // - // info!( - // target: LOG_TARGET, - // "🧩 FOREIGN PROPOSAL: Processed {} commands from foreign block {}", - // command_count, - // block.id() - // ); - // if command_count == 0 { - // warn!( - // target: LOG_TARGET, - // "⚠️ FOREIGN PROPOSAL: No commands were applicable for foreign block {}. Ignoring.", - // block.id() - // ); - // } - // - // Ok(()) - // } - // - // fn validate_and_add_pledges( - // &self, - // tx: &mut ::WriteTransaction<'_>, - // tx_rec: &TransactionPoolRecord, - // block_id: &BlockId, - // atom: &TransactionAtom, - // block_pledge: &mut BlockPledge, - // foreign_committee_info: &CommitteeInfo, - // ) -> Result<(), HotStuffError> { - // #[allow(clippy::mutable_key_type)] - // let maybe_pledges = if atom.decision.is_commit() { - // let pledges = block_pledge.remove_transaction_pledges(&atom.id).ok_or_else(|| { - // HotStuffError::ForeignNodeOmittedTransactionPledges { - // foreign_block_id: *block_id, - // transaction_id: atom.id, - // } - // })?; - // - // // Validate that provided evidence is correct - // // TODO: there are a lot of validations to be done on evidence and the foreign block in general, - // // this is here as a sanity check and should change to not be a fatal error in consensus - // for pledge in &pledges { - // let address = pledge.versioned_substate_id().to_substate_address(); - // let evidence = - // atom.evidence - // .get(&address) - // .ok_or_else(|| ProposalValidationError::ForeignInvalidPledge { - // block_id: *block_id, - // transaction_id: atom.id, - // details: format!("Pledge {pledge} for address {address} not found in evidence"), - // })?; - // if evidence.lock.is_output() && pledge.is_input() { - // return Err(ProposalValidationError::ForeignInvalidPledge { - // block_id: *block_id, - // transaction_id: atom.id, - // details: format!("Pledge {pledge} is an input but evidence is an output for address - // {address}"), } - // .into()); - // } - // if !evidence.lock.is_output() && pledge.is_output() { - // return Err(ProposalValidationError::ForeignInvalidPledge { - // block_id: *block_id, - // transaction_id: atom.id, - // details: format!("Pledge {pledge} is an output but evidence is an input for address - // {address}"), } - // .into()); - // } - // } - // Some(pledges) - // } else { - // if block_pledge.remove_transaction_pledges(&atom.id).is_some() { - // return Err(ProposalValidationError::ForeignInvalidPledge { - // block_id: *block_id, - // transaction_id: atom.id, - // details: "Remote decided ABORT but provided pledges".to_string(), - // } - // .into()); - // } - // None - // }; - // - // if let Some(pledges) = maybe_pledges { - // // If the foreign shard has committed the transaction, we can add the pledges to the transaction - // // record - // tx_rec.add_foreign_pledges(tx, foreign_committee_info.shard_group(), pledges)?; - // } - // - // Ok(()) - // } - fn validate_proposed_block( &self, candidate_block: &Block, diff --git a/dan_layer/consensus/src/hotstuff/substate_store/error.rs b/dan_layer/consensus/src/hotstuff/substate_store/error.rs index 223dcc4a9..18604ed0e 100644 --- a/dan_layer/consensus/src/hotstuff/substate_store/error.rs +++ b/dan_layer/consensus/src/hotstuff/substate_store/error.rs @@ -6,7 +6,7 @@ use tari_dan_storage::StorageError; #[derive(Debug, thiserror::Error)] pub enum SubstateStoreError { - #[error(transparent)] + #[error("Lock failure: {0}")] LockFailed(#[from] LockFailedError), #[error("Substate {id} not found")] SubstateNotFound { id: VersionedSubstateId }, @@ -47,7 +47,8 @@ impl SubstateStoreError { pub enum LockFailedError { #[error("Substate {id} not found")] SubstateNotFound { id: VersionedSubstateId }, - + #[error("Substate {id} is DOWN")] + SubstateIsDown { id: VersionedSubstateId }, #[error( "Failed to {requested_lock} lock substate {substate_id} due to conflict with existing {existing_lock} lock" )] diff --git a/dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs b/dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs index d40dfedac..3253d6c68 100644 --- a/dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs +++ b/dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs @@ -79,6 +79,9 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> ReadableSubstateStore for PendingSu let Some(substate) = SubstateRecord::get(self.read_transaction(), &id.to_substate_address()).optional()? else { return Err(SubstateStoreError::SubstateNotFound { id: id.clone() }); }; + if substate.is_destroyed() { + return Err(SubstateStoreError::SubstateIsDown { id: id.clone() }); + } Ok(substate.into_substate()) } } @@ -151,6 +154,11 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> PendingSubstateStore<'a, 'tx, TStor } let substate = SubstateRecord::get_latest(self.read_transaction(), id)?; + if substate.is_destroyed() { + return Err(SubstateStoreError::SubstateIsDown { + id: substate.to_versioned_substate_id(), + }); + } Ok(substate.into_substate()) } @@ -171,8 +179,10 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> PendingSubstateStore<'a, 'tx, TStor Err(err) => { let error = err.ok_lock_failed()?; match error { + err @ LockFailedError::SubstateIsDown { .. } | err @ LockFailedError::SubstateNotFound { .. } => { - // If the substate does not exist, the transaction is invalid + // If the substate does not exist or is not UP (unversioned: previously DOWNed and never + // UPed), the transaction is invalid let index = lock_status.add_failed(err); lock_status.hard_conflict_idx = Some(index); }, @@ -215,7 +225,7 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> PendingSubstateStore<'a, 'tx, TStor if requested_lock_type.is_output() { self.assert_not_exist(&versioned_substate_id)?; } else { - self.assert_is_up(&versioned_substate_id)?; + self.lock_assert_is_up(&versioned_substate_id)?; } let version = versioned_substate_id.version(); @@ -422,6 +432,16 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> PendingSubstateStore<'a, 'tx, TStor } } + fn lock_assert_is_up(&self, id: &VersionedSubstateId) -> Result<(), SubstateStoreError> { + match self.assert_is_up(id) { + Ok(_) => Ok(()), + // Converts a substate store error to a LockFailedError (TODO: improve) + Err(SubstateStoreError::SubstateIsDown { id }) => Err(LockFailedError::SubstateIsDown { id }.into()), + Err(SubstateStoreError::SubstateNotFound { id }) => Err(LockFailedError::SubstateNotFound { id }.into()), + Err(err) => Err(err), + } + } + fn assert_is_down(&self, id: &VersionedSubstateId) -> Result<(), SubstateStoreError> { if let Some(change) = self.get_pending(&id.to_substate_address()) { if change.is_up() { diff --git a/dan_layer/consensus/src/hotstuff/transaction_manager/manager.rs b/dan_layer/consensus/src/hotstuff/transaction_manager/manager.rs index 36dcd6c83..239130008 100644 --- a/dan_layer/consensus/src/hotstuff/transaction_manager/manager.rs +++ b/dan_layer/consensus/src/hotstuff/transaction_manager/manager.rs @@ -134,8 +134,8 @@ impl> Ok(inputs) => inputs, Err(err) => { warn!(target: LOG_TARGET, "⚠️ PREPARE: failed to resolve local inputs: {err}"); - // We only expect not found errors here. If we get any other error, this is fatal. - if !err.is_not_found_error() { + // We only expect not found or down errors here. If we get any other error, this is fatal. + if !err.is_not_found_error() && !err.is_substate_down_error() { return Err(err); } let is_local_only = local_committee_info.includes_all_substate_addresses( @@ -235,7 +235,11 @@ impl> // specify this or we can correct the locks after execution. Currently, this limitation // prevents concurrent multi-shard read locks. let requested_locks = multishard.local_inputs().iter().map(|(substate_id, substate)| { - SubstateRequirementLockIntent::write(substate_id.clone(), substate.version()) + if substate_id.substate_id.is_read_only() { + SubstateRequirementLockIntent::read(substate_id.clone(), substate.version()) + } else { + SubstateRequirementLockIntent::write(substate_id.clone(), substate.version()) + } }); store.try_lock_all(transaction_id, requested_locks, false)? } else { @@ -246,6 +250,7 @@ impl> }; if let Some(err) = lock_summary.hard_conflict() { + warn!(target: LOG_TARGET, "⚠️ PREPARE: Hard conflict when locking inputs: {err}"); prepared.set_abort_reason(RejectReason::FailedToLockInputs(err.to_string())); } Ok((prepared, lock_summary)) diff --git a/dan_layer/consensus/src/hotstuff/transaction_manager/prepared.rs b/dan_layer/consensus/src/hotstuff/transaction_manager/prepared.rs index 06c7aed79..2f800cb91 100644 --- a/dan_layer/consensus/src/hotstuff/transaction_manager/prepared.rs +++ b/dan_layer/consensus/src/hotstuff/transaction_manager/prepared.rs @@ -128,11 +128,10 @@ impl MultiShardPreparedTransaction { self.transaction .transaction() .all_inputs_iter() - .map(|input| input.or_zero_version()) - .map(|id| VersionedSubstateIdLockIntent::new(id, SubstateLockType::Read)), + .map(|input| VersionedSubstateIdLockIntent::from_requirement(input, SubstateLockType::Read)), self.outputs .iter() - .map(|id| VersionedSubstateIdLockIntent::new(id.clone(), SubstateLockType::Output)), + .map(|id| VersionedSubstateIdLockIntent::output(id.clone())), ); } @@ -146,13 +145,13 @@ impl MultiShardPreparedTransaction { // TODO(correctness): to_zero_version is error prone when used in evidence and the correctness depends how it is used. // e.g. using it to determining which shard is involved is fine, but loading substate by the address is incorrect (v0 may or may not be the actual pledged substate) .chain(self.foreign_inputs().iter().map(|r| r.clone().or_zero_version())) - .map(|id| VersionedSubstateIdLockIntent::new(id, SubstateLockType::Write)); + .map(|id| VersionedSubstateIdLockIntent::write(id, true)); let outputs = self .outputs() .iter() .cloned() - .map(|id| VersionedSubstateIdLockIntent::new(id, SubstateLockType::Output)); + .map(VersionedSubstateIdLockIntent::output); Evidence::from_inputs_and_outputs(inputs, outputs) } diff --git a/dan_layer/consensus/src/hotstuff/worker.rs b/dan_layer/consensus/src/hotstuff/worker.rs index 28c0a2fd0..1e8a629f8 100644 --- a/dan_layer/consensus/src/hotstuff/worker.rs +++ b/dan_layer/consensus/src/hotstuff/worker.rs @@ -267,13 +267,24 @@ impl HotstuffWorker { ); tokio::select! { - Some(result) = self.on_inbound_message.next_message(current_epoch, current_height) => { - if let Err(err) = self.on_unvalidated_message(current_epoch, current_height, result, &local_committee_info).await { - self.hooks.on_error(&err); - error!(target: LOG_TARGET, "🚨Error handling new message: {}", err); + // BIASED: + biased; + + // Epoch manager events are rare, but have priority if they happen + Ok(event) = epoch_manager_events.recv() => { + self.on_epoch_manager_event(event).await?; + }, + + // Proposing is highest priority + maybe_leaf_block = on_force_beat.wait() => { + self.hooks.on_beat(); + if let Err(e) = self.propose_if_leader(current_epoch, maybe_leaf_block, &local_committee_info).await { + self.on_failure("propose_if_leader", &e).await; + return Err(e); } }, + // Sequencing transactions is next highest priority Some((tx_id, pending)) = self.rx_new_transactions.recv() => { if let Err(err) = self.on_new_transaction(tx_id, pending, current_epoch, current_height, &local_committee_info).await { self.hooks.on_error(&err); @@ -281,11 +292,14 @@ impl HotstuffWorker { } }, - Ok(event) = epoch_manager_events.recv() => { - self.on_epoch_manager_event(event).await?; + Some(result) = self.on_inbound_message.next_message(current_epoch, current_height) => { + if let Err(err) = self.on_unvalidated_message(current_epoch, current_height, result, &local_committee_info).await { + self.hooks.on_error(&err); + error!(target: LOG_TARGET, "🚨Error handling new message: {}", err); + } }, - // TODO: This channel is used to work around some design-flaws in missing transactions handling. + // TODO: This channel is used to work around some design-flaws in missing transactions handling. // We cannot simply call check_if_block_can_be_unparked in dispatch_hotstuff_message as that creates a cycle. // One suggestion is to refactor consensus to emit events (kinda like libp2p does) and handle those events. // This should be easy to reason about and avoid a large depth of async calls and "callback channels". @@ -303,14 +317,6 @@ impl HotstuffWorker { } }, - maybe_leaf_block = on_force_beat.wait() => { - self.hooks.on_beat(); - if let Err(e) = self.propose_if_leader(current_epoch, maybe_leaf_block, &local_committee_info).await { - self.on_failure("propose_if_leader", &e).await; - return Err(e); - } - }, - new_height = on_leader_timeout.wait() => { if let Err(e) = self.on_leader_timeout(current_epoch, new_height).await { self.on_failure("on_leader_timeout", &e).await; @@ -642,14 +648,14 @@ impl HotstuffWorker { .handle( epoch, &local_committee, - local_committee_info, + *local_committee_info, leaf_block, is_newview_propose, propose_epoch_end, ) .await?; } else { - // We can make this a warm/error in future, but for now I want to be sure this never happens + // We can make this a warn/error in future, but for now I want to be sure this never happens debug_assert!( !is_newview_propose, "propose_if_leader called with is_newview_propose=true but we're not the leader" diff --git a/dan_layer/consensus/src/traits/transaction_executor.rs b/dan_layer/consensus/src/traits/transaction_executor.rs index 364e5b943..bb624b644 100644 --- a/dan_layer/consensus/src/traits/transaction_executor.rs +++ b/dan_layer/consensus/src/traits/transaction_executor.rs @@ -24,6 +24,14 @@ pub enum BlockTransactionExecutorError { #[error("BUG: Invariant error: {0}")] InvariantError(String), } +impl BlockTransactionExecutorError { + pub fn is_substate_down_error(&self) -> bool { + matches!( + self, + BlockTransactionExecutorError::SubstateStoreError(SubstateStoreError::SubstateIsDown { .. }) + ) + } +} impl IsNotFoundError for BlockTransactionExecutorError { fn is_not_found_error(&self) -> bool { diff --git a/dan_layer/consensus_tests/src/consensus.rs b/dan_layer/consensus_tests/src/consensus.rs index ee6a665c6..7c0c4d3d5 100644 --- a/dan_layer/consensus_tests/src/consensus.rs +++ b/dan_layer/consensus_tests/src/consensus.rs @@ -12,7 +12,7 @@ use std::time::Duration; use tari_common_types::types::PrivateKey; use tari_consensus::hotstuff::HotStuffError; -use tari_dan_common_types::{optional::Optional, Epoch, NodeHeight, SubstateLockType, SubstateRequirement}; +use tari_dan_common_types::{optional::Optional, Epoch, NodeHeight, SubstateRequirement}; use tari_dan_storage::{ consensus_models::{BlockId, Command, Decision, TransactionRecord, VersionedSubstateIdLockIntent}, StateStore, @@ -397,7 +397,10 @@ async fn multishard_local_inputs_foreign_outputs() { .build(), Decision::Commit, 1, - inputs.into_iter().map(VersionedSubstateIdLockIntent::write).collect(), + inputs + .into_iter() + .map(|input| VersionedSubstateIdLockIntent::write(input, true)) + .collect(), outputs, ); test.send_transaction_to_destination(TestVnDestination::All, tx1.clone()) @@ -456,7 +459,7 @@ async fn multishard_local_inputs_and_outputs_foreign_outputs() { inputs_0 .into_iter() .chain(inputs_1) - .map(VersionedSubstateIdLockIntent::write) + .map(|input| VersionedSubstateIdLockIntent::write(input, true)) .collect(), outputs_0.into_iter().chain(outputs_2).collect(), ); @@ -526,7 +529,10 @@ async fn multishard_output_conflict_abort() { tx, Decision::Commit, 1, - inputs.into_iter().map(VersionedSubstateIdLockIntent::write).collect(), + inputs + .into_iter() + .map(|input| VersionedSubstateIdLockIntent::write(input, true)) + .collect(), resulting_outputs, ); assert_ne!(tx1.id(), tx2.id()); @@ -578,9 +584,7 @@ async fn single_shard_inputs_from_previous_outputs() { .resulting_outputs() .unwrap() .iter() - .map(|output| { - VersionedSubstateIdLockIntent::new(output.versioned_substate_id().clone(), SubstateLockType::Write) - }) + .map(|output| VersionedSubstateIdLockIntent::write(output.versioned_substate_id().clone(), true)) .collect::>(); let tx2 = Transaction::builder() @@ -658,9 +662,7 @@ async fn multishard_inputs_from_previous_outputs() { 1, resulting_outputs .into_iter() - .map(|output| { - VersionedSubstateIdLockIntent::new(output.into_versioned_substate_id(), SubstateLockType::Write) - }) + .map(|output| VersionedSubstateIdLockIntent::write(output.into_versioned_substate_id(), true)) .collect(), vec![], ); @@ -723,7 +725,7 @@ async fn single_shard_input_conflict() { *tx1.id(), Decision::Commit, 0, - vec![VersionedSubstateIdLockIntent::read(substate_id.clone())], + vec![VersionedSubstateIdLockIntent::read(substate_id.clone(), true)], vec![], ), ) @@ -733,7 +735,7 @@ async fn single_shard_input_conflict() { *tx2.id(), Decision::Commit, 0, - vec![VersionedSubstateIdLockIntent::write(substate_id)], + vec![VersionedSubstateIdLockIntent::write(substate_id, true)], vec![], ), ); @@ -960,7 +962,10 @@ async fn single_shard_unversioned_inputs() { *tx.id(), Decision::Commit, 0, - inputs.into_iter().map(VersionedSubstateIdLockIntent::write).collect(), + inputs + .into_iter() + .map(|input| VersionedSubstateIdLockIntent::write(input, true)) + .collect(), vec![], ), ); diff --git a/dan_layer/consensus_tests/src/support/harness.rs b/dan_layer/consensus_tests/src/support/harness.rs index 86c48e1c3..555c5823d 100644 --- a/dan_layer/consensus_tests/src/support/harness.rs +++ b/dan_layer/consensus_tests/src/support/harness.rs @@ -17,7 +17,6 @@ use tari_dan_common_types::{ NodeHeight, NumPreshards, ShardGroup, - SubstateLockType, VersionedSubstateId, }; use tari_dan_storage::{ @@ -98,7 +97,7 @@ impl Test { fee, all_inputs .into_iter() - .map(|i| VersionedSubstateIdLockIntent::new(i, SubstateLockType::Write)) + .map(|i| VersionedSubstateIdLockIntent::write(i, true)) .collect(), vec![], ); @@ -173,7 +172,7 @@ impl Test { fee, all_inputs .into_iter() - .map(|i| VersionedSubstateIdLockIntent::new(i, SubstateLockType::Write)) + .map(|i| VersionedSubstateIdLockIntent::write(i, true)) .collect(), outputs, ) diff --git a/dan_layer/consensus_tests/src/support/transaction.rs b/dan_layer/consensus_tests/src/support/transaction.rs index c9d485816..91bbe7a6f 100644 --- a/dan_layer/consensus_tests/src/support/transaction.rs +++ b/dan_layer/consensus_tests/src/support/transaction.rs @@ -5,7 +5,7 @@ use std::{iter, time::Duration}; use rand::{distributions::Alphanumeric, rngs::OsRng, Rng}; use tari_common_types::types::PrivateKey; -use tari_dan_common_types::{SubstateLockType, VersionedSubstateId}; +use tari_dan_common_types::VersionedSubstateId; use tari_dan_storage::consensus_models::{ Decision, ExecutedTransaction, @@ -16,7 +16,7 @@ use tari_dan_storage::consensus_models::{ use tari_engine_types::{ commit_result::{ExecuteResult, FinalizeResult, RejectReason, TransactionResult}, component::{ComponentBody, ComponentHeader}, - fees::FeeReceipt, + fees::{FeeBreakdown, FeeReceipt}, substate::{Substate, SubstateDiff, SubstateId}, transaction_receipt::{TransactionReceipt, TransactionReceiptAddress}, }; @@ -94,7 +94,7 @@ pub fn create_execution_result_for_transaction( fee_receipt: FeeReceipt { total_fee_payment: fee.try_into().unwrap(), total_fees_paid: fee.try_into().unwrap(), - cost_breakdown: vec![], + cost_breakdown: FeeBreakdown::default(), }, }), ); @@ -105,13 +105,10 @@ pub fn create_execution_result_for_transaction( )) }; - resulting_outputs.push(VersionedSubstateIdLockIntent::new( - VersionedSubstateId::new( - SubstateId::TransactionReceipt(TransactionReceiptAddress::from(tx_id)), - 0, - ), - SubstateLockType::Output, - )); + resulting_outputs.push(VersionedSubstateIdLockIntent::output(VersionedSubstateId::new( + SubstateId::TransactionReceipt(TransactionReceiptAddress::from(tx_id)), + 0, + ))); TransactionExecution::new( tx_id, @@ -119,7 +116,7 @@ pub fn create_execution_result_for_transaction( finalize: FinalizeResult::new(tx_id.into_array().into(), vec![], vec![], result, FeeReceipt { total_fee_payment: fee.try_into().unwrap(), total_fees_paid: fee.try_into().unwrap(), - cost_breakdown: vec![], + cost_breakdown: FeeBreakdown::default(), }), execution_time: Duration::from_secs(0), }, diff --git a/dan_layer/engine/src/runtime/fee_state.rs b/dan_layer/engine/src/runtime/fee_state.rs index 853bc8c42..deba5c782 100644 --- a/dan_layer/engine/src/runtime/fee_state.rs +++ b/dan_layer/engine/src/runtime/fee_state.rs @@ -7,7 +7,7 @@ use tari_template_lib::models::{Amount, VaultId}; #[derive(Debug, Clone, Default)] pub struct FeeState { pub fee_payments: Vec<(ResourceContainer, VaultId)>, - pub fee_charges: Vec, + pub fee_charges: FeeBreakdown, } impl FeeState { @@ -16,7 +16,7 @@ impl FeeState { } pub fn total_charges(&self) -> u64 { - self.fee_charges.iter().map(|breakdown| breakdown.amount).sum() + self.fee_charges.get_total() } pub fn total_payments(&self) -> Amount { diff --git a/dan_layer/engine/src/runtime/tracker.rs b/dan_layer/engine/src/runtime/tracker.rs index e535ba886..05b411310 100644 --- a/dan_layer/engine/src/runtime/tracker.rs +++ b/dan_layer/engine/src/runtime/tracker.rs @@ -33,7 +33,7 @@ use tari_engine_types::{ component::{ComponentBody, ComponentHeader}, confidential::UnclaimedConfidentialOutput, events::Event, - fees::{FeeBreakdown, FeeSource}, + fees::FeeSource, indexed_value::{IndexedValue, IndexedWellKnownTypes}, lock::LockFlag, logs::LogEntry, @@ -269,7 +269,7 @@ impl StateTracker { self.write_with(|state| { debug!(target: LOG_TARGET, "Add fee: source: {:?}, amount: {}", source, amount); - state.fee_state_mut().fee_charges.push(FeeBreakdown { source, amount }); + state.fee_state_mut().fee_charges.insert(source, amount); }) } diff --git a/dan_layer/engine/src/runtime/working_state.rs b/dan_layer/engine/src/runtime/working_state.rs index 7e25dc08f..90d1e9fb4 100644 --- a/dan_layer/engine/src/runtime/working_state.rs +++ b/dan_layer/engine/src/runtime/working_state.rs @@ -23,7 +23,6 @@ use tari_engine_types::{ lock::LockFlag, logs::LogEntry, non_fungible::NonFungibleContainer, - non_fungible_index::NonFungibleIndex, proof::{ContainerRef, LockedResource, Proof}, resource::Resource, resource_container::{ResourceContainer, ResourceError}, @@ -42,7 +41,6 @@ use tari_template_lib::{ BucketId, ComponentAddress, NonFungibleAddress, - NonFungibleIndexAddress, ProofId, UnclaimedConfidentialOutputAddress, VaultId, @@ -505,35 +503,32 @@ impl WorkingState { ); let mut token_ids = BTreeSet::new(); - let resource = self.get_resource(locked_resource)?; + // let resource = self.get_resource(locked_resource)?; // TODO: This isn't correct (assumes tokens are never burnt), we'll need to rethink this - let mut index = resource - .total_supply() - .as_u64_checked() - .ok_or(RuntimeError::InvalidAmount { - amount: resource.total_supply(), - reason: "Could not convert to u64".to_owned(), - })?; + // let mut index = resource + // .total_supply() + // .as_u64_checked() + // .ok_or(RuntimeError::InvalidAmount { + // amount: resource.total_supply(), + // reason: "Could not convert to u64".to_owned(), + // })?; for (id, (data, mut_data)) in tokens { - let nft_address = NonFungibleAddress::new(resource_address, id.clone()); - let addr = SubstateId::NonFungible(nft_address.clone()); + let nft_address = NonFungibleAddress::new(resource_address, id); + let token_id = nft_address.id().clone(); + let addr = SubstateId::NonFungible(nft_address); if self.substate_exists(&addr)? { - return Err(RuntimeError::DuplicateNonFungibleId { - token_id: nft_address.id().clone(), - }); - } - if !token_ids.insert(id.clone()) { - // Can't happen - return Err(RuntimeError::DuplicateNonFungibleId { token_id: id }); + return Err(RuntimeError::DuplicateNonFungibleId { token_id }); + } else { + token_ids.insert(token_id); + self.new_substate(addr.clone(), NonFungibleContainer::new(data, mut_data))?; } - self.new_substate(addr.clone(), NonFungibleContainer::new(data, mut_data))?; // for each new nft we also create an index to be allow resource scanning - let index_address = NonFungibleIndexAddress::new(resource_address, index); - index += 1; - let nft_index = NonFungibleIndex::new(nft_address); - self.new_substate(index_address, nft_index)?; + // let index_address = NonFungibleIndexAddress::new(resource_address, index); + // index += 1; + // let nft_index = NonFungibleIndex::new(nft_address); + // self.new_substate(index_address, nft_index)?; } ResourceContainer::non_fungible(resource_address, token_ids) @@ -900,7 +895,7 @@ impl WorkingState { fee_receipt: FeeReceipt { total_fee_payment, total_fees_paid: fee_resource.amount(), - cost_breakdown: self.fee_state.fee_charges.drain(..).collect(), + cost_breakdown: mem::take(&mut self.fee_state.fee_charges), }, }) } diff --git a/dan_layer/engine/tests/test.rs b/dan_layer/engine/tests/test.rs index dffc786de..b15923308 100644 --- a/dan_layer/engine/tests/test.rs +++ b/dan_layer/engine/tests/test.rs @@ -1246,112 +1246,112 @@ mod tickets { } } -mod nft_indexes { - use super::*; - - fn setup() -> ( - TemplateTest, - (ComponentAddress, NonFungibleAddress), - ComponentAddress, - SubstateId, - ) { - let mut template_test = TemplateTest::new(vec!["tests/templates/nft/nft_list"]); - - let (account_address, owner_token, _) = template_test.create_funded_account(); - let nft_component: ComponentAddress = template_test.call_function("SparkleNft", "new", args![], vec![]); - - let nft_resx = template_test.get_previous_output_address(SubstateType::Resource); - - // TODO: cleanup - (template_test, (account_address, owner_token), nft_component, nft_resx) - } - - #[test] - #[allow(clippy::too_many_lines)] - fn new_nft_index() { - let (mut template_test, (account_address, owner_proof), nft_component, nft_resx) = setup(); - - let vars = vec![ - ("account", account_address.into()), - ("nft", nft_component.into()), - ("nft_resx", nft_resx.clone().into()), - ]; - - let total_supply: Amount = - template_test.call_method(nft_component, "total_supply", args![], vec![owner_proof.clone()]); - assert_eq!(total_supply, Amount(0)); - - let result = template_test - .execute_and_commit_manifest( - r#" - let account = var!["account"]; - let sparkle_nft = var!["nft"]; - - let nft_bucket = sparkle_nft.mint(); - account.deposit(nft_bucket); - "#, - vars.clone(), - vec![owner_proof.clone()], - ) - .unwrap(); - - let diff = result.finalize.result.expect("execution failed"); - - // Resource is changed - assert_eq!(diff.down_iter().filter(|(addr, _)| addr.is_resource()).count(), 1); - assert_eq!(diff.up_iter().filter(|(addr, _)| addr.is_resource()).count(), 1); - - // NFT component changed - assert_eq!(diff.down_iter().filter(|(addr, _)| addr.is_component()).count(), 1); - assert_eq!(diff.up_iter().filter(|(addr, _)| addr.is_component()).count(), 1); - - // One new vault created - assert_eq!(diff.down_iter().filter(|(addr, _)| addr.is_vault()).count(), 0); - assert_eq!(diff.up_iter().filter(|(addr, _)| addr.is_vault()).count(), 1); - - // One new NFT minted - assert_eq!(diff.down_iter().filter(|(addr, _)| addr.is_non_fungible()).count(), 0); - assert_eq!(diff.up_iter().filter(|(addr, _)| addr.is_non_fungible()).count(), 1); - - // One new NFT minted - assert_eq!(diff.down_iter().filter(|(addr, _)| addr.is_non_fungible()).count(), 0); - assert_eq!(diff.up_iter().filter(|(addr, _)| addr.is_non_fungible()).count(), 1); - let (nft_addr, _) = diff.up_iter().find(|(addr, _)| addr.is_non_fungible()).unwrap(); - - // One new NFT index - assert_eq!( - diff.down_iter() - .filter(|(addr, _)| addr.is_non_fungible_index()) - .count(), - 0 - ); - assert_eq!( - diff.up_iter().filter(|(addr, _)| addr.is_non_fungible_index()).count(), - 1 - ); - let (index_addr, index) = diff.up_iter().find(|(addr, _)| addr.is_non_fungible_index()).unwrap(); - // The nft index address is composed of the resource address - assert_eq!( - nft_resx.as_resource_address().unwrap(), - index_addr - .as_non_fungible_index_address() - .unwrap() - .resource_address() - .to_owned(), - ); - // The index references the newly minted nft - let referenced_address = index - .substate_value() - .non_fungible_index() - .unwrap() - .referenced_address(); - assert_eq!(nft_addr.to_address_string(), referenced_address.to_string()); - - // The total supply of the resource is increased - let total_supply: Amount = template_test.call_method(nft_component, "total_supply", args![], vec![owner_proof]); - assert_eq!(total_supply, Amount(1)); - } -} +// mod nft_indexes { +// use super::*; +// +// fn setup() -> ( +// TemplateTest, +// (ComponentAddress, NonFungibleAddress), +// ComponentAddress, +// SubstateId, +// ) { +// let mut template_test = TemplateTest::new(vec!["tests/templates/nft/nft_list"]); +// +// let (account_address, owner_token, _) = template_test.create_funded_account(); +// let nft_component: ComponentAddress = template_test.call_function("SparkleNft", "new", args![], vec![]); +// +// let nft_resx = template_test.get_previous_output_address(SubstateType::Resource); +// +// // TODO: cleanup +// (template_test, (account_address, owner_token), nft_component, nft_resx) +// } +// +// #[test] +// #[allow(clippy::too_many_lines)] +// fn new_nft_index() { +// let (mut template_test, (account_address, owner_proof), nft_component, nft_resx) = setup(); +// +// let vars = vec![ +// ("account", account_address.into()), +// ("nft", nft_component.into()), +// ("nft_resx", nft_resx.clone().into()), +// ]; +// +// let total_supply: Amount = +// template_test.call_method(nft_component, "total_supply", args![], vec![owner_proof.clone()]); +// assert_eq!(total_supply, Amount(0)); +// +// let result = template_test +// .execute_and_commit_manifest( +// r#" +// let account = var!["account"]; +// let sparkle_nft = var!["nft"]; +// +// let nft_bucket = sparkle_nft.mint(); +// account.deposit(nft_bucket); +// "#, +// vars.clone(), +// vec![owner_proof.clone()], +// ) +// .unwrap(); +// +// let diff = result.finalize.result.expect("execution failed"); +// +// // Resource is changed +// assert_eq!(diff.down_iter().filter(|(addr, _)| addr.is_resource()).count(), 1); +// assert_eq!(diff.up_iter().filter(|(addr, _)| addr.is_resource()).count(), 1); +// +// // NFT component changed +// assert_eq!(diff.down_iter().filter(|(addr, _)| addr.is_component()).count(), 1); +// assert_eq!(diff.up_iter().filter(|(addr, _)| addr.is_component()).count(), 1); +// +// // One new vault created +// assert_eq!(diff.down_iter().filter(|(addr, _)| addr.is_vault()).count(), 0); +// assert_eq!(diff.up_iter().filter(|(addr, _)| addr.is_vault()).count(), 1); +// +// // One new NFT minted +// assert_eq!(diff.down_iter().filter(|(addr, _)| addr.is_non_fungible()).count(), 0); +// assert_eq!(diff.up_iter().filter(|(addr, _)| addr.is_non_fungible()).count(), 1); +// +// // One new NFT minted +// assert_eq!(diff.down_iter().filter(|(addr, _)| addr.is_non_fungible()).count(), 0); +// assert_eq!(diff.up_iter().filter(|(addr, _)| addr.is_non_fungible()).count(), 1); +// let (nft_addr, _) = diff.up_iter().find(|(addr, _)| addr.is_non_fungible()).unwrap(); +// +// // One new NFT index +// assert_eq!( +// diff.down_iter() +// .filter(|(addr, _)| addr.is_non_fungible_index()) +// .count(), +// 0 +// ); +// assert_eq!( +// diff.up_iter().filter(|(addr, _)| addr.is_non_fungible_index()).count(), +// 1 +// ); +// let (index_addr, index) = diff.up_iter().find(|(addr, _)| addr.is_non_fungible_index()).unwrap(); +// // The nft index address is composed of the resource address +// assert_eq!( +// nft_resx.as_resource_address().unwrap(), +// index_addr +// .as_non_fungible_index_address() +// .unwrap() +// .resource_address() +// .to_owned(), +// ); +// // The index references the newly minted nft +// let referenced_address = index +// .substate_value() +// .non_fungible_index() +// .unwrap() +// .referenced_address(); +// assert_eq!(nft_addr.to_address_string(), referenced_address.to_string()); +// +// // The total supply of the resource is increased +// let total_supply: Amount = template_test.call_method(nft_component, "total_supply", args![], +// vec![owner_proof]); assert_eq!(total_supply, Amount(1)); +// } +// } #[test] fn test_builtin_templates() { diff --git a/dan_layer/engine_types/Cargo.toml b/dan_layer/engine_types/Cargo.toml index 4fb239773..b5fdc0944 100644 --- a/dan_layer/engine_types/Cargo.toml +++ b/dan_layer/engine_types/Cargo.toml @@ -23,6 +23,7 @@ blake2 = { workspace = true } rand = { workspace = true } digest = { workspace = true } hex = { workspace = true, features = ["serde"] } +indexmap = { workspace = true } lazy_static = { workspace = true } serde = { workspace = true, default-features = true } serde_json = { workspace = true } diff --git a/dan_layer/engine_types/src/fees.rs b/dan_layer/engine_types/src/fees.rs index 7c50322d2..4f472916a 100644 --- a/dan_layer/engine_types/src/fees.rs +++ b/dan_layer/engine_types/src/fees.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; +use indexmap::{map::Entry, IndexMap}; use serde::{Deserialize, Serialize}; use tari_template_lib::models::{Amount, VaultId}; #[cfg(feature = "ts")] @@ -18,7 +19,7 @@ pub struct FeeReceipt { /// Total fees paid after refunds pub total_fees_paid: Amount, /// Breakdown of fee costs - pub cost_breakdown: Vec, + pub cost_breakdown: FeeBreakdown, } impl FeeReceipt { @@ -31,13 +32,7 @@ impl FeeReceipt { /// The total amount of fees charged. This may be more than total_fees_paid if the user paid an insufficient amount. pub fn total_fees_charged(&self) -> Amount { - Amount::try_from( - self.cost_breakdown - .iter() - .map(|breakdown| breakdown.amount) - .sum::(), - ) - .unwrap() + Amount::try_from(self.cost_breakdown.get_total()).unwrap() } pub fn total_refunded(&self) -> Amount { @@ -69,7 +64,7 @@ impl FeeReceipt { } } -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, Hash, Eq, PartialEq, PartialOrd, Ord)] #[cfg_attr(feature = "ts", derive(TS), ts(export, export_to = "../../bindings/src/types/"))] pub enum FeeSource { Initial, @@ -79,19 +74,40 @@ pub enum FeeSource { Logs, } -#[derive(Debug, Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Default)] #[cfg_attr(feature = "ts", derive(TS), ts(export, export_to = "../../bindings/src/types/"))] pub struct FeeBreakdown { - pub source: FeeSource, - #[cfg_attr(feature = "ts", ts(type = "number"))] - pub amount: u64, + breakdown: IndexMap, +} + +impl FeeBreakdown { + pub fn insert(&mut self, source: FeeSource, amount: u64) { + match self.breakdown.entry(source) { + Entry::Occupied(entry) => { + *entry.into_mut() += amount; + }, + Entry::Vacant(entry) => { + entry.insert(amount); + self.breakdown.sort_keys(); + }, + } + } + + /// Returns an iterator over the fee breakdown in a canonical order. + pub fn iter(&self) -> impl Iterator { + self.breakdown.iter() + } + + pub fn get_total(&self) -> u64 { + self.breakdown.values().sum() + } } #[derive(Debug, Clone, Serialize, Deserialize)] #[cfg_attr(feature = "ts", derive(TS), ts(export, export_to = "../../bindings/src/types/"))] pub struct FeeCostBreakdown { pub total_fees_charged: Amount, - pub breakdown: Vec, + pub breakdown: FeeBreakdown, } #[derive(Debug)] diff --git a/dan_layer/engine_types/src/indexed_value.rs b/dan_layer/engine_types/src/indexed_value.rs index 7843def77..cc0698222 100644 --- a/dan_layer/engine_types/src/indexed_value.rs +++ b/dan_layer/engine_types/src/indexed_value.rs @@ -111,10 +111,7 @@ impl IndexedValue { pub fn get_value(&self, path: &str) -> Result, IndexedValueError> where for<'a> T: serde::Deserialize<'a> { - get_value_by_path(&self.value, path) - .map(tari_bor::from_value) - .transpose() - .map_err(Into::into) + decode_value_at_path(&self.value, path) } } @@ -459,6 +456,14 @@ impl From<&str> for IndexedValueError { } } +pub fn decode_value_at_path(value: &tari_bor::Value, path: &str) -> Result, IndexedValueError> +where for<'a> T: serde::Deserialize<'a> { + get_value_by_path(value, path) + .map(tari_bor::from_value) + .transpose() + .map_err(Into::into) +} + fn get_value_by_path<'a>(value: &'a tari_bor::Value, path: &str) -> Option<&'a tari_bor::Value> { let mut value = value; for part in path.split('.') { diff --git a/dan_layer/rpc_state_sync/src/manager.rs b/dan_layer/rpc_state_sync/src/manager.rs index bc48f8c87..e55b5f163 100644 --- a/dan_layer/rpc_state_sync/src/manager.rs +++ b/dan_layer/rpc_state_sync/src/manager.rs @@ -1,8 +1,6 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::cmp; - use anyhow::anyhow; use async_trait::async_trait; use futures::StreamExt; @@ -138,13 +136,6 @@ where TConsensusSpec: ConsensusSpec let mut current_version = persisted_version; - // Minimum epoch we should request is 1 since Epoch(0) is the genesis epoch. - let last_state_transition_id = StateTransitionId::new( - cmp::max(last_state_transition_id.epoch(), Epoch(1)), - last_state_transition_id.shard(), - last_state_transition_id.seq(), - ); - info!( target: LOG_TARGET, "🛜Syncing from state transition {last_state_transition_id}" diff --git a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql index c03eafb83..80b2f7656 100644 --- a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql +++ b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql @@ -413,19 +413,16 @@ CREATE TABLE burnt_utxos CREATE TABLE state_tree ( - id integer not NULL primary key AUTOINCREMENT, - shard int not NULL, - key text not NULL, - node text not NULL, - is_stale boolean not null default '0' + id integer not NULL primary key AUTOINCREMENT, + shard int not NULL, + key text not NULL, + node text not NULL ); -- Scoping by shard -CREATE INDEX state_tree_idx_shard_key on state_tree (shard) WHERE is_stale = false; +CREATE INDEX state_tree_idx_shard_key on state_tree (shard); -- Duplicate keys are not allowed -CREATE UNIQUE INDEX state_tree_uniq_idx_key on state_tree (shard, key) WHERE is_stale = false; --- filtering out or by is_stale is used in every query -CREATE INDEX state_tree_idx_is_stale on state_tree (is_stale); +CREATE UNIQUE INDEX state_tree_uniq_idx_key on state_tree (shard, key); create table state_tree_shard_versions ( diff --git a/dan_layer/state_store_sqlite/src/reader.rs b/dan_layer/state_store_sqlite/src/reader.rs index 89b3f7621..fb515b37e 100644 --- a/dan_layer/state_store_sqlite/src/reader.rs +++ b/dan_layer/state_store_sqlite/src/reader.rs @@ -777,6 +777,7 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor let execution = transaction_executions::table .filter(transaction_executions::transaction_id.eq(serialize_hex(tx_id))) .filter(transaction_executions::block_id.eq_any(block_ids)) + // Get last execution .order_by(transaction_executions::id.desc()) .first::(self.connection()) .map_err(|e| SqliteStorageError::DieselError { @@ -2088,15 +2089,14 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor ) -> Result, StorageError> { use crate::schema::{state_transitions, substates}; - // Never return epoch 0 state transitions - let min_epoch = Some(id.epoch().as_u64()).filter(|e| *e > 0).unwrap_or(1) as i64; + debug!(target: LOG_TARGET, "state_transitions_get_n_after: {id}, end_epoch:{end_epoch}"); + let transitions = state_transitions::table .left_join(substates::table.on(state_transitions::substate_address.eq(substates::address))) .select((state_transitions::all_columns, substates::all_columns.nullable())) - .filter(state_transitions::seq.ge(id.seq() as i64)) - .filter(state_transitions::epoch.ge(min_epoch)) - .filter(state_transitions::epoch.lt(end_epoch.as_u64() as i64)) + .filter(state_transitions::seq.gt(id.seq() as i64)) .filter(state_transitions::shard.eq(id.shard().as_u32() as i32)) + .filter(state_transitions::epoch.lt(end_epoch.as_u64() as i64)) .limit(n as i64) .get_results::<(sql_models::StateTransition, Option)>(self.connection()) .map_err(|e| SqliteStorageError::DieselError { @@ -2143,7 +2143,6 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor .select(state_tree::node) .filter(state_tree::shard.eq(shard.as_u32() as i32)) .filter(state_tree::key.eq(key.to_string())) - .filter(state_tree::is_stale.eq(false)) .first::(self.connection()) .map_err(|e| SqliteStorageError::DieselError { operation: "state_tree_nodes_get", @@ -2231,7 +2230,7 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor let version = pledge.version as u32; let id = VersionedSubstateId::new(substate_id, version); let lock_type = parse_from_string(&pledge.lock_type)?; - let lock_intent = VersionedSubstateIdLockIntent::new(id, lock_type); + let lock_intent = VersionedSubstateIdLockIntent::new(id, lock_type, true); let substate_value = pledge.substate_value.as_deref().map(deserialize_json).transpose()?; let pledge = SubstatePledge::try_create(lock_intent.clone(), substate_value).ok_or_else(|| { StorageError::DataInconsistency { diff --git a/dan_layer/state_store_sqlite/src/schema.rs b/dan_layer/state_store_sqlite/src/schema.rs index 41c94f6b0..b2e9b7293 100644 --- a/dan_layer/state_store_sqlite/src/schema.rs +++ b/dan_layer/state_store_sqlite/src/schema.rs @@ -302,7 +302,6 @@ diesel::table! { shard -> Integer, key -> Text, node -> Text, - is_stale -> Bool, } } diff --git a/dan_layer/state_store_sqlite/src/writer.rs b/dan_layer/state_store_sqlite/src/writer.rs index 77195f19e..6e1cd7a22 100644 --- a/dan_layer/state_store_sqlite/src/writer.rs +++ b/dan_layer/state_store_sqlite/src/writer.rs @@ -1457,7 +1457,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta operation: "substates_create", source: e, })?; - let next_seq = seq.map(|s| s + 1).unwrap_or(0); + let next_seq = seq.map(|s| s + 1).unwrap_or(1); // This means that we MUST do the state tree updates before inserting substates let version = self.state_tree_versions_get_latest(substate.created_by_shard)?; @@ -1511,7 +1511,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta .set(changes) .execute(self.connection()) .map_err(|e| SqliteStorageError::DieselError { - operation: "substates_down", + operation: "substates_down (update substates)", source: e, })?; @@ -1520,10 +1520,10 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta .filter(state_transitions::shard.eq(shard.as_u32() as i32)) .first::>(self.connection()) .map_err(|e| SqliteStorageError::DieselError { - operation: "substates_create", + operation: "substates_down (get max seq)", source: e, })?; - let next_seq = seq.map(|s| s + 1).unwrap_or(0); + let next_seq = seq.map(|s| s + 1).unwrap_or(1); let version = self.state_tree_versions_get_latest(shard)?; let values = ( @@ -1541,7 +1541,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta .values(values) .execute(self.connection()) .map_err(|e| SqliteStorageError::DieselError { - operation: "substates_down", + operation: "substates_down(insert into state_transitions)", source: e, })?; @@ -1701,7 +1701,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta let values = ( state_tree::shard.eq(shard.as_u32() as i32), state_tree::key.eq(key.to_string()), - state_tree::node.eq(node), + state_tree::node.eq(&node), ); diesel::insert_into(state_tree::table) .values(&values) @@ -1721,11 +1721,20 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta ) -> Result<(), StorageError> { use crate::schema::state_tree; + // let num_effected = diesel::update(state_tree::table) + // .filter(state_tree::shard.eq(shard.as_u32() as i32)) + // .filter(state_tree::key.eq(key.to_string())) + // .set(state_tree::is_stale.eq(true)) + // .execute(self.connection()) + // .map_err(|e| SqliteStorageError::DieselError { + // operation: "state_tree_nodes_mark_stale_tree_node", + // source: e, + // })?; + let key = node.as_node_key(); - let num_effected = diesel::update(state_tree::table) + let num_effected = diesel::delete(state_tree::table) .filter(state_tree::shard.eq(shard.as_u32() as i32)) .filter(state_tree::key.eq(key.to_string())) - .set(state_tree::is_stale.eq(true)) .execute(self.connection()) .map_err(|e| SqliteStorageError::DieselError { operation: "state_tree_nodes_mark_stale_tree_node", diff --git a/dan_layer/storage/src/consensus_models/executed_transaction.rs b/dan_layer/storage/src/consensus_models/executed_transaction.rs index 31ab688cf..4e3628743 100644 --- a/dan_layer/storage/src/consensus_models/executed_transaction.rs +++ b/dan_layer/storage/src/consensus_models/executed_transaction.rs @@ -9,13 +9,7 @@ use std::{ }; use serde::{Deserialize, Serialize}; -use tari_dan_common_types::{ - optional::Optional, - SubstateAddress, - SubstateLockType, - ToSubstateAddress, - VersionedSubstateId, -}; +use tari_dan_common_types::{optional::Optional, SubstateAddress, ToSubstateAddress, VersionedSubstateId}; use tari_engine_types::commit_result::{ExecuteResult, RejectReason}; use tari_transaction::{Transaction, TransactionId}; @@ -65,10 +59,10 @@ impl ExecutedTransaction { .map(|diff| { diff.up_iter() .map(|(addr, substate)| { - VersionedSubstateIdLockIntent::new( - VersionedSubstateId::new(addr.clone(), substate.version()), - SubstateLockType::Output, - ) + VersionedSubstateIdLockIntent::output(VersionedSubstateId::new( + addr.clone(), + substate.version(), + )) }) .collect::>() }) diff --git a/dan_layer/storage/src/consensus_models/lock_intent.rs b/dan_layer/storage/src/consensus_models/lock_intent.rs index 789b0852b..49680460d 100644 --- a/dan_layer/storage/src/consensus_models/lock_intent.rs +++ b/dan_layer/storage/src/consensus_models/lock_intent.rs @@ -16,26 +16,38 @@ use tari_engine_types::substate::SubstateId; pub struct VersionedSubstateIdLockIntent { versioned_substate_id: VersionedSubstateId, lock_type: SubstateLockType, + require_version: bool, } impl VersionedSubstateIdLockIntent { - pub fn new(versioned_substate_id: VersionedSubstateId, lock: SubstateLockType) -> Self { + pub fn new(versioned_substate_id: VersionedSubstateId, lock: SubstateLockType, require_version: bool) -> Self { Self { versioned_substate_id, lock_type: lock, + require_version, } } - pub fn read(versioned_substate_id: VersionedSubstateId) -> Self { - Self::new(versioned_substate_id, SubstateLockType::Read) + pub fn from_requirement(substate_requirement: SubstateRequirement, lock: SubstateLockType) -> Self { + let version = substate_requirement.version(); + Self::new( + VersionedSubstateId::new(substate_requirement.into_substate_id(), version.unwrap_or(0)), + lock, + version.is_some(), + ) + } + + pub fn read(versioned_substate_id: VersionedSubstateId, require_version: bool) -> Self { + Self::new(versioned_substate_id, SubstateLockType::Read, require_version) } - pub fn write(versioned_substate_id: VersionedSubstateId) -> Self { - Self::new(versioned_substate_id, SubstateLockType::Write) + pub fn write(versioned_substate_id: VersionedSubstateId, require_version: bool) -> Self { + Self::new(versioned_substate_id, SubstateLockType::Write, require_version) } pub fn output(versioned_substate_id: VersionedSubstateId) -> Self { - Self::new(versioned_substate_id, SubstateLockType::Output) + // Once we lock outputs we always require the provided version + Self::new(versioned_substate_id, SubstateLockType::Output, true) } pub fn versioned_substate_id(&self) -> &VersionedSubstateId { @@ -57,6 +69,15 @@ impl VersionedSubstateIdLockIntent { pub fn lock_type(&self) -> SubstateLockType { self.lock_type } + + pub fn to_substate_requirement(&self) -> SubstateRequirement { + let version = if self.require_version { + Some(self.version()) + } else { + None + }; + SubstateRequirement::new(self.substate_id().clone(), version) + } } impl Borrow for VersionedSubstateIdLockIntent { @@ -88,11 +109,15 @@ impl<'a> LockIntent for &'a VersionedSubstateIdLockIntent { } fn version_to_lock(&self) -> u32 { - self.versioned_substate_id.version() + self.version() } fn requested_version(&self) -> Option { - Some(self.versioned_substate_id.version()) + if self.require_version { + Some(self.version()) + } else { + None + } } } @@ -152,6 +177,7 @@ impl SubstateRequirementLockIntent { VersionedSubstateIdLockIntent::new( VersionedSubstateId::new(self.substate_id().clone(), self.version_to_lock), self.lock_type, + self.substate_requirement.version().is_some(), ) } } @@ -195,7 +221,7 @@ impl LockIntent for SubstateRequirementLockIntent { impl From for SubstateRequirementLockIntent { fn from(intent: VersionedSubstateIdLockIntent) -> Self { let version = intent.versioned_substate_id.version(); - Self::new(intent.versioned_substate_id, version, intent.lock_type) + Self::new(intent.to_substate_requirement(), version, intent.lock_type) } } diff --git a/dan_layer/storage/src/consensus_models/substate_lock.rs b/dan_layer/storage/src/consensus_models/substate_lock.rs index 083c97a3b..c2531bb98 100644 --- a/dan_layer/storage/src/consensus_models/substate_lock.rs +++ b/dan_layer/storage/src/consensus_models/substate_lock.rs @@ -80,6 +80,7 @@ impl LockedSubstateValue { VersionedSubstateIdLockIntent::new( VersionedSubstateId::new(self.substate_id.clone(), self.lock.version()), self.lock.substate_lock(), + true, ) } diff --git a/dan_layer/storage/src/consensus_models/transaction.rs b/dan_layer/storage/src/consensus_models/transaction.rs index 2a58f4846..c208debb1 100644 --- a/dan_layer/storage/src/consensus_models/transaction.rs +++ b/dan_layer/storage/src/consensus_models/transaction.rs @@ -171,7 +171,7 @@ impl TransactionRecord { let resolved_inputs = self.resolved_inputs.take().unwrap_or_else(|| { self.transaction .all_inputs_iter() - .map(|i| VersionedSubstateIdLockIntent::new(i.or_zero_version(), SubstateLockType::Write)) + .map(|i| VersionedSubstateIdLockIntent::from_requirement(i, SubstateLockType::Write)) .collect() }); let resulting_outputs = self.resulting_outputs.take().unwrap_or_default(); diff --git a/dan_layer/template_test_tooling/src/template_test.rs b/dan_layer/template_test_tooling/src/template_test.rs index 8e8d62c3e..c1f5098ba 100644 --- a/dan_layer/template_test_tooling/src/template_test.rs +++ b/dan_layer/template_test_tooling/src/template_test.rs @@ -35,7 +35,6 @@ use tari_dan_engine::{ use tari_engine_types::{ commit_result::{ExecuteResult, RejectReason}, component::{ComponentBody, ComponentHeader}, - fees::FeeBreakdown, id_provider::{IdProvider, ObjectIds}, instruction::Instruction, resource_container::ResourceContainer, @@ -517,7 +516,7 @@ impl TemplateTest { eprintln!("Paid: {}", fee.total_fees_paid()); eprintln!("Refund: {}", fee.total_refunded()); eprintln!("Unpaid: {}", fee.unpaid_debt()); - for FeeBreakdown { source, amount } in &fee.cost_breakdown { + for (source, amount) in fee.cost_breakdown.iter() { eprintln!("- {:?} {}", source, amount); } } diff --git a/dan_layer/transaction/src/transaction.rs b/dan_layer/transaction/src/transaction.rs index a674530df..38ce5912b 100644 --- a/dan_layer/transaction/src/transaction.rs +++ b/dan_layer/transaction/src/transaction.rs @@ -1,13 +1,13 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::collections::HashSet; +use std::{collections::HashSet, fmt::Display}; use indexmap::IndexSet; use serde::{Deserialize, Serialize}; use tari_common_types::types::PublicKey; use tari_crypto::ristretto::RistrettoSecretKey; -use tari_dan_common_types::{Epoch, SubstateRequirement, VersionedSubstateId}; +use tari_dan_common_types::{committee::CommitteeInfo, Epoch, SubstateRequirement, VersionedSubstateId}; use tari_engine_types::{ hashing::{hasher32, EngineHashDomainLabel}, indexed_value::{IndexedValue, IndexedValueError}, @@ -144,6 +144,12 @@ impl Transaction { .chain(self.filled_inputs().iter().map(|fi| fi.substate_id())) } + /// Returns true if the provided committee is involved in at least one input of this transaction. + pub fn is_involved_inputs(&self, committee_info: &CommitteeInfo) -> bool { + self.all_inputs_iter() + .any(|id| committee_info.includes_substate_id(id.substate_id())) + } + pub fn num_unique_inputs(&self) -> usize { self.all_inputs_substate_ids_iter().count() } @@ -231,3 +237,18 @@ impl Transaction { self.inputs().iter().any(|i| i.version().is_none()) } } + +impl Display for Transaction { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Transaction[{}, Inputs: {}, Fee Instructions: {}, Instructions: {}, Signatures: {}, Filled Inputs: {}]", + self.id, + self.transaction.inputs.len(), + self.transaction.fee_instructions.len(), + self.transaction.instructions.len(), + self.signatures.len(), + self.filled_inputs.len(), + ) + } +} diff --git a/dan_layer/wallet/sdk/src/apis/transaction.rs b/dan_layer/wallet/sdk/src/apis/transaction.rs index a93319c8b..74ff6c2a0 100644 --- a/dan_layer/wallet/sdk/src/apis/transaction.rs +++ b/dan_layer/wallet/sdk/src/apis/transaction.rs @@ -286,14 +286,14 @@ where diff: &SubstateDiff, ) -> Result<(), TransactionApiError> { let mut downed_substates_with_parents = HashMap::with_capacity(diff.down_len()); - for (addr, _) in diff.down_iter() { - if addr.is_layer1_commitment() { - info!(target: LOG_TARGET, "Layer 1 commitment {} downed", addr); + for (id, _) in diff.down_iter() { + if id.is_layer1_commitment() { + info!(target: LOG_TARGET, "Layer 1 commitment {} downed", id); continue; } - let Some(downed) = tx.substates_remove(addr).optional()? else { - warn!(target: LOG_TARGET, "Downed substate {} not found", addr); + let Some(downed) = tx.substates_remove(id).optional()? else { + warn!(target: LOG_TARGET, "Downed substate {} not found", id); continue; }; diff --git a/integration_tests/tests/steps/validator_node.rs b/integration_tests/tests/steps/validator_node.rs index 1ab7370dd..dbb13df2b 100644 --- a/integration_tests/tests/steps/validator_node.rs +++ b/integration_tests/tests/steps/validator_node.rs @@ -468,7 +468,12 @@ async fn when_i_wait_for_validator_leaf_block_at_least(world: &mut TariWorld, na }) .await .unwrap(); - let actual_height = resp.blocks.first().unwrap().height().as_u64(); + let actual_height = resp + .blocks + .first() + .unwrap_or_else(|| panic!("Validator {name} has no blocks")) + .height() + .as_u64(); if actual_height < height { panic!( "Validator {} leaf block height {} is less than {}", diff --git a/utilities/tariswap_test_bench/src/accounts.rs b/utilities/tariswap_test_bench/src/accounts.rs index d75415ede..4da66806b 100644 --- a/utilities/tariswap_test_bench/src/accounts.rs +++ b/utilities/tariswap_test_bench/src/accounts.rs @@ -6,7 +6,7 @@ use std::ops::RangeInclusive; use tari_crypto::{keys::PublicKey as _, ristretto::RistrettoPublicKey}; use tari_dan_common_types::SubstateRequirement; use tari_dan_wallet_sdk::{apis::key_manager::TRANSACTION_BRANCH, models::Account}; -use tari_engine_types::component::new_component_address_from_public_key; +use tari_engine_types::{component::new_component_address_from_public_key, indexed_value::IndexedWellKnownTypes}; use tari_template_builtin::ACCOUNT_TEMPLATE_ADDRESS; use tari_template_lib::{ args, @@ -43,7 +43,10 @@ impl Runner { let finalize = self.submit_transaction_and_wait(transaction).await?; let diff = finalize.result.accept().unwrap(); let (account, _) = diff.up_iter().find(|(addr, _)| addr.is_component()).unwrap(); - let (vault, _) = diff.up_iter().find(|(addr, _)| addr.is_vault()).unwrap(); + let (vault, _) = diff + .up_iter() + .find(|(addr, _)| *addr != XTR_FAUCET_VAULT_ADDRESS && addr.is_vault()) + .unwrap(); self.sdk.accounts_api().add_account(None, account, 0, true)?; self.sdk.accounts_api().add_vault( @@ -80,8 +83,18 @@ impl Runner { for owner in &owners { builder = builder.create_account(RistrettoPublicKey::from_secret_key(&owner.key)); } + + let pay_fee_vault = self + .sdk + .accounts_api() + .get_vault_by_resource(&pay_fee_account.address, &XTR)?; + let transaction = builder - .with_inputs([SubstateRequirement::unversioned(pay_fee_account.address.clone())]) + .with_inputs([ + SubstateRequirement::unversioned(pay_fee_account.address.clone()), + SubstateRequirement::unversioned(pay_fee_vault.address), + SubstateRequirement::unversioned(pay_fee_vault.resource_address), + ]) .sign(&key.key) .build(); @@ -90,7 +103,7 @@ impl Runner { let mut accounts = Vec::with_capacity(num_accounts); for owner in owners { - let account = diff + let account_addr = diff .up_iter() .map(|(addr, _)| addr) .filter(|addr| addr.is_component()) @@ -105,8 +118,8 @@ impl Runner { self.sdk .accounts_api() - .add_account(None, account, owner.key_index, false)?; - let account = self.sdk.accounts_api().get_account_by_address(account)?; + .add_account(None, account_addr, owner.key_index, false)?; + let account = self.sdk.accounts_api().get_account_by_address(account_addr)?; accounts.push(account); } @@ -120,6 +133,10 @@ impl Runner { accounts: &[Account], ) -> anyhow::Result<()> { let key = self.sdk.key_manager_api().derive_key(TRANSACTION_BRANCH, 0)?; + let fee_vault = self + .sdk + .accounts_api() + .get_vault_by_resource(&fee_account.address, &XTR)?; let mut builder = Transaction::builder().fee_transaction_pay_from_component( fee_account.address.as_component_address().unwrap(), Amount(1000 * accounts.len() as i64), @@ -135,18 +152,51 @@ impl Runner { .put_last_instruction_output_on_workspace("xtr") .call_method(account.address.as_component_address().unwrap(), "deposit", args![ Workspace("xtr") - ]); + ]) + .add_input(SubstateRequirement::unversioned(account.address.clone())); } let transaction = builder .with_inputs([ + SubstateRequirement::unversioned(XTR), + SubstateRequirement::unversioned(XTR_FAUCET_COMPONENT_ADDRESS), + SubstateRequirement::unversioned(XTR_FAUCET_VAULT_ADDRESS), SubstateRequirement::unversioned(faucet.component_address), SubstateRequirement::unversioned(faucet.resource_address), + SubstateRequirement::unversioned(faucet.vault_address), + SubstateRequirement::unversioned(fee_vault.account_address), + SubstateRequirement::unversioned(fee_vault.address), ]) .sign(&key.key) .build(); - self.submit_transaction_and_wait(transaction).await?; + let result = self.submit_transaction_and_wait(transaction).await?; + + let accounts_and_state = result + .result + .accept() + .unwrap() + .up_iter() + .filter(|(addr, _)| { + *addr != XTR_FAUCET_COMPONENT_ADDRESS && + *addr != faucet.component_address && + *addr != fee_account.address + }) + .filter_map(|(addr, substate)| Some((addr, substate.substate_value().component()?))) + .map(|(addr, component)| (addr, IndexedWellKnownTypes::from_value(&component.body.state).unwrap())); + + for (account, indexed) in accounts_and_state { + for vault_id in indexed.vault_ids() { + log::info!("Adding vault {} to account {}", vault_id, account); + self.sdk.accounts_api().add_vault( + account.clone(), + (*vault_id).into(), + faucet.resource_address, + ResourceType::Fungible, + Some("FAUCET".to_string()), + )?; + } + } Ok(()) } diff --git a/utilities/tariswap_test_bench/src/faucet.rs b/utilities/tariswap_test_bench/src/faucet.rs index 78fdcf58a..a3c53690e 100644 --- a/utilities/tariswap_test_bench/src/faucet.rs +++ b/utilities/tariswap_test_bench/src/faucet.rs @@ -1,10 +1,13 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause +use log::info; +use tari_dan_common_types::SubstateRequirement; use tari_dan_wallet_sdk::{apis::key_manager::TRANSACTION_BRANCH, models::Account}; use tari_template_lib::{ args, - models::{Amount, ComponentAddress, ResourceAddress}, + constants::XTR, + models::{Amount, ComponentAddress, ResourceAddress, VaultId}, }; use tari_transaction::Transaction; @@ -13,15 +16,26 @@ use crate::runner::Runner; pub struct Faucet { pub component_address: ComponentAddress, pub resource_address: ResourceAddress, + pub vault_address: VaultId, } impl Runner { pub async fn create_faucet(&mut self, in_account: &Account) -> anyhow::Result { let key = self.sdk.key_manager_api().derive_key(TRANSACTION_BRANCH, 0)?; + let fee_vault = self + .sdk + .accounts_api() + .get_vault_by_resource(&in_account.address, &XTR)?; + let transaction = Transaction::builder() .fee_transaction_pay_from_component(in_account.address.as_component_address().unwrap(), Amount(1000)) - .call_function(self._faucet_template.address, "mint", args![Amount(1_000_000_000)]) + .call_function(self.faucet_template.address, "mint", args![Amount(1_000_000_000)]) + .with_inputs([ + SubstateRequirement::unversioned(in_account.address.clone()), + SubstateRequirement::unversioned(fee_vault.address.clone()), + SubstateRequirement::unversioned(fee_vault.resource_address), + ]) .sign(&key.key) .build(); @@ -31,18 +45,28 @@ impl Runner { let component_address = diff .up_iter() .find_map(|(addr, s)| { - addr.as_component_address() - .filter(|_| s.substate_value().component().unwrap().module_name == "TestFaucet") + addr.as_component_address().filter(|_| { + s.substate_value().component().unwrap().template_address == self.faucet_template.address + }) }) .ok_or_else(|| anyhow::anyhow!("Faucet Component address not found"))?; let resource_address = diff .up_iter() + .filter(|(addr, _)| *addr != XTR) .find_map(|(addr, _)| addr.as_resource_address()) .ok_or_else(|| anyhow::anyhow!("Faucet Resource address not found"))?; + let vault_address = diff + .up_iter() + .filter_map(|(addr, _)| addr.as_vault_id()) + .find(|addr| *addr != fee_vault.address) + .ok_or_else(|| anyhow::anyhow!("Faucet Resource address not found"))?; + + info!("✅ Faucet {component_address} created with {resource_address} and {vault_address}"); Ok(Faucet { component_address, resource_address, + vault_address, }) } } diff --git a/utilities/tariswap_test_bench/src/main.rs b/utilities/tariswap_test_bench/src/main.rs index 9bbe9a81f..146addb2f 100644 --- a/utilities/tariswap_test_bench/src/main.rs +++ b/utilities/tariswap_test_bench/src/main.rs @@ -58,9 +58,9 @@ async fn run(cli: cli::CommonArgs, _args: cli::RunArgs) -> anyhow::Result<()> { info!("✅ Created faucet {}", faucet.component_address); info!("⏳️ Funding accounts ..."); - for batch in accounts.chunks(500) { + for batch in accounts.chunks(250) { runner.fund_accounts(&faucet, &primary_account, batch).await?; - info!("✅ Funded 500 accounts"); + info!("✅ Funded 250 accounts"); } info!("⏳️ Creating 1000 tariswap components..."); diff --git a/utilities/tariswap_test_bench/src/runner.rs b/utilities/tariswap_test_bench/src/runner.rs index 3a4ceffe0..c05fe760d 100644 --- a/utilities/tariswap_test_bench/src/runner.rs +++ b/utilities/tariswap_test_bench/src/runner.rs @@ -4,7 +4,6 @@ use std::{path::Path, time::Duration}; use log::info; -use tari_dan_common_types::SubstateRequirement; use tari_dan_wallet_daemon::indexer_jrpc_impl::IndexerJsonRpcNetworkInterface; use tari_dan_wallet_sdk::{DanWalletSdk, WalletSdkConfig}; use tari_dan_wallet_storage_sqlite::SqliteWalletStore; @@ -20,7 +19,7 @@ type WalletSdk = DanWalletSdk pub struct Runner { pub(crate) sdk: WalletSdk, pub(crate) _cli: CommonArgs, - pub(crate) _faucet_template: TemplateMetadata, + pub(crate) faucet_template: TemplateMetadata, pub(crate) tariswap_template: TemplateMetadata, pub(crate) stats: Stats, } @@ -32,7 +31,7 @@ impl Runner { Ok(Self { sdk, _cli: cli, - _faucet_template: faucet_template, + faucet_template, tariswap_template, stats: Stats::default(), }) @@ -46,16 +45,16 @@ impl Runner { pub async fn submit_transaction(&mut self, transaction: Transaction) -> anyhow::Result { // TODO: remove the filled inputs here and allow consensus to figure out input versions - let inputs = transaction - .to_referenced_substates()? - .into_iter() - .map(|s| SubstateRequirement::new(s, None)) - .collect(); + // let inputs = transaction + // .to_referenced_substates()? + // .into_iter() + // .map(|s| SubstateRequirement::new(s, None)) + // .collect(); let tx_id = self .sdk .transaction_api() - .insert_new_transaction(transaction, inputs, None, false) + .insert_new_transaction(transaction, vec![], None, false) .await?; self.sdk.transaction_api().submit_transaction(tx_id).await?; diff --git a/utilities/tariswap_test_bench/src/tariswap.rs b/utilities/tariswap_test_bench/src/tariswap.rs index 5a657ba5c..27529c181 100644 --- a/utilities/tariswap_test_bench/src/tariswap.rs +++ b/utilities/tariswap_test_bench/src/tariswap.rs @@ -1,13 +1,16 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause +use std::collections::HashMap; + use log::info; -use tari_dan_common_types::SubstateRequirement; +use tari_dan_common_types::{optional::Optional, SubstateRequirement}; use tari_dan_wallet_sdk::{apis::key_manager::TRANSACTION_BRANCH, models::Account}; +use tari_engine_types::indexed_value::decode_value_at_path; use tari_template_lib::{ args, - models::{Amount, ComponentAddress}, - prelude::XTR, + models::{Amount, ComponentAddress, VaultId}, + prelude::{ResourceAddress, ResourceType, XTR}, }; use tari_transaction::Transaction; @@ -15,6 +18,8 @@ use crate::{faucet::Faucet, runner::Runner}; pub struct TariSwap { pub component_address: ComponentAddress, + pub vaults: HashMap, + pub lp_resource_address: ResourceAddress, } impl Runner { @@ -36,17 +41,43 @@ impl Runner { Amount(1) ]); } - let transaction = builder.sign(&key.key).build(); + + let fee_vault = self + .sdk + .accounts_api() + .get_vault_by_resource(&in_account.address, &XTR)?; + + let transaction = builder + .with_inputs([ + SubstateRequirement::unversioned(in_account.address.clone()), + SubstateRequirement::unversioned(fee_vault.address.clone()), + SubstateRequirement::unversioned(XTR), + SubstateRequirement::unversioned(faucet.resource_address), + ]) + .sign(&key.key) + .build(); let finalize = self.submit_transaction_and_wait(transaction).await?; let diff = finalize.result.accept().unwrap(); Ok(diff .up_iter() .filter_map(|(addr, value)| { - addr.as_component_address() - .filter(|_| value.substate_value().component().unwrap().module_name == "TariSwapPool") + let addr = addr + .as_component_address() + .filter(|_| value.substate_value().component().unwrap().module_name == "TariSwapPool")?; + let vaults = decode_value_at_path(value.substate_value().component().unwrap().state(), "$.pools") + .unwrap() + .unwrap(); + let lp_resource_address = + decode_value_at_path(value.substate_value().component().unwrap().state(), "$.lp_resource") + .unwrap() + .unwrap(); + Some(TariSwap { + component_address: addr, + vaults, + lp_resource_address, + }) }) - .map(|component_address| TariSwap { component_address }) .collect()) } @@ -63,8 +94,8 @@ impl Runner { .sdk .key_manager_api() .derive_key(TRANSACTION_BRANCH, primary_account.key_index)?; - let mut tx_ids = vec![]; - // Batch these otherwise we can break consensus (proposed with locked object) + let mut tx_ids = Vec::with_capacity(200); + for i in 0..5 { for (i, tariswap) in tariswaps.iter().enumerate().skip(i * 200).take(200) { let account = &accounts[i % accounts.len()]; @@ -72,11 +103,28 @@ impl Runner { .sdk .key_manager_api() .derive_key(TRANSACTION_BRANCH, account.key_index)?; + let xtr_vault = self.sdk.accounts_api().get_vault_by_resource(&account.address, &XTR)?; + let faucet_vault = self + .sdk + .accounts_api() + .get_vault_by_resource(&account.address, &faucet.resource_address)?; + let maybe_lp_vault = self + .sdk + .accounts_api() + .get_vault_by_resource(&account.address, &tariswap.lp_resource_address) + .optional()?; + let transaction = Transaction::builder() - .with_inputs(vec![ - SubstateRequirement::new(faucet.resource_address.into(), Some(0)), - SubstateRequirement::new(XTR.into(), Some(0)), + .with_inputs(maybe_lp_vault.map(|v| SubstateRequirement::unversioned(v.address))) + .with_inputs([ + SubstateRequirement::unversioned(account.address.clone()), + SubstateRequirement::unversioned(xtr_vault.address), + SubstateRequirement::unversioned(faucet_vault.address), + SubstateRequirement::unversioned(tariswap.component_address), + SubstateRequirement::unversioned(faucet.resource_address), + SubstateRequirement::unversioned(XTR), ]) + .with_inputs(tariswap.vaults.values().map(|v| SubstateRequirement::unversioned(*v))) .fee_transaction_pay_from_component(account.address.as_component_address().unwrap(), Amount(1000)) .call_method(account.address.as_component_address().unwrap(), "withdraw", args![ XTR, amount_a @@ -99,11 +147,30 @@ impl Runner { .sign(&key.key) .build(); - tx_ids.push(self.submit_transaction(transaction).await?); + tx_ids.push((account.address.clone(), self.submit_transaction(transaction).await?)); } - for tx_id in tx_ids.drain(..) { - self.wait_for_transaction(tx_id).await?; + for (account, tx_id) in tx_ids.drain(..) { + let result = self.wait_for_transaction(tx_id).await?; + let diff = result.result.accept().unwrap(); + let lp_vault = diff + .up_iter() + .find_map(|(addr, s)| { + let addr = addr.as_vault_id()?; + if *s.substate_value().vault().unwrap().resource_address() == tariswaps[0].lp_resource_address { + Some(addr) + } else { + None + } + }) + .ok_or_else(|| anyhow::anyhow!("LP Vault not found"))?; + self.sdk.accounts_api().add_vault( + account, + lp_vault.into(), + tariswaps[0].lp_resource_address, + ResourceType::NonFungible, + Some("LP".to_string()), + )?; } info!("⏳️ Added liquidity to pools {}-{}", i * 200, (i + 1) * 200); } @@ -128,37 +195,58 @@ impl Runner { let mut tx_ids = vec![]; // Swap XTR for faucet - // Batch these otherwise we can break consensus (proposed with locked object) for i in 0..5 { for (i, account) in accounts.iter().enumerate().skip(i * 200).take(200) { + let tariswap = &tariswaps[i % tariswaps.len()]; let key = self .sdk .key_manager_api() .derive_key(TRANSACTION_BRANCH, account.key_index)?; - let tariswap = &tariswaps[i % tariswaps.len()]; + let xtr_vault = self.sdk.accounts_api().get_vault_by_resource(&account.address, &XTR)?; + let faucet_vault = self + .sdk + .accounts_api() + .get_vault_by_resource(&account.address, &faucet.resource_address)?; + let maybe_lp_vault = self + .sdk + .accounts_api() + .get_vault_by_resource(&account.address, &tariswap.lp_resource_address) + .optional()?; let transaction = Transaction::builder() - // Use resources as input refs to allow concurrent access. - .with_inputs(vec![ - SubstateRequirement::new(faucet.resource_address.into(), Some(0)), - SubstateRequirement::new(XTR.into(), Some(0)), + .with_inputs(maybe_lp_vault.map(|v| SubstateRequirement::unversioned(v.address))) + .with_inputs([ + SubstateRequirement::unversioned(account.address.clone()), + SubstateRequirement::unversioned(xtr_vault.address), + SubstateRequirement::unversioned(faucet_vault.address), + SubstateRequirement::unversioned(tariswap.component_address), + SubstateRequirement::unversioned(faucet.resource_address), + SubstateRequirement::unversioned(XTR), + SubstateRequirement::unversioned(tariswap.lp_resource_address), ]) + .with_inputs(tariswap.vaults.values().map(|v| SubstateRequirement::unversioned(*v))) .fee_transaction_pay_from_component(account.address.as_component_address().unwrap(), Amount(1000)) - .call_method(tariswap.component_address, "get_pool_balance", args![ XTR, ]) - .call_method(tariswap.component_address, "get_pool_balance", args![ faucet.resource_address, ]) + .call_method(tariswap.component_address, "get_pool_balance", args![XTR,]) + .call_method(tariswap.component_address, "get_pool_balance", args![ + faucet.resource_address, + ]) .call_method(tariswap.component_address, "get_pool_ratio", args![XTR, Amount(1000)]) - .call_method(tariswap.component_address, "get_pool_ratio", args![faucet.resource_address, Amount(1000)]) + .call_method(tariswap.component_address, "get_pool_ratio", args![ + faucet.resource_address, + Amount(1000) + ]) .call_method(account.address.as_component_address().unwrap(), "withdraw", args![ - XTR, amount_a_for_b - ]) + XTR, + amount_a_for_b + ]) .put_last_instruction_output_on_workspace("a") .call_method(tariswap.component_address, "swap", args![ - Workspace("a"), - faucet.resource_address, - ]) + Workspace("a"), + faucet.resource_address, + ]) .put_last_instruction_output_on_workspace("swapped") .call_method(account.address.as_component_address().unwrap(), "deposit", args![ - Workspace("swapped") - ]) + Workspace("swapped") + ]) .sign(&primary_account_key.key) .sign(&key.key) .build(); @@ -188,26 +276,39 @@ impl Runner { .sdk .key_manager_api() .derive_key(TRANSACTION_BRANCH, account.key_index)?; + let xtr_vault = self.sdk.accounts_api().get_vault_by_resource(&account.address, &XTR)?; + let faucet_vault = self + .sdk + .accounts_api() + .get_vault_by_resource(&account.address, &faucet.resource_address)?; let tariswap = &tariswaps[i % tariswaps.len()]; let transaction = Transaction::builder() - // Use resources as input refs to allow concurrent access. - .with_inputs(vec![ - SubstateRequirement::new(faucet.resource_address.into(), Some(0)), - SubstateRequirement::new(XTR.into(), Some(0)), + .with_inputs([ + SubstateRequirement::unversioned(account.address.clone()), + SubstateRequirement::unversioned(xtr_vault.address), + SubstateRequirement::unversioned(faucet_vault.address), + SubstateRequirement::unversioned(tariswap.component_address), + SubstateRequirement::unversioned(faucet.resource_address), + SubstateRequirement::unversioned(XTR), + SubstateRequirement::unversioned(tariswap.lp_resource_address), ]) + .with_inputs(tariswap.vaults.values().map(|v| SubstateRequirement::unversioned(*v))) .fee_transaction_pay_from_component(account.address.as_component_address().unwrap(), Amount(1000)) .call_method(tariswap.component_address, "get_pool_balance", args![XTR]) - .call_method(tariswap.component_address, "get_pool_balance", args![faucet.resource_address]) + .call_method(tariswap.component_address, "get_pool_balance", args![ + faucet.resource_address + ]) .call_method(tariswap.component_address, "get_pool_ratio", args![XTR, Amount(1000)]) - .call_method(tariswap.component_address, "get_pool_ratio", args![faucet.resource_address, Amount(1000)]) + .call_method(tariswap.component_address, "get_pool_ratio", args![ + faucet.resource_address, + Amount(1000) + ]) .call_method(account.address.as_component_address().unwrap(), "withdraw", args![ - faucet.resource_address, amount_b_for_a + faucet.resource_address, + amount_b_for_a ]) .put_last_instruction_output_on_workspace("b") - .call_method(tariswap.component_address, "swap", args![ - Workspace("b"), - XTR, - ]) + .call_method(tariswap.component_address, "swap", args![Workspace("b"), XTR,]) .put_last_instruction_output_on_workspace("swapped") .call_method(account.address.as_component_address().unwrap(), "deposit", args![ Workspace("swapped")