Skip to content

Commit

Permalink
feat(validators): trickle in validators (#1182)
Browse files Browse the repository at this point in the history
Description
---
Allow only a configurable amount of validator nodes to be active in an
epoch, if we reach the limit, simply adding new validator nodes to a
later epoch.

Example:
Let say that the max number of vns to be registered in an epoch is `2`
and there are `10` new validator nodes are trying to to join at the same
time.

Validator nodes table on other nodes (epochs are relative):
1. epoch: new node 1, start epoch: 1, end epoch: 101
2. epoch: new node 2, start epoch: 1, end epoch: 101
3. epoch: new node 3, start epoch: 2, end epoch: 102
4. epoch: new node 4, start epoch: 2, end epoch: 102
5.  epoch: new node 5, start epoch: 3, end epoch: 103
etc...

Motivation and Context
---
Validators were registered all at once, so when the registration form
base layer has been processed, the new validators are available from the
next epoch. The problem is that if there are thousands of thousands of
validators joining the same time, it would cause the consensus to stuck
for a longer period of time until it gets running again healthy.

How Has This Been Tested?
---
Current limit for developer networks is `50`, so setting the number of
validator nodes to a higher number than `50` is needed. Then starting up
a fresh swarm using this new configuration (delete processes dir) and
after the first 50 nodes are up an running, open
`data/swarm/processes/validator-node-00/localnet/data/validator_node/global_storage.sqlite`
local database and check `validator_nodes` table content.
It will nicely show that the first 50 nodes are going to the next epoch,
then the rest in the one after next one.
Then start periodic mining to get to a new epoch and see if everything
still runs well with new validator nodes.

What process can a PR reviewer use to test or verify this change?
---


Breaking Changes
---

- [x] None
- [x] Requires data directory to be deleted
- [ ] Other - Please specify
  • Loading branch information
ksrichard authored Oct 21, 2024
1 parent 50e7b8c commit c423408
Show file tree
Hide file tree
Showing 9 changed files with 98 additions and 16 deletions.
1 change: 1 addition & 0 deletions applications/tari_indexer/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ pub async fn spawn_services(
.try_into()
.context("committee_size must be non-zero")?,
validator_node_sidechain_id: config.indexer.sidechain_id.clone(),
max_vns_per_epoch_activated: consensus_constants.max_vns_per_epoch_activated,
},
global_db.clone(),
base_node_client.clone(),
Expand Down
1 change: 1 addition & 0 deletions applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ pub async fn spawn_services(
.context("committee size must be non-zero")?,
validator_node_sidechain_id: config.validator_node.validator_node_sidechain_id.clone(),
num_preshards: consensus_constants.num_preshards,
max_vns_per_epoch_activated: consensus_constants.max_vns_per_epoch_activated,
};
// Epoch manager
let (epoch_manager, join_handle) = tari_epoch_manager::base_layer::spawn_service(
Expand Down
4 changes: 4 additions & 0 deletions dan_layer/consensus/src/consensus_constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub struct ConsensusConstants {
pub pacemaker_max_base_time: Duration,
/// The value that fees are divided by to determine the amount of fees to burn. 0 means no fees are burned.
pub fee_exhaust_divisor: u64,
/// Maximum number of validator nodes to be activated in an epoch.
/// This is to give enough time to the network to catch up with new validator nodes and do syncing.
pub max_vns_per_epoch_activated: u64,
}

impl ConsensusConstants {
Expand All @@ -47,6 +50,7 @@ impl ConsensusConstants {
num_preshards: NumPreshards::P256,
pacemaker_max_base_time: Duration::from_secs(10),
fee_exhaust_divisor: 20, // 5%
max_vns_per_epoch_activated: 50,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions dan_layer/consensus_tests/src/support/validator/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ impl ValidatorBuilder {
num_preshards: TEST_NUM_PRESHARDS,
pacemaker_max_base_time: self.block_time,
fee_exhaust_divisor: 0,
max_vns_per_epoch_activated: 5,
},
},
self.address.clone(),
Expand Down
43 changes: 38 additions & 5 deletions dan_layer/epoch_manager/src/base_layer/base_layer_epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,15 @@ impl<TAddr: NodeAddressable + DerivableFromPublicKey>
Ok(())
}

/// Assigns validators for the given epoch (makes them active) from the database.
/// Max number of validators must be passed to limit the number of validators to make active in the given epoch.
fn assign_validators_for_epoch(&mut self, epoch: Epoch) -> Result<(), EpochManagerError> {
let mut tx = self.global_db.create_transaction()?;
let mut validator_nodes = self.global_db.validator_nodes(&mut tx);

let vns = validator_nodes.get_all_within_epoch(epoch, self.config.validator_node_sidechain_id.as_ref())?;

let num_committees = calculate_num_committees(vns.len() as u64, self.config.committee_size);

for vn in &vns {
validator_nodes.set_committee_shard(
vn.shard_key,
Expand All @@ -146,6 +147,7 @@ impl<TAddr: NodeAddressable + DerivableFromPublicKey>
epoch,
)?;
}

tx.commit()?;
if let Some(vn) = vns.iter().find(|vn| vn.public_key == self.node_public_key) {
self.publish_event(EpochManagerEvent::ThisValidatorIsRegistered {
Expand All @@ -157,6 +159,13 @@ impl<TAddr: NodeAddressable + DerivableFromPublicKey>
Ok(())
}

pub async fn base_layer_consensus_constants(&self) -> Result<&BaseLayerConsensusConstants, EpochManagerError> {
Ok(self
.base_layer_consensus_constants
.as_ref()
.expect("update_base_layer_consensus_constants did not set constants"))
}

pub async fn get_base_layer_consensus_constants(
&mut self,
) -> Result<&BaseLayerConsensusConstants, EpochManagerError> {
Expand All @@ -183,6 +192,20 @@ impl<TAddr: NodeAddressable + DerivableFromPublicKey>
Ok(())
}

fn validator_nodes_count(
&self,
next_epoch: Epoch,
sidechain_id: Option<&PublicKey>,
) -> Result<u64, EpochManagerError> {
let mut tx = self.global_db.create_transaction()?;
let result = self
.global_db
.validator_nodes(&mut tx)
.count_by_epoch(next_epoch, sidechain_id)?;
tx.commit()?;
Ok(result)
}

pub async fn add_validator_node_registration(
&mut self,
block_height: u64,
Expand All @@ -194,11 +217,20 @@ impl<TAddr: NodeAddressable + DerivableFromPublicKey>
actual: registration.sidechain_id().map(|v| v.to_hex()),
});
}
let constants = self.get_base_layer_consensus_constants().await?;
let next_epoch = constants.height_to_epoch(block_height) + Epoch(1);
let next_epoch_height = constants.epoch_to_height(next_epoch);

let constants = self.base_layer_consensus_constants().await?;
let mut next_epoch = constants.height_to_epoch(block_height) + Epoch(1);
let validator_node_expiry = constants.validator_node_registration_expiry;

// find the next available epoch
let mut next_epoch_vn_count = self.validator_nodes_count(next_epoch, registration.sidechain_id())?;
while next_epoch_vn_count >= self.config.max_vns_per_epoch_activated {
next_epoch += Epoch(1);
next_epoch_vn_count = self.validator_nodes_count(next_epoch, registration.sidechain_id())?;
}

let next_epoch_height = constants.epoch_to_height(next_epoch);

let shard_key = self
.base_node_client
.get_shard_key(next_epoch_height, registration.public_key())
Expand All @@ -208,8 +240,9 @@ impl<TAddr: NodeAddressable + DerivableFromPublicKey>
block_height,
})?;

let mut tx = self.global_db.create_transaction()?;
info!(target: LOG_TARGET, "Registering validator node for epoch {}", next_epoch);

let mut tx = self.global_db.create_transaction()?;
self.global_db.validator_nodes(&mut tx).insert_validator_node(
TAddr::derive_from_public_key(registration.public_key()),
registration.public_key().clone(),
Expand Down
3 changes: 3 additions & 0 deletions dan_layer/epoch_manager/src/base_layer/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ pub struct EpochManagerConfig {
pub committee_size: NonZeroU32,
pub validator_node_sidechain_id: Option<PublicKey>,
pub num_preshards: NumPreshards,
/// Maximum number of validator nodes to be activated in an epoch.
/// This is to give enough time to the network to catch up with new validator nodes and do syncing.
pub max_vns_per_epoch_activated: u64,
}
6 changes: 6 additions & 0 deletions dan_layer/storage/src/global/backend_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ pub trait GlobalDbAdapter: AtomicDb + Send + Sync + Clone {
epoch: Epoch,
sidechain_id: Option<&PublicKey>,
) -> Result<u64, Self::Error>;
fn validator_nodes_count_by_start_epoch(
&self,
tx: &mut Self::DbTransaction<'_>,
epoch: Epoch,
sidechain_id: Option<&PublicKey>,
) -> Result<u64, Self::Error>;
fn validator_nodes_count_for_shard_group(
&self,
tx: &mut Self::DbTransaction<'_>,
Expand Down
10 changes: 10 additions & 0 deletions dan_layer/storage/src/global/validator_node_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ impl<'a, 'tx, TGlobalDbAdapter: GlobalDbAdapter> ValidatorNodeDb<'a, 'tx, TGloba
.map_err(TGlobalDbAdapter::Error::into)
}

pub fn count_by_epoch(
&mut self,
epoch: Epoch,
sidechain_id: Option<&PublicKey>,
) -> Result<u64, TGlobalDbAdapter::Error> {
self.backend
.validator_nodes_count_by_start_epoch(self.tx, epoch, sidechain_id)
.map_err(TGlobalDbAdapter::Error::into)
}

pub fn count_in_shard_group(
&mut self,
epoch: Epoch,
Expand Down
45 changes: 34 additions & 11 deletions dan_layer/storage_sqlite/src/global/backend_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
// Copyright 2022. The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that
// the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the
// following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
// PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY
// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
// OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
// DAMAGE.
use std::{
collections::{HashMap, HashSet},
convert::{TryFrom, TryInto},
Expand Down Expand Up @@ -479,6 +480,28 @@ impl<TAddr: NodeAddressable> GlobalDbAdapter for SqliteGlobalDbAdapter<TAddr> {
Ok(count.cnt as u64)
}

fn validator_nodes_count_by_start_epoch(
&self,
tx: &mut Self::DbTransaction<'_>,
epoch: Epoch,
sidechain_id: Option<&PublicKey>,
) -> Result<u64, Self::Error> {
let db_sidechain_id = sidechain_id.map(|id| id.as_bytes()).unwrap_or(&[0u8; 32]);

let count = sql_query(
"SELECT COUNT(distinct public_key) as cnt FROM validator_nodes WHERE start_epoch = ? AND sidechain_id = ?",
)
.bind::<BigInt, _>(epoch.as_u64() as i64)
.bind::<diesel::sql_types::Binary, _>(db_sidechain_id)
.get_result::<Count>(tx.connection())
.map_err(|source| SqliteStorageError::DieselError {
source,
operation: "count_validator_nodes_by_start_epoch".to_string(),
})?;

Ok(count.cnt as u64)
}

fn validator_nodes_count_for_shard_group(
&self,
tx: &mut Self::DbTransaction<'_>,
Expand Down

0 comments on commit c423408

Please sign in to comment.