Skip to content

Commit

Permalink
fix(consensus)!: implements power of two and half shard boundaries (#…
Browse files Browse the repository at this point in the history
…1031)

Description
---
fix(consensus)!: implements power of two with "half shard" boundaries

Motivation and Context
---
This PR implements fixed shard split boundaries for a given number of
target shards (based on the number of registered VNs for the current
epoch).

~~For splits to occur at fixed addresses in the `U256::MAX` sized shard
space, we need to account somehow for the remainders. For example, if a
shard space is 4 bits, the max address is 15 (0b1111) and we wish to
divide the space into 8 (0b1000) equal pieces, there is a remainder of 7
(0b0111), dividing into 4 (0x0100) there is a remainder of 3 (0b0011)
etc. Any remainder would lead to shard splits at different addresses
depending on num_shards.~~

~~To account for the remainder, we divide a shard space of `S' =
U256::MAX - u16::MAX` and limit `num_shards` to `u16::MAX / 2`. These
values are selected because they will give no remainder for any power of
two `num_shards` up to `u16::MAX / 2`. The final shard always has an
extra `u16::MAX` addresses for num_shards > 1.~~

The remainder of a power of two division of the shard space will always
be `num_shards.next_power_of_two() - 1` e.g. `0b11..11 (U256::MAX) %
0b0100 = 0b00..0011` to account for this each shard has an extra 1 added
to it.

"Half shards" are used to break the shard space at the same address as
the next power of two for any non-power-of-two num_shards.

