Skip to content

Commit

Permalink
Merge branch 'development' into build_dockers_test1
Browse files Browse the repository at this point in the history
  • Loading branch information
stringhandler authored May 2, 2024
2 parents 5c8024c + 771de40 commit e378ebf
Show file tree
Hide file tree
Showing 17 changed files with 473 additions and 256 deletions.
8 changes: 6 additions & 2 deletions applications/tari_indexer/src/transaction_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,18 @@ where
);
let transaction_substate_address = SubstateAddress::for_transaction_receipt(tx_hash.into_array().into());

if transaction.involved_shards_iter().count() == 0 {
if transaction.all_inputs_iter().next().is_none() {
self.try_with_committee(iter::once(transaction_substate_address), |mut client| {
let transaction = transaction.clone();
async move { client.submit_transaction(transaction).await }
})
.await
} else {
self.try_with_committee(transaction.involved_shards_iter(), |mut client| {
let involved = transaction
.all_inputs_iter()
// If there is no version specified, submit to the validator node with version 0
.map(|i| i.or_zero_version().to_substate_address());
self.try_with_committee(involved, |mut client| {
let transaction = transaction.clone();
async move { client.submit_transaction(transaction).await }
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use thiserror::Error;
use tokio::task;

use crate::{
p2p::services::mempool::SubstateResolver,
p2p::services::mempool::{ResolvedSubstates, SubstateResolver},
substate_resolver::{SubstateResolverError, TariSubstateResolver},
virtual_substate::VirtualSubstateError,
};
Expand All @@ -68,6 +68,8 @@ pub enum DryRunTransactionProcessorError {
SubstateResoverError(#[from] SubstateResolverError),
#[error("Virtual substate error: {0}")]
VirtualSubstateError(#[from] VirtualSubstateError),
#[error("Execution thread failed: {0}")]
ExecutionThreadFailed(#[from] task::JoinError),
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -118,14 +120,20 @@ impl DryRunTransactionProcessor {
.resolve_virtual_substates(&transaction, current_epoch)
.await?;

let inputs = self.substate_resolver.resolve(&transaction).await?;
let ResolvedSubstates {
local: inputs,
unresolved_foreign: foreign,
} = self.substate_resolver.try_resolve_local(&transaction)?;
temp_state_store.set_many(inputs)?;
// Dry-run we can request the foreign inputs from validator nodes. The execution result may vary if inputs are
// mutated between the dry-run and live execution.
let foreign_inputs = self.substate_resolver.try_resolve_foreign(&foreign).await?;
temp_state_store.set_many(foreign_inputs)?;

// execute the payload in the WASM engine and return the result
let executed = task::block_in_place(|| {
self.payload_processor
.execute(transaction, temp_state_store, virtual_substates)
})?;
let processor = self.payload_processor.clone();
let executed =
task::spawn_blocking(move || processor.execute(transaction, temp_state_store, virtual_substates)).await??;
let result = executed.into_result();

let fees = &result.finalize.fee_receipt;
Expand Down
13 changes: 11 additions & 2 deletions applications/tari_validator_node/src/p2p/services/mempool/error.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::collections::HashSet;

use indexmap::IndexMap;
use tari_dan_app_utilities::{
template_manager::interface::TemplateManagerError,
transaction_executor::TransactionProcessorError,
};
use tari_dan_common_types::Epoch;
use tari_dan_storage::{consensus_models::TransactionPoolError, StorageError};
use tari_engine_types::substate::{Substate, SubstateId};
use tari_epoch_manager::EpochManagerError;
use tari_networking::NetworkingError;
use tari_transaction::TransactionId;
use tari_transaction::{SubstateRequirement, TransactionId};
use tokio::sync::{mpsc, oneshot};

use crate::{
Expand All @@ -30,7 +34,12 @@ pub enum MempoolError {
#[error("DryRunTransactionProcessor Error: {0}")]
DryRunTransactionProcessorError(#[from] DryRunTransactionProcessorError),
#[error("Execution thread failure: {0}")]
ExecutionThreadFailure(String),
ExecutionThreadPanicked(String),
#[error("Requires consensus for local substates: {local_substates:?}")]
MustDeferExecution {
local_substates: IndexMap<SubstateId, Substate>,
foreign_substates: HashSet<SubstateRequirement>,
},
#[error("SubstateResolver Error: {0}")]
SubstateResolverError(#[from] SubstateResolverError),
#[error("Transaction Execution Error: {0}")]
Expand Down
128 changes: 70 additions & 58 deletions applications/tari_validator_node/src/p2p/services/mempool/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tari_transaction::{Transaction, VersionedSubstateId};
use tokio::task;

use crate::{
p2p::services::mempool::{MempoolError, SubstateResolver},
p2p::services::mempool::{MempoolError, ResolvedSubstates, SubstateResolver},
substate_resolver::SubstateResolverError,
};

Expand Down Expand Up @@ -42,71 +42,83 @@ where
Err(err) => return Err(err.into()),
};

info!(target: LOG_TARGET, "Transaction {} executing. virtual_substates = [{}]", transaction.id(), virtual_substates.keys().map(|addr| addr.to_string()).collect::<Vec<_>>().join(", "));
info!(target: LOG_TARGET, "🎱 Transaction {} found virtual_substates = [{}]", transaction.id(), virtual_substates.keys().map(|addr| addr.to_string()).collect::<Vec<_>>().join(", "));

match substate_resolver.resolve(&transaction).await {
Ok(inputs) => {
let res = task::spawn_blocking(move || {
let versioned_inputs = inputs
.iter()
.map(|(id, substate)| VersionedSubstateId::new(id.clone(), substate.version()))
.collect::<IndexSet<_>>();
let state_db = new_state_db();
state_db.set_many(inputs).expect("memory db is infallible");

match executor.execute(transaction, state_db, virtual_substates) {
Ok(mut executed) => {
// TODO: refactor executor so that resolved inputs are set internally
// Update the resolved inputs to set the specific version, as we know it after execution
if let Some(diff) = executed.result().finalize.accept() {
let resolved_inputs = versioned_inputs
.into_iter()
.map(|versioned_id| {
let lock_flag = if diff.down_iter().any(|(id, _)| *id == versioned_id.substate_id) {
// Update all inputs that were DOWNed to be write locked
SubstateLockFlag::Write
} else {
// Any input not downed, gets a read lock
SubstateLockFlag::Read
};
VersionedSubstateIdLockIntent::new(versioned_id, lock_flag)
})
.collect::<IndexSet<_>>();
executed.set_resolved_inputs(resolved_inputs);
} else {
let resolved_inputs = versioned_inputs
.into_iter()
.map(|versioned_id| {
// We cannot tell which inputs are written, however since this transaction is a
// reject it does not matter since it will not cause locks.
// We still set resolved inputs because this is used to determine which shards are
// involved.
VersionedSubstateIdLockIntent::new(versioned_id, SubstateLockFlag::Write)
})
.collect::<IndexSet<_>>();
executed.set_resolved_inputs(resolved_inputs);
}

Ok(executed)
},
Err(err) => Err(err.into()),
}
})
.await;

// If this errors, the thread panicked due to a bug
res.map_err(|err| MempoolError::ExecutionThreadFailure(err.to_string()))
},
let ResolvedSubstates {
local: local_substates,
unresolved_foreign: foreign,
} = match substate_resolver.try_resolve_local(&transaction) {
Ok(pair) => pair,
// Substates are downed/dont exist
Err(err @ SubstateResolverError::InputSubstateDowned { .. }) |
Err(err @ SubstateResolverError::InputSubstateDoesNotExist { .. }) => {
warn!(target: LOG_TARGET, "One or more invalid input shards for transaction {}: {}", transaction.id(), err);
// Ok(Err(_)) This is not a mempool execution failure, but rather a transaction failure
Ok(Err(err.into()))
// Ok(Err(_)) return that the transaction should be rejected, not an internal mempool execution failure
return Ok(Err(err.into()));
},
// Some other issue - network, db, etc
Err(err) => Err(err.into()),
Err(err) => return Err(err.into()),
};

if !foreign.is_empty() {
info!(target: LOG_TARGET, "Unable to execute transaction {} in the mempool because it has foreign inputs: {:?}", transaction.id(), foreign);
return Ok(Err(MempoolError::MustDeferExecution {
local_substates,
foreign_substates: foreign,
}));
}

info!(target: LOG_TARGET, "🎱 Transaction {} resolved local inputs = [{}]", transaction.id(), local_substates.keys().map(|addr| addr.to_string()).collect::<Vec<_>>().join(", "));

let res = task::spawn_blocking(move || {
let versioned_inputs = local_substates
.iter()
.map(|(id, substate)| VersionedSubstateId::new(id.clone(), substate.version()))
.collect::<IndexSet<_>>();
let state_db = new_state_db();
state_db.set_many(local_substates).expect("memory db is infallible");

match executor.execute(transaction, state_db, virtual_substates) {
Ok(mut executed) => {
// Update the resolved inputs to set the specific version, as we know it after execution
if let Some(diff) = executed.result().finalize.accept() {
let resolved_inputs = versioned_inputs
.into_iter()
.map(|versioned_id| {
let lock_flag = if diff.down_iter().any(|(id, _)| *id == versioned_id.substate_id) {
// Update all inputs that were DOWNed to be write locked
SubstateLockFlag::Write
} else {
// Any input not downed, gets a read lock
SubstateLockFlag::Read
};
VersionedSubstateIdLockIntent::new(versioned_id, lock_flag)
})
.collect::<IndexSet<_>>();
executed.set_resolved_inputs(resolved_inputs);
} else {
let resolved_inputs = versioned_inputs
.into_iter()
.map(|versioned_id| {
// We cannot tell which inputs are written, however since this transaction is a
// reject it does not matter since it will not cause locks.
// We still set resolved inputs because this is used to determine which shards are
// involved.
VersionedSubstateIdLockIntent::new(versioned_id, SubstateLockFlag::Write)
})
.collect::<IndexSet<_>>();
executed.set_resolved_inputs(resolved_inputs);
}

Ok(executed)
},
Err(err) => Err(err.into()),
}
})
.await;

// If this errors, the thread panicked due to a bug
res.map_err(|err| MempoolError::ExecutionThreadPanicked(err.to_string()))
}

fn new_state_db() -> MemoryStateStore {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use log::*;
use tari_dan_app_utilities::transaction_executor::{TransactionExecutor, TransactionProcessorError};
use tari_dan_common_types::PeerAddress;
use tari_dan_storage::consensus_models::ExecutedTransaction;
Expand All @@ -39,6 +40,8 @@ use crate::{
substate_resolver::SubstateResolverError,
};

const LOG_TARGET: &str = "tari::dan::validator_node::mempool";

pub fn spawn<TExecutor, TValidator, TExecutedValidator, TSubstateResolver>(
gossip: Gossip,
tx_executed_transactions: mpsc::Sender<(TransactionId, usize)>,
Expand Down Expand Up @@ -82,6 +85,7 @@ where
let handle = MempoolHandle::new(tx_mempool_request);

let join_handle = task::spawn(mempool.run());
debug!(target: LOG_TARGET, "Spawning mempool service (task: {:?})", join_handle);

(handle, join_handle)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause
use prometheus::{Histogram, HistogramOpts, IntCounter, Registry};
use tari_dan_storage::consensus_models::{ExecutedTransaction, TransactionRecord};
use tari_transaction::TransactionId;
use tari_dan_storage::consensus_models::ExecutedTransaction;
use tari_transaction::{Transaction, TransactionId};

use crate::{metrics::CollectorRegister, p2p::services::mempool::MempoolError};

Expand Down Expand Up @@ -49,7 +49,7 @@ impl PrometheusMempoolMetrics {
}
}

pub fn on_transaction_received(&mut self, _transaction: &TransactionRecord) {
pub fn on_transaction_received(&mut self, _transaction: &Transaction) {
self.transactions_received.inc();
}

Expand Down
Loading

0 comments on commit e378ebf

Please sign in to comment.