From 771de401bb22f48676f2188107a61cddebf7cc20 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Thu, 2 May 2024 12:19:39 +0400 Subject: [PATCH] fix(mempool): allow local transactions with unversioned inputs (#1027) Description --- fix(mempool): resolve local unversioned inputs before execution Motivation and Context --- Allow local only transactions with unversioned inputs to be resolved in the mempool High-level Notes: - Versioned local and foreign inputs may be safely executed and passed to consensus. If an input conflicts or has been downed since execution this transaction will (and must) ABORT. - Unversioned inputs are more complicated. - **Local-only unversioned in mempool**: the mempool resolves all inputs as local and assigns the current UP version. Since inputs are not locked yet, another transaction may consume these inputs rendering one of the transactions to ABORT when it shouldn't. This can be resolved by executing the local-only transactions within the context of the object state of the current block. - **Local-only consensus execution**: The version applicable to the current leaf proposal which accounts for the uncommitted chain (to commit block/3-chain) is selected and proposed as LocalOnly. This implicitly pledges and locks the inputs. - **Local-only with foreign conflicts in consensus**: ... How Has This Been Tested? --- Cucumber test re-added that tests unversioned inputs. What process can a PR reviewer use to test or verify this change? --- Submit a transaction with at least one unversioned input local to the shard Breaking Changes --- - [x] None - [ ] Requires data directory to be deleted - [ ] Other - Please specify NOTE: Added an index to the database, that requires the database to be deleted to be applied. This is non-breaking but does ensure data integrity and improves the SQLite implementation query performance --- .../src/transaction_manager/mod.rs | 8 +- .../src/dry_run_transaction_processor.rs | 20 ++- .../src/p2p/services/mempool/error.rs | 13 +- .../src/p2p/services/mempool/executor.rs | 128 ++++++++-------- .../src/p2p/services/mempool/initializer.rs | 4 + .../src/p2p/services/mempool/metrics.rs | 6 +- .../src/p2p/services/mempool/service.rs | 102 +++++-------- .../src/p2p/services/mempool/traits.rs | 16 +- .../src/substate_resolver.rs | 137 +++++++++++++----- .../src/hotstuff/on_inbound_message.rs | 41 +++--- .../up.sql | 6 +- dan_layer/state_store_sqlite/src/lib.rs | 1 + dan_layer/state_store_sqlite/src/reader.rs | 88 ++++++++++- .../storage/src/consensus_models/substate.rs | 36 ++++- dan_layer/storage/src/state_store/mod.rs | 9 +- dan_layer/transaction/src/transaction.rs | 33 ++++- .../tests/features/concurrency.feature | 81 +++++------ 17 files changed, 473 insertions(+), 256 deletions(-) diff --git a/applications/tari_indexer/src/transaction_manager/mod.rs b/applications/tari_indexer/src/transaction_manager/mod.rs index d66750c27..afcecf19f 100644 --- a/applications/tari_indexer/src/transaction_manager/mod.rs +++ b/applications/tari_indexer/src/transaction_manager/mod.rs @@ -85,14 +85,18 @@ where ); let transaction_substate_address = SubstateAddress::for_transaction_receipt(tx_hash.into_array().into()); - if transaction.involved_shards_iter().count() == 0 { + if transaction.all_inputs_iter().next().is_none() { self.try_with_committee(iter::once(transaction_substate_address), |mut client| { let transaction = transaction.clone(); async move { client.submit_transaction(transaction).await } }) .await } else { - self.try_with_committee(transaction.involved_shards_iter(), |mut client| { + let involved = transaction + .all_inputs_iter() + // If there is no version specified, submit to the validator node with version 0 + .map(|i| i.or_zero_version().to_substate_address()); + self.try_with_committee(involved, |mut client| { let transaction = transaction.clone(); async move { client.submit_transaction(transaction).await } }) diff --git a/applications/tari_validator_node/src/dry_run_transaction_processor.rs b/applications/tari_validator_node/src/dry_run_transaction_processor.rs index 4ddaad7f7..08a6febc3 100644 --- a/applications/tari_validator_node/src/dry_run_transaction_processor.rs +++ b/applications/tari_validator_node/src/dry_run_transaction_processor.rs @@ -43,7 +43,7 @@ use thiserror::Error; use tokio::task; use crate::{ - p2p::services::mempool::SubstateResolver, + p2p::services::mempool::{ResolvedSubstates, SubstateResolver}, substate_resolver::{SubstateResolverError, TariSubstateResolver}, virtual_substate::VirtualSubstateError, }; @@ -68,6 +68,8 @@ pub enum DryRunTransactionProcessorError { SubstateResoverError(#[from] SubstateResolverError), #[error("Virtual substate error: {0}")] VirtualSubstateError(#[from] VirtualSubstateError), + #[error("Execution thread failed: {0}")] + ExecutionThreadFailed(#[from] task::JoinError), } #[derive(Clone, Debug)] @@ -118,14 +120,20 @@ impl DryRunTransactionProcessor { .resolve_virtual_substates(&transaction, current_epoch) .await?; - let inputs = self.substate_resolver.resolve(&transaction).await?; + let ResolvedSubstates { + local: inputs, + unresolved_foreign: foreign, + } = self.substate_resolver.try_resolve_local(&transaction)?; temp_state_store.set_many(inputs)?; + // Dry-run we can request the foreign inputs from validator nodes. The execution result may vary if inputs are + // mutated between the dry-run and live execution. + let foreign_inputs = self.substate_resolver.try_resolve_foreign(&foreign).await?; + temp_state_store.set_many(foreign_inputs)?; // execute the payload in the WASM engine and return the result - let executed = task::block_in_place(|| { - self.payload_processor - .execute(transaction, temp_state_store, virtual_substates) - })?; + let processor = self.payload_processor.clone(); + let executed = + task::spawn_blocking(move || processor.execute(transaction, temp_state_store, virtual_substates)).await??; let result = executed.into_result(); let fees = &result.finalize.fee_receipt; diff --git a/applications/tari_validator_node/src/p2p/services/mempool/error.rs b/applications/tari_validator_node/src/p2p/services/mempool/error.rs index 059d93585..9d001c6b6 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/error.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/error.rs @@ -1,15 +1,19 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause +use std::collections::HashSet; + +use indexmap::IndexMap; use tari_dan_app_utilities::{ template_manager::interface::TemplateManagerError, transaction_executor::TransactionProcessorError, }; use tari_dan_common_types::Epoch; use tari_dan_storage::{consensus_models::TransactionPoolError, StorageError}; +use tari_engine_types::substate::{Substate, SubstateId}; use tari_epoch_manager::EpochManagerError; use tari_networking::NetworkingError; -use tari_transaction::TransactionId; +use tari_transaction::{SubstateRequirement, TransactionId}; use tokio::sync::{mpsc, oneshot}; use crate::{ @@ -30,7 +34,12 @@ pub enum MempoolError { #[error("DryRunTransactionProcessor Error: {0}")] DryRunTransactionProcessorError(#[from] DryRunTransactionProcessorError), #[error("Execution thread failure: {0}")] - ExecutionThreadFailure(String), + ExecutionThreadPanicked(String), + #[error("Requires consensus for local substates: {local_substates:?}")] + MustDeferExecution { + local_substates: IndexMap, + foreign_substates: HashSet, + }, #[error("SubstateResolver Error: {0}")] SubstateResolverError(#[from] SubstateResolverError), #[error("Transaction Execution Error: {0}")] diff --git a/applications/tari_validator_node/src/p2p/services/mempool/executor.rs b/applications/tari_validator_node/src/p2p/services/mempool/executor.rs index 4fcfe474d..9777aaead 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/executor.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/executor.rs @@ -14,7 +14,7 @@ use tari_transaction::{Transaction, VersionedSubstateId}; use tokio::task; use crate::{ - p2p::services::mempool::{MempoolError, SubstateResolver}, + p2p::services::mempool::{MempoolError, ResolvedSubstates, SubstateResolver}, substate_resolver::SubstateResolverError, }; @@ -42,71 +42,83 @@ where Err(err) => return Err(err.into()), }; - info!(target: LOG_TARGET, "Transaction {} executing. virtual_substates = [{}]", transaction.id(), virtual_substates.keys().map(|addr| addr.to_string()).collect::>().join(", ")); + info!(target: LOG_TARGET, "🎱 Transaction {} found virtual_substates = [{}]", transaction.id(), virtual_substates.keys().map(|addr| addr.to_string()).collect::>().join(", ")); - match substate_resolver.resolve(&transaction).await { - Ok(inputs) => { - let res = task::spawn_blocking(move || { - let versioned_inputs = inputs - .iter() - .map(|(id, substate)| VersionedSubstateId::new(id.clone(), substate.version())) - .collect::>(); - let state_db = new_state_db(); - state_db.set_many(inputs).expect("memory db is infallible"); - - match executor.execute(transaction, state_db, virtual_substates) { - Ok(mut executed) => { - // TODO: refactor executor so that resolved inputs are set internally - // Update the resolved inputs to set the specific version, as we know it after execution - if let Some(diff) = executed.result().finalize.accept() { - let resolved_inputs = versioned_inputs - .into_iter() - .map(|versioned_id| { - let lock_flag = if diff.down_iter().any(|(id, _)| *id == versioned_id.substate_id) { - // Update all inputs that were DOWNed to be write locked - SubstateLockFlag::Write - } else { - // Any input not downed, gets a read lock - SubstateLockFlag::Read - }; - VersionedSubstateIdLockIntent::new(versioned_id, lock_flag) - }) - .collect::>(); - executed.set_resolved_inputs(resolved_inputs); - } else { - let resolved_inputs = versioned_inputs - .into_iter() - .map(|versioned_id| { - // We cannot tell which inputs are written, however since this transaction is a - // reject it does not matter since it will not cause locks. - // We still set resolved inputs because this is used to determine which shards are - // involved. - VersionedSubstateIdLockIntent::new(versioned_id, SubstateLockFlag::Write) - }) - .collect::>(); - executed.set_resolved_inputs(resolved_inputs); - } - - Ok(executed) - }, - Err(err) => Err(err.into()), - } - }) - .await; - - // If this errors, the thread panicked due to a bug - res.map_err(|err| MempoolError::ExecutionThreadFailure(err.to_string())) - }, + let ResolvedSubstates { + local: local_substates, + unresolved_foreign: foreign, + } = match substate_resolver.try_resolve_local(&transaction) { + Ok(pair) => pair, // Substates are downed/dont exist Err(err @ SubstateResolverError::InputSubstateDowned { .. }) | Err(err @ SubstateResolverError::InputSubstateDoesNotExist { .. }) => { warn!(target: LOG_TARGET, "One or more invalid input shards for transaction {}: {}", transaction.id(), err); - // Ok(Err(_)) This is not a mempool execution failure, but rather a transaction failure - Ok(Err(err.into())) + // Ok(Err(_)) return that the transaction should be rejected, not an internal mempool execution failure + return Ok(Err(err.into())); }, // Some other issue - network, db, etc - Err(err) => Err(err.into()), + Err(err) => return Err(err.into()), + }; + + if !foreign.is_empty() { + info!(target: LOG_TARGET, "Unable to execute transaction {} in the mempool because it has foreign inputs: {:?}", transaction.id(), foreign); + return Ok(Err(MempoolError::MustDeferExecution { + local_substates, + foreign_substates: foreign, + })); } + + info!(target: LOG_TARGET, "🎱 Transaction {} resolved local inputs = [{}]", transaction.id(), local_substates.keys().map(|addr| addr.to_string()).collect::>().join(", ")); + + let res = task::spawn_blocking(move || { + let versioned_inputs = local_substates + .iter() + .map(|(id, substate)| VersionedSubstateId::new(id.clone(), substate.version())) + .collect::>(); + let state_db = new_state_db(); + state_db.set_many(local_substates).expect("memory db is infallible"); + + match executor.execute(transaction, state_db, virtual_substates) { + Ok(mut executed) => { + // Update the resolved inputs to set the specific version, as we know it after execution + if let Some(diff) = executed.result().finalize.accept() { + let resolved_inputs = versioned_inputs + .into_iter() + .map(|versioned_id| { + let lock_flag = if diff.down_iter().any(|(id, _)| *id == versioned_id.substate_id) { + // Update all inputs that were DOWNed to be write locked + SubstateLockFlag::Write + } else { + // Any input not downed, gets a read lock + SubstateLockFlag::Read + }; + VersionedSubstateIdLockIntent::new(versioned_id, lock_flag) + }) + .collect::>(); + executed.set_resolved_inputs(resolved_inputs); + } else { + let resolved_inputs = versioned_inputs + .into_iter() + .map(|versioned_id| { + // We cannot tell which inputs are written, however since this transaction is a + // reject it does not matter since it will not cause locks. + // We still set resolved inputs because this is used to determine which shards are + // involved. + VersionedSubstateIdLockIntent::new(versioned_id, SubstateLockFlag::Write) + }) + .collect::>(); + executed.set_resolved_inputs(resolved_inputs); + } + + Ok(executed) + }, + Err(err) => Err(err.into()), + } + }) + .await; + + // If this errors, the thread panicked due to a bug + res.map_err(|err| MempoolError::ExecutionThreadPanicked(err.to_string())) } fn new_state_db() -> MemoryStateStore { diff --git a/applications/tari_validator_node/src/p2p/services/mempool/initializer.rs b/applications/tari_validator_node/src/p2p/services/mempool/initializer.rs index 06025b4d1..a8c34600e 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/initializer.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/initializer.rs @@ -20,6 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +use log::*; use tari_dan_app_utilities::transaction_executor::{TransactionExecutor, TransactionProcessorError}; use tari_dan_common_types::PeerAddress; use tari_dan_storage::consensus_models::ExecutedTransaction; @@ -39,6 +40,8 @@ use crate::{ substate_resolver::SubstateResolverError, }; +const LOG_TARGET: &str = "tari::dan::validator_node::mempool"; + pub fn spawn( gossip: Gossip, tx_executed_transactions: mpsc::Sender<(TransactionId, usize)>, @@ -82,6 +85,7 @@ where let handle = MempoolHandle::new(tx_mempool_request); let join_handle = task::spawn(mempool.run()); + debug!(target: LOG_TARGET, "Spawning mempool service (task: {:?})", join_handle); (handle, join_handle) } diff --git a/applications/tari_validator_node/src/p2p/services/mempool/metrics.rs b/applications/tari_validator_node/src/p2p/services/mempool/metrics.rs index 14164833f..49d9e520b 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/metrics.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/metrics.rs @@ -4,8 +4,8 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause use prometheus::{Histogram, HistogramOpts, IntCounter, Registry}; -use tari_dan_storage::consensus_models::{ExecutedTransaction, TransactionRecord}; -use tari_transaction::TransactionId; +use tari_dan_storage::consensus_models::ExecutedTransaction; +use tari_transaction::{Transaction, TransactionId}; use crate::{metrics::CollectorRegister, p2p::services::mempool::MempoolError}; @@ -49,7 +49,7 @@ impl PrometheusMempoolMetrics { } } - pub fn on_transaction_received(&mut self, _transaction: &TransactionRecord) { + pub fn on_transaction_received(&mut self, _transaction: &Transaction) { self.transactions_received.inc(); } diff --git a/applications/tari_validator_node/src/p2p/services/mempool/service.rs b/applications/tari_validator_node/src/p2p/services/mempool/service.rs index 4e268f51a..43c27128f 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/service.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/service.rs @@ -307,26 +307,22 @@ where should_propagate: bool, sender_shard: Option, ) -> Result<(), MempoolError> { - let mut transaction = TransactionRecord::new(transaction); - self.state_store.with_write_tx(|tx| transaction.insert(tx))?; - #[cfg(feature = "metrics")] self.metrics.on_transaction_received(&transaction); - if let Err(e) = self.before_execute_validator.validate(transaction.transaction()).await { + if let Err(e) = self.before_execute_validator.validate(&transaction).await { + let transaction_id = *transaction.id(); self.state_store.with_write_tx(|tx| { - transaction - .set_abort(format!("Mempool validation failed: {}", e)) + TransactionRecord::new(transaction) + .set_abort(format!("Mempool validation failed: {e}")) .update(tx) })?; #[cfg(feature = "metrics")] - self.metrics.on_transaction_validation_error(transaction.id(), &e); + self.metrics.on_transaction_validation_error(&transaction_id, &e); return Err(e); } - let transaction = transaction.into_transaction(); - // Get the shards involved in claim fees. let fee_claims = transaction.fee_claims().collect::>(); @@ -361,36 +357,12 @@ where if is_input_shard || is_output_shard { debug!(target: LOG_TARGET, "🎱 New transaction {} in mempool", transaction.id()); + let transaction = TransactionRecord::new(transaction); + self.state_store.with_write_tx(|tx| transaction.insert(tx))?; + let transaction = transaction.into_transaction(); self.transactions.insert(*transaction.id()); - // The transactions has one or more of its inputs with no version - // This means we skip transaction execution in the mempool, as the execution will happen on consensus - if transaction.has_inputs_without_version() { - info!( - target: LOG_TARGET, - "✅ Transaction {} has inputs without versions. Execution is deferred to consensus to allow input substates to be resolved", - transaction.id(), - ); - - let result = MempoolTransactionExecution { - transaction_id: *transaction.id(), - execution: TransactionExecution::Deferred { - transaction: transaction.clone(), - }, - should_propagate, - sender_shard, - }; - - self.handle_execution_task_complete(result).await?; - } else { - // All the inputs in the transaction have specific versions, so we execute immediately - self.queue_transaction_for_execution( - transaction.clone(), - current_epoch, - should_propagate, - sender_shard, - ); - } + self.queue_transaction_for_execution(transaction.clone(), current_epoch, should_propagate, sender_shard); if should_propagate { // This validator is involved, we to send the transaction to local replicas @@ -407,7 +379,7 @@ where ) .await { - error!( + warn!( target: LOG_TARGET, "Unable to propagate transaction among peers: {}", e @@ -419,7 +391,10 @@ where // Forward to foreign replicas. // We assume that at least f other local replicas receive this transaction and also forward to their // matching replica(s) - let substate_addresses = transaction.involved_shards_iter().collect(); + let substate_addresses = transaction + .all_inputs_iter() + .map(|i| i.or_zero_version().to_substate_address()) + .collect(); if let Err(e) = self .gossip .forward_to_foreign_replicas( @@ -433,7 +408,7 @@ where ) .await { - error!( + warn!( target: LOG_TARGET, "Unable to propagate transaction among peers: {}", e @@ -460,7 +435,7 @@ where }) .await { - error!( + warn!( target: LOG_TARGET, "Unable to propagate transaction among peers: {}", e @@ -494,6 +469,12 @@ where should_propagate, sender_shard, }, + Err(MempoolError::MustDeferExecution { .. }) => MempoolTransactionExecution { + transaction_id, + execution: TransactionExecution::Deferred { transaction }, + should_propagate, + sender_shard, + }, Err(error) => MempoolTransactionExecution { transaction_id, // IO, Database, etc errors @@ -535,28 +516,21 @@ where async fn handle_deferred_execution(&mut self, transaction: Transaction) -> Result<(), MempoolError> { let transaction_id = *transaction.id(); - // TODO: allow deferred transactions - this just rejects the transaction immediately - self.state_store.with_write_tx(|tx| { - TransactionRecord::get(tx.deref_mut(), &transaction_id)? - .set_abort("Transaction requires deferred execution but that is not currently supported") - .update(tx) - })?; - - // let is_consensus_running = self.consensus_handle.get_current_state().is_running(); - // - // let pending_exec_size = self.pending_executions.len(); - // if is_consensus_running && - // // Notify consensus about the transaction - // self.tx_executed_transactions - // .send((transaction_id, pending_exec_size)) - // .await - // .is_err() - // { - // debug!( - // target: LOG_TARGET, - // "Executed transaction channel closed before executed transaction could be sent" - // ); - // } + let is_consensus_running = self.consensus_handle.get_current_state().is_running(); + + let pending_exec_size = self.pending_executions.len(); + if is_consensus_running && + // Notify consensus about the transaction + self.tx_executed_transactions + .send((transaction_id, pending_exec_size)) + .await + .is_err() + { + debug!( + target: LOG_TARGET, + "Executed transaction channel closed before executed transaction could be sent" + ); + } self.transactions.remove(&transaction_id); Ok(()) @@ -628,8 +602,6 @@ where return Ok::<_, MempoolError>(()); } - log::error!(target: LOG_TARGET, "🐞 resolved inputs: {:?}", executed.resolved_inputs()); - assert!(executed.resolved_inputs().is_some(), "Resolved inputs must be set"); executed.update(tx)?; Ok::<_, MempoolError>(()) })?; diff --git a/applications/tari_validator_node/src/p2p/services/mempool/traits.rs b/applications/tari_validator_node/src/p2p/services/mempool/traits.rs index 94f382288..a9bb7bae6 100644 --- a/applications/tari_validator_node/src/p2p/services/mempool/traits.rs +++ b/applications/tari_validator_node/src/p2p/services/mempool/traits.rs @@ -1,6 +1,8 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause +use std::collections::HashSet; + use async_trait::async_trait; use indexmap::IndexMap; use tari_dan_common_types::Epoch; @@ -8,13 +10,23 @@ use tari_engine_types::{ substate::{Substate, SubstateId}, virtual_substate::VirtualSubstates, }; -use tari_transaction::Transaction; +use tari_transaction::{SubstateRequirement, Transaction}; + +pub struct ResolvedSubstates { + pub local: IndexMap, + pub unresolved_foreign: HashSet, +} #[async_trait] pub trait SubstateResolver { type Error: Send + Sync + 'static; - async fn resolve(&self, transaction: &Transaction) -> Result, Self::Error>; + fn try_resolve_local(&self, transaction: &Transaction) -> Result; + + async fn try_resolve_foreign( + &self, + requested_substates: &HashSet, + ) -> Result, Self::Error>; async fn resolve_virtual_substates( &self, diff --git a/applications/tari_validator_node/src/substate_resolver.rs b/applications/tari_validator_node/src/substate_resolver.rs index bdb8e4d8d..4e1c6f10c 100644 --- a/applications/tari_validator_node/src/substate_resolver.rs +++ b/applications/tari_validator_node/src/substate_resolver.rs @@ -17,11 +17,11 @@ use tari_engine_types::{ }; use tari_epoch_manager::{EpochManagerError, EpochManagerReader}; use tari_indexer_lib::{error::IndexerError, substate_cache::SubstateCache, substate_scanner::SubstateScanner}; -use tari_transaction::Transaction; +use tari_transaction::{SubstateRequirement, Transaction}; use tari_validator_node_rpc::client::{SubstateResult, ValidatorNodeClientFactory}; use crate::{ - p2p::services::mempool::SubstateResolver, + p2p::services::mempool::{ResolvedSubstates, SubstateResolver}, virtual_substate::{VirtualSubstateError, VirtualSubstateManager}, }; @@ -57,40 +57,107 @@ where } } - fn resolve_local_substates( - &self, - transaction: &Transaction, - out: &mut IndexMap, - ) -> Result, SubstateResolverError> { - let inputs = transaction.all_input_addresses_iter(); - let (local_substates, missing_shards) = self.store.with_read_tx(|tx| SubstateRecord::get_any(tx, inputs))?; + fn resolve_local_substates(&self, transaction: &Transaction) -> Result { + let mut substates = IndexMap::new(); + let inputs = transaction.all_inputs_substate_ids_iter(); + let (mut found_local_substates, missing_substate_ids) = self + .store + .with_read_tx(|tx| SubstateRecord::get_any_max_version(tx, inputs))?; + + // Reconcile requested inputs with found local substates + let mut missing_substates = HashSet::with_capacity(missing_substate_ids.len()); + for requested_input in transaction.all_inputs_iter() { + if missing_substate_ids.contains(requested_input.substate_id()) { + missing_substates.insert(requested_input); + // Not a local substate, so we will need to fetch it remotely + continue; + } + + match requested_input.version() { + // Specific version requested + Some(requested_version) => { + let maybe_match = found_local_substates + .iter() + .find(|s| s.version() == requested_version && s.substate_id() == requested_input.substate_id()); + + match maybe_match { + Some(substate) => { + if substate.is_destroyed() { + return Err(SubstateResolverError::InputSubstateDowned { + id: requested_input.into_substate_id(), + version: requested_version, + }); + } + // OK + }, + // Requested substate or version not found. We know that the requested substate is not foreign + // because we checked missing_substate_ids + None => { + return Err(SubstateResolverError::InputSubstateDoesNotExist { + substate_requirement: requested_input, + }); + }, + } + }, + // No version specified, so we will use the latest version + None => { + let (pos, substate) = found_local_substates + .iter() + .enumerate() + .find(|(_, s)| s.substate_id() == requested_input.substate_id()) + // This is not possible + .ok_or_else(|| { + error!( + target: LOG_TARGET, + "🐞 BUG: Requested substate {} was not missing but was also not found", + requested_input.substate_id() + ); + SubstateResolverError::InputSubstateDoesNotExist { substate_requirement: requested_input.clone()} + })?; + + if substate.is_destroyed() { + // The requested substate is downed locally, it may be available in a foreign shard so we add it + // to missing + let _substate = found_local_substates.remove(pos); + missing_substates.insert(requested_input); + continue; + } + + // User did not specify the version, so we will use the latest version + // Ok + }, + } + } info!( target: LOG_TARGET, - "Found {} local substates and {} missing shards", - local_substates.len(), - missing_shards.len()); + "Found {} local substates and {} missing substates", + found_local_substates.len(), + missing_substate_ids.len(), + ); - out.extend( - local_substates + substates.extend( + found_local_substates .into_iter() .map(|s| (s.substate_id.clone(), s.into_substate())), ); - Ok(missing_shards) + Ok(ResolvedSubstates { + local: substates, + unresolved_foreign: missing_substates, + }) } async fn resolve_remote_substates( &self, - substate_addresses: HashSet, - out: &mut IndexMap, - ) -> Result<(), SubstateResolverError> { - out.reserve(substate_addresses.len()); - for substate_address in substate_addresses { + requested_substates: &HashSet, + ) -> Result, SubstateResolverError> { + let mut substates = IndexMap::with_capacity(requested_substates.len()); + for substate_req in requested_substates { let timer = Instant::now(); let substate_result = self .scanner - .get_specific_substate_from_committee_by_shard(substate_address) + .get_substate(substate_req.substate_id(), substate_req.version()) .await?; match substate_result { @@ -101,20 +168,20 @@ where id, timer.elapsed().as_millis() ); - out.insert(id, substate); + substates.insert(id, substate); }, SubstateResult::Down { id, version, .. } => { return Err(SubstateResolverError::InputSubstateDowned { id, version }); }, SubstateResult::DoesNotExist => { return Err(SubstateResolverError::InputSubstateDoesNotExist { - address: substate_address, + substate_requirement: substate_req.clone(), }); }, } } - Ok(()) + Ok(substates) } async fn resolve_remote_virtual_substates( @@ -158,17 +225,15 @@ where { type Error = SubstateResolverError; - async fn resolve(&self, transaction: &Transaction) -> Result, Self::Error> { - let mut substates = IndexMap::new(); - - let missing_shards = self.resolve_local_substates(transaction, &mut substates)?; - - // TODO: If any of the missing shards are local we should error early here rather than asking the local - // committee - - self.resolve_remote_substates(missing_shards, &mut substates).await?; + fn try_resolve_local(&self, transaction: &Transaction) -> Result { + self.resolve_local_substates(transaction) + } - Ok(substates) + async fn try_resolve_foreign( + &self, + requested_substates: &HashSet, + ) -> Result, Self::Error> { + self.resolve_remote_substates(requested_substates).await } async fn resolve_virtual_substates( @@ -247,8 +312,8 @@ pub enum SubstateResolverError { StorageError(#[from] StorageError), #[error("Indexer error: {0}")] IndexerError(#[from] IndexerError), - #[error("Input substate does not exist: {address}")] - InputSubstateDoesNotExist { address: SubstateAddress }, + #[error("Input substate does not exist: {substate_requirement}")] + InputSubstateDoesNotExist { substate_requirement: SubstateRequirement }, #[error("Input substate is downed: {id} (version: {version})")] InputSubstateDowned { id: SubstateId, version: u32 }, #[error("Virtual substate error: {0}")] diff --git a/dan_layer/consensus/src/hotstuff/on_inbound_message.rs b/dan_layer/consensus/src/hotstuff/on_inbound_message.rs index b8204943a..994828356 100644 --- a/dan_layer/consensus/src/hotstuff/on_inbound_message.rs +++ b/dan_layer/consensus/src/hotstuff/on_inbound_message.rs @@ -86,7 +86,7 @@ where TConsensusSpec: ConsensusSpec self.process_local_proposal(current_height, msg).await?; }, HotstuffMessage::ForeignProposal(ref proposal) => { - self.check_proposal(proposal.block.clone()).await?; + self.check_proposal(&proposal.block).await?; self.report_message_ready(from, msg)?; }, msg => { @@ -111,18 +111,18 @@ where TConsensusSpec: ConsensusSpec self.message_buffer.clear_buffer(); } - async fn check_proposal(&mut self, block: Block) -> Result, HotStuffError> { - check_base_layer_block_hash::(&block, &self.epoch_manager, &self.config).await?; - check_network(&block, self.network)?; - check_hash_and_height(&block)?; + async fn check_proposal(&self, block: &Block) -> Result<(), HotStuffError> { + check_base_layer_block_hash::(block, &self.epoch_manager, &self.config).await?; + check_network(block, self.network)?; + check_hash_and_height(block)?; let committee_for_block = self .epoch_manager .get_committee_by_validator_public_key(block.epoch(), block.proposed_by()) .await?; - check_proposed_by_leader(&self.leader_strategy, &committee_for_block, &block)?; - check_signature(&block)?; - check_quorum_certificate::(&block, &self.vote_signing_service, &self.epoch_manager).await?; - self.handle_missing_transactions(block).await + check_proposed_by_leader(&self.leader_strategy, &committee_for_block, block)?; + check_signature(block)?; + check_quorum_certificate::(block, &self.vote_signing_service, &self.epoch_manager).await?; + Ok(()) } async fn process_local_proposal( @@ -140,17 +140,18 @@ where TConsensusSpec: ConsensusSpec current_height, ); - // if block.height() < current_height { - // info!( - // target: LOG_TARGET, - // "🔥 Block {} is lower than current height {}. Ignoring.", - // block, - // current_height - // ); - // return Ok(()); - // } - - let Some(ready_block) = self.check_proposal(block).await? else { + if block.height() < current_height { + info!( + target: LOG_TARGET, + "🔥 Block {} is lower than current height {}. Ignoring.", + block, + current_height + ); + return Ok(()); + } + + self.check_proposal(&block).await?; + let Some(ready_block) = self.handle_missing_transactions(block).await? else { // Block not ready return Ok(()); }; diff --git a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql index 934511987..3bef974f3 100644 --- a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql +++ b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql @@ -101,8 +101,10 @@ create table substates destroyed_at timestamp NULL ); --- All shard ids are unique -create unique index substates_uniq_shard_id on substates (address); +-- All addresses are unique +create unique index substates_uniq_address on substates (address); +-- All substate_id, version pairs are unique. This is a common query +create unique index substates_uniq_substate_id_and_version on substates (substate_id, version); -- querying for transaction ids that either Upd or Downd a substate create index substates_idx_created_by_transaction on substates (created_by_transaction); create index substates_idx_destroyed_by_transaction on substates (destroyed_by_transaction) where destroyed_by_transaction is not null; diff --git a/dan_layer/state_store_sqlite/src/lib.rs b/dan_layer/state_store_sqlite/src/lib.rs index 58e1009f0..8a8fadaa3 100644 --- a/dan_layer/state_store_sqlite/src/lib.rs +++ b/dan_layer/state_store_sqlite/src/lib.rs @@ -1,5 +1,6 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause + mod error; mod reader; mod schema; diff --git a/dan_layer/state_store_sqlite/src/reader.rs b/dan_layer/state_store_sqlite/src/reader.rs index b15afc92c..561907b51 100644 --- a/dan_layer/state_store_sqlite/src/reader.rs +++ b/dan_layer/state_store_sqlite/src/reader.rs @@ -10,6 +10,7 @@ use std::{ use bigdecimal::{BigDecimal, ToPrimitive}; use diesel::{ + dsl, query_builder::SqlQuery, sql_query, sql_types::{BigInt, Text}, @@ -60,8 +61,8 @@ use tari_dan_storage::{ StateStoreReadTransaction, StorageError, }; -use tari_engine_types::lock::LockFlag; -use tari_transaction::{TransactionId, VersionedSubstateId}; +use tari_engine_types::{lock::LockFlag, substate::SubstateId}; +use tari_transaction::{SubstateRequirement, TransactionId, VersionedSubstateId}; use tari_utilities::ByteArray; use crate::{ @@ -192,7 +193,7 @@ impl<'a, TAddr: NodeAddressable + Serialize + DeserializeOwned> SqliteStateStore item_size: usize, ) -> String { let len = values.len(); - let mut sql_frag = String::with_capacity(len * item_size + len * 3 + len - 1); + let mut sql_frag = String::with_capacity((len * item_size + len * 3 + len).saturating_sub(1)); for (i, value) in values.enumerate() { sql_frag.push('"'); sql_frag.push_str(value); @@ -1473,18 +1474,91 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor substate.try_into() } - fn substates_get_any(&mut self, addresses: &HashSet) -> Result, StorageError> { + fn substates_get_any( + &mut self, + substate_ids: &HashSet, + ) -> Result, StorageError> { use crate::schema::substates; - let substates = substates::table - .filter(substates::address.eq_any(addresses.iter().map(serialize_hex))) + let mut query = substates::table.into_boxed(); + + for id in substate_ids { + let id_str = id.substate_id.to_string(); + match id.version() { + Some(v) => { + query = query.or_filter(substates::substate_id.eq(id_str).and(substates::version.eq(v as i32))); + }, + None => { + // Select the max known version + query = query.or_filter(substates::substate_id.eq(id_str.clone()).and(substates::version.eq( + dsl::sql("SELECT MAX(version) FROM substates WHERE substate_id = ?").bind::(id_str), + ))); + }, + } + } + + let results = query .get_results::(self.connection()) .map_err(|e| SqliteStorageError::DieselError { operation: "substates_get_any", source: e, })?; - substates.into_iter().map(TryInto::try_into).collect() + results.into_iter().map(TryInto::try_into).collect() + } + + fn substates_get_any_max_version<'a, I: IntoIterator>( + &mut self, + substate_ids: I, + ) -> Result, StorageError> { + use crate::schema::substates; + #[derive(Debug, QueryableByName)] + struct MaxVersionAndId { + #[allow(dead_code)] + #[diesel(sql_type = diesel::sql_types::Nullable)] + max_version: Option, + #[diesel(sql_type = diesel::sql_types::Integer)] + id: i32, + } + + let substate_ids = substate_ids.into_iter().map(ToString::to_string).collect::>(); + if substate_ids.is_empty() { + return Ok(Vec::new()); + } + let frag = self.sql_frag_for_in_statement(substate_ids.iter().map(|s| s.as_str()), 32); + let max_versions_and_ids = sql_query(format!( + r#" + SELECT MAX(version) as max_version, id + FROM substates + WHERE substate_id in ({}) + GROUP BY substate_id"#, + frag + )) + .get_results::(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "substates_get_any_max_version", + source: e, + })?; + + let results = substates::table + .filter(substates::id.eq_any(max_versions_and_ids.iter().map(|m| m.id))) + .get_results::(self.connection()) + .map_err(|e| SqliteStorageError::DieselError { + operation: "substates_get_any_max_version", + source: e, + })?; + + // let results = substates::table + // .group_by(substates::substate_id) + // .select((substates::all_columns, dsl::max(substates::version)) + // .filter(substates::substate_id.eq_any(substate_ids.into_iter().map(ToString::to_string))) + // .get_results::<(sql_models::SubstateRecord, Option)>(self.connection()) + // .map_err(|e| SqliteStorageError::DieselError { + // operation: "substates_get_any_max_version", + // source: e, + // })?; + + results.into_iter().map(TryInto::try_into).collect() } fn substates_any_exist, S: Borrow>( diff --git a/dan_layer/storage/src/consensus_models/substate.rs b/dan_layer/storage/src/consensus_models/substate.rs index 9283d9c0b..27b852445 100644 --- a/dan_layer/storage/src/consensus_models/substate.rs +++ b/dan_layer/storage/src/consensus_models/substate.rs @@ -4,6 +4,7 @@ use std::{ borrow::Borrow, collections::HashSet, + hash::Hash, iter, ops::{DerefMut, RangeInclusive}, }; @@ -16,7 +17,7 @@ use tari_engine_types::{ lock::LockFlag, substate::{Substate, SubstateId, SubstateValue}, }; -use tari_transaction::{TransactionId, VersionedSubstateId}; +use tari_transaction::{SubstateRequirement, TransactionId, VersionedSubstateId}; use crate::{ consensus_models::{Block, BlockId, QcId, QuorumCertificate, VersionedSubstateIdLockIntent}, @@ -95,6 +96,10 @@ impl SubstateRecord { SubstateAddress::from_address(&self.substate_id, self.version) } + pub fn to_substate_requirement(&self) -> SubstateRequirement { + SubstateRequirement::with_version(self.substate_id.clone(), self.version) + } + pub fn substate_id(&self) -> &SubstateId { &self.substate_id } @@ -142,6 +147,10 @@ impl SubstateRecord { pub fn is_destroyed(&self) -> bool { self.destroyed.is_some() } + + pub fn is_up(&self) -> bool { + !self.is_destroyed() + } } impl SubstateRecord { @@ -241,17 +250,30 @@ impl SubstateRecord { tx.substates_get(shard) } - pub fn get_any>( + pub fn get_any>( tx: &mut TTx, shards: I, - ) -> Result<(Vec, HashSet), StorageError> { - let mut shards = shards.into_iter().collect::>(); - let found = tx.substates_get_any(&shards)?; + ) -> Result<(Vec, HashSet), StorageError> { + let mut substate_ids = shards.into_iter().collect::>(); + let found = tx.substates_get_any(&substate_ids)?; + for f in &found { + substate_ids.remove(&f.to_substate_requirement()); + } + + Ok((found, substate_ids)) + } + + pub fn get_any_max_version<'a, TTx: StateStoreReadTransaction + ?Sized, I: IntoIterator>( + tx: &mut TTx, + substate_ids: I, + ) -> Result<(Vec, HashSet<&'a SubstateId>), StorageError> { + let mut substate_ids = substate_ids.into_iter().collect::>(); + let found = tx.substates_get_any_max_version(substate_ids.iter().copied())?; for f in &found { - shards.remove(&f.to_substate_address()); + substate_ids.remove(&f.substate_id); } - Ok((found, shards)) + Ok((found, substate_ids)) } pub fn get_many_within_range>>( diff --git a/dan_layer/storage/src/state_store/mod.rs b/dan_layer/storage/src/state_store/mod.rs index abcfc5152..39b6b1bde 100644 --- a/dan_layer/storage/src/state_store/mod.rs +++ b/dan_layer/storage/src/state_store/mod.rs @@ -10,8 +10,9 @@ use std::{ use serde::{Deserialize, Serialize}; use tari_common_types::types::{FixedHash, PublicKey}; use tari_dan_common_types::{Epoch, NodeAddressable, NodeHeight, SubstateAddress}; +use tari_engine_types::substate::SubstateId; use tari_state_tree::{TreeStore, TreeStoreReader, Version}; -use tari_transaction::{TransactionId, VersionedSubstateId}; +use tari_transaction::{SubstateRequirement, TransactionId, VersionedSubstateId}; #[cfg(feature = "ts")] use ts_rs::TS; @@ -207,7 +208,11 @@ pub trait StateStoreReadTransaction { fn substates_get(&mut self, substate_id: &SubstateAddress) -> Result; fn substates_get_any( &mut self, - substate_ids: &HashSet, + substate_ids: &HashSet, + ) -> Result, StorageError>; + fn substates_get_any_max_version<'a, I: IntoIterator>( + &mut self, + substate_ids: I, ) -> Result, StorageError>; fn substates_any_exist(&mut self, substates: I) -> Result where diff --git a/dan_layer/transaction/src/transaction.rs b/dan_layer/transaction/src/transaction.rs index 328b9c106..1c7be690c 100644 --- a/dan_layer/transaction/src/transaction.rs +++ b/dan_layer/transaction/src/transaction.rs @@ -111,7 +111,7 @@ impl Transaction { } pub fn involved_shards_iter(&self) -> impl Iterator + '_ { - self.all_input_addresses_iter() + self.versioned_input_addresses_iter() } pub fn inputs(&self) -> &IndexSet { @@ -151,7 +151,7 @@ impl Transaction { self.all_inputs_substate_ids_iter().count() } - pub fn all_input_addresses_iter(&self) -> impl Iterator + '_ { + pub fn versioned_input_addresses_iter(&self) -> impl Iterator + '_ { self.input_addresses_iter().chain(self.filled_input_addresses_iter()) } @@ -243,7 +243,7 @@ impl Transaction { } } -#[derive(Debug, Clone, Deserialize, Serialize, Eq, PartialEq, Hash)] +#[derive(Debug, Clone, Deserialize, Serialize)] #[cfg_attr( feature = "ts", derive(ts_rs::TS), @@ -274,6 +274,10 @@ impl SubstateRequirement { &self.substate_id } + pub fn into_substate_id(self) -> SubstateId { + self.substate_id + } + pub fn version(&self) -> Option { self.version } @@ -295,6 +299,13 @@ impl SubstateRequirement { version: v, }) } + + pub fn or_zero_version(self) -> VersionedSubstateId { + VersionedSubstateId { + version: self.version.unwrap_or(0), + substate_id: self.substate_id, + } + } } impl FromStr for SubstateRequirement { @@ -346,6 +357,22 @@ impl> From for SubstateRequirement { } } +impl PartialEq for SubstateRequirement { + fn eq(&self, other: &Self) -> bool { + self.substate_id == other.substate_id + } +} + +impl Eq for SubstateRequirement {} + +// Only consider the substate id in maps. This means that duplicates found if the substate id is the same regardless of +// the version. +impl std::hash::Hash for SubstateRequirement { + fn hash(&self, state: &mut H) { + self.substate_id.hash(state); + } +} + #[derive(Debug, thiserror::Error)] #[error("Failed to parse substate requirement {0}")] pub struct SubstateRequirementParseError(String); diff --git a/integration_tests/tests/features/concurrency.feature b/integration_tests/tests/features/concurrency.feature index 111fb5356..429387a74 100644 --- a/integration_tests/tests/features/concurrency.feature +++ b/integration_tests/tests/features/concurrency.feature @@ -3,44 +3,43 @@ Feature: Concurrency -# TODO: This feature is currently disabled -# @serial -# Scenario: Concurrent calls to the Counter template -# Given fees are disabled -# # Initialize a base node, wallet, miner and VN -# Given a base node BASE -# Given a wallet WALLET connected to base node BASE -# Given a miner MINER connected to base node BASE and wallet WALLET -# -# # Initialize a VN -# Given a validator node VAL_1 connected to base node BASE and wallet daemon WALLET_D -# -# # The wallet must have some funds before the VN sends transactions -# When miner MINER mines 6 new blocks -# When wallet WALLET has at least 20 T -# -# # VN registration -# When validator node VAL_1 sends a registration transaction to base wallet WALLET -# -# # Register the "counter" template -# When base wallet WALLET registers the template "counter" -# When miner MINER mines 13 new blocks -# Then VAL_1 has scanned to height 16 -# Then the validator node VAL_1 is listed as registered -# Then the template "counter" is listed as registered by the validator node VAL_1 -# -# # A file-base CLI account must be created to sign future calls -# When I use an account key named K1 -# -# # Create a new Counter component -# When I create a component COUNTER_1 of template "counter" on VAL_1 using "new" -# When I print the cucumber world -# -# # Send multiple concurrent transactions to increase the counter -# # TODO: when concurrency is fully working, call it with "2 times" or higher -# When I invoke on VAL_1 on component COUNTER_1/components/Counter the method call "increase" concurrently 1 times -# When I print the cucumber world -# -# # Check that the counter has been increased -# # TODO: uncomment when concurrency is fully working -# # When I invoke on VAL_1 on component TX1/components/Counter the method call "value" the result is "2" \ No newline at end of file + @serial + Scenario: Concurrent calls to the Counter template + Given fees are disabled + # Initialize a base node, wallet, miner and VN + Given a base node BASE + Given a wallet WALLET connected to base node BASE + Given a miner MINER connected to base node BASE and wallet WALLET + + # Initialize a VN + Given a validator node VAL_1 connected to base node BASE and wallet daemon WALLET_D + + # The wallet must have some funds before the VN sends transactions + When miner MINER mines 6 new blocks + When wallet WALLET has at least 20 T + + # VN registration + When validator node VAL_1 sends a registration transaction to base wallet WALLET + + # Register the "counter" template + When base wallet WALLET registers the template "counter" + When miner MINER mines 13 new blocks + Then VAL_1 has scanned to height 16 + Then the validator node VAL_1 is listed as registered + Then the template "counter" is listed as registered by the validator node VAL_1 + + # A file-base CLI account must be created to sign future calls + When I use an account key named K1 + + # Create a new Counter component + When I create a component COUNTER_1 of template "counter" on VAL_1 using "new" + When I print the cucumber world + + # Send multiple concurrent transactions to increase the counter + # TODO: when concurrency is fully working, call it with "2 times" or higher + When I invoke on VAL_1 on component COUNTER_1/components/Counter the method call "increase" concurrently 1 times + When I print the cucumber world + + # Check that the counter has been increased + # TODO: uncomment when concurrency is fully working + # When I invoke on VAL_1 on component TX1/components/Counter the method call "value" the result is "2" \ No newline at end of file