From 0982701bc643591dcbbd422a5493162b0e24b0c5 Mon Sep 17 00:00:00 2001 From: Miguel Naveira <47919901+mrnaveira@users.noreply.github.com> Date: Fri, 24 Nov 2023 09:24:20 +0000 Subject: [PATCH] refactor: simple substate fetching optimizations (#777) Description --- * Refactored the `transaction_autofiller` in the `indexer_lib` to spawn `tokio` tasks in parallel for each substate request to the network. * Created a new `SubstateCache` trait and a `SubstateFileCache` implementation using `cacache-rs` * Refactored the existing `SubstateScanner` to read and write to the new cache accordingly. The decision to access the cache from this level was both to encapsulate as well as maximize the usage of the cache across all applications. * Both the indexer and the validator node application need to build a `SubstateFileCache` on startup (it will use a new `./substate_cache` subfolder in the data root folder) * The indexer needs it for transaction autofilling when submitting transactions, as well as for monitoring substates. * The validator node needs it as a dependency for the dry run transaction processing. Motivation and Context --- Currently we are experiencing poor performance in the Tari testnet. A big part of the problem is due to a lack of optimization around substate fetching, particularly to get the latest version of a substate: * Indexer auto-filler fetches substates sequentially, when we could do it in parallel * There is no caching of previous versions of a substate, so each time we want to know the last version of a substate we start querying again from substate 0 onwards. This PR aims to solve this problems by: * Implementing parallel fetching of substates in the indexer auto-filler * Implementing a cache system for previous versions of substates, so we only need to query for possible newer versions. We want the cache system to be file-based to persist between restarts. How Has This Been Tested? --- Running a local network, doing transactions and inspecting the logs for the caching operations. What process can a PR reviewer use to test or verify this change? --- Start up a new local network, do transactions and inspect the new substate caching logs. Breaking Changes --- - [x] None - [ ] Requires data directory to be deleted - [ ] Other - Please specify --------- Co-authored-by: Stan Bondi --- Cargo.lock | 178 +++++++++++++++++- .../tari_dan_app_utilities/Cargo.toml | 4 + .../tari_dan_app_utilities/src/lib.rs | 1 + .../src/substate_file_cache.rs | 77 ++++++++ .../tari_indexer/src/dry_run/processor.rs | 20 +- .../tari_indexer/src/json_rpc/handlers.rs | 13 +- applications/tari_indexer/src/lib.rs | 7 +- .../tari_indexer/src/substate_manager.rs | 7 +- .../src/transaction_manager/mod.rs | 24 ++- .../tari_validator_node/src/bootstrap.rs | 12 +- .../src/dry_run_transaction_processor.rs | 10 +- .../src/substate_resolver.rs | 18 +- dan_layer/indexer_lib/Cargo.toml | 7 + dan_layer/indexer_lib/src/error.rs | 4 + dan_layer/indexer_lib/src/lib.rs | 1 + dan_layer/indexer_lib/src/substate_cache.rs | 41 ++++ dan_layer/indexer_lib/src/substate_scanner.rs | 67 ++++++- .../indexer_lib/src/transaction_autofiller.rs | 161 ++++++++++------ 18 files changed, 558 insertions(+), 94 deletions(-) create mode 100644 applications/tari_dan_app_utilities/src/substate_file_cache.rs create mode 100644 dan_layer/indexer_lib/src/substate_cache.rs diff --git a/Cargo.lock b/Cargo.lock index 29355c805..f6738465c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1232,6 +1232,32 @@ dependencies = [ "serde", ] +[[package]] +name = "cacache" +version = "12.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "142316461ed3a3dfcba10417317472da5bfd0461e4d276bf7c07b330766d9490" +dependencies = [ + "async-std", + "digest 0.10.7", + "either", + "futures 0.3.29", + "hex", + "libc", + "memmap2", + "miette", + "reflink-copy", + "serde", + "serde_derive", + "serde_json", + "sha1", + "sha2", + "ssri", + "tempfile", + "thiserror", + "walkdir", +] + [[package]] name = "camellia" version = "0.1.0" @@ -3547,7 +3573,7 @@ dependencies = [ "iana-time-zone-haiku", "js-sys", "wasm-bindgen", - "windows-core", + "windows-core 0.51.1", ] [[package]] @@ -4429,6 +4455,29 @@ dependencies = [ "zeroize", ] +[[package]] +name = "miette" +version = "5.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59bb584eaeeab6bd0226ccf3509a69d7936d148cf3d036ad350abe35e8c6856e" +dependencies = [ + "miette-derive", + "once_cell", + "thiserror", + "unicode-width", +] + +[[package]] +name = "miette-derive" +version = "5.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49e7bc1560b95a3c4a25d03de42fe76ca718ab92d1a22a55b9b4cf67b3ae635c" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.39", +] + [[package]] name = "migrations_internals" version = "2.1.0" @@ -6071,6 +6120,17 @@ dependencies = [ "thiserror", ] +[[package]] +name = "reflink-copy" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "248fbed6f59e99c8ef6c6ecadc9b09c6e93713b8f921d73e41d6ed6017bf0624" +dependencies = [ + "cfg-if", + "rustix 0.38.25", + "windows", +] + [[package]] name = "regalloc" version = "0.0.34" @@ -6871,6 +6931,17 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sha-1" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest 0.10.7", +] + [[package]] name = "sha1" version = "0.10.6" @@ -7119,6 +7190,23 @@ dependencies = [ "der", ] +[[package]] +name = "ssri" +version = "9.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da7a2b3c2bc9693bcb40870c4e9b5bf0d79f9cb46273321bf855ec513e919082" +dependencies = [ + "base64 0.21.5", + "digest 0.10.7", + "hex", + "miette", + "serde", + "sha-1", + "sha2", + "thiserror", + "xxhash-rust", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -7766,7 +7854,9 @@ name = "tari_dan_app_utilities" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "bytes 1.5.0", + "cacache", "chrono", "dashmap", "futures 0.3.29", @@ -7778,6 +7868,7 @@ dependencies = [ "serde_json", "std-semaphore", "tari_base_node_client", + "tari_bor", "tari_common_types", "tari_core", "tari_crypto", @@ -7787,6 +7878,7 @@ dependencies = [ "tari_dan_storage_sqlite", "tari_engine_types", "tari_epoch_manager", + "tari_indexer_lib", "tari_shutdown", "tari_state_store_sqlite", "tari_template_builtin", @@ -8162,6 +8254,7 @@ dependencies = [ name = "tari_indexer_lib" version = "0.50.0-pre.0" dependencies = [ + "async-trait", "log", "rand", "serde", @@ -8172,6 +8265,7 @@ dependencies = [ "tari_transaction", "tari_validator_node_rpc", "thiserror", + "tokio", ] [[package]] @@ -10236,6 +10330,16 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" +dependencies = [ + "windows-core 0.52.0", + "windows-targets 0.52.0", +] + [[package]] name = "windows-core" version = "0.51.1" @@ -10245,6 +10349,15 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "windows-core" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" +dependencies = [ + "windows-targets 0.52.0", +] + [[package]] name = "windows-sys" version = "0.33.0" @@ -10306,6 +10419,21 @@ dependencies = [ "windows_x86_64_msvc 0.48.5", ] +[[package]] +name = "windows-targets" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a18201040b24831fbb9e4eb208f8892e1f50a37feb53cc7ff887feb8f50e7cd" +dependencies = [ + "windows_aarch64_gnullvm 0.52.0", + "windows_aarch64_msvc 0.52.0", + "windows_i686_gnu 0.52.0", + "windows_i686_msvc 0.52.0", + "windows_x86_64_gnu 0.52.0", + "windows_x86_64_gnullvm 0.52.0", + "windows_x86_64_msvc 0.52.0", +] + [[package]] name = "windows_aarch64_gnullvm" version = "0.42.2" @@ -10318,6 +10446,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" + [[package]] name = "windows_aarch64_msvc" version = "0.33.0" @@ -10336,6 +10470,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" + [[package]] name = "windows_i686_gnu" version = "0.33.0" @@ -10354,6 +10494,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" +[[package]] +name = "windows_i686_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" + [[package]] name = "windows_i686_msvc" version = "0.33.0" @@ -10372,6 +10518,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" +[[package]] +name = "windows_i686_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" + [[package]] name = "windows_x86_64_gnu" version = "0.33.0" @@ -10390,6 +10542,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" + [[package]] name = "windows_x86_64_gnullvm" version = "0.42.2" @@ -10402,6 +10560,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" + [[package]] name = "windows_x86_64_msvc" version = "0.33.0" @@ -10420,6 +10584,12 @@ version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dff9641d1cd4be8d1a070daf9e3773c5f67e78b4d9d42263020c057706765c04" + [[package]] name = "winnow" version = "0.5.19" @@ -10478,6 +10648,12 @@ dependencies = [ "time", ] +[[package]] +name = "xxhash-rust" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9828b178da53440fa9c766a3d2f73f7cf5d0ac1fe3980c1e5018d899fd19e07b" + [[package]] name = "yaml-rust" version = "0.4.5" diff --git a/applications/tari_dan_app_utilities/Cargo.toml b/applications/tari_dan_app_utilities/Cargo.toml index fd8f3e448..940fea423 100644 --- a/applications/tari_dan_app_utilities/Cargo.toml +++ b/applications/tari_dan_app_utilities/Cargo.toml @@ -27,9 +27,13 @@ tari_template_builtin = { path = "../../dan_layer/template_builtin" } tari_template_lib = { path = "../../dan_layer/template_lib" } tari_transaction = { path = "../../dan_layer/transaction" } tari_validator_node_client = { path = "../../clients/validator_node_client" } +tari_bor = { path = "../../dan_layer/tari_bor" } +tari_indexer_lib = { path = "../../dan_layer/indexer_lib" } anyhow = "1.0.53" +async-trait = "0.1.74" bytes = "1" +cacache = "12.0.0" chrono = "0.4.22" futures = { version = "^0.3.1" } log = { version = "0.4.8", features = ["std"] } diff --git a/applications/tari_dan_app_utilities/src/lib.rs b/applications/tari_dan_app_utilities/src/lib.rs index 85634fb29..4747c6ae3 100644 --- a/applications/tari_dan_app_utilities/src/lib.rs +++ b/applications/tari_dan_app_utilities/src/lib.rs @@ -22,5 +22,6 @@ pub mod base_layer_scanner; pub mod consensus_constants; +pub mod substate_file_cache; pub mod template_manager; pub mod transaction_executor; diff --git a/applications/tari_dan_app_utilities/src/substate_file_cache.rs b/applications/tari_dan_app_utilities/src/substate_file_cache.rs new file mode 100644 index 000000000..57f145c18 --- /dev/null +++ b/applications/tari_dan_app_utilities/src/substate_file_cache.rs @@ -0,0 +1,77 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::{fs, path::PathBuf}; + +use async_trait::async_trait; +use tari_bor::{decode, encode}; +use tari_indexer_lib::substate_cache::{SubstateCache, SubstateCacheEntry, SubstateCacheError}; + +#[derive(Debug, Clone)] +pub struct SubstateFileCache { + cache_dir_path: String, +} + +impl SubstateFileCache { + pub fn new(path_buf: PathBuf) -> Result { + let cache_dir_path = path_buf + .into_os_string() + .into_string() + .map_err(|_| SubstateCacheError("Invalid substate cache path".to_string()))?; + + fs::create_dir_all(&cache_dir_path) + .map_err(|e| SubstateCacheError(format!("Error creating the cache directory: {}", e)))?; + + Ok(Self { cache_dir_path }) + } +} + +#[async_trait] +impl SubstateCache for SubstateFileCache { + async fn read(&self, address: String) -> Result, SubstateCacheError> { + let res = cacache::read(&self.cache_dir_path, address).await; + match res { + Ok(value) => { + // cache hit + let entry = decode::(&value).map_err(|e| SubstateCacheError(e.to_string()))?; + return Ok(Some(entry)); + }, + Err(e) => { + // cache miss + if let cacache::Error::EntryNotFound(_, _) = e { + return Ok(None); + // cache error + } else { + return Err(SubstateCacheError(format!("{}", e))); + } + }, + } + } + + async fn write(&self, address: String, entry: &SubstateCacheEntry) -> Result<(), SubstateCacheError> { + let encoded_entry = encode(&entry).map_err(|e| SubstateCacheError(e.to_string()))?; + cacache::write(&self.cache_dir_path, address, encoded_entry) + .await + .map_err(|e| SubstateCacheError(format!("{}", e)))?; + Ok(()) + } +} diff --git a/applications/tari_indexer/src/dry_run/processor.rs b/applications/tari_indexer/src/dry_run/processor.rs index a3edd3a3c..c7ea08de9 100644 --- a/applications/tari_indexer/src/dry_run/processor.rs +++ b/applications/tari_indexer/src/dry_run/processor.rs @@ -41,7 +41,11 @@ use tari_engine_types::{ virtual_substate::{VirtualSubstate, VirtualSubstateAddress}, }; use tari_epoch_manager::EpochManagerReader; -use tari_indexer_lib::{substate_scanner::SubstateScanner, transaction_autofiller::TransactionAutofiller}; +use tari_indexer_lib::{ + substate_cache::SubstateCache, + substate_scanner::SubstateScanner, + transaction_autofiller::TransactionAutofiller, +}; use tari_transaction::{SubstateRequirement, Transaction}; use tari_validator_node_rpc::client::{SubstateResult, ValidatorNodeClientFactory, ValidatorNodeRpcClient}; use tokio::task; @@ -50,23 +54,25 @@ use crate::dry_run::error::DryRunTransactionProcessorError; const LOG_TARGET: &str = "tari::indexer::dry_run_transaction_processor"; -pub struct DryRunTransactionProcessor { +pub struct DryRunTransactionProcessor { epoch_manager: TEpochManager, client_provider: TClientFactory, - transaction_autofiller: TransactionAutofiller, + transaction_autofiller: TransactionAutofiller, template_manager: TemplateManager, } -impl DryRunTransactionProcessor +impl + DryRunTransactionProcessor where - TEpochManager: EpochManagerReader, - TClientFactory: ValidatorNodeClientFactory, + TEpochManager: EpochManagerReader + 'static, + TClientFactory: ValidatorNodeClientFactory + 'static, ::Error: IsNotFoundError, + TSubstateCache: SubstateCache + 'static, { pub fn new( epoch_manager: TEpochManager, client_provider: TClientFactory, - substate_scanner: Arc>, + substate_scanner: Arc>, template_manager: TemplateManager, ) -> Self { let transaction_autofiller = TransactionAutofiller::new(substate_scanner); diff --git a/applications/tari_indexer/src/json_rpc/handlers.rs b/applications/tari_indexer/src/json_rpc/handlers.rs index 82be2f4f3..ac06acd5b 100644 --- a/applications/tari_indexer/src/json_rpc/handlers.rs +++ b/applications/tari_indexer/src/json_rpc/handlers.rs @@ -40,6 +40,7 @@ use tari_comms::{ NodeIdentity, }; use tari_crypto::tari_utilities::hex::Hex; +use tari_dan_app_utilities::substate_file_cache::SubstateFileCache; use tari_dan_common_types::{optional::Optional, Epoch}; use tari_dan_storage::consensus_models::Decision; use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerReader}; @@ -103,8 +104,9 @@ pub struct JsonRpcHandlers { base_node_client: GrpcBaseNodeClient, substate_manager: Arc, epoch_manager: EpochManagerHandle, - transaction_manager: TransactionManager, - dry_run_transaction_processor: DryRunTransactionProcessor, + transaction_manager: TransactionManager, + dry_run_transaction_processor: + DryRunTransactionProcessor, } impl JsonRpcHandlers { @@ -113,10 +115,15 @@ impl JsonRpcHandlers { services: &Services, base_node_client: GrpcBaseNodeClient, substate_manager: Arc, - transaction_manager: TransactionManager, + transaction_manager: TransactionManager< + EpochManagerHandle, + TariCommsValidatorNodeClientFactory, + SubstateFileCache, + >, dry_run_transaction_processor: DryRunTransactionProcessor< EpochManagerHandle, TariCommsValidatorNodeClientFactory, + SubstateFileCache, >, ) -> Self { Self { diff --git a/applications/tari_indexer/src/lib.rs b/applications/tari_indexer/src/lib.rs index 8e92492ed..5489d2dcd 100644 --- a/applications/tari_indexer/src/lib.rs +++ b/applications/tari_indexer/src/lib.rs @@ -51,7 +51,7 @@ use tari_common::{ exit_codes::{ExitCode, ExitError}, }; use tari_comms::peer_manager::PeerFeatures; -use tari_dan_app_utilities::consensus_constants::ConsensusConstants; +use tari_dan_app_utilities::{consensus_constants::ConsensusConstants, substate_file_cache::SubstateFileCache}; use tari_dan_storage::global::DbFactory; use tari_dan_storage_sqlite::SqliteDbFactory; use tari_indexer_lib::substate_scanner::SubstateScanner; @@ -96,9 +96,14 @@ pub async fn run_indexer(config: ApplicationConfig, mut shutdown_signal: Shutdow ) .await?; + let substate_cache_dir = config.common.base_path.join("substate_cache"); + let substate_cache = SubstateFileCache::new(substate_cache_dir) + .map_err(|e| ExitError::new(ExitCode::ConfigError, format!("Substate cache error: {}", e)))?; + let dan_layer_scanner = Arc::new(SubstateScanner::new( services.epoch_manager.clone(), services.validator_node_client_factory.clone(), + substate_cache, )); let substate_manager = Arc::new(SubstateManager::new( diff --git a/applications/tari_indexer/src/substate_manager.rs b/applications/tari_indexer/src/substate_manager.rs index 9c6f00a0e..71692a3bb 100644 --- a/applications/tari_indexer/src/substate_manager.rs +++ b/applications/tari_indexer/src/substate_manager.rs @@ -27,6 +27,7 @@ use log::info; use serde::{Deserialize, Serialize}; use tari_common_types::types::FixedHash; use tari_crypto::tari_utilities::message_format::MessageFormat; +use tari_dan_app_utilities::substate_file_cache::SubstateFileCache; use tari_engine_types::{ events::Event, substate::{Substate, SubstateAddress}, @@ -79,13 +80,15 @@ pub struct EventResponse { } pub struct SubstateManager { - substate_scanner: Arc>, + substate_scanner: Arc>, substate_store: SqliteSubstateStore, } impl SubstateManager { pub fn new( - dan_layer_scanner: Arc>, + dan_layer_scanner: Arc< + SubstateScanner, + >, substate_store: SqliteSubstateStore, ) -> Self { Self { diff --git a/applications/tari_indexer/src/transaction_manager/mod.rs b/applications/tari_indexer/src/transaction_manager/mod.rs index 01282c9e2..787166169 100644 --- a/applications/tari_indexer/src/transaction_manager/mod.rs +++ b/applications/tari_indexer/src/transaction_manager/mod.rs @@ -32,7 +32,11 @@ use tari_dan_common_types::{ }; use tari_engine_types::substate::SubstateAddress; use tari_epoch_manager::EpochManagerReader; -use tari_indexer_lib::{substate_scanner::SubstateScanner, transaction_autofiller::TransactionAutofiller}; +use tari_indexer_lib::{ + substate_cache::SubstateCache, + substate_scanner::SubstateScanner, + transaction_autofiller::TransactionAutofiller, +}; use tari_transaction::{SubstateRequirement, Transaction, TransactionId}; use tari_validator_node_rpc::client::{ SubstateResult, @@ -45,23 +49,25 @@ use crate::transaction_manager::error::TransactionManagerError; const LOG_TARGET: &str = "tari::indexer::transaction_manager"; -pub struct TransactionManager { +pub struct TransactionManager { epoch_manager: TEpochManager, client_provider: TClientFactory, - transaction_autofiller: TransactionAutofiller, + transaction_autofiller: TransactionAutofiller, } -impl TransactionManager +impl + TransactionManager where - TAddr: NodeAddressable, - TEpochManager: EpochManagerReader, - TClientFactory: ValidatorNodeClientFactory, - ::Error: IsNotFoundError, + TAddr: NodeAddressable + 'static, + TEpochManager: EpochManagerReader + 'static, + TClientFactory: ValidatorNodeClientFactory + 'static, + ::Error: IsNotFoundError + 'static, + TSubstateCache: SubstateCache + 'static, { pub fn new( epoch_manager: TEpochManager, client_provider: TClientFactory, - substate_scanner: Arc>, + substate_scanner: Arc>, ) -> Self { Self { epoch_manager, diff --git a/applications/tari_validator_node/src/bootstrap.rs b/applications/tari_validator_node/src/bootstrap.rs index 4d8d239f3..308b19d35 100644 --- a/applications/tari_validator_node/src/bootstrap.rs +++ b/applications/tari_validator_node/src/bootstrap.rs @@ -38,6 +38,7 @@ use tari_core::transactions::transaction_components::ValidatorNodeSignature; use tari_dan_app_utilities::{ base_layer_scanner, consensus_constants::ConsensusConstants, + substate_file_cache::SubstateFileCache, template_manager, template_manager::{implementation::TemplateManager, interface::TemplateManagerHandle}, transaction_executor::TariDanTransactionProcessor, @@ -218,9 +219,18 @@ pub async fn spawn_services( .await; handles.push(consensus_join_handle); + // substate cache + let substate_cache_dir = config.common.base_path.join("substate_cache"); + let substate_cache = SubstateFileCache::new(substate_cache_dir) + .map_err(|e| ExitError::new(ExitCode::ConfigError, format!("Substate cache error: {}", e)))?; + // Mempool let virtual_substate_manager = VirtualSubstateManager::new(state_store.clone(), epoch_manager.clone()); - let scanner = SubstateScanner::new(epoch_manager.clone(), validator_node_client_factory.clone()); + let scanner = SubstateScanner::new( + epoch_manager.clone(), + validator_node_client_factory.clone(), + substate_cache, + ); let substate_resolver = TariSubstateResolver::new( state_store.clone(), scanner, diff --git a/applications/tari_validator_node/src/dry_run_transaction_processor.rs b/applications/tari_validator_node/src/dry_run_transaction_processor.rs index 7db3ce534..e4c302166 100644 --- a/applications/tari_validator_node/src/dry_run_transaction_processor.rs +++ b/applications/tari_validator_node/src/dry_run_transaction_processor.rs @@ -24,6 +24,7 @@ use log::info; use tari_common_types::types::PublicKey; use tari_comms::protocol::rpc::RpcStatus; use tari_dan_app_utilities::{ + substate_file_cache::SubstateFileCache, template_manager::implementation::TemplateManager, transaction_executor::{TariDanTransactionProcessor, TransactionExecutor, TransactionProcessorError}, }; @@ -71,8 +72,12 @@ pub enum DryRunTransactionProcessorError { #[derive(Clone, Debug)] pub struct DryRunTransactionProcessor { - substate_resolver: - TariSubstateResolver, EpochManagerHandle, TariCommsValidatorNodeClientFactory>, + substate_resolver: TariSubstateResolver< + SqliteStateStore, + EpochManagerHandle, + TariCommsValidatorNodeClientFactory, + SubstateFileCache, + >, epoch_manager: EpochManagerHandle, payload_processor: TariDanTransactionProcessor, } @@ -85,6 +90,7 @@ impl DryRunTransactionProcessor { SqliteStateStore, EpochManagerHandle, TariCommsValidatorNodeClientFactory, + SubstateFileCache, >, ) -> Self { Self { diff --git a/applications/tari_validator_node/src/substate_resolver.rs b/applications/tari_validator_node/src/substate_resolver.rs index df16a38e5..ff45427ec 100644 --- a/applications/tari_validator_node/src/substate_resolver.rs +++ b/applications/tari_validator_node/src/substate_resolver.rs @@ -15,7 +15,7 @@ use tari_engine_types::{ virtual_substate::{VirtualSubstate, VirtualSubstateAddress}, }; use tari_epoch_manager::{EpochManagerError, EpochManagerReader}; -use tari_indexer_lib::{error::IndexerError, substate_scanner::SubstateScanner}; +use tari_indexer_lib::{error::IndexerError, substate_cache::SubstateCache, substate_scanner::SubstateScanner}; use tari_transaction::Transaction; use tari_validator_node_rpc::client::{SubstateResult, ValidatorNodeClientFactory}; @@ -27,23 +27,24 @@ use crate::{ const LOG_TARGET: &str = "tari::dan::substate_resolver"; #[derive(Debug, Clone)] -pub struct TariSubstateResolver { +pub struct TariSubstateResolver { store: TStateStore, - scanner: SubstateScanner, + scanner: SubstateScanner, epoch_manager: TEpochManager, virtual_substate_manager: VirtualSubstateManager, } -impl - TariSubstateResolver +impl + TariSubstateResolver where TStateStore: StateStore, TEpochManager: EpochManagerReader, TValidatorNodeClientFactory: ValidatorNodeClientFactory, + TSubstateCache: SubstateCache, { pub fn new( store: TStateStore, - scanner: SubstateScanner, + scanner: SubstateScanner, epoch_manager: TEpochManager, virtual_substate_manager: VirtualSubstateManager, ) -> Self { @@ -147,12 +148,13 @@ where } #[async_trait] -impl SubstateResolver - for TariSubstateResolver +impl SubstateResolver + for TariSubstateResolver where TStateStore: StateStore + Sync + Send, TEpochManager: EpochManagerReader, TValidatorNodeClientFactory: ValidatorNodeClientFactory, + TSubstateCache: SubstateCache, { type Error = SubstateResolverError; diff --git a/dan_layer/indexer_lib/Cargo.toml b/dan_layer/indexer_lib/Cargo.toml index 1eddfab01..fdf818aa3 100644 --- a/dan_layer/indexer_lib/Cargo.toml +++ b/dan_layer/indexer_lib/Cargo.toml @@ -15,7 +15,14 @@ tari_transaction = { path = "../transaction" } tari_template_lib = { path = "../template_lib" } tari_validator_node_rpc = { path = "../validator_node_rpc" } +async-trait = "0.1.74" log = "0.4" serde = "1.0" rand = "0.8.5" thiserror = "1.0" +tokio = { version = "1.10", features = [ + "macros", + "time", + "sync", + "rt-multi-thread", +] } diff --git a/dan_layer/indexer_lib/src/error.rs b/dan_layer/indexer_lib/src/error.rs index 6daf9ee40..fbafb15f0 100644 --- a/dan_layer/indexer_lib/src/error.rs +++ b/dan_layer/indexer_lib/src/error.rs @@ -4,6 +4,8 @@ use tari_engine_types::substate::SubstateAddress; use tari_epoch_manager::EpochManagerError; +use crate::substate_cache::SubstateCacheError; + #[derive(Debug, thiserror::Error)] pub enum IndexerError { #[error("Epoch manager error: {0}")] @@ -22,4 +24,6 @@ pub enum IndexerError { FailedToGetCommitteeSize(String), #[error("Failed to parse transaction hash: {0}")] FailedToParseTransactionHash(String), + #[error("Substate cache operation failed: {0}")] + SubstateCacheError(#[from] SubstateCacheError), } diff --git a/dan_layer/indexer_lib/src/lib.rs b/dan_layer/indexer_lib/src/lib.rs index 2d5ac3aac..329eb7a18 100644 --- a/dan_layer/indexer_lib/src/lib.rs +++ b/dan_layer/indexer_lib/src/lib.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: BSD-3-Clause pub mod error; +pub mod substate_cache; pub mod substate_decoder; pub mod substate_scanner; pub mod transaction_autofiller; diff --git a/dan_layer/indexer_lib/src/substate_cache.rs b/dan_layer/indexer_lib/src/substate_cache.rs new file mode 100644 index 000000000..860c08f45 --- /dev/null +++ b/dan_layer/indexer_lib/src/substate_cache.rs @@ -0,0 +1,41 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use async_trait::async_trait; +use serde::{Deserialize, Serialize}; +use tari_validator_node_rpc::client::SubstateResult; + +#[derive(thiserror::Error, Debug)] +#[error("Failed substate cache operation {0}")] +pub struct SubstateCacheError(pub String); + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct SubstateCacheEntry { + pub version: u32, + pub substate_result: SubstateResult, +} + +#[async_trait] +pub trait SubstateCache: Send + Sync { + async fn read(&self, address: String) -> Result, SubstateCacheError>; + async fn write(&self, address: String, entry: &SubstateCacheEntry) -> Result<(), SubstateCacheError>; +} diff --git a/dan_layer/indexer_lib/src/substate_scanner.rs b/dan_layer/indexer_lib/src/substate_scanner.rs index 2d5d3243a..776fa9dbc 100644 --- a/dan_layer/indexer_lib/src/substate_scanner.rs +++ b/dan_layer/indexer_lib/src/substate_scanner.rs @@ -36,26 +36,37 @@ use tari_template_lib::{ use tari_transaction::TransactionId; use tari_validator_node_rpc::client::{SubstateResult, ValidatorNodeClientFactory, ValidatorNodeRpcClient}; -use crate::{error::IndexerError, NonFungibleSubstate}; +use crate::{ + error::IndexerError, + substate_cache::{SubstateCache, SubstateCacheEntry}, + NonFungibleSubstate, +}; const LOG_TARGET: &str = "tari::indexer::dan_layer_scanner"; #[derive(Debug, Clone)] -pub struct SubstateScanner { +pub struct SubstateScanner { committee_provider: TEpochManager, validator_node_client_factory: TVnClient, + substate_cache: TSubstateCache, } -impl SubstateScanner +impl SubstateScanner where TAddr: NodeAddressable, TEpochManager: EpochManagerReader, TVnClient: ValidatorNodeClientFactory, + TSubstateCache: SubstateCache, { - pub fn new(committee_provider: TEpochManager, validator_node_client_factory: TVnClient) -> Self { + pub fn new( + committee_provider: TEpochManager, + validator_node_client_factory: TVnClient, + substate_cache: TSubstateCache, + ) -> Self { Self { committee_provider, validator_node_client_factory, + substate_cache, } } @@ -133,9 +144,22 @@ where substate_address: &SubstateAddress, lowest_version: u32, ) -> Result { - // we keep asking from version 0 upwards let mut version = lowest_version; let mut last_result = None; + let mut cached_version = None; + + // start from the latest cached version of the substate (if cached previously) + let cache_res = self.substate_cache.read(substate_address.to_address_string()).await?; + if let Some(entry) = cache_res { + if entry.version > version { + info!(target: LOG_TARGET, "Substate cache hit for {} with version {}", entry.version, substate_address.to_address_string()); + cached_version = Some(entry.version); + // we will request newer versions of the cached substate + version = entry.version + 1; + last_result = Some(entry.substate_result); + } + } + loop { let substate_result = self .get_specific_substate_from_committee(substate_address, version) @@ -146,12 +170,41 @@ where last_result = Some(result); version += 1; }, + // stop if the current version does not exist SubstateResult::DoesNotExist => { - return Ok(last_result.unwrap_or(SubstateResult::DoesNotExist)); + break; + }, + // stop and upgrade the last result if the substate is UP, as it's the latest + _ => { + last_result = Some(substate_result); + break; }, - _ => return Ok(substate_result), } } + + if let Some(substate_result) = &last_result { + // update the substate cache if the substate exists and the version is newer than the cached one + if let SubstateResult::Up { substate, .. } = &substate_result { + let should_update_cache = match cached_version { + Some(v) => v < substate.version(), + None => true, + }; + + if should_update_cache { + info!(target: LOG_TARGET, "Updating cached substate {} with version {}", substate_address.to_address_string(), substate.version()); + let entry = SubstateCacheEntry { + version: substate.version(), + substate_result: substate_result.clone(), + }; + self.substate_cache + .write(substate_address.to_address_string(), &entry) + .await?; + }; + } + Ok(substate_result.clone()) + } else { + Ok(SubstateResult::DoesNotExist) + } } /// Returns a specific version. If this is not found an error is returned. diff --git a/dan_layer/indexer_lib/src/transaction_autofiller.rs b/dan_layer/indexer_lib/src/transaction_autofiller.rs index 42d8bf57c..62d9ba20a 100644 --- a/dan_layer/indexer_lib/src/transaction_autofiller.rs +++ b/dan_layer/indexer_lib/src/transaction_autofiller.rs @@ -12,8 +12,14 @@ use tari_engine_types::{ use tari_epoch_manager::EpochManagerReader; use tari_transaction::{SubstateRequirement, Transaction}; use tari_validator_node_rpc::client::{SubstateResult, ValidatorNodeClientFactory}; +use tokio::task::JoinError; -use crate::{error::IndexerError, substate_decoder::find_related_substates, substate_scanner::SubstateScanner}; +use crate::{ + error::IndexerError, + substate_cache::SubstateCache, + substate_decoder::find_related_substates, + substate_scanner::SubstateScanner, +}; const LOG_TARGET: &str = "tari::indexer::transaction_autofiller"; @@ -23,19 +29,22 @@ pub enum TransactionAutofillerError { IndexedValueVisitorError(#[from] IndexedValueError), #[error("Indexer error: {0}")] IndexerError(#[from] IndexerError), + #[error("Tokio join error: {0}")] + JoinError(#[from] JoinError), } -pub struct TransactionAutofiller { - substate_scanner: Arc>, +pub struct TransactionAutofiller { + substate_scanner: Arc>, } -impl TransactionAutofiller +impl TransactionAutofiller where - TEpochManager: EpochManagerReader, - TVnClient: ValidatorNodeClientFactory, - TAddr: NodeAddressable, + TEpochManager: EpochManagerReader + 'static, + TVnClient: ValidatorNodeClientFactory + 'static, + TAddr: NodeAddressable + 'static, + TSubstateCache: SubstateCache + 'static, { - pub fn new(substate_scanner: Arc>) -> Self { + pub fn new(substate_scanner: Arc>) -> Self { Self { substate_scanner } } @@ -48,52 +57,28 @@ where // note that the transaction hash will not change as the "involved_objects" is not part of the hash let mut autofilled_transaction = original_transaction; - // scan the network to fetch all the substates for each required input - // TODO: perform this loop concurrently by spawning a tokio task for each scan + // scan the network in parallel to fetch all the substates for each required input let mut input_shards = vec![]; let mut found_substates = HashMap::new(); - for r in &substate_requirements { - let scan_res = match r.version() { - Some(version) => { - let shard = ShardId::from_address(r.address(), version); - if autofilled_transaction.all_inputs_iter().any(|s| *s == shard) { - // Shard is already an input - continue; - } - - // if the client specified a version, we need to retrieve it - self.substate_scanner - .get_specific_substate_from_committee(r.address(), version) - .await? - }, - None => { - // if the client didn't specify a version, we fetch the latest one - self.substate_scanner.get_substate(r.address(), None).await? - }, - }; - - if let SubstateResult::Up { substate, address, .. } = scan_res { - info!( - target: LOG_TARGET, - "✏️Filling input substate {}:v{}", - address, - substate.version() - ); + let substate_scanner_ref = self.substate_scanner.clone(); + let transaction_ref = Arc::new(autofilled_transaction.clone()); + let mut handles = Vec::new(); + for requirement in &substate_requirements { + let handle = tokio::spawn(get_substate_requirement( + substate_scanner_ref.clone(), + transaction_ref.clone(), + requirement.clone(), + )); + handles.push(handle); + } + for handle in handles { + let res = handle.await??; + if let Some((address, substate)) = res { let shard = ShardId::from_address(&address, substate.version()); - if autofilled_transaction.all_inputs_iter().any(|s| *s == shard) { - // Shard is already an input (TODO: what a waste) - continue; - } input_shards.push(shard); found_substates.insert(address, substate); - } else { - warn!( - target: LOG_TARGET, - "🖋️ The substate for input requirement {} is not in UP status, skipping", r - ); } } - info!(target: LOG_TARGET, "✏️️ Found {} input substates", found_substates.len()); autofilled_transaction.filled_inputs_mut().extend(input_shards); @@ -103,7 +88,6 @@ where for _i in 0..MAX_RECURSION { // add all substates related to the inputs - // TODO: perform this loop concurrently by spawning a tokio task for each scan // TODO: we are going to only check the first level of recursion, for composability we may want to do it // recursively (with a recursion limit) let mut autofilled_inputs = vec![]; @@ -119,12 +103,16 @@ where .flatten() .filter(|s| !substate_requirements.iter().any(|r| r.address() == s)); + // we need to fetch (in parallel) the latest version of all the related substates + let mut handles = HashMap::new(); + let substate_scanner_ref = self.substate_scanner.clone(); for address in related_addresses { - info!(target: LOG_TARGET, "✏️️️ Found {} related substate", address); - - // we need to fetch the latest version of all the related substates - // note that if the version specified is "None", the scanner will fetch the latest version - let scan_res = self.substate_scanner.get_substate(&address, None).await?; + info!(target: LOG_TARGET, "✏️️️ Found {} related substates", address); + let handle = tokio::spawn(get_substate(substate_scanner_ref.clone(), address.clone(), None)); + handles.insert(address.clone(), handle); + } + for (address, handle) in handles { + let scan_res = handle.await??; if let SubstateResult::Up { substate, address, .. } = scan_res { info!( @@ -158,3 +146,70 @@ where Ok((autofilled_transaction, found_substates)) } } + +pub async fn get_substate_requirement( + substate_scanner: Arc>, + transaction: Arc, + req: SubstateRequirement, +) -> Result, IndexerError> +where + TEpochManager: EpochManagerReader, + TVnClient: ValidatorNodeClientFactory, + TAddr: NodeAddressable, + TSubstateCache: SubstateCache, +{ + let scan_res = match req.version() { + Some(version) => { + let shard = ShardId::from_address(req.address(), version); + if transaction.all_inputs_iter().any(|s| *s == shard) { + // Shard is already an input + return Ok(None); + } + + // if the client specified a version, we need to retrieve it + substate_scanner + .get_specific_substate_from_committee(req.address(), version) + .await? + }, + None => { + // if the client didn't specify a version, we fetch the latest one + substate_scanner.get_substate(req.address(), None).await? + }, + }; + + if let SubstateResult::Up { substate, address, .. } = &scan_res { + info!( + target: LOG_TARGET, + "Filling input substate {}:v{}", + address, + substate.version() + ); + let shard = ShardId::from_address(address, substate.version()); + if transaction.all_inputs_iter().any(|s| *s == shard) { + // Shard is already an input (TODO: what a waste) + return Ok(None); + } + + Ok(Some((address.clone(), substate.clone()))) + } else { + warn!( + target: LOG_TARGET, + "🖋️ The substate for input requirement {} is not in UP status, skipping", req + ); + Ok(None) + } +} + +pub async fn get_substate( + substate_scanner: Arc>, + substate_address: SubstateAddress, + version_hint: Option, +) -> Result +where + TEpochManager: EpochManagerReader, + TVnClient: ValidatorNodeClientFactory, + TAddr: NodeAddressable, + TSubstateCache: SubstateCache, +{ + substate_scanner.get_substate(&substate_address, version_hint).await +}