Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(validators): trickle in validators #1182

Merged
1 change: 1 addition & 0 deletions applications/tari_indexer/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ pub async fn spawn_services(
.try_into()
.context("committee_size must be non-zero")?,
validator_node_sidechain_id: config.indexer.sidechain_id.clone(),
max_vns_per_epoch_activated: consensus_constants.max_vns_per_epoch_activated,
},
global_db.clone(),
base_node_client.clone(),
Expand Down
1 change: 1 addition & 0 deletions applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ pub async fn spawn_services(
.context("committee size must be non-zero")?,
validator_node_sidechain_id: config.validator_node.validator_node_sidechain_id.clone(),
num_preshards: consensus_constants.num_preshards,
max_vns_per_epoch_activated: consensus_constants.max_vns_per_epoch_activated,
};
// Epoch manager
let (epoch_manager, join_handle) = tari_epoch_manager::base_layer::spawn_service(
Expand Down
4 changes: 4 additions & 0 deletions dan_layer/consensus/src/consensus_constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ pub struct ConsensusConstants {
pub pacemaker_max_base_time: Duration,
/// The value that fees are divided by to determine the amount of fees to burn. 0 means no fees are burned.
pub fee_exhaust_divisor: u64,
/// Maximum number of validator nodes to be activated in an epoch.
/// This is to give enough time to the network to catch up with new validator nodes and do syncing.
pub max_vns_per_epoch_activated: u64,
}

impl ConsensusConstants {
Expand All @@ -47,6 +50,7 @@ impl ConsensusConstants {
num_preshards: NumPreshards::P256,
pacemaker_max_base_time: Duration::from_secs(10),
fee_exhaust_divisor: 20, // 5%
max_vns_per_epoch_activated: 50,
}
}
}
Expand Down
70 changes: 58 additions & 12 deletions dan_layer/epoch_manager/src/base_layer/base_layer_epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,25 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{cmp, collections::HashMap, mem, num::NonZeroU32};
use std::{
cell::{Cell, RefCell},
cmp,
collections::HashMap,
mem,
num::NonZeroU32,
ops::DerefMut,
rc::Rc,
sync::Arc,
};

