From f70287a438bde62f2025007a7be42c2f38296ce8 Mon Sep 17 00:00:00 2001 From: Stan Bondi Date: Mon, 29 Jul 2024 17:49:49 +0200 Subject: [PATCH] wip --- dan_layer/common_types/src/shard_group.rs | 8 ++ .../common_types/src/substate_address.rs | 78 ++++++++++++++----- .../src/hotstuff/block_change_set.rs | 2 +- dan_layer/consensus/src/hotstuff/common.rs | 19 ++--- .../consensus/src/hotstuff/on_propose.rs | 2 - .../hotstuff/on_receive_foreign_proposal.rs | 2 +- .../src/hotstuff/on_receive_local_proposal.rs | 1 + .../substate_store/sharded_state_tree.rs | 40 +++++----- dan_layer/consensus_tests/src/consensus.rs | 9 ++- .../consensus_tests/src/substate_store.rs | 2 +- .../src/support/epoch_manager.rs | 11 +-- .../consensus_tests/src/support/harness.rs | 44 +++++++---- .../src/support/transaction.rs | 2 +- .../src/support/validator/builder.rs | 2 +- .../src/support/validator/instance.rs | 2 +- .../up.sql | 4 +- dan_layer/state_store_sqlite/src/error.rs | 4 - dan_layer/state_store_sqlite/src/reader.rs | 26 +++++-- .../src/sql_models/block.rs | 2 +- .../src/sql_models/bookkeeping.rs | 2 +- .../src/sql_models/pending_state_tree_diff.rs | 3 +- .../src/sql_models/state_transition.rs | 9 +-- .../state_store_sqlite/src/tree_store.rs | 70 ----------------- dan_layer/state_store_sqlite/src/writer.rs | 15 ++-- dan_layer/state_tree/Cargo.toml | 1 + dan_layer/state_tree/src/jellyfish/tree.rs | 2 +- dan_layer/state_tree/src/jellyfish/types.rs | 15 ++-- dan_layer/state_tree/src/tree.rs | 2 +- .../foreign_receive_counters.rs | 14 ++-- .../src/consensus_models/state_tree_diff.rs | 9 ++- 30 files changed, 204 insertions(+), 198 deletions(-) delete mode 100644 dan_layer/state_store_sqlite/src/tree_store.rs diff --git a/dan_layer/common_types/src/shard_group.rs b/dan_layer/common_types/src/shard_group.rs index fc08b550ff..e5e1bbd4de 100644 --- a/dan_layer/common_types/src/shard_group.rs +++ b/dan_layer/common_types/src/shard_group.rs @@ -138,4 +138,12 @@ mod tests { assert_eq!(ShardGroup::decode_from_u32(0xFFFF + 1), None); assert_eq!(ShardGroup::decode_from_u32(u32::MAX), None); } + + #[test] + fn to_substate_address_range() { + let sg = ShardGroup::new(0, 63); + let range = sg.to_substate_address_range(NumPreshards::SixtyFour); + assert_eq!(*range.start(), SubstateAddress::zero()); + assert_eq!(*range.end(), SubstateAddress::max()); + } } diff --git a/dan_layer/common_types/src/substate_address.rs b/dan_layer/common_types/src/substate_address.rs index a94960f3ff..c4b799c241 100644 --- a/dan_layer/common_types/src/substate_address.rs +++ b/dan_layer/common_types/src/substate_address.rs @@ -186,25 +186,38 @@ impl SubstateAddress { pub fn to_shard_group(&self, num_shards: NumPreshards, num_committees: u32) -> ShardGroup { // number of committees can never exceed number of shards let num_committees = num_committees.min(num_shards.as_u32()); - if num_committees == 1 { + if num_committees <= 1 { return ShardGroup::new(Shard::zero(), Shard::from(num_shards.as_u32() - 1)); } - let shards_per_committee = num_shards.as_u32().div_ceil(num_committees); + let shards_per_committee = num_shards.as_u32() / num_committees; + let mut shards_per_committee_rem = num_shards.as_u32() % num_committees; let shard = self.to_shard(num_shards).as_u32(); - // This removes the remainder from the shard to round down to the start of the committee space - let start = (shard / shards_per_committee) * shards_per_committee; - let end = start + shards_per_committee; + let mut start = 0u32; + let mut end = shards_per_committee; + if shards_per_committee_rem > 0 { + end += 1; + } + loop { + if end > shard { + break; + } + start += shards_per_committee; + if shards_per_committee_rem > 0 { + start += 1; + shards_per_committee_rem -= 1; + } - ShardGroup::new(start, end.saturating_sub(1)) - } + end = start + shards_per_committee; + if shards_per_committee_rem > 0 { + end += 1; + } + } - // pub fn to_address_range(&self, num_shards: NumPreshards) -> RangeInclusive { - // let shard = self.to_shard(num_shards); - // shard.to_substate_address_range(num_shards) - // } + ShardGroup::new(start, end - 1) + } } impl From<[u8; 32]> for SubstateAddress { @@ -575,42 +588,67 @@ mod tests { fn it_returns_the_correct_shard_group_for_odd_num_committees() { // All shard groups except the last have 3 shards each + let group = address_at(0, 64).to_shard_group(NumPreshards::SixtyFour, 3); + // First shard group gets an extra shard to cover the remainder + assert_eq!(group.as_range(), Shard::from(0)..=Shard::from(21)); + assert_eq!(group.len(), 22); + let group = address_at(31, 64).to_shard_group(NumPreshards::SixtyFour, 3); + assert_eq!(group.as_range(), Shard::from(22)..=Shard::from(42)); + assert_eq!(group.len(), 21); + let group = address_at(50, 64).to_shard_group(NumPreshards::SixtyFour, 3); + assert_eq!(group.as_range(), Shard::from(43)..=Shard::from(63)); + assert_eq!(group.len(), 21); + + let group = address_at(3, 64).to_shard_group(NumPreshards::SixtyFour, 7); + assert_eq!(group.as_range(), Shard::from(0)..=Shard::from(9)); + assert_eq!(group.len(), 10); + let group = address_at(11, 64).to_shard_group(NumPreshards::SixtyFour, 7); + assert_eq!(group.as_range(), Shard::from(10)..=Shard::from(18)); + assert_eq!(group.len(), 9); + let group = address_at(22, 64).to_shard_group(NumPreshards::SixtyFour, 7); + assert_eq!(group.as_range(), Shard::from(19)..=Shard::from(27)); + assert_eq!(group.len(), 9); + let group = address_at(60, 64).to_shard_group(NumPreshards::SixtyFour, 7); + assert_eq!(group.as_range(), Shard::from(55)..=Shard::from(63)); + assert_eq!(group.len(), 9); + let group = address_at(64, 64).to_shard_group(NumPreshards::SixtyFour, 7); + assert_eq!(group.as_range(), Shard::from(55)..=Shard::from(63)); + assert_eq!(group.len(), 9); let group = SubstateAddress::zero().to_shard_group(NumPreshards::Eight, 3); assert_eq!(group.as_range(), Shard::from(0)..=Shard::from(2)); let group = address_at(1, 8).to_shard_group(NumPreshards::Eight, 3); assert_eq!(group.as_range(), Shard::from(0)..=Shard::from(2)); - let group = address_at(2, 8).to_shard_group(NumPreshards::Eight, 3); + let group = address_at(1, 8).to_shard_group(NumPreshards::Eight, 3); assert_eq!(group.as_range(), Shard::from(0)..=Shard::from(2)); - // + let group = address_at(3, 8).to_shard_group(NumPreshards::Eight, 3); assert_eq!(group.as_range(), Shard::from(3)..=Shard::from(5)); - // + let group = address_at(4, 8).to_shard_group(NumPreshards::Eight, 3); assert_eq!(group.as_range(), Shard::from(3)..=Shard::from(5)); - // let group = address_at(5, 8).to_shard_group(NumPreshards::Eight, 3); assert_eq!(group.as_range(), Shard::from(3)..=Shard::from(5)); // let group = address_at(6, 8).to_shard_group(NumPreshards::Eight, 3); - assert_eq!(group.as_range(), Shard::from(6)..=Shard::from(8)); + assert_eq!(group.as_range(), Shard::from(6)..=Shard::from(7)); let group = address_at(7, 8).to_shard_group(NumPreshards::Eight, 3); - assert_eq!(group.as_range(), Shard::from(6)..=Shard::from(8)); + assert_eq!(group.as_range(), Shard::from(6)..=Shard::from(7)); let group = address_at(8, 8).to_shard_group(NumPreshards::Eight, 3); - assert_eq!(group.as_range(), Shard::from(6)..=Shard::from(8)); + assert_eq!(group.as_range(), Shard::from(6)..=Shard::from(7)); // Committee = 5 let group = address_at(4, 8).to_shard_group(NumPreshards::Eight, 5); assert_eq!(group.as_range(), Shard::from(4)..=Shard::from(5)); let group = address_at(7, 8).to_shard_group(NumPreshards::Eight, 5); - assert_eq!(group.as_range(), Shard::from(6)..=Shard::from(7)); + assert_eq!(group.as_range(), Shard::from(7)..=Shard::from(7)); let group = address_at(8, 8).to_shard_group(NumPreshards::Eight, 5); - assert_eq!(group.as_range(), Shard::from(6)..=Shard::from(7)); + assert_eq!(group.as_range(), Shard::from(7)..=Shard::from(7)); } } diff --git a/dan_layer/consensus/src/hotstuff/block_change_set.rs b/dan_layer/consensus/src/hotstuff/block_change_set.rs index 4d1b9f9d6a..74b48fbb6c 100644 --- a/dan_layer/consensus/src/hotstuff/block_change_set.rs +++ b/dan_layer/consensus/src/hotstuff/block_change_set.rs @@ -1,7 +1,7 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::{collections::HashMap, ops::Deref}; +use std::ops::Deref; use indexmap::IndexMap; use tari_dan_common_types::{shard::Shard, Epoch}; diff --git a/dan_layer/consensus/src/hotstuff/common.rs b/dan_layer/consensus/src/hotstuff/common.rs index 3a8526d98d..bdedead808 100644 --- a/dan_layer/consensus/src/hotstuff/common.rs +++ b/dan_layer/consensus/src/hotstuff/common.rs @@ -7,20 +7,12 @@ use indexmap::IndexMap; use log::*; use tari_common::configuration::Network; use tari_common_types::types::FixedHash; -use tari_dan_common_types::{ - committee::Committee, - shard::Shard, - Epoch, - NodeAddressable, - NodeHeight, - NumPreshards, - ShardGroup, -}; +use tari_dan_common_types::{committee::Committee, shard::Shard, Epoch, NodeAddressable, NodeHeight, ShardGroup}; use tari_dan_storage::{ consensus_models::{Block, LeafBlock, PendingStateTreeDiff, QuorumCertificate, SubstateChange}, StateStoreReadTransaction, }; -use tari_state_tree::{Hash, StateHashTreeDiff, StateTreeError, Version}; +use tari_state_tree::{Hash, StateHashTreeDiff, StateTreeError}; use crate::{hotstuff::substate_store::ShardedStateTree, traits::LeaderStrategy}; @@ -174,11 +166,12 @@ pub fn calculate_state_merkle_root( tx: &TTx, local_shard_group: ShardGroup, pending_tree_diffs: HashMap, - diff: &[SubstateChange], + changes: &[SubstateChange], ) -> Result<(Hash, IndexMap), StateTreeError> { - let mut change_map = IndexMap::with_capacity(diff.len()); + let mut change_map = IndexMap::with_capacity(changes.len()); - diff.iter() + changes + .iter() .filter(|ch| local_shard_group.contains(&ch.shard())) .for_each(|ch| { change_map.entry(ch.shard()).or_insert_with(Vec::new).push(ch.into()); diff --git a/dan_layer/consensus/src/hotstuff/on_propose.rs b/dan_layer/consensus/src/hotstuff/on_propose.rs index 4c9a71ea52..aee616f992 100644 --- a/dan_layer/consensus/src/hotstuff/on_propose.rs +++ b/dan_layer/consensus/src/hotstuff/on_propose.rs @@ -8,7 +8,6 @@ use std::{ use indexmap::IndexMap; use log::*; -use tari_common::configuration::Network; use tari_common_types::types::{FixedHash, PublicKey}; use tari_crypto::tari_utilities::epoch_time::EpochTime; use tari_dan_common_types::{ @@ -462,7 +461,6 @@ where TConsensusSpec: ConsensusSpec } else { self.transaction_pool.get_batch_for_next_block(tx, TARGET_BLOCK_SIZE)? }; - let current_version = high_qc.block_height().as_u64(); let next_height = parent_block.height() + NodeHeight(1); let mut total_leader_fee = 0; diff --git a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs index 22b2ca1540..6802faeeee 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_foreign_proposal.rs @@ -89,7 +89,7 @@ where TConsensusSpec: ConsensusSpec return Ok(()); } - foreign_receive_counter.increment(committee_shard.shard_group()); + foreign_receive_counter.increment_group(committee_shard.shard_group()); let tx_ids = block .commands() diff --git a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs index 95530c8981..7e9cba6df7 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_local_proposal.rs @@ -215,6 +215,7 @@ impl OnReceiveLocalProposalHandler { tx: TTx, - pending_diffs: HashMap, + pending_diffs: HashMap>, current_tree_diffs: IndexMap, } @@ -44,14 +38,19 @@ impl ShardedStateTree { } } - pub fn with_pending_diffs(self, pending_diffs: HashMap) -> Self { + pub fn with_pending_diffs(self, pending_diffs: HashMap>) -> Self { Self { pending_diffs, ..self } } } impl ShardedStateTree<&TTx> { fn get_current_version(&self, shard: Shard) -> Result, StateTreeError> { - if let Some(version) = self.pending_diffs.get(&shard).map(|diff| diff.version) { + if let Some(version) = self + .pending_diffs + .get(&shard) + .and_then(|diffs| diffs.last()) + .map(|diff| diff.version) + { return Ok(Some(version)); } @@ -86,6 +85,7 @@ impl ShardedStateTree<&TTx> { // Apply state updates to the state tree that is backed by the staged shard-scoped store let mut state_tree = SpreadPrefixStateTree::new(&mut store); + debug!(target: LOG_TARGET, "v{next_version} contains {} tree change(s) for shard {shard}", changes.len()); let state_root = state_tree.put_substate_changes(current_version, next_version, changes)?; state_roots.push(state_root); self.current_tree_diffs.insert(shard, store.into_diff()); @@ -104,16 +104,20 @@ impl ShardedStateTree<&mut TTx> { for (shard, pending_diff) in diffs { let diff = pending_diff.diff; let mut store = ShardScopedTreeStoreWriter::new(self.tx, shard); - for (key, node) in diff.new_nodes { - log::debug!("Inserting node: {}", key); - store.insert_node(key, node)?; - } for stale_tree_node in diff.stale_tree_nodes { - log::debug!("Recording stale tree node: {}", stale_tree_node.as_node_key()); + log::debug!( + "(shard={shard}) Recording stale tree node: {}", + stale_tree_node.as_node_key() + ); store.record_stale_tree_node(stale_tree_node)?; } + for (key, node) in diff.new_nodes { + log::debug!("(shard={shard}) Inserting node: {}", key); + store.insert_node(key, node)?; + } + store.increment_version()?; } diff --git a/dan_layer/consensus_tests/src/consensus.rs b/dan_layer/consensus_tests/src/consensus.rs index 80f3db1420..b061c59b5d 100644 --- a/dan_layer/consensus_tests/src/consensus.rs +++ b/dan_layer/consensus_tests/src/consensus.rs @@ -12,7 +12,7 @@ use std::time::Duration; use tari_common_types::types::PrivateKey; use tari_consensus::hotstuff::HotStuffError; -use tari_dan_common_types::{optional::Optional, shard::Shard, Epoch, NodeHeight}; +use tari_dan_common_types::{optional::Optional, Epoch, NodeHeight}; use tari_dan_storage::{ consensus_models::{BlockId, Command, Decision, TransactionRecord, VersionedSubstateIdLockIntent}, StateStore, @@ -207,11 +207,16 @@ async fn node_requests_missing_transaction_from_local_leader() { async fn multi_validator_propose_blocks_with_new_transactions_until_all_committed() { setup_logger(); let mut test = Test::builder() - .add_committee(0, vec!["1", "2", "3", "4", "5"]) + .debug_sql("/tmp/test{}.db") + .add_committee(0, vec!["1"])//, "2", "3", "4", "5"]) .start() .await; let mut remaining_txs = 10u32; + // for _ in 0..remaining_txs { + // test.send_transaction_to_all(Decision::Commit, 1, 5).await; + // } + test.start_epoch(Epoch(1)).await; loop { if remaining_txs > 0 { diff --git a/dan_layer/consensus_tests/src/substate_store.rs b/dan_layer/consensus_tests/src/substate_store.rs index f0ce29f3c0..e45e44d5ee 100644 --- a/dan_layer/consensus_tests/src/substate_store.rs +++ b/dan_layer/consensus_tests/src/substate_store.rs @@ -5,7 +5,7 @@ use tari_consensus::{ hotstuff::substate_store::{PendingSubstateStore, SubstateStoreError}, traits::{ReadableSubstateStore, WriteableSubstateStore}, }; -use tari_dan_common_types::{shard::Shard, Epoch, NodeAddressable, PeerAddress}; +use tari_dan_common_types::{shard::Shard, NodeAddressable, PeerAddress}; use tari_dan_storage::{ consensus_models::{ BlockId, diff --git a/dan_layer/consensus_tests/src/support/epoch_manager.rs b/dan_layer/consensus_tests/src/support/epoch_manager.rs index 580d0568db..1a0089cb9c 100644 --- a/dan_layer/consensus_tests/src/support/epoch_manager.rs +++ b/dan_layer/consensus_tests/src/support/epoch_manager.rs @@ -1,10 +1,7 @@ // Copyright 2023 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; +use std::{collections::HashMap, sync::Arc}; use async_trait::async_trait; use tari_common_types::types::{FixedHash, PublicKey}; @@ -153,8 +150,8 @@ impl EpochManagerReader for TestEpochManager { substate_address: SubstateAddress, ) -> Result, EpochManagerError> { let state = self.state_lock().await; - let shard = substate_address.to_shard_group(TEST_NUM_PRESHARDS, state.committees.len() as u32); - Ok(state.committees[&shard].clone()) + let shard_group = substate_address.to_shard_group(TEST_NUM_PRESHARDS, state.committees.len() as u32); + Ok(state.committees[&shard_group].clone()) } async fn get_our_validator_node(&self, _epoch: Epoch) -> Result, EpochManagerError> { @@ -232,7 +229,7 @@ impl EpochManagerReader for TestEpochManager { &self, _epoch: Epoch, ) -> Result>, EpochManagerError> { - todo!() + Ok(self.inner.lock().await.committees.clone()) } async fn get_committee_info_by_validator_address( diff --git a/dan_layer/consensus_tests/src/support/harness.rs b/dan_layer/consensus_tests/src/support/harness.rs index becfc490f2..3e5212cc56 100644 --- a/dan_layer/consensus_tests/src/support/harness.rs +++ b/dan_layer/consensus_tests/src/support/harness.rs @@ -9,7 +9,7 @@ use std::{ use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; use tari_consensus::hotstuff::HotstuffEvent; -use tari_dan_common_types::{committee::Committee, shard::Shard, Epoch, NodeHeight, ShardGroup}; +use tari_dan_common_types::{committee::Committee, shard::Shard, Epoch, NodeHeight, NumPreshards, ShardGroup}; use tari_dan_storage::{ consensus_models::{BlockId, Decision, QcId, SubstateRecord, TransactionExecution, TransactionRecord}, StateStore, @@ -516,19 +516,37 @@ impl TestBuilder { } } -pub fn committee_number_to_shard_group(num: u32, num_committees: u32) -> ShardGroup { - let group_size = TEST_NUM_PRESHARDS.as_u32() / num_committees; - let size_rem = TEST_NUM_PRESHARDS.as_u32() % num_committees; - let mut start = num * group_size; - let mut end = (num + 1) * group_size; - // Because this is a test we'll just tack on the remaining shards to the first shard group - if num == 0 { - end += size_rem; - } else { - start += size_rem; +/// Converts a test committee number to a shard group. E.g. 0 is shard group 0 to 21, 1 is 22 to 42, etc. +pub fn committee_number_to_shard_group(num_shards: NumPreshards, target_group: u32, num_committees: u32) -> ShardGroup { + // number of committees can never exceed number of shards + assert!(num_committees <= num_shards.as_u32()); + if num_committees <= 1 { + return ShardGroup::new(Shard::zero(), Shard::from(num_shards.as_u32() - 1)); } - ShardGroup::new(start, end) + let shards_per_committee = num_shards.as_u32() / num_committees; + let mut shards_per_committee_rem = num_shards.as_u32() % num_committees; + + let mut start = 0u32; + let mut end = shards_per_committee; + if shards_per_committee_rem > 0 { + end += 1; + } + + for _group in 0..target_group { + start += shards_per_committee; + if shards_per_committee_rem > 0 { + start += 1; + shards_per_committee_rem -= 1; + } + + end = start + shards_per_committee; + if shards_per_committee_rem > 0 { + end += 1; + } + } + + ShardGroup::new(start, end - 1) } fn build_committees(committees: HashMap>) -> HashMap> { @@ -536,7 +554,7 @@ fn build_committees(committees: HashMap>) -> HashMap committees .into_iter() .map(|(num, committee)| { - let shard_group = committee_number_to_shard_group(num, num_committees); + let shard_group = committee_number_to_shard_group(TEST_NUM_PRESHARDS, num, num_committees); (shard_group, committee) }) .collect() diff --git a/dan_layer/consensus_tests/src/support/transaction.rs b/dan_layer/consensus_tests/src/support/transaction.rs index 4dcdc8ceb6..3246301b25 100644 --- a/dan_layer/consensus_tests/src/support/transaction.rs +++ b/dan_layer/consensus_tests/src/support/transaction.rs @@ -114,7 +114,7 @@ pub fn build_transaction( .flat_map(|shard| { iter::repeat_with(move || { random_substate_in_shard( - committee_number_to_shard_group(shard, num_committees), + committee_number_to_shard_group(TEST_NUM_PRESHARDS, shard, num_committees), TEST_NUM_PRESHARDS, ) }) diff --git a/dan_layer/consensus_tests/src/support/validator/builder.rs b/dan_layer/consensus_tests/src/support/validator/builder.rs index fa9045ec7a..fa74765ab1 100644 --- a/dan_layer/consensus_tests/src/support/validator/builder.rs +++ b/dan_layer/consensus_tests/src/support/validator/builder.rs @@ -9,7 +9,7 @@ use tari_consensus::{ traits::hooks::NoopHooks, }; use tari_crypto::keys::PublicKey as _; -use tari_dan_common_types::{shard::Shard, ShardGroup, SubstateAddress}; +use tari_dan_common_types::{ShardGroup, SubstateAddress}; use tari_dan_storage::consensus_models::TransactionPool; use tari_shutdown::ShutdownSignal; use tari_state_store_sqlite::SqliteStateStore; diff --git a/dan_layer/consensus_tests/src/support/validator/instance.rs b/dan_layer/consensus_tests/src/support/validator/instance.rs index 06d635bec7..67614c7981 100644 --- a/dan_layer/consensus_tests/src/support/validator/instance.rs +++ b/dan_layer/consensus_tests/src/support/validator/instance.rs @@ -5,7 +5,7 @@ use tari_consensus::{ hotstuff::{ConsensusCurrentState, HotstuffEvent}, messages::HotstuffMessage, }; -use tari_dan_common_types::{shard::Shard, ShardGroup, SubstateAddress}; +use tari_dan_common_types::{ShardGroup, SubstateAddress}; use tari_dan_storage::{consensus_models::LeafBlock, StateStore, StateStoreReadTransaction}; use tari_state_store_sqlite::SqliteStateStore; use tari_transaction::Transaction; diff --git a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql index eea8b3f4b0..0377725e09 100644 --- a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql +++ b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql @@ -345,9 +345,9 @@ CREATE TABLE state_tree ); -- Scoping by shard -CREATE INDEX state_tree_idx_shard_key on state_tree (shard); +CREATE INDEX state_tree_idx_shard_key on state_tree (shard) WHERE is_stale = false; -- Duplicate keys are not allowed -CREATE UNIQUE INDEX state_tree_uniq_idx_key on state_tree (shard, key); +CREATE UNIQUE INDEX state_tree_uniq_idx_key on state_tree (shard, key) WHERE is_stale = false; -- filtering out or by is_stale is used in every query CREATE INDEX state_tree_idx_is_stale on state_tree (is_stale); diff --git a/dan_layer/state_store_sqlite/src/error.rs b/dan_layer/state_store_sqlite/src/error.rs index 1ff0d22d1e..5dac59b961 100644 --- a/dan_layer/state_store_sqlite/src/error.rs +++ b/dan_layer/state_store_sqlite/src/error.rs @@ -35,10 +35,6 @@ pub enum SqliteStorageError { operation: &'static str, details: String, }, - #[error("[{operation}] One or more substates were are write locked")] - SubstatesWriteLocked { operation: &'static str }, - #[error("[{operation}] lock error: {details}")] - SubstatesUnlock { operation: &'static str, details: String }, } impl From for StorageError { diff --git a/dan_layer/state_store_sqlite/src/reader.rs b/dan_layer/state_store_sqlite/src/reader.rs index c23d0ebb0b..aa4861f3eb 100644 --- a/dan_layer/state_store_sqlite/src/reader.rs +++ b/dan_layer/state_store_sqlite/src/reader.rs @@ -14,7 +14,7 @@ use diesel::{ dsl, query_builder::SqlQuery, sql_query, - sql_types::{BigInt, Integer, Text}, + sql_types::{BigInt, Text}, BoolExpressionMethods, ExpressionMethods, JoinOnDsl, @@ -1970,19 +1970,19 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor fn pending_state_tree_diffs_get_all_up_to_commit_block( &self, block_id: &BlockId, - ) -> Result, StorageError> { + ) -> Result>, StorageError> { use crate::schema::pending_state_tree_diffs; // Get the last committed block let committed_block_id = self.get_commit_block_id()?; - let block_ids = self.get_block_ids_between(&committed_block_id, block_id)?; + let block_ids = self.get_block_ids_that_change_state_between(&committed_block_id, block_id)?; if block_ids.is_empty() { return Ok(HashMap::new()); } - let diffs = pending_state_tree_diffs::table + let diff_recs = pending_state_tree_diffs::table .filter(pending_state_tree_diffs::block_id.eq_any(block_ids)) .order_by(pending_state_tree_diffs::block_height.asc()) .get_results::(self.connection()) @@ -1991,10 +1991,20 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor source: e, })?; - diffs - .into_iter() - .map(|diff| Ok((Shard::from(diff.shard as u32), diff.try_into()?))) - .collect() + let mut diffs = HashMap::new(); + for diff in diff_recs { + let shard = Shard::from(diff.shard as u32); + let diff = PendingStateTreeDiff::try_from(diff)?; + diffs + .entry(shard) + .or_insert_with(Vec::new)//PendingStateTreeDiff::default) + .push(diff); + } + // diffs + // .into_iter() + // .map(|diff| Ok((Shard::from(diff.shard as u32), diff.try_into()?))) + // .collect() + Ok(diffs) } fn state_transitions_get_n_after( diff --git a/dan_layer/state_store_sqlite/src/sql_models/block.rs b/dan_layer/state_store_sqlite/src/sql_models/block.rs index 17994ec9d1..aa66de1305 100644 --- a/dan_layer/state_store_sqlite/src/sql_models/block.rs +++ b/dan_layer/state_store_sqlite/src/sql_models/block.rs @@ -3,7 +3,7 @@ use diesel::{Queryable, QueryableByName}; use tari_common_types::types::PublicKey; -use tari_dan_common_types::{shard::Shard, Epoch, NodeHeight, ShardGroup}; +use tari_dan_common_types::{Epoch, NodeHeight, ShardGroup}; use tari_dan_storage::{consensus_models, StorageError}; use tari_utilities::byte_array::ByteArray; use time::PrimitiveDateTime; diff --git a/dan_layer/state_store_sqlite/src/sql_models/bookkeeping.rs b/dan_layer/state_store_sqlite/src/sql_models/bookkeeping.rs index 94863854c5..03431897a5 100644 --- a/dan_layer/state_store_sqlite/src/sql_models/bookkeeping.rs +++ b/dan_layer/state_store_sqlite/src/sql_models/bookkeeping.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: BSD-3-Clause use diesel::Queryable; -use tari_dan_common_types::{shard::Shard, Epoch, NodeHeight, ShardGroup}; +use tari_dan_common_types::{Epoch, NodeHeight, ShardGroup}; use tari_dan_storage::{ consensus_models::{self, QuorumDecision}, StorageError, diff --git a/dan_layer/state_store_sqlite/src/sql_models/pending_state_tree_diff.rs b/dan_layer/state_store_sqlite/src/sql_models/pending_state_tree_diff.rs index 8b6a7bf737..ab5afafa03 100644 --- a/dan_layer/state_store_sqlite/src/sql_models/pending_state_tree_diff.rs +++ b/dan_layer/state_store_sqlite/src/sql_models/pending_state_tree_diff.rs @@ -2,11 +2,10 @@ // SPDX-License-Identifier: BSD-3-Clause use diesel::Queryable; -use tari_dan_common_types::shard::Shard; use tari_dan_storage::{consensus_models, StorageError}; use time::PrimitiveDateTime; -use crate::serialization::{deserialize_hex_try_from, deserialize_json}; +use crate::serialization::deserialize_json; #[derive(Debug, Clone, Queryable)] pub struct PendingStateTreeDiff { diff --git a/dan_layer/state_store_sqlite/src/sql_models/state_transition.rs b/dan_layer/state_store_sqlite/src/sql_models/state_transition.rs index 3340227ef5..734dcf502f 100644 --- a/dan_layer/state_store_sqlite/src/sql_models/state_transition.rs +++ b/dan_layer/state_store_sqlite/src/sql_models/state_transition.rs @@ -5,14 +5,7 @@ use diesel::Queryable; use tari_dan_common_types::{shard::Shard, Epoch}; use tari_dan_storage::{ consensus_models, - consensus_models::{ - QuorumCertificate, - StateTransitionId, - SubstateCreatedProof, - SubstateData, - SubstateDestroyedProof, - SubstateUpdate, - }, + consensus_models::{StateTransitionId, SubstateCreatedProof, SubstateData, SubstateDestroyedProof, SubstateUpdate}, StorageError, }; use time::PrimitiveDateTime; diff --git a/dan_layer/state_store_sqlite/src/tree_store.rs b/dan_layer/state_store_sqlite/src/tree_store.rs deleted file mode 100644 index d728d7f854..0000000000 --- a/dan_layer/state_store_sqlite/src/tree_store.rs +++ /dev/null @@ -1,70 +0,0 @@ -// Copyright 2024 The Tari Project -// SPDX-License-Identifier: BSD-3-Clause - -use std::ops::Deref; - -use diesel::{ExpressionMethods, OptionalExtension, QueryDsl, RunQueryDsl}; -use tari_state_tree::{Node, NodeKey, StaleTreeNode, TreeNode, TreeStoreReader, TreeStoreWriter, Version}; - -use crate::{reader::SqliteStateStoreReadTransaction, writer::SqliteStateStoreWriteTransaction}; - -impl<'a, TAddr> TreeStoreReader for SqliteStateStoreReadTransaction<'a, TAddr> { - fn get_node(&self, key: &NodeKey) -> Result, tari_state_tree::JmtStorageError> { - use crate::schema::state_tree; - - let node = state_tree::table - .select(state_tree::node) - .filter(state_tree::key.eq(key.to_string())) - .filter(state_tree::is_stale.eq(false)) - .first::(self.connection()) - .optional() - .map_err(|e| tari_state_tree::JmtStorageError::UnexpectedError(e.to_string()))? - .ok_or_else(|| tari_state_tree::JmtStorageError::NotFound(key.clone()))?; - - let node = serde_json::from_str::(&node) - .map_err(|e| tari_state_tree::JmtStorageError::UnexpectedError(e.to_string()))?; - - Ok(node.into_node()) - } -} - -impl<'a, TAddr> TreeStoreReader for SqliteStateStoreWriteTransaction<'a, TAddr> { - fn get_node(&self, key: &NodeKey) -> Result, tari_state_tree::JmtStorageError> { - self.deref().get_node(key) - } -} - -impl<'a, TAddr> TreeStoreWriter for SqliteStateStoreWriteTransaction<'a, TAddr> { - fn insert_node(&mut self, key: NodeKey, node: Node) -> Result<(), tari_state_tree::JmtStorageError> { - use crate::schema::state_tree; - - let node = TreeNode::new_latest(node); - let node = serde_json::to_string(&node) - .map_err(|e| tari_state_tree::JmtStorageError::UnexpectedError(e.to_string()))?; - - let values = (state_tree::key.eq(key.to_string()), state_tree::node.eq(node)); - diesel::insert_into(state_tree::table) - .values(&values) - .on_conflict(state_tree::key) - .do_update() - .set(values.clone()) - .execute(self.connection()) - .map_err(|e| tari_state_tree::JmtStorageError::UnexpectedError(e.to_string()))?; - - Ok(()) - } - - fn record_stale_tree_node(&mut self, node: StaleTreeNode) -> Result<(), tari_state_tree::JmtStorageError> { - use crate::schema::state_tree; - let key = node.as_node_key(); - diesel::update(state_tree::table) - .filter(state_tree::key.eq(key.to_string())) - .set(state_tree::is_stale.eq(true)) - .execute(self.connection()) - .optional() - .map_err(|e| tari_state_tree::JmtStorageError::UnexpectedError(e.to_string()))? - .ok_or_else(|| tari_state_tree::JmtStorageError::NotFound(key.clone()))?; - - Ok(()) - } -} diff --git a/dan_layer/state_store_sqlite/src/writer.rs b/dan_layer/state_store_sqlite/src/writer.rs index 8d711f4632..cde228450b 100644 --- a/dan_layer/state_store_sqlite/src/writer.rs +++ b/dan_layer/state_store_sqlite/src/writer.rs @@ -5,7 +5,7 @@ use std::ops::Deref; use diesel::{ dsl, - dsl::{count, sql}, + dsl::count, sql_types::Text, AsChangeset, ExpressionMethods, @@ -60,7 +60,6 @@ use time::{OffsetDateTime, PrimitiveDateTime}; use crate::{ error::SqliteStorageError, reader::SqliteStateStoreReadTransaction, - schema::pending_state_tree_diffs::dsl::pending_state_tree_diffs, serialization::{serialize_hex, serialize_json}, sql_models, sqlite_transaction::SqliteTransaction, @@ -1549,9 +1548,15 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta diesel::insert_into(state_tree::table) .values(&values) .execute(self.connection()) - .map_err(|e| SqliteStorageError::DieselError { - operation: "state_tree_nodes_insert", - source: e, + .map_err(|e| { + SqliteStorageError::DbInconsistency { + operation: "state_tree_nodes_insert", + details: format!("Failed to insert node for key: {shard} - {key} {e}"), + } + // SqliteStorageError::DieselError { + // operation: "state_tree_nodes_insert", + // source: e, + // } })?; Ok(()) diff --git a/dan_layer/state_tree/Cargo.toml b/dan_layer/state_tree/Cargo.toml index 2be4945dfd..73b695bda0 100644 --- a/dan_layer/state_tree/Cargo.toml +++ b/dan_layer/state_tree/Cargo.toml @@ -17,6 +17,7 @@ hex = { workspace = true } thiserror = { workspace = true } serde = { workspace = true, features = ["derive"] } log = { workspace = true } +indexmap = { workspace = true } [dev-dependencies] indexmap = { workspace = true } diff --git a/dan_layer/state_tree/src/jellyfish/tree.rs b/dan_layer/state_tree/src/jellyfish/tree.rs index f3484baede..cefe684f6a 100644 --- a/dan_layer/state_tree/src/jellyfish/tree.rs +++ b/dan_layer/state_tree/src/jellyfish/tree.rs @@ -274,7 +274,7 @@ impl<'a, R: 'a + TreeStoreReader

