Skip to content

Commit

Permalink
fix(mempool): allow local transactions with unversioned inputs (#1027)
Browse files Browse the repository at this point in the history
Description
---
fix(mempool): resolve local unversioned inputs before execution

Motivation and Context
---
Allow local only transactions with unversioned inputs to be resolved in
the mempool

High-level Notes:
- Versioned local and foreign inputs may be safely executed and passed
to consensus. If an input conflicts or has been downed since execution
this transaction will (and must) ABORT.
- Unversioned inputs are more complicated. 
- **Local-only unversioned in mempool**: the mempool resolves all inputs
as local and assigns the current UP version. Since inputs are not locked
yet, another transaction may consume these inputs rendering one of the
transactions to ABORT when it shouldn't. This can be resolved by
executing the local-only transactions within the context of the object
state of the current block.
- **Local-only consensus execution**: The version applicable to the
current leaf proposal which accounts for the uncommitted chain (to
commit block/3-chain) is selected and proposed as LocalOnly. This
implicitly pledges and locks the inputs.
- **Local-only with foreign conflicts in consensus**: ...



How Has This Been Tested?
---
Cucumber test re-added that tests unversioned inputs.

What process can a PR reviewer use to test or verify this change?
---
Submit a transaction with at least one unversioned input local to the
shard

Breaking Changes
---

- [x] None
- [ ] Requires data directory to be deleted
- [ ] Other - Please specify

NOTE: Added an index to the database, that requires the database to be
deleted to be applied. This is non-breaking but does ensure data
integrity and improves the SQLite implementation query performance
  • Loading branch information
sdbondi authored May 2, 2024
1 parent 4ae0774 commit 771de40
Show file tree
Hide file tree
Showing 17 changed files with 473 additions and 256 deletions.
8 changes: 6 additions & 2 deletions applications/tari_indexer/src/transaction_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,18 @@ where
);
let transaction_substate_address = SubstateAddress::for_transaction_receipt(tx_hash.into_array().into());