use log::*;
use log::{__private_api::loc, *};
sdbondi marked this conversation as resolved.
Show resolved Hide resolved
use tari_base_node_client::{grpc::GrpcBaseNodeClient, types::BaseLayerConsensusConstants, BaseNodeClient};
use tari_common_types::types::{FixedHash, PublicKey};
use tari_core::{blocks::BlockHeader, transactions::transaction_components::ValidatorNodeRegistration};
use tari_core::{
blocks::BlockHeader,
consensus::ConsensusConstants,
transactions::transaction_components::ValidatorNodeRegistration,
};
use tari_dan_common_types::{
committee::{Committee, CommitteeInfo},
optional::Optional,
Expand All @@ -36,16 +49,16 @@ use tari_dan_common_types::{
SubstateAddress,
};
use tari_dan_storage::global::{models::ValidatorNode, DbBaseLayerBlockInfo, DbEpoch, GlobalDb, MetadataKey};
use tari_dan_storage_sqlite::global::SqliteGlobalDbAdapter;
use tari_dan_storage_sqlite::{error::SqliteStorageError, global::SqliteGlobalDbAdapter};
use tari_utilities::{byte_array::ByteArray, hex::Hex};
use tokio::sync::{broadcast, oneshot};
use tokio::sync::{broadcast, oneshot, Mutex};

use crate::{base_layer::config::EpochManagerConfig, error::EpochManagerError, EpochManagerEvent};

const LOG_TARGET: &str = "tari::dan::epoch_manager::base_layer";

pub struct BaseLayerEpochManager<TGlobalStore, TBaseNodeClient> {
global_db: GlobalDb<TGlobalStore>,
global_db: Arc<GlobalDb<TGlobalStore>>,
sdbondi marked this conversation as resolved.
Show resolved Hide resolved
base_node_client: TBaseNodeClient,
config: EpochManagerConfig,
current_epoch: Epoch,
Expand All @@ -70,7 +83,7 @@ impl<TAddr: NodeAddressable + DerivableFromPublicKey>
node_public_key: PublicKey,
) -> Self {
Self {
global_db,
global_db: Arc::new(global_db),
base_node_client,
config,
current_epoch: Epoch(0),
Expand Down Expand Up @@ -130,14 +143,15 @@ impl<TAddr: NodeAddressable + DerivableFromPublicKey>
Ok(())
}

/// Assigns validators for the given epoch (makes them active) from the database.
/// Max number of validators must be passed to limit the number of validators to make active in the given epoch.
fn assign_validators_for_epoch(&mut self, epoch: Epoch) -> Result<(), EpochManagerError> {
let mut tx = self.global_db.create_transaction()?;
let mut validator_nodes = self.global_db.validator_nodes(&mut tx);

let vns = validator_nodes.get_all_within_epoch(epoch, self.config.validator_node_sidechain_id.as_ref())?;

let num_committees = calculate_num_committees(vns.len() as u64, self.config.committee_size);

for vn in &vns {
validator_nodes.set_committee_shard(
vn.shard_key,
Expand All @@ -146,6 +160,7 @@ impl<TAddr: NodeAddressable + DerivableFromPublicKey>
epoch,
)?;
}

tx.commit()?;
if let Some(vn) = vns.iter().find(|vn| vn.public_key == self.node_public_key) {
self.publish_event(EpochManagerEvent::ThisValidatorIsRegistered {
Expand All @@ -157,6 +172,13 @@ impl<TAddr: NodeAddressable + DerivableFromPublicKey>
Ok(())
}

pub async fn base_layer_consensus_constants(&self) -> Result<&BaseLayerConsensusConstants, EpochManagerError> {
Ok(self
.base_layer_consensus_constants
.as_ref()
.expect("update_base_layer_consensus_constants did not set constants"))
}

pub async fn get_base_layer_consensus_constants(
&mut self,
) -> Result<&BaseLayerConsensusConstants, EpochManagerError> {
Expand All @@ -183,6 +205,20 @@ impl<TAddr: NodeAddressable + DerivableFromPublicKey>
Ok(())
}

fn validator_nodes_count(
&self,
next_epoch: Epoch,
sidechain_id: Option<&PublicKey>,
) -> Result<u64, EpochManagerError> {
let mut tx = self.global_db.create_transaction()?;
let result = self
.global_db
.validator_nodes(&mut tx)
.count_by_epoch(next_epoch, sidechain_id)?;
tx.commit()?;
Ok(result)
}

pub async fn add_validator_node_registration(
&mut self,
block_height: u64,
Expand All @@ -194,11 +230,20 @@ impl<TAddr: NodeAddressable + DerivableFromPublicKey>
actual: registration.sidechain_id().map(|v| v.to_hex()),
});
}
let constants = self.get_base_layer_consensus_constants().await?;
let next_epoch = constants.height_to_epoch(block_height) + Epoch(1);
let next_epoch_height = constants.epoch_to_height(next_epoch);

let constants = self.base_layer_consensus_constants().await?;
let mut next_epoch = constants.height_to_epoch(block_height) + Epoch(1);
let validator_node_expiry = constants.validator_node_registration_expiry;

// find the next available epoch
let mut next_epoch_vn_count = self.validator_nodes_count(next_epoch, registration.sidechain_id())?;
while next_epoch_vn_count == self.config.max_vns_per_epoch_activated {
sdbondi marked this conversation as resolved.
Show resolved Hide resolved
next_epoch += Epoch(1);
next_epoch_vn_count = self.validator_nodes_count(next_epoch, registration.sidechain_id())?;
}

let next_epoch_height = constants.epoch_to_height(next_epoch);

let shard_key = self
.base_node_client
.get_shard_key(next_epoch_height, registration.public_key())
Expand All @@ -208,8 +253,9 @@ impl<TAddr: NodeAddressable + DerivableFromPublicKey>
block_height,
})?;

let mut tx = self.global_db.create_transaction()?;
info!(target: LOG_TARGET, "Registering validator node for epoch {}", next_epoch);

let mut tx = self.global_db.create_transaction()?;
self.global_db.validator_nodes(&mut tx).insert_validator_node(
TAddr::derive_from_public_key(registration.public_key()),
registration.public_key().clone(),
Expand Down
3 changes: 3 additions & 0 deletions dan_layer/epoch_manager/src/base_layer/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,7 @@ pub struct EpochManagerConfig {
pub committee_size: NonZeroU32,
pub validator_node_sidechain_id: Option<PublicKey>,
pub num_preshards: NumPreshards,
/// Maximum number of validator nodes to be activated in an epoch.
/// This is to give enough time to the network to catch up with new validator nodes and do syncing.
pub max_vns_per_epoch_activated: u64,
}
6 changes: 6 additions & 0 deletions dan_layer/storage/src/global/backend_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ pub trait GlobalDbAdapter: AtomicDb + Send + Sync + Clone {
epoch: Epoch,
sidechain_id: Option<&PublicKey>,
) -> Result<u64, Self::Error>;
fn validator_nodes_count_by_start_epoch(
&self,
tx: &mut Self::DbTransaction<'_>,
epoch: Epoch,
sidechain_id: Option<&PublicKey>,
) -> Result<u64, Self::Error>;
fn validator_nodes_count_for_shard_group(
&self,
tx: &mut Self::DbTransaction<'_>,
Expand Down
10 changes: 10 additions & 0 deletions dan_layer/storage/src/global/validator_node_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,16 @@ impl<'a, 'tx, TGlobalDbAdapter: GlobalDbAdapter> ValidatorNodeDb<'a, 'tx, TGloba
.map_err(TGlobalDbAdapter::Error::into)
}

pub fn count_by_epoch(
&mut self,
epoch: Epoch,
sidechain_id: Option<&PublicKey>,
) -> Result<u64, TGlobalDbAdapter::Error> {
self.backend
.validator_nodes_count_by_start_epoch(self.tx, epoch, sidechain_id)
.map_err(TGlobalDbAdapter::Error::into)
}

pub fn count_in_shard_group(
&mut self,
epoch: Epoch,
Expand Down
48 changes: 37 additions & 11 deletions dan_layer/storage_sqlite/src/global/backend_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
use std::ops::Add;
sdbondi marked this conversation as resolved.
Show resolved Hide resolved
// Copyright 2022. The Tari Project
//
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the
// following conditions are met:
// Redistribution and use in source and binary forms, with or without modification, are permitted provided that
// the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following
// disclaimer.
// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the
// following disclaimer.
//
// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the
// following disclaimer in the documentation and/or other materials provided with the distribution.
//
// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote
// products derived from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
// WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
// PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY
// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
// CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
// OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
// DAMAGE.
use std::{
collections::{HashMap, HashSet},
convert::{TryFrom, TryInto},
Expand All @@ -31,13 +33,15 @@ use std::{
use diesel::{
sql_query,
sql_types::{BigInt, Bigint},
BoolExpressionMethods,
ExpressionMethods,
JoinOnDsl,
NullableExpressionMethods,
OptionalExtension,
QueryDsl,
RunQueryDsl,
SqliteConnection,
SqliteExpressionMethods,
};
use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
use serde::{de::DeserializeOwned, Serialize};
Expand Down Expand Up @@ -479,6 +483,28 @@ impl<TAddr: NodeAddressable> GlobalDbAdapter for SqliteGlobalDbAdapter<TAddr> {
Ok(count.cnt as u64)
}

fn validator_nodes_count_by_start_epoch(
&self,
tx: &mut Self::DbTransaction<'_>,
epoch: Epoch,
sidechain_id: Option<&PublicKey>,
) -> Result<u64, Self::Error> {
let db_sidechain_id = sidechain_id.map(|id| id.as_bytes()).unwrap_or(&[0u8; 32]);

let count = sql_query(
"SELECT COUNT(distinct public_key) as cnt FROM validator_nodes WHERE start_epoch = ? AND sidechain_id = ?",
)
.bind::<BigInt, _>(epoch.as_u64() as i64)
.bind::<diesel::sql_types::Binary, _>(db_sidechain_id)
.get_result::<Count>(tx.connection())
.map_err(|source| SqliteStorageError::DieselError {
source,
operation: "count_validator_nodes_by_start_epoch".to_string(),
})?;

Ok(count.cnt as u64)
}

fn validator_nodes_count_for_shard_group(
&self,
tx: &mut Self::DbTransaction<'_>,
Expand Down
Loading