Skip to content

Commit

Permalink
fix(engine): simplify and improve state store performance (#1143)
Browse files Browse the repository at this point in the history
Description
---
Removes unnecessary locks in MemoryStateStore
Remove de/encoding from MemoryStateStore
Removed unused lmdb implementation and lmdb_zero dep

Motivation and Context
---
MemoryStateStore now uses a hashmap internally without locks. The engine
only requires read access to the provided inputs. Key and value types
are always SubstateId and Substate respectively, the generic en/decoding
to/from CBOR no longer necessary.

How Has This Been Tested?
---
Existing tests and manually

What process can a PR reviewer use to test or verify this change?
---
Execution time should be slightly reduced, exact gain is not
deteremined.

Breaking Changes
---

- [x] None
- [ ] Requires data directory to be deleted
- [ ] Other - Please specify
  • Loading branch information
sdbondi authored Sep 18, 2024
1 parent 1bbd7d5 commit 70fcc69
Show file tree
Hide file tree
Showing 21 changed files with 168 additions and 620 deletions.
14 changes: 0 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ members = [
"dan_layer/rpc_state_sync",
"dan_layer/state_store_sqlite",
"dan_layer/state_tree",
"dan_layer/storage_lmdb",
"dan_layer/storage_sqlite",
"dan_layer/storage",
"dan_layer/tari_bor",
Expand Down Expand Up @@ -193,7 +192,6 @@ libp2p = { git = "https://github.com/tari-project/rust-libp2p.git", rev = "0dccc
libsqlite3-sys = "0.25"
liquid = "0.26.4"
liquid-core = "0.26.4"
lmdb-zero = "0.4.4"
log = "0.4.20"
log4rs = "1.3"
mime_guess = "2.0.4"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use tari_dan_common_types::{
use tari_dan_engine::{
fees::{FeeModule, FeeTable},
runtime::{AuthParams, RuntimeModule},
state_store::{memory::MemoryStateStore, StateStoreError},
state_store::{memory::ReadOnlyMemoryStateStore, StateStoreError},
template::LoadedTemplate,
transaction::{TransactionError, TransactionProcessor},
};
Expand All @@ -33,7 +33,7 @@ pub trait TransactionExecutor {
fn execute(
&self,
transaction: Transaction,
state_store: MemoryStateStore,
state_store: ReadOnlyMemoryStateStore,
virtual_substates: VirtualSubstates,
) -> Result<ExecutionOutput, Self::Error>;
}
Expand Down Expand Up @@ -110,7 +110,7 @@ where TTemplateProvider: TemplateProvider<Template = LoadedTemplate>
fn execute(
&self,
transaction: Transaction,
state_store: MemoryStateStore,
state_store: ReadOnlyMemoryStateStore,
virtual_substates: VirtualSubstates,
) -> Result<ExecutionOutput, Self::Error> {
// Include signature public key badges for all transaction signers in the initial auth scope
Expand All @@ -129,7 +129,7 @@ where TTemplateProvider: TemplateProvider<Template = LoadedTemplate>

let processor = TransactionProcessor::new(
self.template_provider.clone(),
state_store.clone(),
state_store,
auth_params,
virtual_substates,
modules,
Expand Down
7 changes: 4 additions & 3 deletions applications/tari_indexer/src/dry_run/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,13 @@ where TSubstateCache: SubstateCache + 'static

let virtual_substates = self.get_virtual_substates(&transaction, epoch).await?;

let state_store = new_memory_store();
let mut state_store = new_memory_store();
state_store.set_many(found_substates)?;

// execute the payload in the WASM engine and return the result
let exec_output =
task::block_in_place(|| payload_processor.execute(transaction, state_store, virtual_substates))?;
let exec_output = task::block_in_place(|| {
payload_processor.execute(transaction, state_store.into_read_only(), virtual_substates)
})?;

Ok(exec_output.result)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use log::info;
use tari_consensus::traits::{BlockTransactionExecutor, BlockTransactionExecutorError};
use tari_dan_app_utilities::transaction_executor::TransactionExecutor;
use tari_dan_common_types::{Epoch, SubstateRequirement};
use tari_dan_engine::state_store::{memory::MemoryStateStore, new_memory_store, AtomicDb, StateWriter};
use tari_dan_engine::state_store::{memory::MemoryStateStore, new_memory_store, StateWriter};
use tari_dan_storage::{consensus_models::ExecutedTransaction, StateStore};
use tari_engine_types::{
substate::Substate,
Expand Down Expand Up @@ -36,22 +36,14 @@ where TExecutor: TransactionExecutor
}

fn add_substates_to_memory_db<'a, I: IntoIterator<Item = (&'a SubstateRequirement, &'a Substate)>>(
&self,
inputs: I,
out: &MemoryStateStore,
out: &mut MemoryStateStore,
) -> Result<(), BlockTransactionExecutorError> {
// TODO: pass the impl SubstateStore directly into the engine
let mut access = out
.write_access()
.map_err(|e| BlockTransactionExecutorError::StateStoreError(e.to_string()))?;
// TODO: pass the SubstateStore directly into the engine
for (id, substate) in inputs {
access
.set_state(id.substate_id(), substate)
out.set_state(id.substate_id().clone(), substate.clone())
.map_err(|e| BlockTransactionExecutorError::StateStoreError(e.to_string()))?;
}
access
.commit()
.map_err(|e| BlockTransactionExecutorError::StateStoreError(e.to_string()))?;

Ok(())
}
Expand Down Expand Up @@ -87,8 +79,8 @@ where
info!(target: LOG_TARGET, "Transaction {} executing. {} input(s)", id, resolved_inputs.len());

// Create a memory db with all the input substates, needed for the transaction execution
let state_db = new_memory_store();
self.add_substates_to_memory_db(resolved_inputs, &state_db)?;
let mut state_db = new_memory_store();
Self::add_substates_to_memory_db(resolved_inputs, &mut state_db)?;

let mut virtual_substates = VirtualSubstates::new();
virtual_substates.insert(
Expand All @@ -99,7 +91,7 @@ where
// Execute the transaction and get the result
let exec_output = self
.executor
.execute(transaction, state_db, virtual_substates)
.execute(transaction, state_db.into_read_only(), virtual_substates)
.map_err(|e| BlockTransactionExecutorError::ExecutionThreadFailure(e.to_string()))?;

// Generate the resolved inputs to set the specific version and required lock flag, as we know it after
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl DryRunTransactionProcessor {
transaction: Transaction,
) -> Result<ExecuteResult, DryRunTransactionProcessorError> {
// Resolve all local and foreign substates
let temp_state_store = new_memory_store();
let mut temp_state_store = new_memory_store();

let current_epoch = self.epoch_manager.current_epoch().await?;
let virtual_substates = self
Expand All @@ -124,8 +124,10 @@ impl DryRunTransactionProcessor {

// execute the payload in the WASM engine and return the result
let processor = self.payload_processor.clone();
let exec_output =
task::spawn_blocking(move || processor.execute(transaction, temp_state_store, virtual_substates)).await??;
let exec_output = task::spawn_blocking(move || {
processor.execute(transaction, temp_state_store.into_read_only(), virtual_substates)
})
.await??;
let result = exec_output.result;

let fees = &result.finalize.fee_receipt;
Expand Down
18 changes: 5 additions & 13 deletions dan_layer/consensus_tests/src/support/transaction_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{collections::HashMap, iter};

use tari_consensus::traits::{BlockTransactionExecutor, BlockTransactionExecutorError};
use tari_dan_common_types::{Epoch, LockIntent, SubstateRequirement, VersionedSubstateId};
use tari_dan_engine::state_store::{memory::MemoryStateStore, new_memory_store, AtomicDb, StateWriter};
use tari_dan_engine::state_store::{memory::MemoryStateStore, new_memory_store, StateWriter};
use tari_dan_storage::{
consensus_models::{ExecutedTransaction, VersionedSubstateIdLockIntent},
StateStore,
Expand All @@ -30,22 +30,14 @@ impl TestBlockTransactionProcessor {
}

fn add_substates_to_memory_db<'a, I: IntoIterator<Item = (&'a SubstateRequirement, &'a Substate)>>(
&self,
inputs: I,
out: &MemoryStateStore,
out: &mut MemoryStateStore,
) -> Result<(), BlockTransactionExecutorError> {
// TODO: pass the impl SubstateStore directly into the engine
let mut access = out
.write_access()
.map_err(|e| BlockTransactionExecutorError::StateStoreError(e.to_string()))?;
for (id, substate) in inputs {
access
.set_state(id.substate_id(), substate)
out.set_state(id.substate_id().clone(), substate.clone())
.map_err(|e| BlockTransactionExecutorError::StateStoreError(e.to_string()))?;
}
access
.commit()
.map_err(|e| BlockTransactionExecutorError::StateStoreError(e.to_string()))?;

Ok(())
}
Expand All @@ -72,8 +64,8 @@ impl<TStateStore: StateStore> BlockTransactionExecutor<TStateStore> for TestBloc
log::info!("Transaction {} executing. {} input(s)", id, resolved_inputs.len());

// Create a memory db with all the input substates, needed for the transaction execution
let state_db = new_memory_store();
self.add_substates_to_memory_db(resolved_inputs, &state_db)?;
let mut state_db = new_memory_store();
Self::add_substates_to_memory_db(resolved_inputs, &mut state_db)?;

let mut virtual_substates = VirtualSubstates::new();
virtual_substates.insert(
Expand Down
4 changes: 2 additions & 2 deletions dan_layer/engine/src/runtime/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ pub enum RuntimeError {
StateStoreError(#[from] StateStoreError),
#[error("Workspace error: {0}")]
WorkspaceError(#[from] WorkspaceError),
#[error("Substate not found with address '{address}'")]
SubstateNotFound { address: SubstateId },
#[error("Substate '{id}' not found")]
SubstateNotFound { id: SubstateId },
#[error("Root substate not found with address '{address}'")]
RootSubstateNotFound { address: SubstateId },
#[error("Referenced substate not found with address '{address}'")]
Expand Down
61 changes: 27 additions & 34 deletions dan_layer/engine/src/runtime/state_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
locking::{LockError, LockedSubstates},
RuntimeError,
},
state_store::{memory::MemoryStateStore, AtomicDb, StateReader},
state_store::{memory::ReadOnlyMemoryStateStore, StateReader},
};

#[derive(Debug, Clone)]
Expand All @@ -29,11 +29,11 @@ pub struct WorkingStateStore {
loaded_substates: HashMap<SubstateId, SubstateValue>,
locked_substates: LockedSubstates,

state_store: MemoryStateStore,
state_store: ReadOnlyMemoryStateStore,
}

impl WorkingStateStore {
pub fn new(state_store: MemoryStateStore) -> Self {
pub fn new(state_store: ReadOnlyMemoryStateStore) -> Self {
Self {
new_substates: IndexMap::new(),
loaded_substates: HashMap::new(),
Expand All @@ -44,9 +44,7 @@ impl WorkingStateStore {

pub fn try_lock(&mut self, address: &SubstateId, lock_flag: LockFlag) -> Result<LockId, RuntimeError> {
if !self.exists(address)? {
return Err(RuntimeError::SubstateNotFound {
address: address.clone(),
});
return Err(RuntimeError::SubstateNotFound { id: address.clone() });
}
let lock_id = self.locked_substates.try_lock(address, lock_flag)?;
self.load(address)?;
Expand Down Expand Up @@ -131,38 +129,35 @@ impl WorkingStateStore {
})
}

pub fn exists(&self, address: &SubstateId) -> Result<bool, RuntimeError> {
let exists = self.new_substates.contains_key(address) || self.loaded_substates.contains_key(address) || {
let tx = self.state_store.read_access()?;
tx.exists(address)?
};
pub fn exists(&self, id: &SubstateId) -> Result<bool, RuntimeError> {
let exists = self.new_substates.contains_key(id) ||
self.loaded_substates.contains_key(id) ||
self.state_store.exists(id)?;
Ok(exists)
}

pub fn insert(&mut self, address: SubstateId, value: SubstateValue) -> Result<(), RuntimeError> {
if self.exists(&address)? {
return Err(RuntimeError::DuplicateSubstate { address });
pub fn insert(&mut self, id: SubstateId, value: SubstateValue) -> Result<(), RuntimeError> {
if self.exists(&id)? {
return Err(RuntimeError::DuplicateSubstate { address: id });
}
self.new_substates.insert(address, value);
self.new_substates.insert(id, value);
Ok(())
}

fn load(&mut self, address: &SubstateId) -> Result<(), RuntimeError> {
if self.new_substates.contains_key(address) {
fn load(&mut self, id: &SubstateId) -> Result<(), RuntimeError> {
if self.new_substates.contains_key(id) {
return Ok(());
}
if self.loaded_substates.contains_key(address) {
if self.loaded_substates.contains_key(id) {
return Ok(());
}
let tx = self.state_store.read_access()?;
let substate =
tx.get_state::<_, Substate>(address)
.optional()?
.ok_or_else(|| RuntimeError::SubstateNotFound {
address: address.clone(),
})?;
let substate = substate.into_substate_value();
self.loaded_substates.insert(address.clone(), substate);
let substate = self
.state_store
.get_state(id)
.optional()?
.ok_or_else(|| RuntimeError::SubstateNotFound { id: id.clone() })?;
let substate = substate.substate_value().clone();
self.loaded_substates.insert(id.clone(), substate);
Ok(())
}

Expand All @@ -181,7 +176,7 @@ impl WorkingStateStore {
.map(|(addr, vault)| (addr.as_vault_id().unwrap(), vault.as_vault().unwrap()))
}

pub(super) fn state_store(&self) -> &MemoryStateStore {
pub(super) fn state_store(&self) -> &ReadOnlyMemoryStateStore {
&self.state_store
}

Expand All @@ -196,12 +191,10 @@ impl WorkingStateStore {
})
}

pub(super) fn get_unmodified_substate(&self, address: &SubstateId) -> Result<Substate, RuntimeError> {
let tx = self.state_store.read_access()?;
tx.get_state(address)
pub(super) fn get_unmodified_substate(&self, address: &SubstateId) -> Result<&Substate, RuntimeError> {
self.state_store
.get_state(address)
.optional()?
.ok_or_else(|| RuntimeError::SubstateNotFound {
address: address.clone(),
})
.ok_or_else(|| RuntimeError::SubstateNotFound { id: address.clone() })
}
}
4 changes: 2 additions & 2 deletions dan_layer/engine/src/runtime/tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use crate::{
workspace::Workspace,
RuntimeError,
},
state_store::memory::MemoryStateStore,
state_store::memory::ReadOnlyMemoryStateStore,
};

const LOG_TARGET: &str = "tari::dan::engine::runtime::state_tracker";
Expand All @@ -69,7 +69,7 @@ pub struct StateTracker {

impl StateTracker {
pub fn new(
state_store: MemoryStateStore,
state_store: ReadOnlyMemoryStateStore,
virtual_substates: VirtualSubstates,
initial_call_scope: CallScope,
transaction_hash: Hash,
Expand Down
4 changes: 2 additions & 2 deletions dan_layer/engine/src/runtime/working_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ use crate::{
RuntimeError,
TransactionCommitError,
},
state_store::memory::MemoryStateStore,
state_store::memory::ReadOnlyMemoryStateStore,
};

const LOG_TARGET: &str = "dan::engine::runtime::working_state";
Expand Down Expand Up @@ -93,7 +93,7 @@ pub(super) struct WorkingState {

impl WorkingState {
pub fn new(
state_store: MemoryStateStore,
state_store: ReadOnlyMemoryStateStore,
virtual_substates: VirtualSubstates,
initial_call_scope: CallScope,
transaction_hash: Hash,
Expand Down
Loading

0 comments on commit 70fcc69

Please sign in to comment.