if transaction.involved_shards_iter().count() == 0 {
if transaction.all_inputs_iter().next().is_none() {
self.try_with_committee(iter::once(transaction_substate_address), |mut client| {
let transaction = transaction.clone();
async move { client.submit_transaction(transaction).await }
})
.await
} else {
self.try_with_committee(transaction.involved_shards_iter(), |mut client| {
let involved = transaction
.all_inputs_iter()
// If there is no version specified, submit to the validator node with version 0
.map(|i| i.or_zero_version().to_substate_address());
self.try_with_committee(involved, |mut client| {
let transaction = transaction.clone();
async move { client.submit_transaction(transaction).await }
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use thiserror::Error;
use tokio::task;

use crate::{
p2p::services::mempool::SubstateResolver,
p2p::services::mempool::{ResolvedSubstates, SubstateResolver},
substate_resolver::{SubstateResolverError, TariSubstateResolver},
virtual_substate::VirtualSubstateError,
};
Expand All @@ -68,6 +68,8 @@ pub enum DryRunTransactionProcessorError {
SubstateResoverError(#[from] SubstateResolverError),
#[error("Virtual substate error: {0}")]
VirtualSubstateError(#[from] VirtualSubstateError),
#[error("Execution thread failed: {0}")]
ExecutionThreadFailed(#[from] task::JoinError),
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -118,14 +120,20 @@ impl DryRunTransactionProcessor {
.resolve_virtual_substates(&transaction, current_epoch)
.await?;

let inputs = self.substate_resolver.resolve(&transaction).await?;
let ResolvedSubstates {
local: inputs,
unresolved_foreign: foreign,
} = self.substate_resolver.try_resolve_local(&transaction)?;
temp_state_store.set_many(inputs)?;
// Dry-run we can request the foreign inputs from validator nodes. The execution result may vary if inputs are
// mutated between the dry-run and live execution.
let foreign_inputs = self.substate_resolver.try_resolve_foreign(&foreign).await?;
temp_state_store.set_many(foreign_inputs)?;

// execute the payload in the WASM engine and return the result
let executed = task::block_in_place(|| {
self.payload_processor
.execute(transaction, temp_state_store, virtual_substates)
})?;
let processor = self.payload_processor.clone();
let executed =
task::spawn_blocking(move || processor.execute(transaction, temp_state_store, virtual_substates)).await??;
let result = executed.into_result();

let fees = &result.finalize.fee_receipt;
Expand Down
13 changes: 11 additions & 2 deletions applications/tari_validator_node/src/p2p/services/mempool/error.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::collections::HashSet;

use indexmap::IndexMap;
use tari_dan_app_utilities::{
template_manager::interface::TemplateManagerError,
transaction_executor::TransactionProcessorError,
};
use tari_dan_common_types::Epoch;
use tari_dan_storage::{consensus_models::TransactionPoolError, StorageError};
use tari_engine_types::substate::{Substate, SubstateId};
use tari_epoch_manager::EpochManagerError;
use tari_networking::NetworkingError;
use tari_transaction::TransactionId;
use tari_transaction::{SubstateRequirement, TransactionId};
use tokio::sync::{mpsc, oneshot};

use crate::{
Expand All @@ -30,7 +34,12 @@ pub enum MempoolError {
#[error("DryRunTransactionProcessor Error: {0}")]
DryRunTransactionProcessorError(#[from] DryRunTransactionProcessorError),
#[error("Execution thread failure: {0}")]
ExecutionThreadFailure(String),
ExecutionThreadPanicked(String),
#[error("Requires consensus for local substates: {local_substates:?}")]
MustDeferExecution {
local_substates: IndexMap<SubstateId, Substate>,
foreign_substates: HashSet<SubstateRequirement>,
},
#[error("SubstateResolver Error: {0}")]
SubstateResolverError(#[from] SubstateResolverError),
#[error("Transaction Execution Error: {0}")]
Expand Down
128 changes: 70 additions & 58 deletions applications/tari_validator_node/src/p2p/services/mempool/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tari_transaction::{Transaction, VersionedSubstateId};
use tokio::task;

use crate::{
p2p::services::mempool::{MempoolError, SubstateResolver},
p2p::services::mempool::{MempoolError, ResolvedSubstates, SubstateResolver},
substate_resolver::SubstateResolverError,
};

Expand Down Expand Up @@ -42,71 +42,83 @@ where
Err(err) => return Err(err.into()),
};

info!(target: LOG_TARGET, "Transaction {} executing. virtual_substates = [{}]", transaction.id(), virtual_substates.keys().map(|addr| addr.to_string()).collect::<Vec<_>>().join(", "));
info!(target: LOG_TARGET, "🎱 Transaction {} found virtual_substates = [{}]", transaction.id(), virtual_substates.keys().map(|addr| addr.to_string()).collect::<Vec<_>>().join(", "));

match substate_resolver.resolve(&transaction).await {
Ok(inputs) => {
let res = task::spawn_blocking(move || {
let versioned_inputs = inputs
.iter()
.map(|(id, substate)| VersionedSubstateId::new(id.clone(), substate.version()))
.collect::<IndexSet<_>>();
let state_db = new_state_db();
state_db.set_many(inputs).expect("memory db is infallible");

match executor.execute(transaction, state_db, virtual_substates) {
Ok(mut executed) => {
// TODO: refactor executor so that resolved inputs are set internally
// Update the resolved inputs to set the specific version, as we know it after execution
if let Some(diff) = executed.result().finalize.accept() {
let resolved_inputs = versioned_inputs
.into_iter()
.map(|versioned_id| {
let lock_flag = if diff.down_iter().any(|(id, _)| *id == versioned_id.substate_id) {
// Update all inputs that were DOWNed to be write locked
SubstateLockFlag::Write
} else {
// Any input not downed, gets a read lock
SubstateLockFlag::Read
};
VersionedSubstateIdLockIntent::new(versioned_id, lock_flag)
})
.collect::<IndexSet<_>>();
executed.set_resolved_inputs(resolved_inputs);
} else {
let resolved_inputs = versioned_inputs
.into_iter()
.map(|versioned_id| {
// We cannot tell which inputs are written, however since this transaction is a
// reject it does not matter since it will not cause locks.
// We still set resolved inputs because this is used to determine which shards are
// involved.
VersionedSubstateIdLockIntent::new(versioned_id, SubstateLockFlag::Write)
})
.collect::<IndexSet<_>>();
executed.set_resolved_inputs(resolved_inputs);
}

Ok(executed)
},
Err(err) => Err(err.into()),
}
})
.await;

// If this errors, the thread panicked due to a bug
res.map_err(|err| MempoolError::ExecutionThreadFailure(err.to_string()))
},
let ResolvedSubstates {
local: local_substates,
unresolved_foreign: foreign,
} = match substate_resolver.try_resolve_local(&transaction) {
Ok(pair) => pair,
// Substates are downed/dont exist
Err(err @ SubstateResolverError::InputSubstateDowned { .. }) |
Err(err @ SubstateResolverError::InputSubstateDoesNotExist { .. }) => {
warn!(target: LOG_TARGET, "One or more invalid input shards for transaction {}: {}", transaction.id(), err);
// Ok(Err(_)) This is not a mempool execution failure, but rather a transaction failure
Ok(Err(err.into()))
// Ok(Err(_)) return that the transaction should be rejected, not an internal mempool execution failure
return Ok(Err(err.into()));
},
// Some other issue - network, db, etc
Err(err) => Err(err.into()),
Err(err) => return Err(err.into()),
};

if !foreign.is_empty() {
info!(target: LOG_TARGET, "Unable to execute transaction {} in the mempool because it has foreign inputs: {:?}", transaction.id(), foreign);
return Ok(Err(MempoolError::MustDeferExecution {
local_substates,
foreign_substates: foreign,
}));
}

info!(target: LOG_TARGET, "🎱 Transaction {} resolved local inputs = [{}]", transaction.id(), local_substates.keys().map(|addr| addr.to_string()).collect::<Vec<_>>().join(", "));

let res = task::spawn_blocking(move || {
let versioned_inputs = local_substates
.iter()
.map(|(id, substate)| VersionedSubstateId::new(id.clone(), substate.version()))
.collect::<IndexSet<_>>();
let state_db = new_state_db();
state_db.set_many(local_substates).expect("memory db is infallible");

match executor.execute(transaction, state_db, virtual_substates) {
Ok(mut executed) => {
// Update the resolved inputs to set the specific version, as we know it after execution
if let Some(diff) = executed.result().finalize.accept() {
let resolved_inputs = versioned_inputs
.into_iter()
.map(|versioned_id| {
let lock_flag = if diff.down_iter().any(|(id, _)| *id == versioned_id.substate_id) {
// Update all inputs that were DOWNed to be write locked
SubstateLockFlag::Write
} else {
// Any input not downed, gets a read lock
SubstateLockFlag::Read
};
VersionedSubstateIdLockIntent::new(versioned_id, lock_flag)
})
.collect::<IndexSet<_>>();
executed.set_resolved_inputs(resolved_inputs);
} else {
let resolved_inputs = versioned_inputs
.into_iter()
.map(|versioned_id| {
// We cannot tell which inputs are written, however since this transaction is a
// reject it does not matter since it will not cause locks.
// We still set resolved inputs because this is used to determine which shards are
// involved.
VersionedSubstateIdLockIntent::new(versioned_id, SubstateLockFlag::Write)
})
.collect::<IndexSet<_>>();
executed.set_resolved_inputs(resolved_inputs);
}

Ok(executed)
},
Err(err) => Err(err.into()),
}
})
.await;

// If this errors, the thread panicked due to a bug
res.map_err(|err| MempoolError::ExecutionThreadPanicked(err.to_string()))
}

fn new_state_db() -> MemoryStateStore {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use log::*;
use tari_dan_app_utilities::transaction_executor::{TransactionExecutor, TransactionProcessorError};
use tari_dan_common_types::PeerAddress;
use tari_dan_storage::consensus_models::ExecutedTransaction;
Expand All @@ -39,6 +40,8 @@ use crate::{
substate_resolver::SubstateResolverError,
};

const LOG_TARGET: &str = "tari::dan::validator_node::mempool";

pub fn spawn<TExecutor, TValidator, TExecutedValidator, TSubstateResolver>(
gossip: Gossip,
tx_executed_transactions: mpsc::Sender<(TransactionId, usize)>,
Expand Down Expand Up @@ -82,6 +85,7 @@ where
let handle = MempoolHandle::new(tx_mempool_request);

let join_handle = task::spawn(mempool.run());
debug!(target: LOG_TARGET, "Spawning mempool service (task: {:?})", join_handle);

(handle, join_handle)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause
use prometheus::{Histogram, HistogramOpts, IntCounter, Registry};
use tari_dan_storage::consensus_models::{ExecutedTransaction, TransactionRecord};
use tari_transaction::TransactionId;
use tari_dan_storage::consensus_models::ExecutedTransaction;
use tari_transaction::{Transaction, TransactionId};

use crate::{metrics::CollectorRegister, p2p::services::mempool::MempoolError};

Expand Down Expand Up @@ -49,7 +49,7 @@ impl PrometheusMempoolMetrics {
}
}

pub fn on_transaction_received(&mut self, _transaction: &TransactionRecord) {
pub fn on_transaction_received(&mut self, _transaction: &Transaction) {
self.transactions_received.inc();
}

Expand Down
Loading

0 comments on commit 771de40

Please sign in to comment.