Skip to content

Commit

Permalink
refactor!: fix epoch database structure (#1034)
Browse files Browse the repository at this point in the history
Description
---
Changes the epoch, validator and committee structure in the global
database.


Motivation and Context
---
Previously, validator nodes did not deregister and this resulted in a
bug that caused the committees for epochs to not be correct.

How Has This Been Tested?
---
Existing tests, and manual testing

Breaking Changes
---

- [ ] None
- [x] Requires data directory to be deleted
- [ ] Other - Please specify

BREAKING CHANGE: Data directory must be cleaned
  • Loading branch information
stringhandler authored May 15, 2024
1 parent 533d82e commit c2aee34
Show file tree
Hide file tree
Showing 73 changed files with 995 additions and 1,160 deletions.
3 changes: 2 additions & 1 deletion .prettierrc
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
"useTabs": false,
"semi": true,
"singleQuote": false,
"quoteProps": "consistent",
"trailingComma": "all",
"bracketSpacing": true,
"bracketSameLine": false,
"arrowParens": "always",
"endOfLine": "auto"
"endOfLine": "lf"
}
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 23 additions & 8 deletions applications/tari_dan_app_utilities/src/base_layer_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ use tari_base_node_client::{
};
use tari_common::configuration::Network;
use tari_common_types::types::{Commitment, FixedHash, FixedHashSizeError, PublicKey};
use tari_core::transactions::transaction_components::{
CodeTemplateRegistration,
SideChainFeature,
TransactionOutput,
ValidatorNodeRegistration,
use tari_core::transactions::{
tari_amount::MicroMinotari,
transaction_components::{
CodeTemplateRegistration,
SideChainFeature,
TransactionOutput,
ValidatorNodeRegistration,
},
};
use tari_crypto::{
ristretto::RistrettoPublicKey,
Expand Down Expand Up @@ -334,9 +337,20 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
};
match sidechain_feature {
SideChainFeature::ValidatorNodeRegistration(reg) => {
info!(
target: LOG_TARGET,
"⛓️ Validator node registration UTXO for {} sidechain {} found at height {}",
reg.public_key(),
reg.sidechain_id().map(|v| v.to_hex()).unwrap_or("None".to_string()),
current_height,
);
if reg.sidechain_id() == self.validator_node_sidechain_id.as_ref() {
self.register_validator_node_registration(current_height, reg.clone())
.await?;
self.register_validator_node_registration(
current_height,
reg.clone(),
output.minimum_value_promise,
)
.await?;
} else {
warn!(
target: LOG_TARGET,
Expand Down Expand Up @@ -465,6 +479,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
&mut self,
height: u64,
registration: ValidatorNodeRegistration,
minimum_value_promise: MicroMinotari,
) -> Result<(), BaseLayerScannerError> {
info!(
target: LOG_TARGET,
Expand All @@ -474,7 +489,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
);

self.epoch_manager
.add_validator_node_registration(height, registration)
.add_validator_node_registration(height, registration, minimum_value_promise)
.await?;

Ok(())
Expand Down
1 change: 1 addition & 0 deletions applications/tari_indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ log4rs = { workspace = true, features = [
"fixed_window_roller",
] }
mime_guess = { workspace = true }
rand = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true, features = ["default", "derive"] }
serde_json = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_indexer/src/dry_run/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ where TSubstateCache: SubstateCache + 'static
address: SubstateAddress,
epoch: Epoch,
) -> Result<(SubstateId, Substate), DryRunTransactionProcessorError> {
let mut committee = self.epoch_manager.get_committee(epoch, address).await?;
let mut committee = self.epoch_manager.get_committee_for_substate(epoch, address).await?;
committee.shuffle();

let mut nexist_count = 0;
Expand Down
48 changes: 20 additions & 28 deletions applications/tari_indexer/src/event_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,16 @@

use std::{
collections::{BTreeMap, HashSet},
ops::RangeInclusive,
str::FromStr,
};

use futures::StreamExt;
use log::*;
use rand::{prelude::SliceRandom, rngs::OsRng};
use tari_bor::decode;
use tari_common::configuration::Network;
use tari_crypto::tari_utilities::message_format::MessageFormat;
use tari_dan_common_types::{
committee::{Committee, CommitteeShardInfo},
shard::Shard,
Epoch,
PeerAddress,
SubstateAddress,
};
use tari_dan_common_types::{committee::Committee, shard::Shard, Epoch, PeerAddress};
use tari_dan_p2p::proto::rpc::{GetTransactionResultRequest, PayloadResultStatus, SyncBlocksRequest};
use tari_dan_storage::consensus_models::{Block, BlockId, Command, Decision, TransactionRecord};
use tari_engine_types::{commit_result::ExecuteResult, events::Event, substate::SubstateId};
Expand Down Expand Up @@ -88,18 +82,20 @@ impl EventManager {

let mut event_count = 0;

let network_committee_info = self.epoch_manager.get_network_committees().await?;
let epoch = network_committee_info.epoch;
for committee in network_committee_info.committees {
let current_epoch = self.epoch_manager.current_epoch().await?;
// let network_committee_info = self.epoch_manager.get_network_committees().await?;
// let epoch = network_committee_info.epoch;
let current_committees = self.epoch_manager.get_committees(current_epoch).await?;
for (shard, committee) in current_committees {
info!(
target: LOG_TARGET,
"Scanning committee epoch={}, shard={}",
epoch,
committee.shard
current_epoch,
shard
);
// TODO: use the latest block id that we scanned for each committee
let new_blocks = self
.get_new_blocks_from_committee(&mut committee.clone(), epoch)
.get_new_blocks_from_committee(shard, &mut committee.clone(), current_epoch)
.await?;
info!(
target: LOG_TARGET,
Expand Down Expand Up @@ -185,7 +181,7 @@ impl EventManager {
async fn get_events_for_transaction(&self, transaction_id: TransactionId) -> Result<Vec<Event>, anyhow::Error> {
let committee = self.get_all_vns().await?;

for member in committee.addresses() {
for member in &committee {
let resp = self.get_execute_result_from_vn(member, &transaction_id).await;

match resp {
Expand Down Expand Up @@ -274,18 +270,18 @@ impl EventManager {
#[allow(unused_assignments)]
async fn get_new_blocks_from_committee(
&self,
committee: &mut CommitteeShardInfo<PeerAddress>,
shard: Shard,
committee: &mut Committee<PeerAddress>,
epoch: Epoch,
) -> Result<Vec<Block>, anyhow::Error> {
// We start scanning from the last scanned block for this commitee
let shard = committee.shard;
let start_block_id = {
let mut tx = self.substate_store.create_read_tx()?;
tx.get_last_scanned_block_id(epoch, shard)?
};
let start_block_id = start_block_id.unwrap_or(self.build_genesis_block_id());

committee.validators.shuffle();
committee.members.shuffle(&mut OsRng);
let mut last_block_id = start_block_id;

info!(
Expand All @@ -296,7 +292,7 @@ impl EventManager {
shard
);

for member in committee.validators.addresses() {
for (member, _) in &committee.members {
let resp = self.get_blocks_from_vn(member, start_block_id).await;

match resp {
Expand Down Expand Up @@ -353,18 +349,14 @@ impl EventManager {
Ok(())
}

async fn get_all_vns(&self) -> Result<Committee<PeerAddress>, anyhow::Error> {
async fn get_all_vns(&self) -> Result<Vec<PeerAddress>, anyhow::Error> {
// get all the committees
// TODO: optimize by getting all individual CommiteeShards instead of all the VNs
let epoch = self.epoch_manager.current_epoch().await?;
let full_range = RangeInclusive::new(SubstateAddress::zero(), SubstateAddress::max());
let mut committee = self
Ok(self
.epoch_manager
.get_committee_within_shard_range(epoch, full_range)
.await?;
committee.shuffle();

Ok(committee)
.get_all_validator_nodes(epoch)
.await
.map(|v| v.iter().map(|m| m.address).collect())?)
}

async fn get_blocks_from_vn(
Expand Down
9 changes: 2 additions & 7 deletions applications/tari_indexer/src/json_rpc/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use axum_jrpc::{
JsonRpcResponse,
};
use libp2p::swarm::dial_opts::{DialOpts, PeerCondition};
use log::{error, warn};
use log::warn;
use serde_json::{self as json, json, Value};
use tari_base_node_client::{grpc::GrpcBaseNodeClient, types::BaseLayerConsensusConstants, BaseNodeClient};
use tari_crypto::tari_utilities::hex::to_hex;
Expand Down Expand Up @@ -780,12 +780,7 @@ impl JsonRpcHandlers {
}

fn internal_error<T: Display>(answer_id: i64, error: T) -> JsonRpcResponse {
let msg = if cfg!(debug_assertions) || option_env!("CI").is_some() {
error.to_string()
} else {
error!(target: LOG_TARGET, "Internal error: {}", error);
"Something went wrong".to_string()
};
let msg = error.to_string();
Self::error_response(answer_id, JsonRpcErrorReason::InternalError, msg)
}
}
5 changes: 4 additions & 1 deletion applications/tari_indexer/src/transaction_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,10 @@ where
// Get all unique members. The hashset already "shuffles" items owing to the random hash function.
let mut all_members = HashSet::new();
for substate_address in substate_addresses {
let committee = self.epoch_manager.get_committee(epoch, substate_address).await?;
let committee = self
.epoch_manager
.get_committee_for_substate(epoch, substate_address)
.await?;
all_members.extend(committee.into_addresses());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,7 @@ impl ExecutableManager {
exec.instance_type,
compile.working_dir().display()
);
let mut child = cargo_build(
compile
.working_dir()
.canonicalize()
.context("working_dir does not exist")?,
&compile.package_name,
)?;
let mut child = cargo_build(compile.working_dir(), &compile.package_name)?;
tasks.push(async move {
let status = child.wait().await?;
Ok::<_, anyhow::Error>((status, exec))
Expand All @@ -93,9 +87,7 @@ impl ExecutableManager {

self.prepared.push(Executable {
instance_type: exec.instance_type,
path: add_ext(&bin_path)
.canonicalize()
.with_context(|| anyhow!("The compiled binary at path '{}' does not exist.", bin_path.display()))?,
path: add_ext(&bin_path),
env: exec.env.clone(),
})
}
Expand Down Expand Up @@ -195,7 +187,7 @@ fn add_ext<P: AsRef<Path>>(path: P) -> PathBuf {
let path = path.as_ref().to_path_buf();

if cfg!(windows) {
path.with_extension(".exe")
path.with_extension("exe")
} else {
path
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: BSD-3-Clause

use tari_consensus::traits::StateManager;
use tari_dan_common_types::{committee::CommitteeShard, SubstateAddress};
use tari_dan_common_types::{committee::CommitteeInfo, SubstateAddress};
use tari_dan_storage::{
consensus_models::{Block, ExecutedTransaction, SubstateRecord},
StateStore,
Expand All @@ -25,7 +25,7 @@ impl<TStateStore: StateStore> StateManager<TStateStore> for TariStateManager {
tx: &mut TStateStore::WriteTransaction<'_>,
block: &Block,
transaction: &ExecutedTransaction,
local_committee_shard: &CommitteeShard,
local_committee_info: &CommitteeInfo,
) -> Result<(), Self::Error> {
let Some(diff) = transaction.result().finalize.result.accept() else {
// We should only commit accepted transactions, might want to change this API to reflect that
Expand All @@ -35,7 +35,7 @@ impl<TStateStore: StateStore> StateManager<TStateStore> for TariStateManager {
let down_shards = diff
.down_iter()
.map(|(addr, version)| SubstateAddress::from_address(addr, *version))
.filter(|shard| local_committee_shard.includes_substate_address(shard));
.filter(|shard| local_committee_info.includes_substate_address(shard));
SubstateRecord::destroy_many(
tx,
down_shards,
Expand All @@ -49,7 +49,7 @@ impl<TStateStore: StateStore> StateManager<TStateStore> for TariStateManager {
let to_up = diff.up_iter().filter_map(|(addr, substate)| {
let address = SubstateAddress::from_address(addr, substate.version());
// Commit all substates included in this shard. Every involved validator commits the transaction receipt.
if local_committee_shard.includes_substate_address(&address) || addr.is_transaction_receipt() {
if local_committee_info.includes_substate_address(&address) || addr.is_transaction_receipt() {
Some(SubstateRecord::new(
addr.clone(),
substate.version(),
Expand Down
Loading

0 comments on commit c2aee34

Please sign in to comment.