diff --git a/applications/tari_validator_node/src/substate_resolver.rs b/applications/tari_validator_node/src/substate_resolver.rs index 856be9fb2e..db71469947 100644 --- a/applications/tari_validator_node/src/substate_resolver.rs +++ b/applications/tari_validator_node/src/substate_resolver.rs @@ -70,7 +70,7 @@ where if missing_substate_ids.contains(requested_input.substate_id()) { // TODO/NOTE: This assumes that consensus is up to date (i.e. doesnt need to sync, or catch up). We need // to check the if the substate is in our shard range. The best action then may be to - // let consensus handle it (deferred) which is what happens currently anyway. + // let consensus handle it which is what happens currently anyway. missing_substates.insert(requested_input); // Not a local substate, so we will need to fetch it remotely continue; diff --git a/dan_layer/consensus/src/hotstuff/block_change_set.rs b/dan_layer/consensus/src/hotstuff/block_change_set.rs index 74b48fbb6c..fe51a3d3d3 100644 --- a/dan_layer/consensus/src/hotstuff/block_change_set.rs +++ b/dan_layer/consensus/src/hotstuff/block_change_set.rs @@ -20,6 +20,7 @@ use tari_dan_storage::{ TransactionPoolRecord, TransactionPoolStage, TransactionPoolStatusUpdate, + VersionedStateHashTreeDiff, }, StateStoreReadTransaction, StateStoreWriteTransaction, @@ -42,7 +43,7 @@ pub struct ProposedBlockChangeSet { block: LeafBlock, quorum_decision: Option, block_diff: Vec, - state_tree_diffs: IndexMap, + state_tree_diffs: IndexMap, substate_locks: IndexMap>, transaction_changes: IndexMap, } @@ -66,7 +67,7 @@ impl ProposedBlockChangeSet { self } - pub fn set_state_tree_diffs(&mut self, diffs: IndexMap) -> &mut Self { + pub fn set_state_tree_diffs(&mut self, diffs: IndexMap) -> &mut Self { self.state_tree_diffs = diffs; self } diff --git a/dan_layer/consensus/src/hotstuff/common.rs b/dan_layer/consensus/src/hotstuff/common.rs index bdedead808..ac153e2fd7 100644 --- a/dan_layer/consensus/src/hotstuff/common.rs +++ b/dan_layer/consensus/src/hotstuff/common.rs @@ -9,7 +9,14 @@ use tari_common::configuration::Network; use tari_common_types::types::FixedHash; use tari_dan_common_types::{committee::Committee, shard::Shard, Epoch, NodeAddressable, NodeHeight, ShardGroup}; use tari_dan_storage::{ - consensus_models::{Block, LeafBlock, PendingStateTreeDiff, QuorumCertificate, SubstateChange}, + consensus_models::{ + Block, + LeafBlock, + PendingStateTreeDiff, + QuorumCertificate, + SubstateChange, + VersionedStateHashTreeDiff, + }, StateStoreReadTransaction, }; use tari_state_tree::{Hash, StateHashTreeDiff, StateTreeError}; @@ -165,9 +172,9 @@ fn with_dummy_blocks( pub fn calculate_state_merkle_root( tx: &TTx, local_shard_group: ShardGroup, - pending_tree_diffs: HashMap, + pending_tree_diffs: HashMap>, changes: &[SubstateChange], -) -> Result<(Hash, IndexMap), StateTreeError> { +) -> Result<(Hash, IndexMap), StateTreeError> { let mut change_map = IndexMap::with_capacity(changes.len()); changes @@ -179,5 +186,5 @@ pub fn calculate_state_merkle_root( let mut sharded_tree = ShardedStateTree::new(tx).with_pending_diffs(pending_tree_diffs); let state_root = sharded_tree.put_substate_tree_changes(change_map)?; - Ok((state_root, sharded_tree.into_tree_diffs())) + Ok((state_root, sharded_tree.into_versioned_tree_diffs())) } diff --git a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs index 3b6a92ca24..b46aa6d536 100644 --- a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs +++ b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs @@ -267,7 +267,7 @@ where TConsensusSpec: ConsensusSpec Command::LocalOnly(t) => { info!( target: LOG_TARGET, - "👨‍🔧 LOCAL-ONLY: Executing deferred transaction {} in block {}", + "👨‍🔧 LOCAL-ONLY: Executing transaction {} in block {}", tx_rec.transaction_id(), block, ); @@ -433,7 +433,7 @@ where TConsensusSpec: ConsensusSpec Command::Prepare(t) => { info!( target: LOG_TARGET, - "👨‍🔧 PREPARE: Executing deferred transaction {} in block {}", + "👨‍🔧 PREPARE: Executing transaction {} in block {}", tx_rec.transaction_id(), block, ); diff --git a/dan_layer/consensus/src/hotstuff/pacemaker.rs b/dan_layer/consensus/src/hotstuff/pacemaker.rs index b3c6b24714..693d4830cc 100644 --- a/dan_layer/consensus/src/hotstuff/pacemaker.rs +++ b/dan_layer/consensus/src/hotstuff/pacemaker.rs @@ -141,8 +141,8 @@ impl PaceMaker { let delta = self.delta_time(); leader_timeout.as_mut().reset(tokio::time::Instant::now() + delta); info!(target: LOG_TARGET, "⚠️ Leader timeout! Current view: {}, Delta: {:.2?}", self.current_view, delta); - self.current_view.set_next_height(); - on_leader_timeout.leader_timed_out(self.current_view.get_height()); + // self.current_view.set_next_height(); + // on_leader_timeout.leader_timed_out(self.current_view.get_height()); }, } diff --git a/dan_layer/consensus/src/hotstuff/substate_store/sharded_state_tree.rs b/dan_layer/consensus/src/hotstuff/substate_store/sharded_state_tree.rs index d4e641d420..2fe59d2ae7 100644 --- a/dan_layer/consensus/src/hotstuff/substate_store/sharded_state_tree.rs +++ b/dan_layer/consensus/src/hotstuff/substate_store/sharded_state_tree.rs @@ -6,7 +6,11 @@ use std::collections::HashMap; use indexmap::IndexMap; use log::debug; use tari_dan_common_types::shard::Shard; -use tari_dan_storage::{consensus_models::PendingStateTreeDiff, StateStoreReadTransaction, StateStoreWriteTransaction}; +use tari_dan_storage::{ + consensus_models::{PendingStateTreeDiff, VersionedStateHashTreeDiff}, + StateStoreReadTransaction, + StateStoreWriteTransaction, +}; use tari_state_tree::{ Hash, JmtStorageError, @@ -26,7 +30,7 @@ const LOG_TARGET: &str = "tari::dan::consensus::sharded_state_tree"; pub struct ShardedStateTree { tx: TTx, pending_diffs: HashMap>, - current_tree_diffs: IndexMap, + current_tree_diffs: IndexMap, } impl ShardedStateTree { @@ -61,7 +65,7 @@ impl ShardedStateTree<&TTx> { Ok(maybe_version) } - pub fn into_tree_diffs(self) -> IndexMap { + pub fn into_versioned_tree_diffs(self) -> IndexMap { self.current_tree_diffs } @@ -72,15 +76,18 @@ impl ShardedStateTree<&TTx> { let mut state_roots = Vec::with_capacity(changes.len()); for (shard, changes) in changes { let current_version = self.get_current_version(shard)?; - let next_version = current_version.map(|v| v + 1).unwrap_or(0); + let next_version = current_version.unwrap_or(0) + 1; // Read only state store that is scoped to the shard let scoped_store = ShardScopedTreeStoreReader::new(self.tx, shard); // Staged store that tracks changes to the state tree let mut store = StagedTreeStore::new(&scoped_store); // Apply pending (not yet committed) diffs to the staged store - if let Some(diff) = self.pending_diffs.remove(&shard) { - store.apply_pending_diff(diff.diff); + if let Some(diffs) = self.pending_diffs.remove(&shard) { + debug!(target: LOG_TARGET, "Applying {num_diffs} pending diff(s) to shard {shard} (version={version})", num_diffs = diffs.len(), version = diffs.last().map(|d| d.version).unwrap_or(0)); + for diff in diffs { + store.apply_pending_diff(diff.diff); + } } // Apply state updates to the state tree that is backed by the staged shard-scoped store @@ -88,7 +95,8 @@ impl ShardedStateTree<&TTx> { debug!(target: LOG_TARGET, "v{next_version} contains {} tree change(s) for shard {shard}", changes.len()); let state_root = state_tree.put_substate_changes(current_version, next_version, changes)?; state_roots.push(state_root); - self.current_tree_diffs.insert(shard, store.into_diff()); + self.current_tree_diffs + .insert(shard, VersionedStateHashTreeDiff::new(next_version, store.into_diff())); } // TODO: @@ -100,25 +108,27 @@ impl ShardedStateTree<&TTx> { } impl ShardedStateTree<&mut TTx> { - pub fn commit_diff(&mut self, diffs: IndexMap) -> Result<(), StateTreeError> { - for (shard, pending_diff) in diffs { - let diff = pending_diff.diff; - let mut store = ShardScopedTreeStoreWriter::new(self.tx, shard); - - for stale_tree_node in diff.stale_tree_nodes { - log::debug!( - "(shard={shard}) Recording stale tree node: {}", - stale_tree_node.as_node_key() - ); - store.record_stale_tree_node(stale_tree_node)?; - } - - for (key, node) in diff.new_nodes { - log::debug!("(shard={shard}) Inserting node: {}", key); - store.insert_node(key, node)?; + pub fn commit_diff(&mut self, diffs: IndexMap>) -> Result<(), StateTreeError> { + for (shard, pending_diffs) in diffs { + for pending_diff in pending_diffs { + let diff = pending_diff.diff; + let mut store = ShardScopedTreeStoreWriter::new(self.tx, shard); + + for stale_tree_node in diff.stale_tree_nodes { + debug!( + "(shard={shard}) Recording stale tree node: {}", + stale_tree_node.as_node_key() + ); + store.record_stale_tree_node(stale_tree_node)?; + } + + for (key, node) in diff.new_nodes { + debug!("(shard={shard}) Inserting node: {}", key); + store.insert_node(key, node)?; + } + + store.increment_version()?; } - - store.increment_version()?; } Ok(()) diff --git a/dan_layer/consensus_tests/src/consensus.rs b/dan_layer/consensus_tests/src/consensus.rs index b061c59b5d..535eb65f8f 100644 --- a/dan_layer/consensus_tests/src/consensus.rs +++ b/dan_layer/consensus_tests/src/consensus.rs @@ -207,8 +207,9 @@ async fn node_requests_missing_transaction_from_local_leader() { async fn multi_validator_propose_blocks_with_new_transactions_until_all_committed() { setup_logger(); let mut test = Test::builder() + .disable_timeout() .debug_sql("/tmp/test{}.db") - .add_committee(0, vec!["1"])//, "2", "3", "4", "5"]) + .add_committee(0, vec!["1", "2", "3", "4", "5"]) .start() .await; let mut remaining_txs = 10u32; @@ -761,7 +762,7 @@ async fn single_shard_unversioned_inputs() { let mut test = Test::builder().add_committee(0, vec!["1", "2"]).start().await; // First get transaction in the mempool let inputs = test.create_substates_on_all_vns(1); - // Remove versions from inputs to allow deferred transactions + // Remove versions from inputs to test substate version resolution let unversioned_inputs = inputs .iter() .map(|i| SubstateRequirement::new(i.substate_id.clone(), None)); diff --git a/dan_layer/consensus_tests/src/support/epoch_manager.rs b/dan_layer/consensus_tests/src/support/epoch_manager.rs index 1a0089cb9c..7495a2b25e 100644 --- a/dan_layer/consensus_tests/src/support/epoch_manager.rs +++ b/dan_layer/consensus_tests/src/support/epoch_manager.rs @@ -16,7 +16,7 @@ use tari_dan_storage::global::models::ValidatorNode; use tari_epoch_manager::{EpochManagerError, EpochManagerEvent, EpochManagerReader}; use tokio::sync::{broadcast, Mutex, MutexGuard}; -use crate::support::{address::TestAddress, helpers::random_substate_in_shard, TEST_NUM_PRESHARDS}; +use crate::support::{address::TestAddress, helpers::random_substate_in_shard_group, TEST_NUM_PRESHARDS}; #[derive(Debug, Clone)] pub struct TestEpochManager { @@ -84,7 +84,7 @@ impl TestEpochManager { let mut state = self.state_lock().await; for (shard_group, committee) in committees { for (address, pk) in &committee.members { - let substate_address = random_substate_in_shard(shard_group, TEST_NUM_PRESHARDS); + let substate_address = random_substate_in_shard_group(shard_group, TEST_NUM_PRESHARDS); state.validator_shards.insert( address.clone(), ( diff --git a/dan_layer/consensus_tests/src/support/helpers.rs b/dan_layer/consensus_tests/src/support/helpers.rs index 91c3af675d..574e3c7f9e 100644 --- a/dan_layer/consensus_tests/src/support/helpers.rs +++ b/dan_layer/consensus_tests/src/support/helpers.rs @@ -11,9 +11,10 @@ use tari_transaction::VersionedSubstateId; use crate::support::TestAddress; -pub(crate) fn random_substate_in_shard(shard: ShardGroup, num_shards: NumPreshards) -> VersionedSubstateId { - let range = shard.to_substate_address_range(num_shards); +pub(crate) fn random_substate_in_shard_group(shard_group: ShardGroup, num_shards: NumPreshards) -> VersionedSubstateId { + let range = shard_group.to_substate_address_range(num_shards); let size = range.end().to_u256() - range.start().to_u256(); + let a = OsRng.gen_range(*range.start().array()..*range.end().array()); let middlish = range.start().to_u256() + size / 2; let entity_id = EntityId::new(copy_fixed(&middlish.to_be_bytes()[0..EntityId::LENGTH])); let rand_bytes = OsRng.gen::<[u8; ComponentKey::LENGTH]>(); diff --git a/dan_layer/consensus_tests/src/support/transaction.rs b/dan_layer/consensus_tests/src/support/transaction.rs index 3246301b25..7cddc7d2fe 100644 --- a/dan_layer/consensus_tests/src/support/transaction.rs +++ b/dan_layer/consensus_tests/src/support/transaction.rs @@ -21,7 +21,7 @@ use tari_engine_types::{ }; use tari_transaction::{Transaction, TransactionId, VersionedSubstateId}; -use crate::support::{committee_number_to_shard_group, helpers::random_substate_in_shard, TEST_NUM_PRESHARDS}; +use crate::support::{committee_number_to_shard_group, helpers::random_substate_in_shard_group, TEST_NUM_PRESHARDS}; pub fn build_transaction_from( tx: Transaction, @@ -111,10 +111,10 @@ pub fn build_transaction( // We create these outputs so that the test VNs dont have to have any UP substates // Equal potion of shards to each committee let outputs = (0..num_committees) - .flat_map(|shard| { + .flat_map(|group_no| { iter::repeat_with(move || { - random_substate_in_shard( - committee_number_to_shard_group(TEST_NUM_PRESHARDS, shard, num_committees), + random_substate_in_shard_group( + committee_number_to_shard_group(TEST_NUM_PRESHARDS, group_no, num_committees), TEST_NUM_PRESHARDS, ) }) diff --git a/dan_layer/state_store_sqlite/src/writer.rs b/dan_layer/state_store_sqlite/src/writer.rs index cde228450b..7f3c1107b9 100644 --- a/dan_layer/state_store_sqlite/src/writer.rs +++ b/dan_layer/state_store_sqlite/src/writer.rs @@ -45,6 +45,7 @@ use tari_dan_storage::{ TransactionPoolStage, TransactionPoolStatusUpdate, TransactionRecord, + VersionedStateHashTreeDiff, Vote, }, StateStoreReadTransaction, @@ -1451,11 +1452,12 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta fn pending_state_tree_diffs_remove_by_block( &mut self, block_id: &BlockId, - ) -> Result, StorageError> { + ) -> Result>, StorageError> { use crate::schema::pending_state_tree_diffs; let diff_recs = pending_state_tree_diffs::table .filter(pending_state_tree_diffs::block_id.eq(serialize_hex(block_id))) + .order_by(pending_state_tree_diffs::block_height.asc()) .get_results::(self.connection()) .map_err(|e| SqliteStorageError::DieselError { operation: "pending_state_tree_diffs_remove_by_block", @@ -1474,10 +1476,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta for diff in diff_recs { let shard = Shard::from(diff.shard as u32); let diff = PendingStateTreeDiff::try_from(diff)?; - diffs - .entry(shard) - .or_insert_with(PendingStateTreeDiff::default) - .merge(diff); + diffs.entry(shard).or_insert_with(Vec::new).push(diff); } Ok(diffs) @@ -1487,28 +1486,10 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta &mut self, block_id: BlockId, shard: Shard, - diff: StateHashTreeDiff, + diff: VersionedStateHashTreeDiff, ) -> Result<(), StorageError> { use crate::schema::{blocks, pending_state_tree_diffs}; - let version = self.state_tree_versions_get_latest(shard)?; - - // Fetch the number of pending between this block and the current commit block - let commit_block = self.get_commit_block_id()?; - let block_ids = self.get_block_ids_that_change_state_between(&commit_block, &block_id)?; - - let num_pending = pending_state_tree_diffs::table - .select(count(pending_state_tree_diffs::id)) - .filter(pending_state_tree_diffs::block_id.eq_any(block_ids)) - .filter(pending_state_tree_diffs::shard.eq(shard.as_u32() as i32)) - .first::(self.connection()) - .map_err(|e| SqliteStorageError::DieselError { - operation: "pending_state_tree_diffs_insert (count)", - source: e, - })?; - - let next_version = version.unwrap_or(0) + (num_pending as u64); - let insert = ( pending_state_tree_diffs::block_id.eq(serialize_hex(block_id)), pending_state_tree_diffs::shard.eq(shard.as_u32() as i32), @@ -1517,8 +1498,8 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta .filter(blocks::block_id.eq(serialize_hex(block_id))) .single_value() .assume_not_null()), - pending_state_tree_diffs::version.eq(next_version as i64), - pending_state_tree_diffs::diff_json.eq(serialize_json(&diff)?), + pending_state_tree_diffs::version.eq(diff.version as i64), + pending_state_tree_diffs::diff_json.eq(serialize_json(&diff.diff)?), ); diesel::insert_into(pending_state_tree_diffs::table) @@ -1566,7 +1547,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta use crate::schema::state_tree; let key = node.as_node_key(); - diesel::update(state_tree::table) + let num_effected = diesel::update(state_tree::table) .filter(state_tree::shard.eq(shard.as_u32() as i32)) .filter(state_tree::key.eq(key.to_string())) .set(state_tree::is_stale.eq(true)) @@ -1576,6 +1557,13 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta source: e, })?; + if num_effected == 0 { + return Err(StorageError::NotFound { + item: "state_tree_node".to_string(), + key: key.to_string(), + }); + } + Ok(()) } @@ -1584,7 +1572,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta let values = ( state_tree_shard_versions::shard.eq(shard.as_u32() as i32), - state_tree_shard_versions::version.eq(0), + state_tree_shard_versions::version.eq(1), ); diesel::insert_into(state_tree_shard_versions::table) diff --git a/dan_layer/state_tree/src/jellyfish/types.rs b/dan_layer/state_tree/src/jellyfish/types.rs index b51284281a..7ebd6914f8 100644 --- a/dan_layer/state_tree/src/jellyfish/types.rs +++ b/dan_layer/state_tree/src/jellyfish/types.rs @@ -1243,6 +1243,9 @@ pub enum JmtStorageError { #[error("Unexpected error: {0}")] UnexpectedError(String), + + #[error("Attempted to insert node {0} that already exists")] + Conflict(NodeKey), } impl IsNotFoundError for JmtStorageError { diff --git a/dan_layer/state_tree/src/staged_store.rs b/dan_layer/state_tree/src/staged_store.rs index 867603b68c..084bce300d 100644 --- a/dan_layer/state_tree/src/staged_store.rs +++ b/dan_layer/state_tree/src/staged_store.rs @@ -1,13 +1,10 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -// Copyright 2024 The Tari Project -// SPDX-License-Identifier: BSD-3-Clause -// Copyright 2024 The Tari Project -// SPDX-License-Identifier: BSD-3-Clause - use std::collections::{HashMap, VecDeque}; +use log::debug; + use crate::{ JmtStorageError, Node, @@ -19,6 +16,8 @@ use crate::{ Version, }; +const LOG_TARGET: &str = "tari::dan::consensus::sharded_state_tree"; + pub struct StagedTreeStore<'s, S> { readable_store: &'s S, preceding_pending_state: HashMap>, @@ -37,11 +36,17 @@ impl<'s, S: TreeStoreReader> StagedTreeStore<'s, S> { } pub fn apply_pending_diff(&mut self, diff: StateHashTreeDiff) { + self.preceding_pending_state.reserve(diff.new_nodes.len()); for (key, node) in diff.new_nodes { + debug!(target: LOG_TARGET, "PENDING INSERT: node {}", key); self.preceding_pending_state.insert(key, node); } + for stale in diff.stale_tree_nodes { - self.preceding_pending_state.remove(stale.as_node_key()); + debug!(target: LOG_TARGET, "PENDING DELETE: node {}", stale.as_node_key()); + if self.preceding_pending_state.remove(stale.as_node_key()).is_some() { + debug!(target: LOG_TARGET, "PENDING DELETE: node {} removed", stale.as_node_key()); + } } } @@ -68,7 +73,9 @@ impl<'s, S: TreeStoreReader> TreeStoreReader for StagedTreeSto impl<'s, S> TreeStoreWriter for StagedTreeStore<'s, S> { fn insert_node(&mut self, key: NodeKey, node: Node) -> Result<(), JmtStorageError> { - self.new_tree_nodes.insert(key, node); + if self.new_tree_nodes.insert(key.clone(), node).is_some() { + return Err(JmtStorageError::Conflict(key)); + } Ok(()) } diff --git a/dan_layer/state_tree/src/tree.rs b/dan_layer/state_tree/src/tree.rs index 6beba3268d..623513ef48 100644 --- a/dan_layer/state_tree/src/tree.rs +++ b/dan_layer/state_tree/src/tree.rs @@ -130,11 +130,6 @@ impl StateHashTreeDiff { stale_tree_nodes: Vec::new(), } } - - pub fn merge(&mut self, other: Self) { - self.new_nodes.extend(other.new_nodes); - self.stale_tree_nodes.extend(other.stale_tree_nodes); - } } impl From> for StateHashTreeDiff { diff --git a/dan_layer/storage/src/consensus_models/state_tree_diff.rs b/dan_layer/storage/src/consensus_models/state_tree_diff.rs index 4c06825cac..59d55dc849 100644 --- a/dan_layer/storage/src/consensus_models/state_tree_diff.rs +++ b/dan_layer/storage/src/consensus_models/state_tree_diff.rs @@ -22,10 +22,6 @@ impl PendingStateTreeDiff { pub fn load(version: Version, diff: StateHashTreeDiff) -> Self { Self { version, diff } } - - pub fn merge(&mut self, other: Self) { - self.diff.merge(other.diff); - } } impl PendingStateTreeDiff { @@ -40,7 +36,7 @@ impl PendingStateTreeDiff { tx.pending_state_tree_diffs_get_all_up_to_commit_block(block_id) } - pub fn remove_by_block(tx: &mut TTx, block_id: &BlockId) -> Result, StorageError> + pub fn remove_by_block(tx: &mut TTx, block_id: &BlockId) -> Result>, StorageError> where TTx: Deref + StateStoreWriteTransaction, TTx::Target: StateStoreReadTransaction, @@ -52,7 +48,7 @@ impl PendingStateTreeDiff { tx: &mut TTx, block_id: BlockId, shard: Shard, - diff: StateHashTreeDiff, + diff: VersionedStateHashTreeDiff, ) -> Result<(), StorageError> where TTx: Deref + StateStoreWriteTransaction, @@ -67,3 +63,15 @@ impl PendingStateTreeDiff { Ok(()) } } + +#[derive(Debug, Clone)] +pub struct VersionedStateHashTreeDiff { + pub version: Version, + pub diff: StateHashTreeDiff, +} + +impl VersionedStateHashTreeDiff { + pub fn new(version: Version, diff: StateHashTreeDiff) -> Self { + Self { version, diff } + } +} diff --git a/dan_layer/storage/src/state_store/mod.rs b/dan_layer/storage/src/state_store/mod.rs index 52e1524c3c..fc3fb20be5 100644 --- a/dan_layer/storage/src/state_store/mod.rs +++ b/dan_layer/storage/src/state_store/mod.rs @@ -48,6 +48,7 @@ use crate::{ TransactionPoolStage, TransactionPoolStatusUpdate, TransactionRecord, + VersionedStateHashTreeDiff, Vote, }, StorageError, @@ -280,7 +281,7 @@ pub trait StateStoreReadTransaction: Sized { fn pending_state_tree_diffs_get_all_up_to_commit_block( &self, block_id: &BlockId, - ) -> Result, StorageError>; + ) -> Result>, StorageError>; fn state_transitions_get_n_after( &self, @@ -433,12 +434,12 @@ pub trait StateStoreWriteTransaction { &mut self, block_id: BlockId, shard: Shard, - diff: StateHashTreeDiff, + diff: VersionedStateHashTreeDiff, ) -> Result<(), StorageError>; fn pending_state_tree_diffs_remove_by_block( &mut self, block_id: &BlockId, - ) -> Result, StorageError>; + ) -> Result>, StorageError>; //---------------------------------- State tree --------------------------------------------// fn state_tree_nodes_insert(&mut self, shard: Shard, key: NodeKey, node: Node) -> Result<(), StorageError>;