For example, let's say that num_shards is 6, we can divide `S'` into 4
equal parts (4 is the previous power of two from 6) but the target
number of shards requires 2 extra shards. To achieve this we divide the
first 2 shards in half. Or put another way, we divide the first two
whole shards at the address at which the next power of two would divide
them. In this example, we'd divide them at num_shards = 8.


![image](https://github.com/tari-project/tari-dan/assets/1057902/f14527b5-627d-4fbb-9ee2-bed5ebaa2e40)
num_shards = 6. Four half shards 0, 1, 2 and 3

PROS: 
- Always splitting on the same shard boundary makes chains join/split,
and state sync easier.
- The target number of shards based on number of validator nodes is
maintained. Only splitting shards in power of twos will require an
exponential jump in VN numbers (double for each shard split) e.g
committee size of 40, splitting from 8 shards (>= 320 nodes) to 16
shards (>= 640 nodes) to 32 shards (>= 1280 nodes). Before these splits
all committee sizes will grow large, impacting communication complexity
and performance.

CONS:
- ~~VNs with a lower shard address will on average earn less fees due to
the increased likelihood of being in half shards~~
- Unequal shard sizes, whole shards may have a large number of members
before they split

How Has This Been Tested?
---
New unit test cases, manually single shard

What process can a PR reviewer use to test or verify this change?
---
Multishard network

Breaking Changes
---

- [ ] None
- [ ] Requires data directory to be deleted
- [x] Other - Please specify

BREAKING CHANGE: shard boundaries have changed, therefore nodes in a
multishard network will not agree on the shard state
  • Loading branch information
sdbondi authored May 17, 2024
1 parent 33d8367 commit 198e6ec
Show file tree
Hide file tree
Showing 14 changed files with 547 additions and 394 deletions.
350 changes: 47 additions & 303 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ diesel_migrations = "2"
digest = "0.10"
dirs = "4.0.0"
env_logger = "0.10.0"
ethnum = "1.5.0"
fern = "0.6.2"
futures = "0.3.30"
futures-bounded = "0.2.3"
Expand Down Expand Up @@ -212,7 +213,6 @@ quote = "1.0.7"
rand = "0.8.5"
rayon = "1.7.0"
reqwest = "0.11.16"
ruint = "1.8.0"
semver = "1.0"
serde = { version = "1.0", default-features = false }
serde_json = "1.0"
Expand Down
6 changes: 5 additions & 1 deletion applications/tari_indexer/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

use std::{fs, io, str::FromStr};

use anyhow::Context;
use libp2p::identity;
use minotari_app_utilities::identity_management;
use tari_base_node_client::grpc::GrpcBaseNodeClient;
Expand Down Expand Up @@ -117,7 +118,10 @@ pub async fn spawn_services(
let (epoch_manager, _) = tari_epoch_manager::base_layer::spawn_service(
EpochManagerConfig {
base_layer_confirmations: consensus_constants.base_layer_confirmations,
committee_size: consensus_constants.committee_size,
committee_size: consensus_constants
.committee_size
.try_into()
.context("committee_size must be non-zero")?,
validator_node_sidechain_id: config.indexer.sidechain_id.clone(),
},
global_db.clone(),
Expand Down
7 changes: 5 additions & 2 deletions applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

use std::{fs, io, ops::DerefMut, str::FromStr};

use anyhow::anyhow;
use anyhow::{anyhow, Context};
use futures::{future, FutureExt};
use libp2p::identity;
use log::info;
Expand Down Expand Up @@ -192,7 +192,10 @@ pub async fn spawn_services(
// which depends on epoch_manager, so would be a circular dependency.
EpochManagerConfig {
base_layer_confirmations: consensus_constants.base_layer_confirmations,
committee_size: consensus_constants.committee_size,
committee_size: consensus_constants
.committee_size
.try_into()
.context("committee size must be non-zero")?,
validator_node_sidechain_id: config.validator_node.validator_node_sidechain_id.clone(),
},
global_db.clone(),
Expand Down
5 changes: 4 additions & 1 deletion dan_layer/common_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@ libp2p-identity = { workspace = true, features = [
] }

blake2 = { workspace = true }
ethnum = { workspace = true }
newtype-ops = { workspace = true }
rand = { workspace = true }
prost = { workspace = true }
prost-types = { workspace = true }
serde = { workspace = true, default-features = true }
ruint = { workspace = true }
ts-rs = { workspace = true, optional = true }

[dev-dependencies]
indexmap = { workspace = true }

[package.metadata.cargo-machete]
ignored = ["prost", "prost-types"] # false positive, used in OUT_DIR structs

Expand Down
161 changes: 125 additions & 36 deletions dan_layer/common_types/src/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use serde::{Deserialize, Serialize};
#[cfg(feature = "ts")]
use ts_rs::TS;

use crate::{
uint::{U256, U256_ONE},
SubstateAddress,
};
use crate::{uint::U256, SubstateAddress};

#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
#[serde(transparent)]
Expand All @@ -22,21 +19,69 @@ impl Shard {
self.0
}

pub fn to_substate_address_range(self, num_committees: u32) -> RangeInclusive<SubstateAddress> {
if num_committees == 0 {
return RangeInclusive::new(SubstateAddress::zero(), SubstateAddress::from_u256(U256::MAX));
pub fn to_substate_address_range(self, num_shards: u32) -> RangeInclusive<SubstateAddress> {
if num_shards <= 1 {
return RangeInclusive::new(SubstateAddress::zero(), SubstateAddress::max());
}
let bucket = U256::from(self.0);
let num_committees = U256::from(num_committees);
let bucket_size = U256::MAX / num_committees;
let bucket_remainder = U256::MAX % num_committees;
let next_bucket = bucket + U256_ONE;
let start = bucket_size * bucket + bucket_remainder.min(bucket);
let mut end = start + bucket_size;
if next_bucket != num_committees && bucket_remainder <= bucket {
end -= U256_ONE;

// There will never be close to 2^31-1 committees but the calculation below will overflow/panic if
// num_shards.leading_zeros() == 0.
let num_shards = num_shards.min(crate::substate_address::MAX_NUM_SHARDS);

let shard_u256 = U256::from(self.0);

if num_shards.is_power_of_two() {
let shard_size = U256::MAX >> num_shards.trailing_zeros();
if self.0 == 0 {
return RangeInclusive::new(SubstateAddress::zero(), SubstateAddress::from_u256(shard_size));
}

// Add one to each start to account for remainder
let start = shard_u256 * shard_size;

if self.0 == num_shards - 1 {
return RangeInclusive::new(SubstateAddress::from_u256(start + shard_u256), SubstateAddress::max());
}

let next_shard = shard_u256 + 1;
let end = next_shard * shard_size;
return RangeInclusive::new(
SubstateAddress::from_u256(start + shard_u256),
SubstateAddress::from_u256(end + shard_u256),
);
}

let num_shards_next_pow2 = num_shards.next_power_of_two();
// Half the next power of two i.e. num_shards rounded down to previous power of two
let num_shards_prev_pow2 = num_shards_next_pow2 >> 1;
let num_shards_next_pow2 = U256::from(num_shards_next_pow2);
// Power of two division using bit shifts
let half_shard_size = U256::MAX >> num_shards_next_pow2.trailing_zeros();

if self.0 == 0 {
return RangeInclusive::new(SubstateAddress::zero(), SubstateAddress::from_u256(half_shard_size));
}

// Calculate size of shard at previous power of two
let full_shard_size = U256::MAX >> num_shards_prev_pow2.trailing_zeros();
// The "extra" half shards in the space
let num_half_shards = num_shards % num_shards_prev_pow2;

let start = U256::from(self.0.min(num_half_shards * 2)) * half_shard_size +
U256::from(self.0.saturating_sub(num_half_shards * 2)) * full_shard_size;

if self.0 == num_shards - 1 {
return RangeInclusive::new(SubstateAddress::from_u256(start + shard_u256), SubstateAddress::max());
}
RangeInclusive::new(SubstateAddress::from_u256(start), SubstateAddress::from_u256(end))

let next_shard = self.0 + 1;
let end = U256::from(next_shard.min(num_half_shards * 2)) * half_shard_size +
U256::from(next_shard.saturating_sub(num_half_shards * 2)) * full_shard_size;

RangeInclusive::new(
SubstateAddress::from_u256(start + shard_u256),
SubstateAddress::from_u256(end + shard_u256),
)
}
}

Expand Down Expand Up @@ -65,33 +110,77 @@ impl Display for Shard {

#[cfg(test)]
mod test {
use crate::uint::{U256, U256_ONE};
use std::iter;

use indexmap::IndexMap;

use super::*;

#[test]
fn committee_is_properly_computed() {
for num_of_committees in 1..100 {
let mut previous_end = U256::ZERO;
let mut min_committee_size = U256::MAX;
let mut max_committee_size = U256::ZERO;
for bucket_index in 0..num_of_committees {
let bucket = super::Shard::from(bucket_index);
let range = bucket.to_substate_address_range(num_of_committees);
if bucket_index > 0 {
// TODO: clean this up a bit, I wrote this very hastily
let power_of_twos = iter::successors(Some(1), |x| Some(x * 2)).take(10);
let mut split_map = IndexMap::<_, Vec<_>>::new();
for num_of_shards in power_of_twos {
let mut last_end = U256::ZERO;
for shard_index in 0..num_of_shards {
let shard = Shard::from(shard_index);
let range = shard.to_substate_address_range(num_of_shards);
if shard_index == 0 {
assert_eq!(range.start().to_u256(), U256::ZERO, "First shard should start at 0");
} else {
assert_eq!(
range.start().to_u256(),
previous_end + U256_ONE,
"Bucket should start where the previous one ended+1"
last_end + 1,
"Shard should start where the previous one ended+1"
);
}
min_committee_size = min_committee_size.min(range.end().to_u256() - range.start().to_u256());
max_committee_size = max_committee_size.max(range.end().to_u256() - range.start().to_u256());
previous_end = range.end().to_u256();
last_end = range.end().to_u256();
split_map.entry(num_of_shards).or_default().push(range);
}
assert!(
num_of_committees <= 1 || max_committee_size <= min_committee_size + U256_ONE,
"Committee sizes should be balanced {min_committee_size} {max_committee_size}"
);
assert_eq!(previous_end, U256::MAX, "Last bucket should end at U256::MAX");
assert_eq!(last_end, U256::MAX, "Last shard should end at U256::MAX");
}

let mut i = 0usize;
for (num_of_shards, splits) in &split_map {
// Each split in the next num_of_shards should match the previous shard splits
let Some(next_splits) = split_map.get(&(num_of_shards << 1)) else {
break;
};

i += 1;

for (split, next_split) in splits.iter().zip(
next_splits
.iter()
.enumerate()
// Every 2nd boundary matches
.filter(|(i, _)| i % 2 == 1)
.map(|(_, s)| s),
) {
assert_eq!(
split.end().to_u256(),
next_split.end().to_u256(),
"Bucket should end where the next one starts-1"
);
}

if splits.len() >= 2 {
let mut size = None;
for split in splits.iter().skip(1).take(splits.len() - 2) {
if let Some(size) = size {
assert_eq!(
split.end().to_u256() - split.start().to_u256(),
size,
"Shard size should be consistent"
);
}
size = Some(split.end().to_u256() - split.start().to_u256());
}
}
}

// Check that we didnt break early
assert_eq!(i, 9);
}
}
Loading

0 comments on commit 198e6ec

Please sign in to comment.