diff --git a/applications/tari_dan_app_utilities/src/transaction_executor.rs b/applications/tari_dan_app_utilities/src/transaction_executor.rs index 3396ac21b9..c06b11fafd 100644 --- a/applications/tari_dan_app_utilities/src/transaction_executor.rs +++ b/applications/tari_dan_app_utilities/src/transaction_executor.rs @@ -54,6 +54,7 @@ impl ExecutionOutput { inputs .iter() .map(|(substate_req, substate)| { + let requested_specific_version = substate_req.version().is_some(); let lock_flag = if diff.down_iter().any(|(id, _)| id == substate_req.substate_id()) { // Update all inputs that were DOWNed to be write locked SubstateLockType::Write @@ -64,6 +65,7 @@ impl ExecutionOutput { VersionedSubstateIdLockIntent::new( VersionedSubstateId::new(substate_req.substate_id().clone(), substate.version()), lock_flag, + requested_specific_version, ) }) .collect() @@ -76,6 +78,7 @@ impl ExecutionOutput { VersionedSubstateIdLockIntent::new( VersionedSubstateId::new(substate_req.substate_id().clone(), substate.version()), SubstateLockType::Read, + true, ) }) .collect() diff --git a/applications/tari_validator_node/src/p2p/services/mempool/service.rs b/applications/tari_validator_node/src/p2p/services/mempool/service.rs index 01bb9ac4b7..e0799eb746 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/service.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/service.rs @@ -156,9 +156,7 @@ where TValidator: Validator( "⚠️ Foreign committee ABORT transaction {}. Update overall decision to ABORT. Local stage: {}, Leaf: {}", tx_rec.transaction_id(), tx_rec.current_stage(), local_leaf ); + + // Add an abort execution since we previously decided to commit + let mut transaction = TransactionRecord::get(tx, tx_rec.transaction_id())?; + transaction.set_abort_reason(RejectReason::ForeignShardGroupDecidedToAbort(format!( + "Foreign shard group {} decided to abort the transaction", + foreign_committee_info.shard_group() + ))); + let exec = transaction.into_execution().expect("ABORT set above"); + proposed_block_change_set.add_transaction_execution(exec)?; } // We need to add the justify QC to the evidence because the all prepare block could not include it diff --git a/dan_layer/consensus/src/hotstuff/on_propose.rs b/dan_layer/consensus/src/hotstuff/on_propose.rs index 0bd6e32c91..2c6d5099b4 100644 --- a/dan_layer/consensus/src/hotstuff/on_propose.rs +++ b/dan_layer/consensus/src/hotstuff/on_propose.rs @@ -49,6 +49,7 @@ use tari_dan_storage::{ use tari_engine_types::{commit_result::RejectReason, substate::Substate}; use tari_epoch_manager::EpochManagerReader; use tari_transaction::TransactionId; +use tokio::task; use crate::{ hotstuff::{ @@ -78,6 +79,7 @@ type NextBlock = ( HashMap, ); +#[derive(Debug, Clone)] pub struct OnPropose { config: HotstuffConfig, store: TConsensusSpec::StateStore, @@ -119,7 +121,7 @@ where TConsensusSpec: ConsensusSpec &mut self, epoch: Epoch, local_committee: &Committee, - local_committee_info: &CommitteeInfo, + local_committee_info: CommitteeInfo, leaf_block: LeafBlock, is_newview_propose: bool, propose_epoch_end: bool, @@ -168,41 +170,45 @@ where TConsensusSpec: ConsensusSpec let base_layer_block_hash = current_base_layer_block_hash; let base_layer_block_height = current_base_layer_block_height; - let (next_block, foreign_proposals) = self.store.with_write_tx(|tx| { - let high_qc = HighQc::get(&**tx, epoch)?; - let high_qc_cert = high_qc.get_quorum_certificate(&**tx)?; + let on_propose = self.clone(); + let (next_block, foreign_proposals) = task::spawn_blocking(move || { + on_propose.store.with_write_tx(|tx| { + let high_qc = HighQc::get(&**tx, epoch)?; + let high_qc_cert = high_qc.get_quorum_certificate(&**tx)?; - let (next_block, foreign_proposals, executed_transactions) = self.build_next_block( - tx, - epoch, - &leaf_block, - high_qc_cert, - validator.public_key, - local_committee_info, - // TODO: This just avoids issues with proposed transactions causing leader failures. Not sure if this - // is a good idea. - is_newview_propose, - base_layer_block_height, - base_layer_block_hash, - propose_epoch_end, - )?; + let (next_block, foreign_proposals, executed_transactions) = on_propose.build_next_block( + tx, + epoch, + &leaf_block, + high_qc_cert, + validator.public_key, + &local_committee_info, + // TODO: This just avoids issues with proposed transactions causing leader failures. Not sure if + // this is a good idea. + is_newview_propose, + base_layer_block_height, + base_layer_block_hash, + propose_epoch_end, + )?; - // Add executions for this block - if !executed_transactions.is_empty() { - debug!( - target: LOG_TARGET, - "Saving {} executed transaction(s) for block {}", - executed_transactions.len(), - next_block.id() - ); - } - for executed in executed_transactions.into_values() { - executed.for_block(*next_block.id()).insert_if_required(tx)?; - } + // Add executions for this block + if !executed_transactions.is_empty() { + debug!( + target: LOG_TARGET, + "Saving {} executed transaction(s) for block {}", + executed_transactions.len(), + next_block.id() + ); + } + for executed in executed_transactions.into_values() { + executed.for_block(*next_block.id()).insert_if_required(tx)?; + } - next_block.as_last_proposed().set(tx)?; - Ok::<_, HotStuffError>((next_block, foreign_proposals)) - })?; + next_block.as_last_proposed().set(tx)?; + Ok::<_, HotStuffError>((next_block, foreign_proposals)) + }) + }) + .await??; info!( target: LOG_TARGET, diff --git a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs index f74ef6ef99..84e58b5d96 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs @@ -121,402 +121,6 @@ where TConsensusSpec: ConsensusSpec Ok(()) } - // #[allow(clippy::too_many_lines)] - // fn process_foreign_block( - // &self, - // tx: &mut ::WriteTransaction<'_>, - // foreign_proposal: ForeignProposal, - // foreign_committee_info: &CommitteeInfo, - // local_committee_info: &CommitteeInfo, - // ) -> Result<(), HotStuffError> { - // let ForeignProposal { - // block, - // justify_qc, - // mut block_pledge, - // .. - // } = foreign_proposal; - // let local_leaf = LeafBlock::get(&**tx)?; - // // We only want to save the QC once if applicable - // let mut command_count = 0usize; - // - // for cmd in block.commands() { - // match cmd { - // Command::LocalPrepare(atom) => { - // if !local_committee_info.includes_any_address(atom.evidence.substate_addresses_iter()) { - // continue; - // } - // - // debug!( - // target: LOG_TARGET, - // "🧩 FOREIGN PROPOSAL: Command: LocalPrepare({}, {}), block: {}", - // atom.id,atom.decision, block.id(), - // ); - // - // let Some(mut tx_rec) = self.transaction_pool.get(tx, local_leaf, &atom.id).optional()? else { - // // If this happens, it could be a bug in the foreign missing transaction handling - // warn!( - // target: LOG_TARGET, - // "⚠️ NEVER HAPPEN: Foreign proposal received for transaction {} but this transaction is - // not in the pool.", atom.id - // ); - // continue; - // }; - // - // if tx_rec.current_stage() > TransactionPoolStage::LocalPrepared { - // // TODO: This can happen if the foreign shard group is only responsible for outputs (the - // input // SGs have already progressed to LocalAccept) in which case it is safe to ignore - // this command. // However we should not send the proposal in the first place (assuming it - // does not involve any // other shard-applicable transactions). - // warn!( - // target: LOG_TARGET, - // "⚠️ Foreign LocalPrepare proposal ({}) received LOCAL_PREPARE for transaction {} but - // current transaction stage is {}. Ignoring.", block, - // tx_rec.transaction_id(), tx_rec.current_stage() - // ); - // continue; - // } - // - // command_count += 1; - // - // let remote_decision = atom.decision; - // let local_decision = tx_rec.current_decision(); - // if remote_decision.is_abort() && local_decision.is_commit() { - // info!( - // target: LOG_TARGET, - // "⚠️ Foreign committee ABORT transaction {}. Update overall decision to ABORT. Local - // stage: {}, Leaf: {}", tx_rec.transaction_id(), tx_rec.current_stage(), local_leaf - // ); - // } - // - // // We need to add the justify QC to the evidence because the all prepare block could not include - // it // yet - // let mut foreign_evidence = atom.evidence.clone(); - // foreign_evidence.add_qc_evidence(foreign_committee_info, *justify_qc.id()); - // - // // Update the transaction record with any new information provided by this foreign block - // tx_rec.update_remote_data( - // tx, - // remote_decision, - // *justify_qc.id(), - // foreign_committee_info, - // foreign_evidence, - // )?; - // - // self.validate_and_add_pledges( - // tx, - // &tx_rec, - // block.id(), - // atom, - // &mut block_pledge, - // foreign_committee_info, - // )?; - // - // if tx_rec.current_stage().is_new() { - // info!( - // target: LOG_TARGET, - // "🧩 FOREIGN PROPOSAL: (Initial sequence from LocalPrepare) Transaction is ready for - // Prepare({}, {}) Local Stage: {}", tx_rec.transaction_id(), - // tx_rec.current_decision(), - // tx_rec.current_stage() - // ); - // // If the transaction is New, we're waiting for all foreign pledges. Propose transaction once - // we // have them. - // - // // CASE: One foreign SG is involved in all inputs and executed the transaction, local SG is - // // involved in the outputs - // let transaction = tx_rec.get_transaction(&**tx)?; - // let is_ready = local_committee_info.includes_substate_id(&transaction.to_receipt_id().into()) - // || transaction.has_any_local_inputs(local_committee_info) || - // transaction.has_all_foreign_input_pledges(&**tx, local_committee_info)?; - // - // if is_ready { - // info!( - // target: LOG_TARGET, - // "🧩 FOREIGN PROPOSAL: (Initial sequence from LocalPrepare) Transaction is ready for - // Prepare({}, {}) Local Stage: {}", tx_rec.transaction_id(), - // tx_rec.current_decision(), - // tx_rec.current_stage() - // ); - // tx_rec.add_pending_status_update(tx, local_leaf, TransactionPoolStage::New, true)?; - // } else { - // info!( - // target: LOG_TARGET, - // "🧩 FOREIGN PROPOSAL: (Initial sequence from LocalPrepare) Transaction is NOT ready - // for Prepare({}, {}) Local Stage: {}", tx_rec.transaction_id(), - // tx_rec.current_decision(), - // tx_rec.current_stage() - // ); - // } - // } else if tx_rec.current_stage().is_local_prepared() && - // tx_rec.evidence().all_input_addresses_justified() - // { - // // If all shards are complete, and we've already received our LocalPrepared, we can set out - // // LocalPrepared transaction as ready to propose ACCEPT. If we have not received - // // the local LocalPrepared, the transition will happen when we receive the local - // // block. - // info!( - // target: LOG_TARGET, - // "🧩 FOREIGN PROPOSAL: Transaction is ready for propose AllPrepared({}, {}) Local Stage: - // {}", tx_rec.transaction_id(), - // tx_rec.current_decision(), - // tx_rec.current_stage() - // ); - // - // tx_rec.add_pending_status_update(tx, local_leaf, TransactionPoolStage::LocalPrepared, true)?; - // // TODO: there is a race condition between the local node receiving the foreign LocalPrepare - // and // the leader proposing AllPrepare. If the latter comes first, this node - // // will not vote on this block which leads inevitably to erroneous - // // leader failures. Currently we simply vote ACCEPT on the block, with is ready == false, so - // we // need to handle this here. When we confirm foreign proposals correctly, we can - // remove this. } else { - // info!( - // target: LOG_TARGET, - // "🧩 FOREIGN PROPOSAL: Transaction is NOT ready for AllPrepared({}, {}) Local Stage: {}, - // All Justified: {}. Waiting for local proposal.", tx_rec.transaction_id(), - // tx_rec.current_decision(), - // tx_rec.current_stage(), - // tx_rec.evidence().all_input_addresses_justified() - // ); - // tx_rec.add_pending_status_update(tx, local_leaf, tx_rec.current_stage(), tx_rec.is_ready())?; - // } - // }, - // Command::LocalAccept(atom) => { - // if !local_committee_info.includes_any_address(atom.evidence.substate_addresses_iter()) { - // continue; - // } - // - // debug!( - // target: LOG_TARGET, - // "🧩 FOREIGN PROPOSAL: Command: LocalAccept({}, {}), block: {}", - // atom.id, atom.decision, block.id(), - // ); - // - // let Some(mut tx_rec) = self.transaction_pool.get(tx, local_leaf, &atom.id).optional()? else { - // warn!( - // target: LOG_TARGET, - // "⚠️ NEVER HAPPEN: Foreign proposal received for transaction {} but this transaction is - // not in the pool.", atom.id - // ); - // continue; - // }; - // - // if tx_rec.current_stage() > TransactionPoolStage::LocalAccepted { - // warn!( - // target: LOG_TARGET, - // "⚠️ Foreign proposal {} received LOCAL_ACCEPT for transaction {} but current transaction - // stage is {}. Ignoring.", block, - // tx_rec.transaction_id(), - // tx_rec.current_stage(), - // ); - // continue; - // } - // - // command_count += 1; - // - // let remote_decision = atom.decision; - // let local_decision = tx_rec.current_local_decision(); - // if remote_decision.is_abort() && local_decision.is_commit() { - // info!( - // target: LOG_TARGET, - // "⚠️ Foreign ABORT {}. Update overall decision to ABORT. Local stage: {}, Leaf: {}", - // tx_rec.transaction_id(), tx_rec.current_stage(), local_leaf - // ); - // } - // - // // We need to add the justify QC to the evidence because the all prepare block could not include - // it // yet - // let mut foreign_evidence = atom.evidence.clone(); - // foreign_evidence.add_qc_evidence(foreign_committee_info, *justify_qc.id()); - // - // // Update the transaction record with any new information provided by this foreign block - // tx_rec.update_remote_data( - // tx, - // remote_decision, - // *justify_qc.id(), - // foreign_committee_info, - // foreign_evidence, - // )?; - // - // self.validate_and_add_pledges( - // tx, - // &tx_rec, - // block.id(), - // atom, - // &mut block_pledge, - // foreign_committee_info, - // )?; - // - // // Good debug info - // // tx_rec.evidence().iter().for_each(|(addr, ev)| { - // // let includes_local = local_committee_info.includes_substate_address(addr); - // // log::error!( - // // target: LOG_TARGET, - // // "🐞 LOCALACCEPT EVIDENCE (l={}, f={}) {}: {}", includes_local, !includes_local, addr, - // ev // ); - // // }); - // - // if tx_rec.current_stage().is_new() { - // // If the transaction is New, we're waiting for all foreign pledges. Propose transaction once - // we // have them. - // // CASE: Foreign SGs have pledged all inputs and executed the transaction, local SG is - // involved // in the outputs - // let transaction = tx_rec.get_transaction(&**tx)?; - // let is_ready = local_committee_info.includes_substate_id(&transaction.to_receipt_id().into()) - // || transaction.has_any_local_inputs(local_committee_info) || - // transaction.has_all_foreign_input_pledges(&**tx, local_committee_info)?; - // if is_ready { - // info!( - // target: LOG_TARGET, - // "🧩 FOREIGN PROPOSAL: (Initial sequence from LocalAccept) Transaction is ready for - // Prepare({}, {}) Local Stage: {}", tx_rec.transaction_id(), - // tx_rec.current_decision(), - // tx_rec.current_stage() - // ); - // tx_rec.add_pending_status_update(tx, local_leaf, TransactionPoolStage::New, true)?; - // } else { - // info!( - // target: LOG_TARGET, - // "🧩 FOREIGN PROPOSAL: (Initial sequence from LocalAccept) Transaction is NOT ready - // for Prepare({}, {}) Local Stage: {}", tx_rec.transaction_id(), - // tx_rec.current_decision(), - // tx_rec.current_stage() - // ); - // } - // } else if tx_rec.current_stage().is_local_accepted() && - // tx_rec.evidence().all_addresses_justified() { - // info!( - // target: LOG_TARGET, - // "🧩 FOREIGN PROPOSAL: Transaction is ready for propose ALL_ACCEPT({}, {}) Local Stage: - // {}", tx_rec.transaction_id(), - // tx_rec.current_decision(), - // tx_rec.current_stage() - // ); - // - // tx_rec.add_pending_status_update(tx, local_leaf, TransactionPoolStage::LocalAccepted, true)?; - // } else { - // info!( - // target: LOG_TARGET, - // "🧩 FOREIGN PROPOSAL: Transaction is NOT ready for ALL_ACCEPT({}, {}) Local Stage: {}, - // All Justified: {}. Waiting for local proposal.", tx_rec.transaction_id(), - // tx_rec.current_decision(), - // tx_rec.current_stage(), - // tx_rec.evidence().all_addresses_justified() - // ); - // // Still need to update the evidence - // tx_rec.add_pending_status_update(tx, local_leaf, tx_rec.current_stage(), tx_rec.is_ready())?; - // } - // }, - // // Should never receive this - // Command::EndEpoch => { - // warn!( - // target: LOG_TARGET, - // "❓️ NEVER HAPPEN: Foreign proposal received for block {} contains an EndEpoch command. This - // is invalid behaviour.", block.id() - // ); - // continue; - // }, - // // TODO(perf): Can we find a way to exclude these unused commands to reduce message size? - // Command::AllAccept(_) | - // Command::SomeAccept(_) | - // Command::AllPrepare(_) | - // Command::SomePrepare(_) | - // Command::Prepare(_) | - // Command::LocalOnly(_) | - // Command::ForeignProposal(_) | - // Command::MintConfidentialOutput(_) => { - // // Disregard - // continue; - // }, - // } - // } - // - // info!( - // target: LOG_TARGET, - // "🧩 FOREIGN PROPOSAL: Processed {} commands from foreign block {}", - // command_count, - // block.id() - // ); - // if command_count == 0 { - // warn!( - // target: LOG_TARGET, - // "⚠️ FOREIGN PROPOSAL: No commands were applicable for foreign block {}. Ignoring.", - // block.id() - // ); - // } - // - // Ok(()) - // } - // - // fn validate_and_add_pledges( - // &self, - // tx: &mut ::WriteTransaction<'_>, - // tx_rec: &TransactionPoolRecord, - // block_id: &BlockId, - // atom: &TransactionAtom, - // block_pledge: &mut BlockPledge, - // foreign_committee_info: &CommitteeInfo, - // ) -> Result<(), HotStuffError> { - // #[allow(clippy::mutable_key_type)] - // let maybe_pledges = if atom.decision.is_commit() { - // let pledges = block_pledge.remove_transaction_pledges(&atom.id).ok_or_else(|| { - // HotStuffError::ForeignNodeOmittedTransactionPledges { - // foreign_block_id: *block_id, - // transaction_id: atom.id, - // } - // })?; - // - // // Validate that provided evidence is correct - // // TODO: there are a lot of validations to be done on evidence and the foreign block in general, - // // this is here as a sanity check and should change to not be a fatal error in consensus - // for pledge in &pledges { - // let address = pledge.versioned_substate_id().to_substate_address(); - // let evidence = - // atom.evidence - // .get(&address) - // .ok_or_else(|| ProposalValidationError::ForeignInvalidPledge { - // block_id: *block_id, - // transaction_id: atom.id, - // details: format!("Pledge {pledge} for address {address} not found in evidence"), - // })?; - // if evidence.lock.is_output() && pledge.is_input() { - // return Err(ProposalValidationError::ForeignInvalidPledge { - // block_id: *block_id, - // transaction_id: atom.id, - // details: format!("Pledge {pledge} is an input but evidence is an output for address - // {address}"), } - // .into()); - // } - // if !evidence.lock.is_output() && pledge.is_output() { - // return Err(ProposalValidationError::ForeignInvalidPledge { - // block_id: *block_id, - // transaction_id: atom.id, - // details: format!("Pledge {pledge} is an output but evidence is an input for address - // {address}"), } - // .into()); - // } - // } - // Some(pledges) - // } else { - // if block_pledge.remove_transaction_pledges(&atom.id).is_some() { - // return Err(ProposalValidationError::ForeignInvalidPledge { - // block_id: *block_id, - // transaction_id: atom.id, - // details: "Remote decided ABORT but provided pledges".to_string(), - // } - // .into()); - // } - // None - // }; - // - // if let Some(pledges) = maybe_pledges { - // // If the foreign shard has committed the transaction, we can add the pledges to the transaction - // // record - // tx_rec.add_foreign_pledges(tx, foreign_committee_info.shard_group(), pledges)?; - // } - // - // Ok(()) - // } - fn validate_proposed_block( &self, candidate_block: &Block, diff --git a/dan_layer/consensus/src/hotstuff/transaction_manager/manager.rs b/dan_layer/consensus/src/hotstuff/transaction_manager/manager.rs index 6d7d791e59..239130008d 100644 --- a/dan_layer/consensus/src/hotstuff/transaction_manager/manager.rs +++ b/dan_layer/consensus/src/hotstuff/transaction_manager/manager.rs @@ -250,6 +250,7 @@ impl> }; if let Some(err) = lock_summary.hard_conflict() { + warn!(target: LOG_TARGET, "⚠️ PREPARE: Hard conflict when locking inputs: {err}"); prepared.set_abort_reason(RejectReason::FailedToLockInputs(err.to_string())); } Ok((prepared, lock_summary)) diff --git a/dan_layer/consensus/src/hotstuff/transaction_manager/prepared.rs b/dan_layer/consensus/src/hotstuff/transaction_manager/prepared.rs index 06c7aed791..2f800cb914 100644 --- a/dan_layer/consensus/src/hotstuff/transaction_manager/prepared.rs +++ b/dan_layer/consensus/src/hotstuff/transaction_manager/prepared.rs @@ -128,11 +128,10 @@ impl MultiShardPreparedTransaction { self.transaction .transaction() .all_inputs_iter() - .map(|input| input.or_zero_version()) - .map(|id| VersionedSubstateIdLockIntent::new(id, SubstateLockType::Read)), + .map(|input| VersionedSubstateIdLockIntent::from_requirement(input, SubstateLockType::Read)), self.outputs .iter() - .map(|id| VersionedSubstateIdLockIntent::new(id.clone(), SubstateLockType::Output)), + .map(|id| VersionedSubstateIdLockIntent::output(id.clone())), ); } @@ -146,13 +145,13 @@ impl MultiShardPreparedTransaction { // TODO(correctness): to_zero_version is error prone when used in evidence and the correctness depends how it is used. // e.g. using it to determining which shard is involved is fine, but loading substate by the address is incorrect (v0 may or may not be the actual pledged substate) .chain(self.foreign_inputs().iter().map(|r| r.clone().or_zero_version())) - .map(|id| VersionedSubstateIdLockIntent::new(id, SubstateLockType::Write)); + .map(|id| VersionedSubstateIdLockIntent::write(id, true)); let outputs = self .outputs() .iter() .cloned() - .map(|id| VersionedSubstateIdLockIntent::new(id, SubstateLockType::Output)); + .map(VersionedSubstateIdLockIntent::output); Evidence::from_inputs_and_outputs(inputs, outputs) } diff --git a/dan_layer/consensus/src/hotstuff/worker.rs b/dan_layer/consensus/src/hotstuff/worker.rs index c13ce436df..0353d5531e 100644 --- a/dan_layer/consensus/src/hotstuff/worker.rs +++ b/dan_layer/consensus/src/hotstuff/worker.rs @@ -275,7 +275,6 @@ impl HotstuffWorker { self.on_epoch_manager_event(event).await?; }, - // Proposing is highest priority maybe_leaf_block = on_force_beat.wait() => { self.hooks.on_beat(); @@ -649,14 +648,14 @@ impl HotstuffWorker { .handle( epoch, &local_committee, - local_committee_info, + *local_committee_info, leaf_block, is_newview_propose, propose_epoch_end, ) .await?; } else { - // We can make this a warm/error in future, but for now I want to be sure this never happens + // We can make this a warn/error in future, but for now I want to be sure this never happens debug_assert!( !is_newview_propose, "propose_if_leader called with is_newview_propose=true but we're not the leader" diff --git a/dan_layer/consensus_tests/src/consensus.rs b/dan_layer/consensus_tests/src/consensus.rs index ee6a665c69..7c0c4d3d5e 100644 --- a/dan_layer/consensus_tests/src/consensus.rs +++ b/dan_layer/consensus_tests/src/consensus.rs @@ -12,7 +12,7 @@ use std::time::Duration; use tari_common_types::types::PrivateKey; use tari_consensus::hotstuff::HotStuffError; -use tari_dan_common_types::{optional::Optional, Epoch, NodeHeight, SubstateLockType, SubstateRequirement}; +use tari_dan_common_types::{optional::Optional, Epoch, NodeHeight, SubstateRequirement}; use tari_dan_storage::{ consensus_models::{BlockId, Command, Decision, TransactionRecord, VersionedSubstateIdLockIntent}, StateStore, @@ -397,7 +397,10 @@ async fn multishard_local_inputs_foreign_outputs() { .build(), Decision::Commit, 1, - inputs.into_iter().map(VersionedSubstateIdLockIntent::write).collect(), + inputs + .into_iter() + .map(|input| VersionedSubstateIdLockIntent::write(input, true)) + .collect(), outputs, ); test.send_transaction_to_destination(TestVnDestination::All, tx1.clone()) @@ -456,7 +459,7 @@ async fn multishard_local_inputs_and_outputs_foreign_outputs() { inputs_0 .into_iter() .chain(inputs_1) - .map(VersionedSubstateIdLockIntent::write) + .map(|input| VersionedSubstateIdLockIntent::write(input, true)) .collect(), outputs_0.into_iter().chain(outputs_2).collect(), ); @@ -526,7 +529,10 @@ async fn multishard_output_conflict_abort() { tx, Decision::Commit, 1, - inputs.into_iter().map(VersionedSubstateIdLockIntent::write).collect(), + inputs + .into_iter() + .map(|input| VersionedSubstateIdLockIntent::write(input, true)) + .collect(), resulting_outputs, ); assert_ne!(tx1.id(), tx2.id()); @@ -578,9 +584,7 @@ async fn single_shard_inputs_from_previous_outputs() { .resulting_outputs() .unwrap() .iter() - .map(|output| { - VersionedSubstateIdLockIntent::new(output.versioned_substate_id().clone(), SubstateLockType::Write) - }) + .map(|output| VersionedSubstateIdLockIntent::write(output.versioned_substate_id().clone(), true)) .collect::>(); let tx2 = Transaction::builder() @@ -658,9 +662,7 @@ async fn multishard_inputs_from_previous_outputs() { 1, resulting_outputs .into_iter() - .map(|output| { - VersionedSubstateIdLockIntent::new(output.into_versioned_substate_id(), SubstateLockType::Write) - }) + .map(|output| VersionedSubstateIdLockIntent::write(output.into_versioned_substate_id(), true)) .collect(), vec![], ); @@ -723,7 +725,7 @@ async fn single_shard_input_conflict() { *tx1.id(), Decision::Commit, 0, - vec![VersionedSubstateIdLockIntent::read(substate_id.clone())], + vec![VersionedSubstateIdLockIntent::read(substate_id.clone(), true)], vec![], ), ) @@ -733,7 +735,7 @@ async fn single_shard_input_conflict() { *tx2.id(), Decision::Commit, 0, - vec![VersionedSubstateIdLockIntent::write(substate_id)], + vec![VersionedSubstateIdLockIntent::write(substate_id, true)], vec![], ), ); @@ -960,7 +962,10 @@ async fn single_shard_unversioned_inputs() { *tx.id(), Decision::Commit, 0, - inputs.into_iter().map(VersionedSubstateIdLockIntent::write).collect(), + inputs + .into_iter() + .map(|input| VersionedSubstateIdLockIntent::write(input, true)) + .collect(), vec![], ), ); diff --git a/dan_layer/consensus_tests/src/support/harness.rs b/dan_layer/consensus_tests/src/support/harness.rs index 86c48e1c37..555c5823d4 100644 --- a/dan_layer/consensus_tests/src/support/harness.rs +++ b/dan_layer/consensus_tests/src/support/harness.rs @@ -17,7 +17,6 @@ use tari_dan_common_types::{ NodeHeight, NumPreshards, ShardGroup, - SubstateLockType, VersionedSubstateId, }; use tari_dan_storage::{ @@ -98,7 +97,7 @@ impl Test { fee, all_inputs .into_iter() - .map(|i| VersionedSubstateIdLockIntent::new(i, SubstateLockType::Write)) + .map(|i| VersionedSubstateIdLockIntent::write(i, true)) .collect(), vec![], ); @@ -173,7 +172,7 @@ impl Test { fee, all_inputs .into_iter() - .map(|i| VersionedSubstateIdLockIntent::new(i, SubstateLockType::Write)) + .map(|i| VersionedSubstateIdLockIntent::write(i, true)) .collect(), outputs, ) diff --git a/dan_layer/consensus_tests/src/support/transaction.rs b/dan_layer/consensus_tests/src/support/transaction.rs index 78accc73c0..91bbe7a6f5 100644 --- a/dan_layer/consensus_tests/src/support/transaction.rs +++ b/dan_layer/consensus_tests/src/support/transaction.rs @@ -5,7 +5,7 @@ use std::{iter, time::Duration}; use rand::{distributions::Alphanumeric, rngs::OsRng, Rng}; use tari_common_types::types::PrivateKey; -use tari_dan_common_types::{SubstateLockType, VersionedSubstateId}; +use tari_dan_common_types::VersionedSubstateId; use tari_dan_storage::consensus_models::{ Decision, ExecutedTransaction, @@ -105,13 +105,10 @@ pub fn create_execution_result_for_transaction( )) }; - resulting_outputs.push(VersionedSubstateIdLockIntent::new( - VersionedSubstateId::new( - SubstateId::TransactionReceipt(TransactionReceiptAddress::from(tx_id)), - 0, - ), - SubstateLockType::Output, - )); + resulting_outputs.push(VersionedSubstateIdLockIntent::output(VersionedSubstateId::new( + SubstateId::TransactionReceipt(TransactionReceiptAddress::from(tx_id)), + 0, + ))); TransactionExecution::new( tx_id, diff --git a/dan_layer/engine/src/runtime/working_state.rs b/dan_layer/engine/src/runtime/working_state.rs index 041f57a881..90d1e9fb45 100644 --- a/dan_layer/engine/src/runtime/working_state.rs +++ b/dan_layer/engine/src/runtime/working_state.rs @@ -23,7 +23,6 @@ use tari_engine_types::{ lock::LockFlag, logs::LogEntry, non_fungible::NonFungibleContainer, - non_fungible_index::NonFungibleIndex, proof::{ContainerRef, LockedResource, Proof}, resource::Resource, resource_container::{ResourceContainer, ResourceError}, @@ -42,7 +41,6 @@ use tari_template_lib::{ BucketId, ComponentAddress, NonFungibleAddress, - NonFungibleIndexAddress, ProofId, UnclaimedConfidentialOutputAddress, VaultId, @@ -505,35 +503,32 @@ impl WorkingState { ); let mut token_ids = BTreeSet::new(); - let resource = self.get_resource(locked_resource)?; + // let resource = self.get_resource(locked_resource)?; // TODO: This isn't correct (assumes tokens are never burnt), we'll need to rethink this - let mut index = resource - .total_supply() - .as_u64_checked() - .ok_or(RuntimeError::InvalidAmount { - amount: resource.total_supply(), - reason: "Could not convert to u64".to_owned(), - })?; + // let mut index = resource + // .total_supply() + // .as_u64_checked() + // .ok_or(RuntimeError::InvalidAmount { + // amount: resource.total_supply(), + // reason: "Could not convert to u64".to_owned(), + // })?; for (id, (data, mut_data)) in tokens { - let nft_address = NonFungibleAddress::new(resource_address, id.clone()); - let addr = SubstateId::NonFungible(nft_address.clone()); + let nft_address = NonFungibleAddress::new(resource_address, id); + let token_id = nft_address.id().clone(); + let addr = SubstateId::NonFungible(nft_address); if self.substate_exists(&addr)? { - return Err(RuntimeError::DuplicateNonFungibleId { - token_id: nft_address.id().clone(), - }); - } - if !token_ids.insert(id.clone()) { - // Can't happen - return Err(RuntimeError::DuplicateNonFungibleId { token_id: id }); + return Err(RuntimeError::DuplicateNonFungibleId { token_id }); + } else { + token_ids.insert(token_id); + self.new_substate(addr.clone(), NonFungibleContainer::new(data, mut_data))?; } - self.new_substate(addr.clone(), NonFungibleContainer::new(data, mut_data))?; // for each new nft we also create an index to be allow resource scanning - let index_address = NonFungibleIndexAddress::new(resource_address, index); - index += 1; - let nft_index = NonFungibleIndex::new(nft_address); - self.new_substate(index_address, nft_index)?; + // let index_address = NonFungibleIndexAddress::new(resource_address, index); + // index += 1; + // let nft_index = NonFungibleIndex::new(nft_address); + // self.new_substate(index_address, nft_index)?; } ResourceContainer::non_fungible(resource_address, token_ids) diff --git a/dan_layer/engine/tests/test.rs b/dan_layer/engine/tests/test.rs index 01980d3e58..81353fb5c9 100644 --- a/dan_layer/engine/tests/test.rs +++ b/dan_layer/engine/tests/test.rs @@ -1246,112 +1246,112 @@ mod tickets { } } -mod nft_indexes { - use super::*; - - fn setup() -> ( - TemplateTest, - (ComponentAddress, NonFungibleAddress), - ComponentAddress, - SubstateId, - ) { - let mut template_test = TemplateTest::new(vec!["tests/templates/nft/nft_list"]); - - let (account_address, owner_token, _) = template_test.create_funded_account(); - let nft_component: ComponentAddress = template_test.call_function("SparkleNft", "new", args![], vec![]); - - let nft_resx = template_test.get_previous_output_address(SubstateType::Resource); - - // TODO: cleanup - (template_test, (account_address, owner_token), nft_component, nft_resx) - } - - #[test] - #[allow(clippy::too_many_lines)] - fn new_nft_index() { - let (mut template_test, (account_address, owner_proof), nft_component, nft_resx) = setup(); - - let vars = vec![ - ("account", account_address.into()), - ("nft", nft_component.into()), - ("nft_resx", nft_resx.clone().into()), - ]; - - let total_supply: Amount = - template_test.call_method(nft_component, "total_supply", args![], vec![owner_proof.clone()]); - assert_eq!(total_supply, Amount(0)); - - let result = template_test - .execute_and_commit_manifest( - r#" - let account = var!["account"]; - let sparkle_nft = var!["nft"]; - - let nft_bucket = sparkle_nft.mint(); - account.deposit(nft_bucket); - "#, - vars.clone(), - vec![owner_proof.clone()], - ) - .unwrap(); - - let diff = result.finalize.result.expect("execution failed"); - - // Resource is changed - assert_eq!(diff.down_iter().filter(|(addr, _)| addr.is_resource()).count(), 1); - assert_eq!(diff.up_iter().filter(|(addr, _)| addr.is_resource()).count(), 1); - - // NFT component changed - assert_eq!(diff.down_iter().filter(|(addr, _)| addr.is_component()).count(), 1); - assert_eq!(diff.up_iter().filter(|(addr, _)| addr.is_component()).count(), 1); - - // One new vault created - assert_eq!(diff.down_iter().filter(|(addr, _)| addr.is_vault()).count(), 0); - assert_eq!(diff.up_iter().filter(|(addr, _)| addr.is_vault()).count(), 1); - - // One new NFT minted - assert_eq!(diff.down_iter().filter(|(addr, _)| addr.is_non_fungible()).count(), 0); - assert_eq!(diff.up_iter().filter(|(addr, _)| addr.is_non_fungible()).count(), 1); - - // One new NFT minted - assert_eq!(diff.down_iter().filter(|(addr, _)| addr.is_non_fungible()).count(), 0); - assert_eq!(diff.up_iter().filter(|(addr, _)| addr.is_non_fungible()).count(), 1); - let (nft_addr, _) = diff.up_iter().find(|(addr, _)| addr.is_non_fungible()).unwrap(); - - // One new NFT index - assert_eq!( - diff.down_iter() - .filter(|(addr, _)| addr.is_non_fungible_index()) - .count(), - 0 - ); - assert_eq!( - diff.up_iter().filter(|(addr, _)| addr.is_non_fungible_index()).count(), - 1 - ); - let (index_addr, index) = diff.up_iter().find(|(addr, _)| addr.is_non_fungible_index()).unwrap(); - // The nft index address is composed of the resource address - assert_eq!( - nft_resx.as_resource_address().unwrap(), - index_addr - .as_non_fungible_index_address() - .unwrap() - .resource_address() - .to_owned(), - ); - // The index references the newly minted nft - let referenced_address = index - .substate_value() - .non_fungible_index() - .unwrap() - .referenced_address(); - assert_eq!(nft_addr.to_address_string(), referenced_address.to_string()); - - // The total supply of the resource is increased - let total_supply: Amount = template_test.call_method(nft_component, "total_supply", args![], vec![owner_proof]); - assert_eq!(total_supply, Amount(1)); - } -} +// mod nft_indexes { +// use super::*; +// +// fn setup() -> ( +// TemplateTest, +// (ComponentAddress, NonFungibleAddress), +// ComponentAddress, +// SubstateId, +// ) { +// let mut template_test = TemplateTest::new(vec!["tests/templates/nft/nft_list"]); +// +// let (account_address, owner_token, _) = template_test.create_funded_account(); +// let nft_component: ComponentAddress = template_test.call_function("SparkleNft", "new", args![], vec![]); +// +// let nft_resx = template_test.get_previous_output_address(SubstateType::Resource); +// +// // TODO: cleanup +// (template_test, (account_address, owner_token), nft_component, nft_resx) +// } +// +// #[test] +// #[allow(clippy::too_many_lines)] +// fn new_nft_index() { +// let (mut template_test, (account_address, owner_proof), nft_component, nft_resx) = setup(); +// +// let vars = vec![ +// ("account", account_address.into()), +// ("nft", nft_component.into()), +// ("nft_resx", nft_resx.clone().into()), +// ]; +// +// let total_supply: Amount = +// template_test.call_method(nft_component, "total_supply", args![], vec![owner_proof.clone()]); +// assert_eq!(total_supply, Amount(0)); +// +// let result = template_test +// .execute_and_commit_manifest( +// r#" +// let account = var!["account"]; +// let sparkle_nft = var!["nft"]; +// +// let nft_bucket = sparkle_nft.mint(); +// account.deposit(nft_bucket); +// "#, +// vars.clone(), +// vec![owner_proof.clone()], +// ) +// .unwrap(); +// +// let diff = result.finalize.result.expect("execution failed"); +// +// // Resource is changed +// assert_eq!(diff.down_iter().filter(|(addr, _)| addr.is_resource()).count(), 1); +// assert_eq!(diff.up_iter().filter(|(addr, _)| addr.is_resource()).count(), 1); +// +// // NFT component changed +// assert_eq!(diff.down_iter().filter(|(addr, _)| addr.is_component()).count(), 1); +// assert_eq!(diff.up_iter().filter(|(addr, _)| addr.is_component()).count(), 1); +// +// // One new vault created +// assert_eq!(diff.down_iter().filter(|(addr, _)| addr.is_vault()).count(), 0); +// assert_eq!(diff.up_iter().filter(|(addr, _)| addr.is_vault()).count(), 1); +// +// // One new NFT minted +// assert_eq!(diff.down_iter().filter(|(addr, _)| addr.is_non_fungible()).count(), 0); +// assert_eq!(diff.up_iter().filter(|(addr, _)| addr.is_non_fungible()).count(), 1); +// +// // One new NFT minted +// assert_eq!(diff.down_iter().filter(|(addr, _)| addr.is_non_fungible()).count(), 0); +// assert_eq!(diff.up_iter().filter(|(addr, _)| addr.is_non_fungible()).count(), 1); +// let (nft_addr, _) = diff.up_iter().find(|(addr, _)| addr.is_non_fungible()).unwrap(); +// +// // One new NFT index +// assert_eq!( +// diff.down_iter() +// .filter(|(addr, _)| addr.is_non_fungible_index()) +// .count(), +// 0 +// ); +// assert_eq!( +// diff.up_iter().filter(|(addr, _)| addr.is_non_fungible_index()).count(), +// 1 +// ); +// let (index_addr, index) = diff.up_iter().find(|(addr, _)| addr.is_non_fungible_index()).unwrap(); +// // The nft index address is composed of the resource address +// assert_eq!( +// nft_resx.as_resource_address().unwrap(), +// index_addr +// .as_non_fungible_index_address() +// .unwrap() +// .resource_address() +// .to_owned(), +// ); +// // The index references the newly minted nft +// let referenced_address = index +// .substate_value() +// .non_fungible_index() +// .unwrap() +// .referenced_address(); +// assert_eq!(nft_addr.to_address_string(), referenced_address.to_string()); +// +// // The total supply of the resource is increased +// let total_supply: Amount = template_test.call_method(nft_component, "total_supply", args![], +// vec![owner_proof]); assert_eq!(total_supply, Amount(1)); +// } +// } #[test] fn test_builtin_templates() { diff --git a/dan_layer/engine_types/src/indexed_value.rs b/dan_layer/engine_types/src/indexed_value.rs index 7843def771..cc06982224 100644 --- a/dan_layer/engine_types/src/indexed_value.rs +++ b/dan_layer/engine_types/src/indexed_value.rs @@ -111,10 +111,7 @@ impl IndexedValue { pub fn get_value(&self, path: &str) -> Result, IndexedValueError> where for<'a> T: serde::Deserialize<'a> { - get_value_by_path(&self.value, path) - .map(tari_bor::from_value) - .transpose() - .map_err(Into::into) + decode_value_at_path(&self.value, path) } } @@ -459,6 +456,14 @@ impl From<&str> for IndexedValueError { } } +pub fn decode_value_at_path(value: &tari_bor::Value, path: &str) -> Result, IndexedValueError> +where for<'a> T: serde::Deserialize<'a> { + get_value_by_path(value, path) + .map(tari_bor::from_value) + .transpose() + .map_err(Into::into) +} + fn get_value_by_path<'a>(value: &'a tari_bor::Value, path: &str) -> Option<&'a tari_bor::Value> { let mut value = value; for part in path.split('.') { diff --git a/dan_layer/rpc_state_sync/src/manager.rs b/dan_layer/rpc_state_sync/src/manager.rs index bc48f8c871..e55b5f1636 100644 --- a/dan_layer/rpc_state_sync/src/manager.rs +++ b/dan_layer/rpc_state_sync/src/manager.rs @@ -1,8 +1,6 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::cmp; - use anyhow::anyhow; use async_trait::async_trait; use futures::StreamExt; @@ -138,13 +136,6 @@ where TConsensusSpec: ConsensusSpec let mut current_version = persisted_version; - // Minimum epoch we should request is 1 since Epoch(0) is the genesis epoch. - let last_state_transition_id = StateTransitionId::new( - cmp::max(last_state_transition_id.epoch(), Epoch(1)), - last_state_transition_id.shard(), - last_state_transition_id.seq(), - ); - info!( target: LOG_TARGET, "🛜Syncing from state transition {last_state_transition_id}" diff --git a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql index d905ef783d..79b38daaca 100644 --- a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql +++ b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql @@ -410,19 +410,16 @@ CREATE TABLE burnt_utxos CREATE TABLE state_tree ( - id integer not NULL primary key AUTOINCREMENT, - shard int not NULL, - key text not NULL, - node text not NULL, - is_stale boolean not null default '0' + id integer not NULL primary key AUTOINCREMENT, + shard int not NULL, + key text not NULL, + node text not NULL ); -- Scoping by shard -CREATE INDEX state_tree_idx_shard_key on state_tree (shard) WHERE is_stale = false; +CREATE INDEX state_tree_idx_shard_key on state_tree (shard); -- Duplicate keys are not allowed -CREATE UNIQUE INDEX state_tree_uniq_idx_key on state_tree (shard, key) WHERE is_stale = false; --- filtering out or by is_stale is used in every query -CREATE INDEX state_tree_idx_is_stale on state_tree (is_stale); +CREATE UNIQUE INDEX state_tree_uniq_idx_key on state_tree (shard, key); create table state_tree_shard_versions ( diff --git a/dan_layer/state_store_sqlite/src/reader.rs b/dan_layer/state_store_sqlite/src/reader.rs index c4c366623a..fb515b37e8 100644 --- a/dan_layer/state_store_sqlite/src/reader.rs +++ b/dan_layer/state_store_sqlite/src/reader.rs @@ -777,6 +777,7 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor let execution = transaction_executions::table .filter(transaction_executions::transaction_id.eq(serialize_hex(tx_id))) .filter(transaction_executions::block_id.eq_any(block_ids)) + // Get last execution .order_by(transaction_executions::id.desc()) .first::(self.connection()) .map_err(|e| SqliteStorageError::DieselError { @@ -2088,31 +2089,14 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor ) -> Result, StorageError> { use crate::schema::{state_transitions, substates}; - let (transition_id, epoch) = state_transitions::table - .select((state_transitions::id, state_transitions::epoch)) - // seq is monotonic for each shard, it does not reset for the epoch - .filter(state_transitions::seq.eq(id.seq() as i64)) - .filter(state_transitions::shard.eq(id.shard().as_u32() as i32)) - .get_result::<(i32, i64)>(self.connection()) - .map_err(|e| SqliteStorageError::DieselError { - operation: "state_transitions_get_n_after", - source: e, - })?; - - let next_transition_id = if epoch > id.epoch().as_u64() as i64 { - // If the provided id is from a previous epoch, include the first (seq=0) transition - transition_id - } else { - // Fetch from the next transition - transition_id + 1 - }; + debug!(target: LOG_TARGET, "state_transitions_get_n_after: {id}, end_epoch:{end_epoch}"); let transitions = state_transitions::table .left_join(substates::table.on(state_transitions::substate_address.eq(substates::address))) .select((state_transitions::all_columns, substates::all_columns.nullable())) - .filter(state_transitions::id.ge(next_transition_id)) - .filter(state_transitions::epoch.lt(end_epoch.as_u64() as i64)) + .filter(state_transitions::seq.gt(id.seq() as i64)) .filter(state_transitions::shard.eq(id.shard().as_u32() as i32)) + .filter(state_transitions::epoch.lt(end_epoch.as_u64() as i64)) .limit(n as i64) .get_results::<(sql_models::StateTransition, Option)>(self.connection()) .map_err(|e| SqliteStorageError::DieselError { @@ -2159,7 +2143,6 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor .select(state_tree::node) .filter(state_tree::shard.eq(shard.as_u32() as i32)) .filter(state_tree::key.eq(key.to_string())) - .filter(state_tree::is_stale.eq(false)) .first::(self.connection()) .map_err(|e| SqliteStorageError::DieselError { operation: "state_tree_nodes_get", @@ -2247,7 +2230,7 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor let version = pledge.version as u32; let id = VersionedSubstateId::new(substate_id, version); let lock_type = parse_from_string(&pledge.lock_type)?; - let lock_intent = VersionedSubstateIdLockIntent::new(id, lock_type); + let lock_intent = VersionedSubstateIdLockIntent::new(id, lock_type, true); let substate_value = pledge.substate_value.as_deref().map(deserialize_json).transpose()?; let pledge = SubstatePledge::try_create(lock_intent.clone(), substate_value).ok_or_else(|| { StorageError::DataInconsistency { diff --git a/dan_layer/state_store_sqlite/src/schema.rs b/dan_layer/state_store_sqlite/src/schema.rs index d800e9eaf0..8f4d72e044 100644 --- a/dan_layer/state_store_sqlite/src/schema.rs +++ b/dan_layer/state_store_sqlite/src/schema.rs @@ -299,7 +299,6 @@ diesel::table! { shard -> Integer, key -> Text, node -> Text, - is_stale -> Bool, } } diff --git a/dan_layer/state_store_sqlite/src/writer.rs b/dan_layer/state_store_sqlite/src/writer.rs index 0629d103ef..18d8f0dec8 100644 --- a/dan_layer/state_store_sqlite/src/writer.rs +++ b/dan_layer/state_store_sqlite/src/writer.rs @@ -1454,7 +1454,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta operation: "substates_create", source: e, })?; - let next_seq = seq.map(|s| s + 1).unwrap_or(0); + let next_seq = seq.map(|s| s + 1).unwrap_or(1); // This means that we MUST do the state tree updates before inserting substates let version = self.state_tree_versions_get_latest(substate.created_by_shard)?; @@ -1520,7 +1520,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta operation: "substates_down (get max seq)", source: e, })?; - let next_seq = seq.map(|s| s + 1).unwrap_or(0); + let next_seq = seq.map(|s| s + 1).unwrap_or(1); let version = self.state_tree_versions_get_latest(shard)?; let values = ( @@ -1698,7 +1698,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta let values = ( state_tree::shard.eq(shard.as_u32() as i32), state_tree::key.eq(key.to_string()), - state_tree::node.eq(node), + state_tree::node.eq(&node), ); diesel::insert_into(state_tree::table) .values(&values) @@ -1718,11 +1718,20 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta ) -> Result<(), StorageError> { use crate::schema::state_tree; + // let num_effected = diesel::update(state_tree::table) + // .filter(state_tree::shard.eq(shard.as_u32() as i32)) + // .filter(state_tree::key.eq(key.to_string())) + // .set(state_tree::is_stale.eq(true)) + // .execute(self.connection()) + // .map_err(|e| SqliteStorageError::DieselError { + // operation: "state_tree_nodes_mark_stale_tree_node", + // source: e, + // })?; + let key = node.as_node_key(); - let num_effected = diesel::update(state_tree::table) + let num_effected = diesel::delete(state_tree::table) .filter(state_tree::shard.eq(shard.as_u32() as i32)) .filter(state_tree::key.eq(key.to_string())) - .set(state_tree::is_stale.eq(true)) .execute(self.connection()) .map_err(|e| SqliteStorageError::DieselError { operation: "state_tree_nodes_mark_stale_tree_node", diff --git a/dan_layer/storage/src/consensus_models/executed_transaction.rs b/dan_layer/storage/src/consensus_models/executed_transaction.rs index 31ab688cfb..4e36287435 100644 --- a/dan_layer/storage/src/consensus_models/executed_transaction.rs +++ b/dan_layer/storage/src/consensus_models/executed_transaction.rs @@ -9,13 +9,7 @@ use std::{ }; use serde::{Deserialize, Serialize}; -use tari_dan_common_types::{ - optional::Optional, - SubstateAddress, - SubstateLockType, - ToSubstateAddress, - VersionedSubstateId, -}; +use tari_dan_common_types::{optional::Optional, SubstateAddress, ToSubstateAddress, VersionedSubstateId}; use tari_engine_types::commit_result::{ExecuteResult, RejectReason}; use tari_transaction::{Transaction, TransactionId}; @@ -65,10 +59,10 @@ impl ExecutedTransaction { .map(|diff| { diff.up_iter() .map(|(addr, substate)| { - VersionedSubstateIdLockIntent::new( - VersionedSubstateId::new(addr.clone(), substate.version()), - SubstateLockType::Output, - ) + VersionedSubstateIdLockIntent::output(VersionedSubstateId::new( + addr.clone(), + substate.version(), + )) }) .collect::>() }) diff --git a/dan_layer/storage/src/consensus_models/lock_intent.rs b/dan_layer/storage/src/consensus_models/lock_intent.rs index 789b0852b2..49680460d2 100644 --- a/dan_layer/storage/src/consensus_models/lock_intent.rs +++ b/dan_layer/storage/src/consensus_models/lock_intent.rs @@ -16,26 +16,38 @@ use tari_engine_types::substate::SubstateId; pub struct VersionedSubstateIdLockIntent { versioned_substate_id: VersionedSubstateId, lock_type: SubstateLockType, + require_version: bool, } impl VersionedSubstateIdLockIntent { - pub fn new(versioned_substate_id: VersionedSubstateId, lock: SubstateLockType) -> Self { + pub fn new(versioned_substate_id: VersionedSubstateId, lock: SubstateLockType, require_version: bool) -> Self { Self { versioned_substate_id, lock_type: lock, + require_version, } } - pub fn read(versioned_substate_id: VersionedSubstateId) -> Self { - Self::new(versioned_substate_id, SubstateLockType::Read) + pub fn from_requirement(substate_requirement: SubstateRequirement, lock: SubstateLockType) -> Self { + let version = substate_requirement.version(); + Self::new( + VersionedSubstateId::new(substate_requirement.into_substate_id(), version.unwrap_or(0)), + lock, + version.is_some(), + ) + } + + pub fn read(versioned_substate_id: VersionedSubstateId, require_version: bool) -> Self { + Self::new(versioned_substate_id, SubstateLockType::Read, require_version) } - pub fn write(versioned_substate_id: VersionedSubstateId) -> Self { - Self::new(versioned_substate_id, SubstateLockType::Write) + pub fn write(versioned_substate_id: VersionedSubstateId, require_version: bool) -> Self { + Self::new(versioned_substate_id, SubstateLockType::Write, require_version) } pub fn output(versioned_substate_id: VersionedSubstateId) -> Self { - Self::new(versioned_substate_id, SubstateLockType::Output) + // Once we lock outputs we always require the provided version + Self::new(versioned_substate_id, SubstateLockType::Output, true) } pub fn versioned_substate_id(&self) -> &VersionedSubstateId { @@ -57,6 +69,15 @@ impl VersionedSubstateIdLockIntent { pub fn lock_type(&self) -> SubstateLockType { self.lock_type } + + pub fn to_substate_requirement(&self) -> SubstateRequirement { + let version = if self.require_version { + Some(self.version()) + } else { + None + }; + SubstateRequirement::new(self.substate_id().clone(), version) + } } impl Borrow for VersionedSubstateIdLockIntent { @@ -88,11 +109,15 @@ impl<'a> LockIntent for &'a VersionedSubstateIdLockIntent { } fn version_to_lock(&self) -> u32 { - self.versioned_substate_id.version() + self.version() } fn requested_version(&self) -> Option { - Some(self.versioned_substate_id.version()) + if self.require_version { + Some(self.version()) + } else { + None + } } } @@ -152,6 +177,7 @@ impl SubstateRequirementLockIntent { VersionedSubstateIdLockIntent::new( VersionedSubstateId::new(self.substate_id().clone(), self.version_to_lock), self.lock_type, + self.substate_requirement.version().is_some(), ) } } @@ -195,7 +221,7 @@ impl LockIntent for SubstateRequirementLockIntent { impl From for SubstateRequirementLockIntent { fn from(intent: VersionedSubstateIdLockIntent) -> Self { let version = intent.versioned_substate_id.version(); - Self::new(intent.versioned_substate_id, version, intent.lock_type) + Self::new(intent.to_substate_requirement(), version, intent.lock_type) } } diff --git a/dan_layer/storage/src/consensus_models/substate_lock.rs b/dan_layer/storage/src/consensus_models/substate_lock.rs index 083c97a3b2..c2531bb98a 100644 --- a/dan_layer/storage/src/consensus_models/substate_lock.rs +++ b/dan_layer/storage/src/consensus_models/substate_lock.rs @@ -80,6 +80,7 @@ impl LockedSubstateValue { VersionedSubstateIdLockIntent::new( VersionedSubstateId::new(self.substate_id.clone(), self.lock.version()), self.lock.substate_lock(), + true, ) } diff --git a/dan_layer/storage/src/consensus_models/transaction.rs b/dan_layer/storage/src/consensus_models/transaction.rs index 2a58f48468..c208debb13 100644 --- a/dan_layer/storage/src/consensus_models/transaction.rs +++ b/dan_layer/storage/src/consensus_models/transaction.rs @@ -171,7 +171,7 @@ impl TransactionRecord { let resolved_inputs = self.resolved_inputs.take().unwrap_or_else(|| { self.transaction .all_inputs_iter() - .map(|i| VersionedSubstateIdLockIntent::new(i.or_zero_version(), SubstateLockType::Write)) + .map(|i| VersionedSubstateIdLockIntent::from_requirement(i, SubstateLockType::Write)) .collect() }); let resulting_outputs = self.resulting_outputs.take().unwrap_or_default(); diff --git a/dan_layer/transaction/src/transaction.rs b/dan_layer/transaction/src/transaction.rs index 35ee255fc9..38ce5912b2 100644 --- a/dan_layer/transaction/src/transaction.rs +++ b/dan_layer/transaction/src/transaction.rs @@ -1,7 +1,7 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::collections::HashSet; +use std::{collections::HashSet, fmt::Display}; use indexmap::IndexSet; use serde::{Deserialize, Serialize}; @@ -237,3 +237,18 @@ impl Transaction { self.inputs().iter().any(|i| i.version().is_none()) } } + +impl Display for Transaction { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "Transaction[{}, Inputs: {}, Fee Instructions: {}, Instructions: {}, Signatures: {}, Filled Inputs: {}]", + self.id, + self.transaction.inputs.len(), + self.transaction.fee_instructions.len(), + self.transaction.instructions.len(), + self.signatures.len(), + self.filled_inputs.len(), + ) + } +} diff --git a/dan_layer/wallet/sdk/src/apis/transaction.rs b/dan_layer/wallet/sdk/src/apis/transaction.rs index a93319c8be..74ff6c2a06 100644 --- a/dan_layer/wallet/sdk/src/apis/transaction.rs +++ b/dan_layer/wallet/sdk/src/apis/transaction.rs @@ -286,14 +286,14 @@ where diff: &SubstateDiff, ) -> Result<(), TransactionApiError> { let mut downed_substates_with_parents = HashMap::with_capacity(diff.down_len()); - for (addr, _) in diff.down_iter() { - if addr.is_layer1_commitment() { - info!(target: LOG_TARGET, "Layer 1 commitment {} downed", addr); + for (id, _) in diff.down_iter() { + if id.is_layer1_commitment() { + info!(target: LOG_TARGET, "Layer 1 commitment {} downed", id); continue; } - let Some(downed) = tx.substates_remove(addr).optional()? else { - warn!(target: LOG_TARGET, "Downed substate {} not found", addr); + let Some(downed) = tx.substates_remove(id).optional()? else { + warn!(target: LOG_TARGET, "Downed substate {} not found", id); continue; }; diff --git a/integration_tests/tests/features/state_sync.feature b/integration_tests/tests/features/state_sync.feature index 63a00187e6..46492ff703 100644 --- a/integration_tests/tests/features/state_sync.feature +++ b/integration_tests/tests/features/state_sync.feature @@ -3,7 +3,7 @@ @state_sync Feature: State Sync - @serial @fixed + @serial @fixed @doit Scenario: New validator node registers and syncs # Initialize a base node, wallet, miner and VN Given a base node BASE diff --git a/integration_tests/tests/steps/validator_node.rs b/integration_tests/tests/steps/validator_node.rs index 1ab7370dd8..dbb13df2be 100644 --- a/integration_tests/tests/steps/validator_node.rs +++ b/integration_tests/tests/steps/validator_node.rs @@ -468,7 +468,12 @@ async fn when_i_wait_for_validator_leaf_block_at_least(world: &mut TariWorld, na }) .await .unwrap(); - let actual_height = resp.blocks.first().unwrap().height().as_u64(); + let actual_height = resp + .blocks + .first() + .unwrap_or_else(|| panic!("Validator {name} has no blocks")) + .height() + .as_u64(); if actual_height < height { panic!( "Validator {} leaf block height {} is less than {}", diff --git a/utilities/tariswap_test_bench/src/accounts.rs b/utilities/tariswap_test_bench/src/accounts.rs index d75415ede0..4da66806b0 100644 --- a/utilities/tariswap_test_bench/src/accounts.rs +++ b/utilities/tariswap_test_bench/src/accounts.rs @@ -6,7 +6,7 @@ use std::ops::RangeInclusive; use tari_crypto::{keys::PublicKey as _, ristretto::RistrettoPublicKey}; use tari_dan_common_types::SubstateRequirement; use tari_dan_wallet_sdk::{apis::key_manager::TRANSACTION_BRANCH, models::Account}; -use tari_engine_types::component::new_component_address_from_public_key; +use tari_engine_types::{component::new_component_address_from_public_key, indexed_value::IndexedWellKnownTypes}; use tari_template_builtin::ACCOUNT_TEMPLATE_ADDRESS; use tari_template_lib::{ args, @@ -43,7 +43,10 @@ impl Runner { let finalize = self.submit_transaction_and_wait(transaction).await?; let diff = finalize.result.accept().unwrap(); let (account, _) = diff.up_iter().find(|(addr, _)| addr.is_component()).unwrap(); - let (vault, _) = diff.up_iter().find(|(addr, _)| addr.is_vault()).unwrap(); + let (vault, _) = diff + .up_iter() + .find(|(addr, _)| *addr != XTR_FAUCET_VAULT_ADDRESS && addr.is_vault()) + .unwrap(); self.sdk.accounts_api().add_account(None, account, 0, true)?; self.sdk.accounts_api().add_vault( @@ -80,8 +83,18 @@ impl Runner { for owner in &owners { builder = builder.create_account(RistrettoPublicKey::from_secret_key(&owner.key)); } + + let pay_fee_vault = self + .sdk + .accounts_api() + .get_vault_by_resource(&pay_fee_account.address, &XTR)?; + let transaction = builder - .with_inputs([SubstateRequirement::unversioned(pay_fee_account.address.clone())]) + .with_inputs([ + SubstateRequirement::unversioned(pay_fee_account.address.clone()), + SubstateRequirement::unversioned(pay_fee_vault.address), + SubstateRequirement::unversioned(pay_fee_vault.resource_address), + ]) .sign(&key.key) .build(); @@ -90,7 +103,7 @@ impl Runner { let mut accounts = Vec::with_capacity(num_accounts); for owner in owners { - let account = diff + let account_addr = diff .up_iter() .map(|(addr, _)| addr) .filter(|addr| addr.is_component()) @@ -105,8 +118,8 @@ impl Runner { self.sdk .accounts_api() - .add_account(None, account, owner.key_index, false)?; - let account = self.sdk.accounts_api().get_account_by_address(account)?; + .add_account(None, account_addr, owner.key_index, false)?; + let account = self.sdk.accounts_api().get_account_by_address(account_addr)?; accounts.push(account); } @@ -120,6 +133,10 @@ impl Runner { accounts: &[Account], ) -> anyhow::Result<()> { let key = self.sdk.key_manager_api().derive_key(TRANSACTION_BRANCH, 0)?; + let fee_vault = self + .sdk + .accounts_api() + .get_vault_by_resource(&fee_account.address, &XTR)?; let mut builder = Transaction::builder().fee_transaction_pay_from_component( fee_account.address.as_component_address().unwrap(), Amount(1000 * accounts.len() as i64), @@ -135,18 +152,51 @@ impl Runner { .put_last_instruction_output_on_workspace("xtr") .call_method(account.address.as_component_address().unwrap(), "deposit", args![ Workspace("xtr") - ]); + ]) + .add_input(SubstateRequirement::unversioned(account.address.clone())); } let transaction = builder .with_inputs([ + SubstateRequirement::unversioned(XTR), + SubstateRequirement::unversioned(XTR_FAUCET_COMPONENT_ADDRESS), + SubstateRequirement::unversioned(XTR_FAUCET_VAULT_ADDRESS), SubstateRequirement::unversioned(faucet.component_address), SubstateRequirement::unversioned(faucet.resource_address), + SubstateRequirement::unversioned(faucet.vault_address), + SubstateRequirement::unversioned(fee_vault.account_address), + SubstateRequirement::unversioned(fee_vault.address), ]) .sign(&key.key) .build(); - self.submit_transaction_and_wait(transaction).await?; + let result = self.submit_transaction_and_wait(transaction).await?; + + let accounts_and_state = result + .result + .accept() + .unwrap() + .up_iter() + .filter(|(addr, _)| { + *addr != XTR_FAUCET_COMPONENT_ADDRESS && + *addr != faucet.component_address && + *addr != fee_account.address + }) + .filter_map(|(addr, substate)| Some((addr, substate.substate_value().component()?))) + .map(|(addr, component)| (addr, IndexedWellKnownTypes::from_value(&component.body.state).unwrap())); + + for (account, indexed) in accounts_and_state { + for vault_id in indexed.vault_ids() { + log::info!("Adding vault {} to account {}", vault_id, account); + self.sdk.accounts_api().add_vault( + account.clone(), + (*vault_id).into(), + faucet.resource_address, + ResourceType::Fungible, + Some("FAUCET".to_string()), + )?; + } + } Ok(()) } diff --git a/utilities/tariswap_test_bench/src/faucet.rs b/utilities/tariswap_test_bench/src/faucet.rs index 78fdcf58a7..a3c53690ea 100644 --- a/utilities/tariswap_test_bench/src/faucet.rs +++ b/utilities/tariswap_test_bench/src/faucet.rs @@ -1,10 +1,13 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause +use log::info; +use tari_dan_common_types::SubstateRequirement; use tari_dan_wallet_sdk::{apis::key_manager::TRANSACTION_BRANCH, models::Account}; use tari_template_lib::{ args, - models::{Amount, ComponentAddress, ResourceAddress}, + constants::XTR, + models::{Amount, ComponentAddress, ResourceAddress, VaultId}, }; use tari_transaction::Transaction; @@ -13,15 +16,26 @@ use crate::runner::Runner; pub struct Faucet { pub component_address: ComponentAddress, pub resource_address: ResourceAddress, + pub vault_address: VaultId, } impl Runner { pub async fn create_faucet(&mut self, in_account: &Account) -> anyhow::Result { let key = self.sdk.key_manager_api().derive_key(TRANSACTION_BRANCH, 0)?; + let fee_vault = self + .sdk + .accounts_api() + .get_vault_by_resource(&in_account.address, &XTR)?; + let transaction = Transaction::builder() .fee_transaction_pay_from_component(in_account.address.as_component_address().unwrap(), Amount(1000)) - .call_function(self._faucet_template.address, "mint", args![Amount(1_000_000_000)]) + .call_function(self.faucet_template.address, "mint", args![Amount(1_000_000_000)]) + .with_inputs([ + SubstateRequirement::unversioned(in_account.address.clone()), + SubstateRequirement::unversioned(fee_vault.address.clone()), + SubstateRequirement::unversioned(fee_vault.resource_address), + ]) .sign(&key.key) .build(); @@ -31,18 +45,28 @@ impl Runner { let component_address = diff .up_iter() .find_map(|(addr, s)| { - addr.as_component_address() - .filter(|_| s.substate_value().component().unwrap().module_name == "TestFaucet") + addr.as_component_address().filter(|_| { + s.substate_value().component().unwrap().template_address == self.faucet_template.address + }) }) .ok_or_else(|| anyhow::anyhow!("Faucet Component address not found"))?; let resource_address = diff .up_iter() + .filter(|(addr, _)| *addr != XTR) .find_map(|(addr, _)| addr.as_resource_address()) .ok_or_else(|| anyhow::anyhow!("Faucet Resource address not found"))?; + let vault_address = diff + .up_iter() + .filter_map(|(addr, _)| addr.as_vault_id()) + .find(|addr| *addr != fee_vault.address) + .ok_or_else(|| anyhow::anyhow!("Faucet Resource address not found"))?; + + info!("✅ Faucet {component_address} created with {resource_address} and {vault_address}"); Ok(Faucet { component_address, resource_address, + vault_address, }) } } diff --git a/utilities/tariswap_test_bench/src/main.rs b/utilities/tariswap_test_bench/src/main.rs index 9bbe9a81fb..146addb2fa 100644 --- a/utilities/tariswap_test_bench/src/main.rs +++ b/utilities/tariswap_test_bench/src/main.rs @@ -58,9 +58,9 @@ async fn run(cli: cli::CommonArgs, _args: cli::RunArgs) -> anyhow::Result<()> { info!("✅ Created faucet {}", faucet.component_address); info!("⏳️ Funding accounts ..."); - for batch in accounts.chunks(500) { + for batch in accounts.chunks(250) { runner.fund_accounts(&faucet, &primary_account, batch).await?; - info!("✅ Funded 500 accounts"); + info!("✅ Funded 250 accounts"); } info!("⏳️ Creating 1000 tariswap components..."); diff --git a/utilities/tariswap_test_bench/src/runner.rs b/utilities/tariswap_test_bench/src/runner.rs index 3a4ceffe06..c05fe760d6 100644 --- a/utilities/tariswap_test_bench/src/runner.rs +++ b/utilities/tariswap_test_bench/src/runner.rs @@ -4,7 +4,6 @@ use std::{path::Path, time::Duration}; use log::info; -use tari_dan_common_types::SubstateRequirement; use tari_dan_wallet_daemon::indexer_jrpc_impl::IndexerJsonRpcNetworkInterface; use tari_dan_wallet_sdk::{DanWalletSdk, WalletSdkConfig}; use tari_dan_wallet_storage_sqlite::SqliteWalletStore; @@ -20,7 +19,7 @@ type WalletSdk = DanWalletSdk pub struct Runner { pub(crate) sdk: WalletSdk, pub(crate) _cli: CommonArgs, - pub(crate) _faucet_template: TemplateMetadata, + pub(crate) faucet_template: TemplateMetadata, pub(crate) tariswap_template: TemplateMetadata, pub(crate) stats: Stats, } @@ -32,7 +31,7 @@ impl Runner { Ok(Self { sdk, _cli: cli, - _faucet_template: faucet_template, + faucet_template, tariswap_template, stats: Stats::default(), }) @@ -46,16 +45,16 @@ impl Runner { pub async fn submit_transaction(&mut self, transaction: Transaction) -> anyhow::Result { // TODO: remove the filled inputs here and allow consensus to figure out input versions - let inputs = transaction - .to_referenced_substates()? - .into_iter() - .map(|s| SubstateRequirement::new(s, None)) - .collect(); + // let inputs = transaction + // .to_referenced_substates()? + // .into_iter() + // .map(|s| SubstateRequirement::new(s, None)) + // .collect(); let tx_id = self .sdk .transaction_api() - .insert_new_transaction(transaction, inputs, None, false) + .insert_new_transaction(transaction, vec![], None, false) .await?; self.sdk.transaction_api().submit_transaction(tx_id).await?; diff --git a/utilities/tariswap_test_bench/src/tariswap.rs b/utilities/tariswap_test_bench/src/tariswap.rs index 5a657ba5ca..27529c181f 100644 --- a/utilities/tariswap_test_bench/src/tariswap.rs +++ b/utilities/tariswap_test_bench/src/tariswap.rs @@ -1,13 +1,16 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause +use std::collections::HashMap; + use log::info; -use tari_dan_common_types::SubstateRequirement; +use tari_dan_common_types::{optional::Optional, SubstateRequirement}; use tari_dan_wallet_sdk::{apis::key_manager::TRANSACTION_BRANCH, models::Account}; +use tari_engine_types::indexed_value::decode_value_at_path; use tari_template_lib::{ args, - models::{Amount, ComponentAddress}, - prelude::XTR, + models::{Amount, ComponentAddress, VaultId}, + prelude::{ResourceAddress, ResourceType, XTR}, }; use tari_transaction::Transaction; @@ -15,6 +18,8 @@ use crate::{faucet::Faucet, runner::Runner}; pub struct TariSwap { pub component_address: ComponentAddress, + pub vaults: HashMap, + pub lp_resource_address: ResourceAddress, } impl Runner { @@ -36,17 +41,43 @@ impl Runner { Amount(1) ]); } - let transaction = builder.sign(&key.key).build(); + + let fee_vault = self + .sdk + .accounts_api() + .get_vault_by_resource(&in_account.address, &XTR)?; + + let transaction = builder + .with_inputs([ + SubstateRequirement::unversioned(in_account.address.clone()), + SubstateRequirement::unversioned(fee_vault.address.clone()), + SubstateRequirement::unversioned(XTR), + SubstateRequirement::unversioned(faucet.resource_address), + ]) + .sign(&key.key) + .build(); let finalize = self.submit_transaction_and_wait(transaction).await?; let diff = finalize.result.accept().unwrap(); Ok(diff .up_iter() .filter_map(|(addr, value)| { - addr.as_component_address() - .filter(|_| value.substate_value().component().unwrap().module_name == "TariSwapPool") + let addr = addr + .as_component_address() + .filter(|_| value.substate_value().component().unwrap().module_name == "TariSwapPool")?; + let vaults = decode_value_at_path(value.substate_value().component().unwrap().state(), "$.pools") + .unwrap() + .unwrap(); + let lp_resource_address = + decode_value_at_path(value.substate_value().component().unwrap().state(), "$.lp_resource") + .unwrap() + .unwrap(); + Some(TariSwap { + component_address: addr, + vaults, + lp_resource_address, + }) }) - .map(|component_address| TariSwap { component_address }) .collect()) } @@ -63,8 +94,8 @@ impl Runner { .sdk .key_manager_api() .derive_key(TRANSACTION_BRANCH, primary_account.key_index)?; - let mut tx_ids = vec![]; - // Batch these otherwise we can break consensus (proposed with locked object) + let mut tx_ids = Vec::with_capacity(200); + for i in 0..5 { for (i, tariswap) in tariswaps.iter().enumerate().skip(i * 200).take(200) { let account = &accounts[i % accounts.len()]; @@ -72,11 +103,28 @@ impl Runner { .sdk .key_manager_api() .derive_key(TRANSACTION_BRANCH, account.key_index)?; + let xtr_vault = self.sdk.accounts_api().get_vault_by_resource(&account.address, &XTR)?; + let faucet_vault = self + .sdk + .accounts_api() + .get_vault_by_resource(&account.address, &faucet.resource_address)?; + let maybe_lp_vault = self + .sdk + .accounts_api() + .get_vault_by_resource(&account.address, &tariswap.lp_resource_address) + .optional()?; + let transaction = Transaction::builder() - .with_inputs(vec![ - SubstateRequirement::new(faucet.resource_address.into(), Some(0)), - SubstateRequirement::new(XTR.into(), Some(0)), + .with_inputs(maybe_lp_vault.map(|v| SubstateRequirement::unversioned(v.address))) + .with_inputs([ + SubstateRequirement::unversioned(account.address.clone()), + SubstateRequirement::unversioned(xtr_vault.address), + SubstateRequirement::unversioned(faucet_vault.address), + SubstateRequirement::unversioned(tariswap.component_address), + SubstateRequirement::unversioned(faucet.resource_address), + SubstateRequirement::unversioned(XTR), ]) + .with_inputs(tariswap.vaults.values().map(|v| SubstateRequirement::unversioned(*v))) .fee_transaction_pay_from_component(account.address.as_component_address().unwrap(), Amount(1000)) .call_method(account.address.as_component_address().unwrap(), "withdraw", args![ XTR, amount_a @@ -99,11 +147,30 @@ impl Runner { .sign(&key.key) .build(); - tx_ids.push(self.submit_transaction(transaction).await?); + tx_ids.push((account.address.clone(), self.submit_transaction(transaction).await?)); } - for tx_id in tx_ids.drain(..) { - self.wait_for_transaction(tx_id).await?; + for (account, tx_id) in tx_ids.drain(..) { + let result = self.wait_for_transaction(tx_id).await?; + let diff = result.result.accept().unwrap(); + let lp_vault = diff + .up_iter() + .find_map(|(addr, s)| { + let addr = addr.as_vault_id()?; + if *s.substate_value().vault().unwrap().resource_address() == tariswaps[0].lp_resource_address { + Some(addr) + } else { + None + } + }) + .ok_or_else(|| anyhow::anyhow!("LP Vault not found"))?; + self.sdk.accounts_api().add_vault( + account, + lp_vault.into(), + tariswaps[0].lp_resource_address, + ResourceType::NonFungible, + Some("LP".to_string()), + )?; } info!("⏳️ Added liquidity to pools {}-{}", i * 200, (i + 1) * 200); } @@ -128,37 +195,58 @@ impl Runner { let mut tx_ids = vec![]; // Swap XTR for faucet - // Batch these otherwise we can break consensus (proposed with locked object) for i in 0..5 { for (i, account) in accounts.iter().enumerate().skip(i * 200).take(200) { + let tariswap = &tariswaps[i % tariswaps.len()]; let key = self .sdk .key_manager_api() .derive_key(TRANSACTION_BRANCH, account.key_index)?; - let tariswap = &tariswaps[i % tariswaps.len()]; + let xtr_vault = self.sdk.accounts_api().get_vault_by_resource(&account.address, &XTR)?; + let faucet_vault = self + .sdk + .accounts_api() + .get_vault_by_resource(&account.address, &faucet.resource_address)?; + let maybe_lp_vault = self + .sdk + .accounts_api() + .get_vault_by_resource(&account.address, &tariswap.lp_resource_address) + .optional()?; let transaction = Transaction::builder() - // Use resources as input refs to allow concurrent access. - .with_inputs(vec![ - SubstateRequirement::new(faucet.resource_address.into(), Some(0)), - SubstateRequirement::new(XTR.into(), Some(0)), + .with_inputs(maybe_lp_vault.map(|v| SubstateRequirement::unversioned(v.address))) + .with_inputs([ + SubstateRequirement::unversioned(account.address.clone()), + SubstateRequirement::unversioned(xtr_vault.address), + SubstateRequirement::unversioned(faucet_vault.address), + SubstateRequirement::unversioned(tariswap.component_address), + SubstateRequirement::unversioned(faucet.resource_address), + SubstateRequirement::unversioned(XTR), + SubstateRequirement::unversioned(tariswap.lp_resource_address), ]) + .with_inputs(tariswap.vaults.values().map(|v| SubstateRequirement::unversioned(*v))) .fee_transaction_pay_from_component(account.address.as_component_address().unwrap(), Amount(1000)) - .call_method(tariswap.component_address, "get_pool_balance", args![ XTR, ]) - .call_method(tariswap.component_address, "get_pool_balance", args![ faucet.resource_address, ]) + .call_method(tariswap.component_address, "get_pool_balance", args![XTR,]) + .call_method(tariswap.component_address, "get_pool_balance", args![ + faucet.resource_address, + ]) .call_method(tariswap.component_address, "get_pool_ratio", args![XTR, Amount(1000)]) - .call_method(tariswap.component_address, "get_pool_ratio", args![faucet.resource_address, Amount(1000)]) + .call_method(tariswap.component_address, "get_pool_ratio", args![ + faucet.resource_address, + Amount(1000) + ]) .call_method(account.address.as_component_address().unwrap(), "withdraw", args![ - XTR, amount_a_for_b - ]) + XTR, + amount_a_for_b + ]) .put_last_instruction_output_on_workspace("a") .call_method(tariswap.component_address, "swap", args![ - Workspace("a"), - faucet.resource_address, - ]) + Workspace("a"), + faucet.resource_address, + ]) .put_last_instruction_output_on_workspace("swapped") .call_method(account.address.as_component_address().unwrap(), "deposit", args![ - Workspace("swapped") - ]) + Workspace("swapped") + ]) .sign(&primary_account_key.key) .sign(&key.key) .build(); @@ -188,26 +276,39 @@ impl Runner { .sdk .key_manager_api() .derive_key(TRANSACTION_BRANCH, account.key_index)?; + let xtr_vault = self.sdk.accounts_api().get_vault_by_resource(&account.address, &XTR)?; + let faucet_vault = self + .sdk + .accounts_api() + .get_vault_by_resource(&account.address, &faucet.resource_address)?; let tariswap = &tariswaps[i % tariswaps.len()]; let transaction = Transaction::builder() - // Use resources as input refs to allow concurrent access. - .with_inputs(vec![ - SubstateRequirement::new(faucet.resource_address.into(), Some(0)), - SubstateRequirement::new(XTR.into(), Some(0)), + .with_inputs([ + SubstateRequirement::unversioned(account.address.clone()), + SubstateRequirement::unversioned(xtr_vault.address), + SubstateRequirement::unversioned(faucet_vault.address), + SubstateRequirement::unversioned(tariswap.component_address), + SubstateRequirement::unversioned(faucet.resource_address), + SubstateRequirement::unversioned(XTR), + SubstateRequirement::unversioned(tariswap.lp_resource_address), ]) + .with_inputs(tariswap.vaults.values().map(|v| SubstateRequirement::unversioned(*v))) .fee_transaction_pay_from_component(account.address.as_component_address().unwrap(), Amount(1000)) .call_method(tariswap.component_address, "get_pool_balance", args![XTR]) - .call_method(tariswap.component_address, "get_pool_balance", args![faucet.resource_address]) + .call_method(tariswap.component_address, "get_pool_balance", args![ + faucet.resource_address + ]) .call_method(tariswap.component_address, "get_pool_ratio", args![XTR, Amount(1000)]) - .call_method(tariswap.component_address, "get_pool_ratio", args![faucet.resource_address, Amount(1000)]) + .call_method(tariswap.component_address, "get_pool_ratio", args![ + faucet.resource_address, + Amount(1000) + ]) .call_method(account.address.as_component_address().unwrap(), "withdraw", args![ - faucet.resource_address, amount_b_for_a + faucet.resource_address, + amount_b_for_a ]) .put_last_instruction_output_on_workspace("b") - .call_method(tariswap.component_address, "swap", args![ - Workspace("b"), - XTR, - ]) + .call_method(tariswap.component_address, "swap", args![Workspace("b"), XTR,]) .put_last_instruction_output_on_workspace("swapped") .call_method(account.address.as_component_address().unwrap(), "deposit", args![ Workspace("swapped")