diff --git a/Cargo.lock b/Cargo.lock index 29355c805..f6738465c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1232,6 +1232,32 @@ dependencies = [ "serde", ] +[[package]] +name = "cacache" +version = "12.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "142316461ed3a3dfcba10417317472da5bfd0461e4d276bf7c07b330766d9490" +dependencies = [ + "async-std", + "digest 0.10.7", + "either", + "futures 0.3.29", + "hex", + "libc", + "memmap2", + "miette", + "reflink-copy", + "serde", + "serde_derive", + "serde_json", + "sha1", + "sha2", + "ssri", + "tempfile", + "thiserror", + "walkdir", +] + [[package]] name = "camellia" version = "0.1.0" @@ -3547,7 +3573,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows-core", + "windows-core 0.51.1", ] [[package]] @@ -4429,6 +4455,29 @@ dependencies = [ "zeroize", ] +[[package]] +name = "miette" +version = "5.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59bb584eaeeab6bd0226ccf3509a69d7936d148cf3d036ad350abe35e8c6856e" +dependencies = [ + "miette-derive", + "once_cell", + "thiserror", + "unicode-width", +] + +[[package]] +name = "miette-derive" +version = "5.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49e7bc1560b95a3c4a25d03de42fe76ca718ab92d1a22a55b9b4cf67b3ae635c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.39", +] + [[package]] name = "migrations_internals" version = "2.1.0" @@ -6071,6 +6120,17 @@ dependencies = [ "thiserror", ] +[[package]] +name = "reflink-copy" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "248fbed6f59e99c8ef6c6ecadc9b09c6e93713b8f921d73e41d6ed6017bf0624" +dependencies = [ + "cfg-if", + "rustix 0.38.25", + "windows", +] + [[package]] name = "regalloc" version = "0.0.34" @@ -6871,6 +6931,17 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sha-1" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest 0.10.7", +] + [[package]] name = "sha1" version = "0.10.6" @@ -7119,6 +7190,23 @@ dependencies = [ "der", ] +[[package]] +name = "ssri" +version = "9.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da7a2b3c2bc9693bcb40870c4e9b5bf0d79f9cb46273321bf855ec513e919082" +dependencies = [ + "base64 0.21.5", + "digest 0.10.7", + "hex", + "miette", + "serde", + "sha-1", + "sha2", + "thiserror", + "xxhash-rust", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -7766,7 +7854,9 @@ name = "tari_dan_app_utilities" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "bytes 1.5.0", + "cacache", "chrono", "dashmap", "futures 0.3.29", @@ -7778,6 +7868,7 @@ dependencies = [ "serde_json", "std-semaphore", "tari_base_node_client", + "tari_bor", "tari_common_types", "tari_core", "tari_crypto", @@ -7787,6 +7878,7 @@ dependencies = [ "tari_dan_storage_sqlite", "tari_engine_types", "tari_epoch_manager", + "tari_indexer_lib", "tari_shutdown", "tari_state_store_sqlite", "tari_template_builtin", @@ -8162,6 +8254,7 @@ dependencies = [ name = "tari_indexer_lib" version = "0.50.0-pre.0" dependencies = [ + "async-trait", "log", "rand", "serde", @@ -8172,6 +8265,7 @@ dependencies = [ "tari_transaction", "tari_validator_node_rpc", "thiserror", + "tokio", ] [[package]] @@ -10236,6 +10330,16 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" +dependencies = [ + "windows-core 0.52.0", + "windows-targets 0.52.0", +] + [[package]] name = "windows-core" version = "0.51.1" @@ -10245,6 +10349,15 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.0", +] + [[package]] name = "windows-sys" version = "0.33.0" @@ -10306,6 +10419,21 @@ dependencies = [ "windows_x86_64_msvc 0.48.5", ] +[[package]] +name = "windows-targets" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +dependencies = [ + "windows_aarch64_gnullvm 0.52.0", + "windows_aarch64_msvc 0.52.0", + "windows_i686_gnu 0.52.0", + "windows_i686_msvc 0.52.0", + "windows_x86_64_gnu 0.52.0", + "windows_x86_64_gnullvm 0.52.0", + "windows_x86_64_msvc 0.52.0", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.42.2" @@ -10318,6 +10446,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" + [[package]] name = "windows_aarch64_msvc" version = "0.33.0" @@ -10336,6 +10470,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" + [[package]] name = "windows_i686_gnu" version = "0.33.0" @@ -10354,6 +10494,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" +[[package]] +name = "windows_i686_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" + [[package]] name = "windows_i686_msvc" version = "0.33.0" @@ -10372,6 +10518,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" +[[package]] +name = "windows_i686_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" + [[package]] name = "windows_x86_64_gnu" version = "0.33.0" @@ -10390,6 +10542,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" + [[package]] name = "windows_x86_64_gnullvm" version = "0.42.2" @@ -10402,6 +10560,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" + [[package]] name = "windows_x86_64_msvc" version = "0.33.0" @@ -10420,6 +10584,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" + [[package]] name = "winnow" version = "0.5.19" @@ -10478,6 +10648,12 @@ dependencies = [ "time", ] +[[package]] +name = "xxhash-rust" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9828b178da53440fa9c766a3d2f73f7cf5d0ac1fe3980c1e5018d899fd19e07b" + [[package]] name = "yaml-rust" version = "0.4.5" diff --git a/applications/tari_dan_app_utilities/Cargo.toml b/applications/tari_dan_app_utilities/Cargo.toml index fd8f3e448..940fea423 100644 --- a/applications/tari_dan_app_utilities/Cargo.toml +++ b/applications/tari_dan_app_utilities/Cargo.toml @@ -27,9 +27,13 @@ tari_template_builtin = { path = "../../dan_layer/template_builtin" } tari_template_lib = { path = "../../dan_layer/template_lib" } tari_transaction = { path = "../../dan_layer/transaction" } tari_validator_node_client = { path = "../../clients/validator_node_client" } +tari_bor = { path = "../../dan_layer/tari_bor" } +tari_indexer_lib = { path = "../../dan_layer/indexer_lib" } anyhow = "1.0.53" +async-trait = "0.1.74" bytes = "1" +cacache = "12.0.0" chrono = "0.4.22" futures = { version = "^0.3.1" } log = { version = "0.4.8", features = ["std"] } diff --git a/applications/tari_dan_app_utilities/src/lib.rs b/applications/tari_dan_app_utilities/src/lib.rs index 85634fb29..4747c6ae3 100644 --- a/applications/tari_dan_app_utilities/src/lib.rs +++ b/applications/tari_dan_app_utilities/src/lib.rs @@ -22,5 +22,6 @@ pub mod base_layer_scanner; pub mod consensus_constants; +pub mod substate_file_cache; pub mod template_manager; pub mod transaction_executor; diff --git a/applications/tari_dan_app_utilities/src/substate_file_cache.rs b/applications/tari_dan_app_utilities/src/substate_file_cache.rs new file mode 100644 index 000000000..57f145c18 --- /dev/null +++ b/applications/tari_dan_app_utilities/src/substate_file_cache.rs @@ -0,0 +1,77 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::{fs, path::PathBuf}; + +use async_trait::async_trait; +use tari_bor::{decode, encode}; +use tari_indexer_lib::substate_cache::{SubstateCache, SubstateCacheEntry, SubstateCacheError}; + +#[derive(Debug, Clone)] +pub struct SubstateFileCache { + cache_dir_path: String, +} + +impl SubstateFileCache { + pub fn new(path_buf: PathBuf) -> Result { + let cache_dir_path = path_buf + .into_os_string() + .into_string() + .map_err(|_| SubstateCacheError("Invalid substate cache path".to_string()))?; + + fs::create_dir_all(&cache_dir_path) + .map_err(|e| SubstateCacheError(format!("Error creating the cache directory: {}", e)))?; + + Ok(Self { cache_dir_path }) + } +} + +#[async_trait] +impl SubstateCache for SubstateFileCache { + async fn read(&self, address: String) -> Result, SubstateCacheError> { + let res = cacache::read(&self.cache_dir_path, address).await; + match res { + Ok(value) => { + // cache hit + let entry = decode::(&value).map_err(|e| SubstateCacheError(e.to_string()))?; + return Ok(Some(entry)); + }, + Err(e) => { + // cache miss + if let cacache::Error::EntryNotFound(_, _) = e { + return Ok(None); + // cache error + } else { + return Err(SubstateCacheError(format!("{}", e))); + } + }, + } + } + + async fn write(&self, address: String, entry: &SubstateCacheEntry) -> Result<(), SubstateCacheError> { + let encoded_entry = encode(&entry).map_err(|e| SubstateCacheError(e.to_string()))?; + cacache::write(&self.cache_dir_path, address, encoded_entry) + .await + .map_err(|e| SubstateCacheError(format!("{}", e)))?; + Ok(()) + } +} diff --git a/applications/tari_indexer/src/dry_run/processor.rs b/applications/tari_indexer/src/dry_run/processor.rs index a3edd3a3c..c7ea08de9 100644 --- a/applications/tari_indexer/src/dry_run/processor.rs +++ b/applications/tari_indexer/src/dry_run/processor.rs @@ -41,7 +41,11 @@ use tari_engine_types::{ virtual_substate::{VirtualSubstate, VirtualSubstateAddress}, }; use tari_epoch_manager::EpochManagerReader; -use tari_indexer_lib::{substate_scanner::SubstateScanner, transaction_autofiller::TransactionAutofiller}; +use tari_indexer_lib::{ + substate_cache::SubstateCache, + substate_scanner::SubstateScanner, + transaction_autofiller::TransactionAutofiller, +}; use tari_transaction::{SubstateRequirement, Transaction}; use tari_validator_node_rpc::client::{SubstateResult, ValidatorNodeClientFactory, ValidatorNodeRpcClient}; use tokio::task; @@ -50,23 +54,25 @@ use crate::dry_run::error::DryRunTransactionProcessorError; const LOG_TARGET: &str = "tari::indexer::dry_run_transaction_processor"; -pub struct DryRunTransactionProcessor { +pub struct DryRunTransactionProcessor { epoch_manager: TEpochManager, client_provider: TClientFactory, - transaction_autofiller: TransactionAutofiller, + transaction_autofiller: TransactionAutofiller, template_manager: TemplateManager, } -impl DryRunTransactionProcessor +impl + DryRunTransactionProcessor where - TEpochManager: EpochManagerReader, - TClientFactory: ValidatorNodeClientFactory, + TEpochManager: EpochManagerReader + 'static, + TClientFactory: ValidatorNodeClientFactory + 'static, ::Error: IsNotFoundError, + TSubstateCache: SubstateCache + 'static, { pub fn new( epoch_manager: TEpochManager, client_provider: TClientFactory, - substate_scanner: Arc>, + substate_scanner: Arc>, template_manager: TemplateManager, ) -> Self { let transaction_autofiller = TransactionAutofiller::new(substate_scanner); diff --git a/applications/tari_indexer/src/json_rpc/handlers.rs b/applications/tari_indexer/src/json_rpc/handlers.rs index 82be2f4f3..ac06acd5b 100644 --- a/applications/tari_indexer/src/json_rpc/handlers.rs +++ b/applications/tari_indexer/src/json_rpc/handlers.rs @@ -40,6 +40,7 @@ use tari_comms::{ NodeIdentity, }; use tari_crypto::tari_utilities::hex::Hex; +use tari_dan_app_utilities::substate_file_cache::SubstateFileCache; use tari_dan_common_types::{optional::Optional, Epoch}; use tari_dan_storage::consensus_models::Decision; use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerReader}; @@ -103,8 +104,9 @@ pub struct JsonRpcHandlers { base_node_client: GrpcBaseNodeClient, substate_manager: Arc, epoch_manager: EpochManagerHandle, - transaction_manager: TransactionManager, - dry_run_transaction_processor: DryRunTransactionProcessor, + transaction_manager: TransactionManager, + dry_run_transaction_processor: + DryRunTransactionProcessor, } impl JsonRpcHandlers { @@ -113,10 +115,15 @@ impl JsonRpcHandlers { services: &Services, base_node_client: GrpcBaseNodeClient, substate_manager: Arc, - transaction_manager: TransactionManager, + transaction_manager: TransactionManager< + EpochManagerHandle, + TariCommsValidatorNodeClientFactory, + SubstateFileCache, + >, dry_run_transaction_processor: DryRunTransactionProcessor< EpochManagerHandle, TariCommsValidatorNodeClientFactory, + SubstateFileCache, >, ) -> Self { Self { diff --git a/applications/tari_indexer/src/lib.rs b/applications/tari_indexer/src/lib.rs index 8e92492ed..5489d2dcd 100644 --- a/applications/tari_indexer/src/lib.rs +++ b/applications/tari_indexer/src/lib.rs @@ -51,7 +51,7 @@ use tari_common::{ exit_codes::{ExitCode, ExitError}, }; use tari_comms::peer_manager::PeerFeatures; -use tari_dan_app_utilities::consensus_constants::ConsensusConstants; +use tari_dan_app_utilities::{consensus_constants::ConsensusConstants, substate_file_cache::SubstateFileCache}; use tari_dan_storage::global::DbFactory; use tari_dan_storage_sqlite::SqliteDbFactory; use tari_indexer_lib::substate_scanner::SubstateScanner; @@ -96,9 +96,14 @@ pub async fn run_indexer(config: ApplicationConfig, mut shutdown_signal: Shutdow ) .await?; + let substate_cache_dir = config.common.base_path.join("substate_cache"); + let substate_cache = SubstateFileCache::new(substate_cache_dir) + .map_err(|e| ExitError::new(ExitCode::ConfigError, format!("Substate cache error: {}", e)))?; + let dan_layer_scanner = Arc::new(SubstateScanner::new( services.epoch_manager.clone(), services.validator_node_client_factory.clone(), + substate_cache, )); let substate_manager = Arc::new(SubstateManager::new( diff --git a/applications/tari_indexer/src/substate_manager.rs b/applications/tari_indexer/src/substate_manager.rs index 9c6f00a0e..71692a3bb 100644 --- a/applications/tari_indexer/src/substate_manager.rs +++ b/applications/tari_indexer/src/substate_manager.rs @@ -27,6 +27,7 @@ use log::info; use serde::{Deserialize, Serialize}; use tari_common_types::types::FixedHash; use tari_crypto::tari_utilities::message_format::MessageFormat; +use tari_dan_app_utilities::substate_file_cache::SubstateFileCache; use tari_engine_types::{ events::Event, substate::{Substate, SubstateAddress}, @@ -79,13 +80,15 @@ pub struct EventResponse { } pub struct SubstateManager { - substate_scanner: Arc>, + substate_scanner: Arc>, substate_store: SqliteSubstateStore, } impl SubstateManager { pub fn new( - dan_layer_scanner: Arc>, + dan_layer_scanner: Arc< + SubstateScanner, + >, substate_store: SqliteSubstateStore, ) -> Self { Self { diff --git a/applications/tari_indexer/src/transaction_manager/mod.rs b/applications/tari_indexer/src/transaction_manager/mod.rs index 01282c9e2..787166169 100644 --- a/applications/tari_indexer/src/transaction_manager/mod.rs +++ b/applications/tari_indexer/src/transaction_manager/mod.rs @@ -32,7 +32,11 @@ use tari_dan_common_types::{ }; use tari_engine_types::substate::SubstateAddress; use tari_epoch_manager::EpochManagerReader; -use tari_indexer_lib::{substate_scanner::SubstateScanner, transaction_autofiller::TransactionAutofiller}; +use tari_indexer_lib::{ + substate_cache::SubstateCache, + substate_scanner::SubstateScanner, + transaction_autofiller::TransactionAutofiller, +}; use tari_transaction::{SubstateRequirement, Transaction, TransactionId}; use tari_validator_node_rpc::client::{ SubstateResult, @@ -45,23 +49,25 @@ use crate::transaction_manager::error::TransactionManagerError; const LOG_TARGET: &str = "tari::indexer::transaction_manager"; -pub struct TransactionManager { +pub struct TransactionManager { epoch_manager: TEpochManager, client_provider: TClientFactory, - transaction_autofiller: TransactionAutofiller, + transaction_autofiller: TransactionAutofiller, } -impl TransactionManager +impl + TransactionManager where - TAddr: NodeAddressable, - TEpochManager: EpochManagerReader, - TClientFactory: ValidatorNodeClientFactory, - ::Error: IsNotFoundError, + TAddr: NodeAddressable + 'static, + TEpochManager: EpochManagerReader + 'static, + TClientFactory: ValidatorNodeClientFactory + 'static, + ::Error: IsNotFoundError + 'static, + TSubstateCache: SubstateCache + 'static, { pub fn new( epoch_manager: TEpochManager, client_provider: TClientFactory, - substate_scanner: Arc>, + substate_scanner: Arc>, ) -> Self { Self { epoch_manager, diff --git a/applications/tari_validator_node/src/bootstrap.rs b/applications/tari_validator_node/src/bootstrap.rs index 4d8d239f3..308b19d35 100644 --- a/applications/tari_validator_node/src/bootstrap.rs +++ b/applications/tari_validator_node/src/bootstrap.rs @@ -38,6 +38,7 @@ use tari_core::transactions::transaction_components::ValidatorNodeSignature; use tari_dan_app_utilities::{ base_layer_scanner, consensus_constants::ConsensusConstants, + substate_file_cache::SubstateFileCache, template_manager, template_manager::{implementation::TemplateManager, interface::TemplateManagerHandle}, transaction_executor::TariDanTransactionProcessor, @@ -218,9 +219,18 @@ pub async fn spawn_services( .await; handles.push(consensus_join_handle); + // substate cache + let substate_cache_dir = config.common.base_path.join("substate_cache"); + let substate_cache = SubstateFileCache::new(substate_cache_dir) + .map_err(|e| ExitError::new(ExitCode::ConfigError, format!("Substate cache error: {}", e)))?; + // Mempool let virtual_substate_manager = VirtualSubstateManager::new(state_store.clone(), epoch_manager.clone()); - let scanner = SubstateScanner::new(epoch_manager.clone(), validator_node_client_factory.clone()); + let scanner = SubstateScanner::new( + epoch_manager.clone(), + validator_node_client_factory.clone(), + substate_cache, + ); let substate_resolver = TariSubstateResolver::new( state_store.clone(), scanner, diff --git a/applications/tari_validator_node/src/dry_run_transaction_processor.rs b/applications/tari_validator_node/src/dry_run_transaction_processor.rs index 7db3ce534..e4c302166 100644 --- a/applications/tari_validator_node/src/dry_run_transaction_processor.rs +++ b/applications/tari_validator_node/src/dry_run_transaction_processor.rs @@ -24,6 +24,7 @@ use log::info; use tari_common_types::types::PublicKey; use tari_comms::protocol::rpc::RpcStatus; use tari_dan_app_utilities::{ + substate_file_cache::SubstateFileCache, template_manager::implementation::TemplateManager, transaction_executor::{TariDanTransactionProcessor, TransactionExecutor, TransactionProcessorError}, }; @@ -71,8 +72,12 @@ pub enum DryRunTransactionProcessorError { #[derive(Clone, Debug)] pub struct DryRunTransactionProcessor { - substate_resolver: - TariSubstateResolver, EpochManagerHandle, TariCommsValidatorNodeClientFactory>, + substate_resolver: TariSubstateResolver< + SqliteStateStore, + EpochManagerHandle, + TariCommsValidatorNodeClientFactory, + SubstateFileCache, + >, epoch_manager: EpochManagerHandle, payload_processor: TariDanTransactionProcessor, } @@ -85,6 +90,7 @@ impl DryRunTransactionProcessor { SqliteStateStore, EpochManagerHandle, TariCommsValidatorNodeClientFactory, + SubstateFileCache, >, ) -> Self { Self { diff --git a/applications/tari_validator_node/src/substate_resolver.rs b/applications/tari_validator_node/src/substate_resolver.rs index df16a38e5..ff45427ec 100644 --- a/applications/tari_validator_node/src/substate_resolver.rs +++ b/applications/tari_validator_node/src/substate_resolver.rs @@ -15,7 +15,7 @@ use tari_engine_types::{ virtual_substate::{VirtualSubstate, VirtualSubstateAddress}, }; use tari_epoch_manager::{EpochManagerError, EpochManagerReader}; -use tari_indexer_lib::{error::IndexerError, substate_scanner::SubstateScanner}; +use tari_indexer_lib::{error::IndexerError, substate_cache::SubstateCache, substate_scanner::SubstateScanner}; use tari_transaction::Transaction; use tari_validator_node_rpc::client::{SubstateResult, ValidatorNodeClientFactory}; @@ -27,23 +27,24 @@ use crate::{ const LOG_TARGET: &str = "tari::dan::substate_resolver"; #[derive(Debug, Clone)] -pub struct TariSubstateResolver { +pub struct TariSubstateResolver { store: TStateStore, - scanner: SubstateScanner, + scanner: SubstateScanner, epoch_manager: TEpochManager, virtual_substate_manager: VirtualSubstateManager, } -impl - TariSubstateResolver +impl + TariSubstateResolver where TStateStore: StateStore, TEpochManager: EpochManagerReader, TValidatorNodeClientFactory: ValidatorNodeClientFactory, + TSubstateCache: SubstateCache, { pub fn new( store: TStateStore, - scanner: SubstateScanner, + scanner: SubstateScanner, epoch_manager: TEpochManager, virtual_substate_manager: VirtualSubstateManager, ) -> Self { @@ -147,12 +148,13 @@ where } #[async_trait] -impl SubstateResolver - for TariSubstateResolver +impl SubstateResolver + for TariSubstateResolver where TStateStore: StateStore + Sync + Send, TEpochManager: EpochManagerReader, TValidatorNodeClientFactory: ValidatorNodeClientFactory, + TSubstateCache: SubstateCache, { type Error = SubstateResolverError; diff --git a/dan_layer/indexer_lib/Cargo.toml b/dan_layer/indexer_lib/Cargo.toml index 1eddfab01..fdf818aa3 100644 --- a/dan_layer/indexer_lib/Cargo.toml +++ b/dan_layer/indexer_lib/Cargo.toml @@ -15,7 +15,14 @@ tari_transaction = { path = "../transaction" } tari_template_lib = { path = "../template_lib" } tari_validator_node_rpc = { path = "../validator_node_rpc" } +async-trait = "0.1.74" log = "0.4" serde = "1.0" rand = "0.8.5" thiserror = "1.0" +tokio = { version = "1.10", features = [ + "macros", + "time", + "sync", + "rt-multi-thread", +] } diff --git a/dan_layer/indexer_lib/src/error.rs b/dan_layer/indexer_lib/src/error.rs index 6daf9ee40..fbafb15f0 100644 --- a/dan_layer/indexer_lib/src/error.rs +++ b/dan_layer/indexer_lib/src/error.rs @@ -4,6 +4,8 @@ use tari_engine_types::substate::SubstateAddress; use tari_epoch_manager::EpochManagerError; +use crate::substate_cache::SubstateCacheError; + #[derive(Debug, thiserror::Error)] pub enum IndexerError { #[error("Epoch manager error: {0}")] @@ -22,4 +24,6 @@ pub enum IndexerError { FailedToGetCommitteeSize(String), #[error("Failed to parse transaction hash: {0}")] FailedToParseTransactionHash(String), + #[error("Substate cache operation failed: {0}")] + SubstateCacheError(#[from] SubstateCacheError), } diff --git a/dan_layer/indexer_lib/src/lib.rs b/dan_layer/indexer_lib/src/lib.rs index 2d5ac3aac..329eb7a18 100644 --- a/dan_layer/indexer_lib/src/lib.rs +++ b/dan_layer/indexer_lib/src/lib.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: BSD-3-Clause pub mod error; +pub mod substate_cache; pub mod substate_decoder; pub mod substate_scanner; pub mod transaction_autofiller; diff --git a/dan_layer/indexer_lib/src/substate_cache.rs b/dan_layer/indexer_lib/src/substate_cache.rs new file mode 100644 index 000000000..860c08f45 --- /dev/null +++ b/dan_layer/indexer_lib/src/substate_cache.rs @@ -0,0 +1,41 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use tari_validator_node_rpc::client::SubstateResult; + +#[derive(thiserror::Error, Debug)] +#[error("Failed substate cache operation {0}")] +pub struct SubstateCacheError(pub String); + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct SubstateCacheEntry { + pub version: u32, + pub substate_result: SubstateResult, +} + +#[async_trait] +pub trait SubstateCache: Send + Sync { + async fn read(&self, address: String) -> Result, SubstateCacheError>; + async fn write(&self, address: String, entry: &SubstateCacheEntry) -> Result<(), SubstateCacheError>; +} diff --git a/dan_layer/indexer_lib/src/substate_scanner.rs b/dan_layer/indexer_lib/src/substate_scanner.rs index 2d5d3243a..776fa9dbc 100644 --- a/dan_layer/indexer_lib/src/substate_scanner.rs +++ b/dan_layer/indexer_lib/src/substate_scanner.rs @@ -36,26 +36,37 @@ use tari_template_lib::{ use tari_transaction::TransactionId; use tari_validator_node_rpc::client::{SubstateResult, ValidatorNodeClientFactory, ValidatorNodeRpcClient}; -use crate::{error::IndexerError, NonFungibleSubstate}; +use crate::{ + error::IndexerError, + substate_cache::{SubstateCache, SubstateCacheEntry}, + NonFungibleSubstate, +}; const LOG_TARGET: &str = "tari::indexer::dan_layer_scanner"; #[derive(Debug, Clone)] -pub struct SubstateScanner { +pub struct SubstateScanner { committee_provider: TEpochManager, validator_node_client_factory: TVnClient, + substate_cache: TSubstateCache, } -impl SubstateScanner +impl SubstateScanner where TAddr: NodeAddressable, TEpochManager: EpochManagerReader, TVnClient: ValidatorNodeClientFactory, + TSubstateCache: SubstateCache, { - pub fn new(committee_provider: TEpochManager, validator_node_client_factory: TVnClient) -> Self { + pub fn new( + committee_provider: TEpochManager, + validator_node_client_factory: TVnClient, + substate_cache: TSubstateCache, + ) -> Self { Self { committee_provider, validator_node_client_factory, + substate_cache, } } @@ -133,9 +144,22 @@ where substate_address: &SubstateAddress, lowest_version: u32, ) -> Result { - // we keep asking from version 0 upwards let mut version = lowest_version; let mut last_result = None; + let mut cached_version = None; + + // start from the latest cached version of the substate (if cached previously) + let cache_res = self.substate_cache.read(substate_address.to_address_string()).await?; + if let Some(entry) = cache_res { + if entry.version > version { + info!(target: LOG_TARGET, "Substate cache hit for {} with version {}", entry.version, substate_address.to_address_string()); + cached_version = Some(entry.version); + // we will request newer versions of the cached substate + version = entry.version + 1; + last_result = Some(entry.substate_result); + } + } + loop { let substate_result = self .get_specific_substate_from_committee(substate_address, version) @@ -146,12 +170,41 @@ where last_result = Some(result); version += 1; }, + // stop if the current version does not exist SubstateResult::DoesNotExist => { - return Ok(last_result.unwrap_or(SubstateResult::DoesNotExist)); + break; + }, + // stop and upgrade the last result if the substate is UP, as it's the latest + _ => { + last_result = Some(substate_result); + break; }, - _ => return Ok(substate_result), } } + + if let Some(substate_result) = &last_result { + // update the substate cache if the substate exists and the version is newer than the cached one + if let SubstateResult::Up { substate, .. } = &substate_result { + let should_update_cache = match cached_version { + Some(v) => v < substate.version(), + None => true, + }; + + if should_update_cache { + info!(target: LOG_TARGET, "Updating cached substate {} with version {}", substate_address.to_address_string(), substate.version()); + let entry = SubstateCacheEntry { + version: substate.version(), + substate_result: substate_result.clone(), + }; + self.substate_cache + .write(substate_address.to_address_string(), &entry) + .await?; + }; + } + Ok(substate_result.clone()) + } else { + Ok(SubstateResult::DoesNotExist) + } } /// Returns a specific version. If this is not found an error is returned. diff --git a/dan_layer/indexer_lib/src/transaction_autofiller.rs b/dan_layer/indexer_lib/src/transaction_autofiller.rs index 42d8bf57c..62d9ba20a 100644 --- a/dan_layer/indexer_lib/src/transaction_autofiller.rs +++ b/dan_layer/indexer_lib/src/transaction_autofiller.rs @@ -12,8 +12,14 @@ use tari_engine_types::{ use tari_epoch_manager::EpochManagerReader; use tari_transaction::{SubstateRequirement, Transaction}; use tari_validator_node_rpc::client::{SubstateResult, ValidatorNodeClientFactory}; +use tokio::task::JoinError; -use crate::{error::IndexerError, substate_decoder::find_related_substates, substate_scanner::SubstateScanner}; +use crate::{ + error::IndexerError, + substate_cache::SubstateCache, + substate_decoder::find_related_substates, + substate_scanner::SubstateScanner, +}; const LOG_TARGET: &str = "tari::indexer::transaction_autofiller"; @@ -23,19 +29,22 @@ pub enum TransactionAutofillerError { IndexedValueVisitorError(#[from] IndexedValueError), #[error("Indexer error: {0}")] IndexerError(#[from] IndexerError), + #[error("Tokio join error: {0}")] + JoinError(#[from] JoinError), } -pub struct TransactionAutofiller { - substate_scanner: Arc>, +pub struct TransactionAutofiller { + substate_scanner: Arc>, } -impl TransactionAutofiller +impl TransactionAutofiller where - TEpochManager: EpochManagerReader, - TVnClient: ValidatorNodeClientFactory, - TAddr: NodeAddressable, + TEpochManager: EpochManagerReader + 'static, + TVnClient: ValidatorNodeClientFactory + 'static, + TAddr: NodeAddressable + 'static, + TSubstateCache: SubstateCache + 'static, { - pub fn new(substate_scanner: Arc>) -> Self { + pub fn new(substate_scanner: Arc>) -> Self { Self { substate_scanner } } @@ -48,52 +57,28 @@ where // note that the transaction hash will not change as the "involved_objects" is not part of the hash let mut autofilled_transaction = original_transaction; - // scan the network to fetch all the substates for each required input - // TODO: perform this loop concurrently by spawning a tokio task for each scan + // scan the network in parallel to fetch all the substates for each required input let mut input_shards = vec![]; let mut found_substates = HashMap::new(); - for r in &substate_requirements { - let scan_res = match r.version() { - Some(version) => { - let shard = ShardId::from_address(r.address(), version); - if autofilled_transaction.all_inputs_iter().any(|s| *s == shard) { - // Shard is already an input - continue; - } - - // if the client specified a version, we need to retrieve it - self.substate_scanner - .get_specific_substate_from_committee(r.address(), version) - .await? - }, - None => { - // if the client didn't specify a version, we fetch the latest one - self.substate_scanner.get_substate(r.address(), None).await? - }, - }; - - if let SubstateResult::Up { substate, address, .. } = scan_res { - info!( - target: LOG_TARGET, - "✏️Filling input substate {}:v{}", - address, - substate.version() - ); + let substate_scanner_ref = self.substate_scanner.clone(); + let transaction_ref = Arc::new(autofilled_transaction.clone()); + let mut handles = Vec::new(); + for requirement in &substate_requirements { + let handle = tokio::spawn(get_substate_requirement( + substate_scanner_ref.clone(), + transaction_ref.clone(), + requirement.clone(), + )); + handles.push(handle); + } + for handle in handles { + let res = handle.await??; + if let Some((address, substate)) = res { let shard = ShardId::from_address(&address, substate.version()); - if autofilled_transaction.all_inputs_iter().any(|s| *s == shard) { - // Shard is already an input (TODO: what a waste) - continue; - } input_shards.push(shard); found_substates.insert(address, substate); - } else { - warn!( - target: LOG_TARGET, - "🖋️ The substate for input requirement {} is not in UP status, skipping", r - ); } } - info!(target: LOG_TARGET, "✏️️ Found {} input substates", found_substates.len()); autofilled_transaction.filled_inputs_mut().extend(input_shards); @@ -103,7 +88,6 @@ where for _i in 0..MAX_RECURSION { // add all substates related to the inputs - // TODO: perform this loop concurrently by spawning a tokio task for each scan // TODO: we are going to only check the first level of recursion, for composability we may want to do it // recursively (with a recursion limit) let mut autofilled_inputs = vec![]; @@ -119,12 +103,16 @@ where .flatten() .filter(|s| !substate_requirements.iter().any(|r| r.address() == s)); + // we need to fetch (in parallel) the latest version of all the related substates + let mut handles = HashMap::new(); + let substate_scanner_ref = self.substate_scanner.clone(); for address in related_addresses { - info!(target: LOG_TARGET, "✏️️️ Found {} related substate", address); - - // we need to fetch the latest version of all the related substates - // note that if the version specified is "None", the scanner will fetch the latest version - let scan_res = self.substate_scanner.get_substate(&address, None).await?; + info!(target: LOG_TARGET, "✏️️️ Found {} related substates", address); + let handle = tokio::spawn(get_substate(substate_scanner_ref.clone(), address.clone(), None)); + handles.insert(address.clone(), handle); + } + for (address, handle) in handles { + let scan_res = handle.await??; if let SubstateResult::Up { substate, address, .. } = scan_res { info!( @@ -158,3 +146,70 @@ where Ok((autofilled_transaction, found_substates)) } } + +pub async fn get_substate_requirement( + substate_scanner: Arc>, + transaction: Arc, + req: SubstateRequirement, +) -> Result, IndexerError> +where + TEpochManager: EpochManagerReader, + TVnClient: ValidatorNodeClientFactory, + TAddr: NodeAddressable, + TSubstateCache: SubstateCache, +{ + let scan_res = match req.version() { + Some(version) => { + let shard = ShardId::from_address(req.address(), version); + if transaction.all_inputs_iter().any(|s| *s == shard) { + // Shard is already an input + return Ok(None); + } + + // if the client specified a version, we need to retrieve it + substate_scanner + .get_specific_substate_from_committee(req.address(), version) + .await? + }, + None => { + // if the client didn't specify a version, we fetch the latest one + substate_scanner.get_substate(req.address(), None).await? + }, + }; + + if let SubstateResult::Up { substate, address, .. } = &scan_res { + info!( + target: LOG_TARGET, + "Filling input substate {}:v{}", + address, + substate.version() + ); + let shard = ShardId::from_address(address, substate.version()); + if transaction.all_inputs_iter().any(|s| *s == shard) { + // Shard is already an input (TODO: what a waste) + return Ok(None); + } + + Ok(Some((address.clone(), substate.clone()))) + } else { + warn!( + target: LOG_TARGET, + "🖋️ The substate for input requirement {} is not in UP status, skipping", req + ); + Ok(None) + } +} + +pub async fn get_substate( + substate_scanner: Arc>, + substate_address: SubstateAddress, + version_hint: Option, +) -> Result +where + TEpochManager: EpochManagerReader, + TVnClient: ValidatorNodeClientFactory, + TAddr: NodeAddressable, + TSubstateCache: SubstateCache, +{ + substate_scanner.get_substate(&substate_address, version_hint).await +}