Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Jul 30, 2024
1 parent 4a309f9 commit f70287a
Show file tree
Hide file tree
Showing 30 changed files with 204 additions and 198 deletions.
8 changes: 8 additions & 0 deletions dan_layer/common_types/src/shard_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,12 @@ mod tests {
assert_eq!(ShardGroup::decode_from_u32(0xFFFF + 1), None);
assert_eq!(ShardGroup::decode_from_u32(u32::MAX), None);
}

#[test]
fn to_substate_address_range() {
let sg = ShardGroup::new(0, 63);
let range = sg.to_substate_address_range(NumPreshards::SixtyFour);
assert_eq!(*range.start(), SubstateAddress::zero());
assert_eq!(*range.end(), SubstateAddress::max());
}
}
78 changes: 58 additions & 20 deletions dan_layer/common_types/src/substate_address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,25 +186,38 @@ impl SubstateAddress {
pub fn to_shard_group(&self, num_shards: NumPreshards, num_committees: u32) -> ShardGroup {
// number of committees can never exceed number of shards
let num_committees = num_committees.min(num_shards.as_u32());
if num_committees == 1 {
if num_committees <= 1 {
return ShardGroup::new(Shard::zero(), Shard::from(num_shards.as_u32() - 1));
}

let shards_per_committee = num_shards.as_u32().div_ceil(num_committees);
let shards_per_committee = num_shards.as_u32() / num_committees;
let mut shards_per_committee_rem = num_shards.as_u32() % num_committees;

let shard = self.to_shard(num_shards).as_u32();
// This removes the remainder from the shard to round down to the start of the committee space
let start = (shard / shards_per_committee) * shards_per_committee;

let end = start + shards_per_committee;
let mut start = 0u32;
let mut end = shards_per_committee;
if shards_per_committee_rem > 0 {
end += 1;
}
loop {
if end > shard {
break;
}
start += shards_per_committee;
if shards_per_committee_rem > 0 {
start += 1;
shards_per_committee_rem -= 1;
}

ShardGroup::new(start, end.saturating_sub(1))
}
end = start + shards_per_committee;
if shards_per_committee_rem > 0 {
end += 1;
}
}

// pub fn to_address_range(&self, num_shards: NumPreshards) -> RangeInclusive<SubstateAddress> {
// let shard = self.to_shard(num_shards);
// shard.to_substate_address_range(num_shards)
// }
ShardGroup::new(start, end - 1)
}
}

