Skip to content

Commit

Permalink
refactor(consensus)!: block diffs, exec on propose and locks improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed May 20, 2024
1 parent 198e6ec commit 6181528
Show file tree
Hide file tree
Showing 111 changed files with 4,363 additions and 2,987 deletions.
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ convert_case = "0.6.0"
cucumber = "0.21.0"
d3ne = { git = "https://github.com/stringhandler/d3ne-rs.git", tag = "v0.8.0-pre.3" }
dashmap = "5.5.0"
diesel = { version = "2", default-features = false }
diesel_migrations = "2"
diesel = { version = "2.1", default-features = false }
diesel_migrations = "2.1"
digest = "0.10"
dirs = "4.0.0"
env_logger = "0.10.0"
Expand Down
1 change: 1 addition & 0 deletions applications/tari_dan_app_utilities/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ libp2p-identity = { workspace = true }
log = { workspace = true, features = ["std"] }
mini-moka = { workspace = true }
multiaddr = { workspace = true }
indexmap = { workspace = true }
std-semaphore = { workspace = true }
prost = { workspace = true }
rand = { workspace = true }
Expand Down
53 changes: 48 additions & 5 deletions applications/tari_dan_app_utilities/src/transaction_executor.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::{sync::Arc, time::Instant};
use std::{
sync::Arc,
time::{Duration, Instant},
};

use indexmap::{IndexMap, IndexSet};
use log::*;
use tari_common::configuration::Network;
use tari_common_types::types::PublicKey;
Expand All @@ -15,9 +19,10 @@ use tari_dan_engine::{
template::LoadedTemplate,
transaction::{TransactionError, TransactionProcessor},
};
use tari_dan_storage::consensus_models::ExecutedTransaction;
use tari_dan_storage::consensus_models::{SubstateLockFlag, VersionedSubstateIdLockIntent};
use tari_engine_types::{
commit_result::{ExecuteResult, FinalizeResult, RejectReason},
substate::Substate,
virtual_substate::VirtualSubstates,
};
use tari_template_lib::{crypto::RistrettoPublicKeyBytes, prelude::NonFungibleAddress};
Expand All @@ -33,7 +38,40 @@ pub trait TransactionExecutor {
transaction: Transaction,
state_store: MemoryStateStore,
virtual_substates: VirtualSubstates,
) -> Result<ExecutedTransaction, Self::Error>;
) -> Result<ExecutionOutput, Self::Error>;
}

#[derive(Debug, Clone)]
pub struct ExecutionOutput {
pub transaction: Transaction,
pub result: ExecuteResult,
pub outputs: Vec<VersionedSubstateId>,
pub execution_time: Duration,
}

impl ExecutionOutput {
pub fn resolve_inputs(
&self,
inputs: IndexMap<VersionedSubstateId, Substate>,
) -> IndexSet<VersionedSubstateIdLockIntent> {
let mut resolved_inputs = IndexSet::new();
if let Some(diff) = self.result.finalize.accept() {
resolved_inputs = inputs
.into_iter()
.map(|(versioned_id, _)| {
let lock_flag = if diff.down_iter().any(|(id, _)| *id == versioned_id.substate_id) {
// Update all inputs that were DOWNed to be write locked
SubstateLockFlag::Write
} else {
// Any input not downed, gets a read lock
SubstateLockFlag::Read
};
VersionedSubstateIdLockIntent::new(versioned_id, lock_flag)
})
.collect::<IndexSet<_>>();
}
resolved_inputs
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -63,7 +101,7 @@ where TTemplateProvider: TemplateProvider<Template = LoadedTemplate>
transaction: Transaction,
state_store: MemoryStateStore,
virtual_substates: VirtualSubstates,
) -> Result<ExecutedTransaction, Self::Error> {
) -> Result<ExecutionOutput, Self::Error> {
let timer = Instant::now();
// Include ownership token for the signers of this in the auth scope
let owner_token = get_auth_token(transaction.signer_public_key());
Expand Down Expand Up @@ -100,7 +138,12 @@ where TTemplateProvider: TemplateProvider<Template = LoadedTemplate>
.collect::<Vec<_>>()
})
.unwrap_or_default();
Ok(ExecutedTransaction::new(transaction, result, outputs, timer.elapsed()))
Ok(ExecutionOutput {
transaction,
result,
outputs,
execution_time: timer.elapsed(),
})
}
}