, P: Clone> JellyfishMerkleTree<'a, R, P> { if let Some(child) = child_option { new_created_children.insert(child_nibble, child); } else { - old_children.remove(&child_nibble); + old_children.swap_remove(&child_nibble); } } diff --git a/dan_layer/state_tree/src/jellyfish/types.rs b/dan_layer/state_tree/src/jellyfish/types.rs index d18ad6d598..b51284281a 100644 --- a/dan_layer/state_tree/src/jellyfish/types.rs +++ b/dan_layer/state_tree/src/jellyfish/types.rs @@ -81,8 +81,9 @@ // Copyright (c) Aptos // SPDX-License-Identifier: Apache-2.0 -use std::{collections::HashMap, fmt, ops::Range}; +use std::{fmt, ops::Range}; +use indexmap::IndexMap; use serde::{Deserialize, Serialize}; use tari_crypto::{hash_domain, tari_utilities::ByteArray}; use tari_dan_common_types::{ @@ -842,7 +843,7 @@ impl Child { /// [`Children`] is just a collection of children belonging to a [`InternalNode`], indexed from 0 to /// 15, inclusive. -pub(crate) type Children = HashMap; +pub(crate) type Children = IndexMap; /// Represents a 4-level subtree with 16 children at the bottom level. Theoretically, this reduces /// IOPS to query a tree by 4x since we compress 4 levels in a standard Merkle tree into 1 node. @@ -858,7 +859,8 @@ pub struct InternalNode { impl InternalNode { /// Creates a new Internal node. - pub fn new(children: Children) -> Self { + pub fn new(mut children: Children) -> Self { + children.sort_keys(); let leaf_count = children.values().map(Child::leaf_count).sum(); Self { children, leaf_count } } @@ -882,9 +884,10 @@ impl InternalNode { } pub fn children_sorted(&self) -> impl Iterator { - let mut tmp = self.children.iter().collect::>(); - tmp.sort_by_key(|(nibble, _)| **nibble); - tmp.into_iter() + // let mut tmp = self.children.iter().collect::>(); + // tmp.sort_by_key(|(nibble, _)| **nibble); + // tmp.into_iter() + self.children.iter() } pub fn into_children(self) -> Children { diff --git a/dan_layer/state_tree/src/tree.rs b/dan_layer/state_tree/src/tree.rs index caac64cdce..6beba3268d 100644 --- a/dan_layer/state_tree/src/tree.rs +++ b/dan_layer/state_tree/src/tree.rs @@ -8,7 +8,7 @@ use tari_engine_types::substate::SubstateId; use crate::{ error::StateTreeError, - jellyfish::{Hash, JellyfishMerkleTree, LeafKey, SparseMerkleProofExt, TreeStore, Version}, + jellyfish::{Hash, JellyfishMerkleTree, SparseMerkleProofExt, TreeStore, Version}, key_mapper::{DbKeyMapper, SpreadPrefixKeyMapper}, Node, NodeKey, diff --git a/dan_layer/storage/src/consensus_models/foreign_receive_counters.rs b/dan_layer/storage/src/consensus_models/foreign_receive_counters.rs index 94c11f5bd7..b2c5021336 100644 --- a/dan_layer/storage/src/consensus_models/foreign_receive_counters.rs +++ b/dan_layer/storage/src/consensus_models/foreign_receive_counters.rs @@ -3,13 +3,13 @@ use std::collections::HashMap; -use tari_dan_common_types::{optional::Optional, ShardGroup}; +use tari_dan_common_types::{optional::Optional, shard::Shard, ShardGroup}; use crate::{StateStoreReadTransaction, StateStoreWriteTransaction, StorageError}; #[derive(Debug, Clone)] pub struct ForeignReceiveCounters { - pub counters: HashMap, + pub counters: HashMap, } impl Default for ForeignReceiveCounters { @@ -25,13 +25,15 @@ impl ForeignReceiveCounters { } } - pub fn increment(&mut self, shard_group: ShardGroup) { - *self.counters.entry(shard_group).or_default() += 1; + pub fn increment_group(&mut self, shard_group: ShardGroup) { + for shard in shard_group.shard_iter() { + *self.counters.entry(shard).or_default() += 1; + } } /// Returns the counter for the provided shard. If the count does not exist, 0 is returned. - pub fn get_count(&self, shard_group: &ShardGroup) -> u64 { - self.counters.get(shard_group).copied().unwrap_or_default() + pub fn get_count(&self, shard: &Shard) -> u64 { + self.counters.get(shard).copied().unwrap_or_default() } } diff --git a/dan_layer/storage/src/consensus_models/state_tree_diff.rs b/dan_layer/storage/src/consensus_models/state_tree_diff.rs index 78217b2aa8..4c06825cac 100644 --- a/dan_layer/storage/src/consensus_models/state_tree_diff.rs +++ b/dan_layer/storage/src/consensus_models/state_tree_diff.rs @@ -30,8 +30,13 @@ impl PendingStateTreeDiff { impl PendingStateTreeDiff { /// Returns all pending state tree diffs from the last committed block (exclusive) to the given block (inclusive). - pub fn get_all_up_to_commit_block(tx: &TTx, block_id: &BlockId) -> Result, StorageError> - where TTx: StateStoreReadTransaction { + pub fn get_all_up_to_commit_block( + tx: &TTx, + block_id: &BlockId, + ) -> Result>, StorageError> + where + TTx: StateStoreReadTransaction, + { tx.pending_state_tree_diffs_get_all_up_to_commit_block(block_id) }