impl From<[u8; 32]> for SubstateAddress {
Expand Down Expand Up @@ -575,42 +588,67 @@ mod tests {
fn it_returns_the_correct_shard_group_for_odd_num_committees() {
// All shard groups except the last have 3 shards each

let group = address_at(0, 64).to_shard_group(NumPreshards::SixtyFour, 3);
// First shard group gets an extra shard to cover the remainder
assert_eq!(group.as_range(), Shard::from(0)..=Shard::from(21));
assert_eq!(group.len(), 22);
let group = address_at(31, 64).to_shard_group(NumPreshards::SixtyFour, 3);
assert_eq!(group.as_range(), Shard::from(22)..=Shard::from(42));
assert_eq!(group.len(), 21);
let group = address_at(50, 64).to_shard_group(NumPreshards::SixtyFour, 3);
assert_eq!(group.as_range(), Shard::from(43)..=Shard::from(63));
assert_eq!(group.len(), 21);

let group = address_at(3, 64).to_shard_group(NumPreshards::SixtyFour, 7);
assert_eq!(group.as_range(), Shard::from(0)..=Shard::from(9));
assert_eq!(group.len(), 10);
let group = address_at(11, 64).to_shard_group(NumPreshards::SixtyFour, 7);
assert_eq!(group.as_range(), Shard::from(10)..=Shard::from(18));
assert_eq!(group.len(), 9);
let group = address_at(22, 64).to_shard_group(NumPreshards::SixtyFour, 7);
assert_eq!(group.as_range(), Shard::from(19)..=Shard::from(27));
assert_eq!(group.len(), 9);
let group = address_at(60, 64).to_shard_group(NumPreshards::SixtyFour, 7);
assert_eq!(group.as_range(), Shard::from(55)..=Shard::from(63));
assert_eq!(group.len(), 9);
let group = address_at(64, 64).to_shard_group(NumPreshards::SixtyFour, 7);
assert_eq!(group.as_range(), Shard::from(55)..=Shard::from(63));
assert_eq!(group.len(), 9);
let group = SubstateAddress::zero().to_shard_group(NumPreshards::Eight, 3);
assert_eq!(group.as_range(), Shard::from(0)..=Shard::from(2));

let group = address_at(1, 8).to_shard_group(NumPreshards::Eight, 3);
assert_eq!(group.as_range(), Shard::from(0)..=Shard::from(2));

let group = address_at(2, 8).to_shard_group(NumPreshards::Eight, 3);
let group = address_at(1, 8).to_shard_group(NumPreshards::Eight, 3);
assert_eq!(group.as_range(), Shard::from(0)..=Shard::from(2));
//

let group = address_at(3, 8).to_shard_group(NumPreshards::Eight, 3);
assert_eq!(group.as_range(), Shard::from(3)..=Shard::from(5));
//

let group = address_at(4, 8).to_shard_group(NumPreshards::Eight, 3);
assert_eq!(group.as_range(), Shard::from(3)..=Shard::from(5));

//
let group = address_at(5, 8).to_shard_group(NumPreshards::Eight, 3);
assert_eq!(group.as_range(), Shard::from(3)..=Shard::from(5));
//
let group = address_at(6, 8).to_shard_group(NumPreshards::Eight, 3);
assert_eq!(group.as_range(), Shard::from(6)..=Shard::from(8));
assert_eq!(group.as_range(), Shard::from(6)..=Shard::from(7));

let group = address_at(7, 8).to_shard_group(NumPreshards::Eight, 3);
assert_eq!(group.as_range(), Shard::from(6)..=Shard::from(8));
assert_eq!(group.as_range(), Shard::from(6)..=Shard::from(7));
let group = address_at(8, 8).to_shard_group(NumPreshards::Eight, 3);
assert_eq!(group.as_range(), Shard::from(6)..=Shard::from(8));
assert_eq!(group.as_range(), Shard::from(6)..=Shard::from(7));

// Committee = 5
let group = address_at(4, 8).to_shard_group(NumPreshards::Eight, 5);
assert_eq!(group.as_range(), Shard::from(4)..=Shard::from(5));

let group = address_at(7, 8).to_shard_group(NumPreshards::Eight, 5);
assert_eq!(group.as_range(), Shard::from(6)..=Shard::from(7));
assert_eq!(group.as_range(), Shard::from(7)..=Shard::from(7));

let group = address_at(8, 8).to_shard_group(NumPreshards::Eight, 5);
assert_eq!(group.as_range(), Shard::from(6)..=Shard::from(7));
assert_eq!(group.as_range(), Shard::from(7)..=Shard::from(7));
}
}

Expand Down
2 changes: 1 addition & 1 deletion dan_layer/consensus/src/hotstuff/block_change_set.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::{collections::HashMap, ops::Deref};
use std::ops::Deref;

use indexmap::IndexMap;
use tari_dan_common_types::{shard::Shard, Epoch};
Expand Down
19 changes: 6 additions & 13 deletions dan_layer/consensus/src/hotstuff/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,12 @@ use indexmap::IndexMap;
use log::*;
use tari_common::configuration::Network;
use tari_common_types::types::FixedHash;
use tari_dan_common_types::{
committee::Committee,
shard::Shard,
Epoch,
NodeAddressable,
NodeHeight,
NumPreshards,
ShardGroup,
};
use tari_dan_common_types::{committee::Committee, shard::Shard, Epoch, NodeAddressable, NodeHeight, ShardGroup};
use tari_dan_storage::{
consensus_models::{Block, LeafBlock, PendingStateTreeDiff, QuorumCertificate, SubstateChange},
StateStoreReadTransaction,
};
use tari_state_tree::{Hash, StateHashTreeDiff, StateTreeError, Version};
use tari_state_tree::{Hash, StateHashTreeDiff, StateTreeError};

use crate::{hotstuff::substate_store::ShardedStateTree, traits::LeaderStrategy};

Expand Down Expand Up @@ -174,11 +166,12 @@ pub fn calculate_state_merkle_root<TTx: StateStoreReadTransaction>(
tx: &TTx,
local_shard_group: ShardGroup,
pending_tree_diffs: HashMap<Shard, PendingStateTreeDiff>,
diff: &[SubstateChange],
changes: &[SubstateChange],
) -> Result<(Hash, IndexMap<Shard, StateHashTreeDiff>), StateTreeError> {
let mut change_map = IndexMap::with_capacity(diff.len());
let mut change_map = IndexMap::with_capacity(changes.len());

diff.iter()
changes
.iter()
.filter(|ch| local_shard_group.contains(&ch.shard()))
.for_each(|ch| {
change_map.entry(ch.shard()).or_insert_with(Vec::new).push(ch.into());
Expand Down
2 changes: 0 additions & 2 deletions dan_layer/consensus/src/hotstuff/on_propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::{

use indexmap::IndexMap;
use log::*;
use tari_common::configuration::Network;
use tari_common_types::types::{FixedHash, PublicKey};
use tari_crypto::tari_utilities::epoch_time::EpochTime;
use tari_dan_common_types::{
Expand Down Expand Up @@ -462,7 +461,6 @@ where TConsensusSpec: ConsensusSpec
} else {
self.transaction_pool.get_batch_for_next_block(tx, TARGET_BLOCK_SIZE)?
};
let current_version = high_qc.block_height().as_u64();
let next_height = parent_block.height() + NodeHeight(1);

let mut total_leader_fee = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ where TConsensusSpec: ConsensusSpec
return Ok(());
}

foreign_receive_counter.increment(committee_shard.shard_group());
foreign_receive_counter.increment_group(committee_shard.shard_group());

let tx_ids = block
.commands()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ impl<TConsensusSpec: ConsensusSpec> OnReceiveLocalProposalHandler<TConsensusSpec
self.store.with_write_tx(|tx| {
let genesis = Block::genesis(self.config.network, next_epoch, next_shard_group);
info!(target: LOG_TARGET, "⭐️ Creating new genesis block {genesis}");
genesis.justify().insert(tx)?;
genesis.insert(tx)?;
// We'll propose using the new genesis as parent
genesis.as_locked_block().set(tx)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,34 +4,28 @@
use std::collections::HashMap;

use indexmap::IndexMap;
use log::debug;
use tari_dan_common_types::shard::Shard;
use tari_dan_storage::{
consensus_models::{PendingStateTreeDiff, SubstateChange},
StateStoreReadTransaction,
StateStoreWriteTransaction,
};
use tari_engine_types::substate::hash_substate;
use tari_dan_storage::{consensus_models::PendingStateTreeDiff, StateStoreReadTransaction, StateStoreWriteTransaction};
use tari_state_tree::{
Hash,
JmtStorageError,
Node,
NodeKey,
SpreadPrefixStateTree,
StagedTreeStore,
StateHashTreeDiff,
StateTree,
StateTreeError,
SubstateTreeChange,
TreeStoreReader,
TreeStoreWriter,
Version,
};

use crate::hotstuff::substate_store::sharded_store::{ShardScopedTreeStoreReader, ShardScopedTreeStoreWriter};

const LOG_TARGET: &str = "tari::dan::consensus::sharded_state_tree";

pub struct ShardedStateTree<TTx> {
tx: TTx,
pending_diffs: HashMap<Shard, PendingStateTreeDiff>,
pending_diffs: HashMap<Shard, Vec<PendingStateTreeDiff>>,
current_tree_diffs: IndexMap<Shard, StateHashTreeDiff>,
}

Expand All @@ -44,14 +38,19 @@ impl<TTx> ShardedStateTree<TTx> {
}
}

pub fn with_pending_diffs(self, pending_diffs: HashMap<Shard, PendingStateTreeDiff>) -> Self {
pub fn with_pending_diffs(self, pending_diffs: HashMap<Shard, Vec<PendingStateTreeDiff>>) -> Self {
Self { pending_diffs, ..self }
}
}

impl<TTx: StateStoreReadTransaction> ShardedStateTree<&TTx> {
fn get_current_version(&self, shard: Shard) -> Result<Option<Version>, StateTreeError> {
if let Some(version) = self.pending_diffs.get(&shard).map(|diff| diff.version) {
if let Some(version) = self
.pending_diffs
.get(&shard)
.and_then(|diffs| diffs.last())
.map(|diff| diff.version)
{
return Ok(Some(version));
}

Expand Down Expand Up @@ -86,6 +85,7 @@ impl<TTx: StateStoreReadTransaction> ShardedStateTree<&TTx> {

// Apply state updates to the state tree that is backed by the staged shard-scoped store
let mut state_tree = SpreadPrefixStateTree::new(&mut store);
debug!(target: LOG_TARGET, "v{next_version} contains {} tree change(s) for shard {shard}", changes.len());
let state_root = state_tree.put_substate_changes(current_version, next_version, changes)?;
state_roots.push(state_root);
self.current_tree_diffs.insert(shard, store.into_diff());
Expand All @@ -104,16 +104,20 @@ impl<TTx: StateStoreWriteTransaction> ShardedStateTree<&mut TTx> {
for (shard, pending_diff) in diffs {
let diff = pending_diff.diff;
let mut store = ShardScopedTreeStoreWriter::new(self.tx, shard);
for (key, node) in diff.new_nodes {
log::debug!("Inserting node: {}", key);
store.insert_node(key, node)?;
}

for stale_tree_node in diff.stale_tree_nodes {
log::debug!("Recording stale tree node: {}", stale_tree_node.as_node_key());
log::debug!(
"(shard={shard}) Recording stale tree node: {}",
stale_tree_node.as_node_key()
);
store.record_stale_tree_node(stale_tree_node)?;
}

for (key, node) in diff.new_nodes {
log::debug!("(shard={shard}) Inserting node: {}", key);
store.insert_node(key, node)?;
}

store.increment_version()?;
}

Expand Down
9 changes: 7 additions & 2 deletions dan_layer/consensus_tests/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::time::Duration;

use tari_common_types::types::PrivateKey;
use tari_consensus::hotstuff::HotStuffError;
use tari_dan_common_types::{optional::Optional, shard::Shard, Epoch, NodeHeight};
use tari_dan_common_types::{optional::Optional, Epoch, NodeHeight};
use tari_dan_storage::{
consensus_models::{BlockId, Command, Decision, TransactionRecord, VersionedSubstateIdLockIntent},
StateStore,
Expand Down Expand Up @@ -207,11 +207,16 @@ async fn node_requests_missing_transaction_from_local_leader() {
async fn multi_validator_propose_blocks_with_new_transactions_until_all_committed() {
setup_logger();
let mut test = Test::builder()
.add_committee(0, vec!["1", "2", "3", "4", "5"])
.debug_sql("/tmp/test{}.db")
.add_committee(0, vec!["1"])//, "2", "3", "4", "5"])
.start()
.await;
let mut remaining_txs = 10u32;

// for _ in 0..remaining_txs {
// test.send_transaction_to_all(Decision::Commit, 1, 5).await;
// }

test.start_epoch(Epoch(1)).await;
loop {
if remaining_txs > 0 {
Expand Down
2 changes: 1 addition & 1 deletion dan_layer/consensus_tests/src/substate_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tari_consensus::{
hotstuff::substate_store::{PendingSubstateStore, SubstateStoreError},
traits::{ReadableSubstateStore, WriteableSubstateStore},
};
use tari_dan_common_types::{shard::Shard, Epoch, NodeAddressable, PeerAddress};
use tari_dan_common_types::{shard::Shard, NodeAddressable, PeerAddress};
use tari_dan_storage::{
consensus_models::{
BlockId,
Expand Down
11 changes: 4 additions & 7 deletions dan_layer/consensus_tests/src/support/epoch_manager.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use std::{collections::HashMap, sync::Arc};

use async_trait::async_trait;
use tari_common_types::types::{FixedHash, PublicKey};
Expand Down Expand Up @@ -153,8 +150,8 @@ impl EpochManagerReader for TestEpochManager {
substate_address: SubstateAddress,
) -> Result<Committee<Self::Addr>, EpochManagerError> {
let state = self.state_lock().await;
let shard = substate_address.to_shard_group(TEST_NUM_PRESHARDS, state.committees.len() as u32);
Ok(state.committees[&shard].clone())
let shard_group = substate_address.to_shard_group(TEST_NUM_PRESHARDS, state.committees.len() as u32);
Ok(state.committees[&shard_group].clone())
}

async fn get_our_validator_node(&self, _epoch: Epoch) -> Result<ValidatorNode<TestAddress>, EpochManagerError> {
Expand Down Expand Up @@ -232,7 +229,7 @@ impl EpochManagerReader for TestEpochManager {
&self,
_epoch: Epoch,
) -> Result<HashMap<ShardGroup, Committee<Self::Addr>>, EpochManagerError> {
todo!()
Ok(self.inner.lock().await.committees.clone())
}

async fn get_committee_info_by_validator_address(
Expand Down
Loading

0 comments on commit f70287a

Please sign in to comment.