Expand Down
4 changes: 2 additions & 2 deletions applications/tari_dan_wallet_cli/src/command/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ pub fn print_substate_diff(diff: &SubstateDiff) {
println!("️🌲 UP substate {} (v{})", address, substate.version(),);
println!(
" 🧩 Substate address: {}",
SubstateAddress::from_address(address, substate.version())
SubstateAddress::from_substate_id(address, substate.version())
);
match substate.substate_value() {
SubstateValue::Component(component) => {
Expand Down Expand Up @@ -522,7 +522,7 @@ pub fn print_substate_diff(diff: &SubstateDiff) {
println!("🗑️ DOWN substate {} v{}", address, version,);
println!(
" 🧩 Substate address: {}",
SubstateAddress::from_address(address, *version)
SubstateAddress::from_substate_id(address, *version)
);
println!();
}
Expand Down
5 changes: 3 additions & 2 deletions applications/tari_indexer/src/dry_run/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ where TSubstateCache: SubstateCache + 'static
state_store.set_many(found_substates)?;

// execute the payload in the WASM engine and return the result
let result = task::block_in_place(|| payload_processor.execute(transaction, state_store, virtual_substates))?;
let exec_output =
task::block_in_place(|| payload_processor.execute(transaction, state_store, virtual_substates))?;

Ok(result.into_result())
Ok(exec_output.result)
}

fn build_payload_processor(
Expand Down
4 changes: 2 additions & 2 deletions applications/tari_indexer/src/transaction_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ where
substate_address: SubstateId,
version: u32,
) -> Result<SubstateResult, TransactionManagerError> {
let shard = SubstateAddress::from_address(&substate_address, version);
let shard = SubstateAddress::from_substate_id(&substate_address, version);

self.try_with_committee(iter::once(shard), |mut client| {
// This double clone looks strange, but it's needed because this function is called in a loop
Expand All @@ -145,7 +145,7 @@ where
async move {
let substate_address = substate_address.clone();
client
.get_substate(SubstateAddress::from_address(&substate_address, version))
.get_substate(SubstateAddress::from_substate_id(&substate_address, version))
.await
}
})
Expand Down
19 changes: 9 additions & 10 deletions applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{fs, io, ops::DerefMut, str::FromStr};
use std::{fs, io, ops::Deref, str::FromStr};

use anyhow::{anyhow, Context};
use futures::{future, FutureExt};
Expand Down Expand Up @@ -81,7 +81,7 @@ use tokio::{sync::mpsc, task::JoinHandle};
#[cfg(feature = "metrics")]
use crate::consensus::metrics::PrometheusConsensusMetrics;
use crate::{
consensus::{self, ConsensusHandle, TariDanBlockTransactionExecutorBuilder},
consensus::{self, ConsensusHandle, TariDanBlockTransactionExecutor},
dry_run_transaction_processor::DryRunTransactionProcessor,
p2p::{
create_tari_validator_node_rpc_service,
Expand Down Expand Up @@ -245,8 +245,7 @@ pub async fn spawn_services(
let outbound_messaging =
ConsensusOutboundMessaging::new(loopback_sender, networking.clone(), message_logger.clone());

let transaction_executor_builder =
TariDanBlockTransactionExecutorBuilder::new(epoch_manager.clone(), payload_processor.clone());
let transaction_executor = TariDanBlockTransactionExecutor::new(epoch_manager.clone(), payload_processor.clone());

#[cfg(feature = "metrics")]
let metrics = PrometheusConsensusMetrics::new(state_store.clone(), metrics_registry);
Expand All @@ -264,7 +263,7 @@ pub async fn spawn_services(
validator_node_client_factory.clone(),
metrics,
shutdown.clone(),
transaction_executor_builder,
transaction_executor,
consensus_constants.clone(),
)
.await;
Expand Down Expand Up @@ -448,16 +447,16 @@ async fn spawn_p2p_rpc(
// TODO: Figure out the best way to have the engine shard store mirror these bootstrapped states.
fn bootstrap_state<TTx>(tx: &mut TTx, network: Network) -> Result<(), StorageError>
where
TTx: StateStoreWriteTransaction + DerefMut,
TTx: StateStoreWriteTransaction + Deref,
TTx::Target: StateStoreReadTransaction,
TTx::Addr: NodeAddressable + Serialize,
{
let genesis_block = Block::genesis(network);
let substate_id = SubstateId::Resource(PUBLIC_IDENTITY_RESOURCE_ADDRESS);
let substate_address = SubstateAddress::from_address(&substate_id, 0);
let substate_address = SubstateAddress::from_substate_id(&substate_id, 0);
let mut metadata: Metadata = Default::default();
metadata.insert(TOKEN_SYMBOL, "ID".to_string());
if !SubstateRecord::exists(tx.deref_mut(), &substate_address)? {
if !SubstateRecord::exists(&**tx, &substate_address)? {
// Create the resource for public identity
SubstateRecord {
substate_id,
Expand All @@ -484,10 +483,10 @@ where
}

let substate_id = SubstateId::Resource(CONFIDENTIAL_TARI_RESOURCE_ADDRESS);
let substate_address = SubstateAddress::from_address(&substate_id, 0);
let substate_address = SubstateAddress::from_substate_id(&substate_id, 0);
let mut metadata = Metadata::new();
metadata.insert(TOKEN_SYMBOL, "tXTR2".to_string());
if !SubstateRecord::exists(tx.deref_mut(), &substate_address)? {
if !SubstateRecord::exists(&**tx, &substate_address)? {
SubstateRecord {
substate_id,
version: 0,
Expand Down
Loading

0 comments on commit 6181528

Please sign in to comment.