Skip to content

Commit

Permalink
feat!: state checkpoint sync, fixes, epoch change (#1067)
Browse files Browse the repository at this point in the history
Description
---

feat: adds state sync
fix: remove block sync
feat: start new chain at each epoch
fix: fixes bug that drops consensus messages (inbound messaging not
cancel safe)
fix: scope JMT to epoch/shard
fix: only emit epoch changed event once scanning to tip has completed
fix: swarm mines more initial blocks when registering many vns
fix: bug that caused block signature to disappear after unparking the
block
feat: adds state checkpoint call
test: fix epoch change and state sync cucumbers

Motivation and Context
---

State sync fetches state transitions and applies them to the local state
db. Syncing historical blocks is not feasible across changing
shards/epochs and this is removed in this PR.

Closes #1070
Closes #1037 

Outstanding items:
- verify checkpoint on sync
- construct a "tiered" state merkle root consisting of presharded JMT
merkle roots applicable to the current shard space.
- leader failure due to race condition when proposal for end of epoch is
received before all validators have scanned to the base layer tip

How Has This Been Tested?
---
Manually: 
1. 2 validators, mine to next epoch
2. 2 validators, send transactions, delete data of one vn and resync
3. 4 validators, delete data from one validator, leader failures, epoch
change, resync validator
4. [TODO] new validator joining network
5. [TODO] multishard test 

What process can a PR reviewer use to test or verify this change?
---
Swarm, mine to next epoch

Breaking Changes
---

- [ ] None
- [x] Requires data directory to be deleted
- [x] Other - Please specify
  • Loading branch information
sdbondi authored Jul 16, 2024
1 parent 4731032 commit de14ad9
Show file tree
Hide file tree
Showing 139 changed files with 4,714 additions and 2,552 deletions.
5 changes: 3 additions & 2 deletions applications/tari_dan_app_utilities/src/base_layer_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use tari_crypto::{
ristretto::RistrettoPublicKey,
tari_utilities::{hex::Hex, ByteArray},
};
use tari_dan_common_types::{optional::Optional, Epoch, NodeAddressable, NodeHeight};
use tari_dan_common_types::{optional::Optional, shard::Shard, Epoch, NodeAddressable, NodeHeight};
use tari_dan_storage::{
consensus_models::{Block, SubstateRecord},
global::{GlobalDb, MetadataKey},
Expand Down Expand Up @@ -451,7 +451,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
});
self.state_store
.with_write_tx(|tx| {
let genesis = Block::genesis(self.network);
let genesis = Block::genesis(self.network, Epoch(0), Shard::zero());

// TODO: This should be proposed in a block...
SubstateRecord {
Expand All @@ -463,6 +463,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
created_justify: *genesis.justify().id(),
created_block: *genesis.id(),
created_height: NodeHeight::zero(),
created_by_shard: Shard::zero(),
created_at_epoch: Epoch(0),
destroyed: None,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
time::{Duration, Instant},
};

use indexmap::{IndexMap, IndexSet};
use indexmap::IndexMap;
use log::*;
use tari_common::configuration::Network;
use tari_common_types::types::PublicKey;
Expand Down Expand Up @@ -53,7 +53,7 @@ impl ExecutionOutput {
pub fn resolve_inputs(
&self,
inputs: IndexMap<VersionedSubstateId, Substate>,
) -> IndexSet<VersionedSubstateIdLockIntent> {
) -> Vec<VersionedSubstateIdLockIntent> {
if let Some(diff) = self.result.finalize.accept() {
inputs
.into_iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use diesel::{
};
use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
use log::*;
use tari_crypto::tari_utilities::hex::to_hex;
use tari_dan_common_types::{shard::Shard, substate_type::SubstateType, Epoch};
use tari_dan_storage::{consensus_models::BlockId, StorageError};
use tari_dan_storage_sqlite::{error::SqliteStorageError, SqliteTransaction};
Expand Down Expand Up @@ -802,9 +803,9 @@ impl SubstateStoreWriteTransaction for SqliteSubstateStoreWriteTransaction<'_> {
reason: format!("save_scanned_block_id error: {}", e),
})?;

info!(
debug!(
target: LOG_TARGET,
"Added new scanned block id {:?} for epoch {} and shard {:?}", new.last_block_id, new.epoch, new.shard
"Added new scanned block id {} for epoch {} and shard {:?}", to_hex(&new.last_block_id), new.epoch, new.shard
);

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,11 @@ fn forward_logs<R: AsyncRead + Unpin + Send + 'static>(path: PathBuf, reader: R,
task::spawn(async move {
let mut log_file = File::create(path).await.unwrap();
while let Some(output) = lines.next_line().await.unwrap() {
log::debug!("[{target}] {output}");
log::debug!(target: "swarm", "[{target}] {output}");
log_file.write_all(output.as_bytes()).await.unwrap();
log_file.write_all(b"\n").await.unwrap();
log_file.flush().await.unwrap();
}
log::debug!("Process exited ({target})");
log::debug!(target: "swarm", "Process exited ({target})");
});
}
23 changes: 15 additions & 8 deletions applications/tari_swarm_daemon/src/process_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use std::{collections::HashMap, time::Duration};

use anyhow::anyhow;
use anyhow::{anyhow, Context};
use log::info;
use minotari_node_grpc_client::grpc;
use tari_crypto::tari_utilities::ByteArray;
Expand Down Expand Up @@ -56,10 +56,14 @@ impl ProcessManager {

let num_vns = self.instance_manager.num_validator_nodes();
// Mine some initial funds, guessing 10 blocks to allow for coinbase maturity
self.mine(num_vns + 5).await?;
self.wait_for_wallet_funds(num_vns).await?;
self.mine(num_vns + 10).await.context("mining failed")?;
self.wait_for_wallet_funds(num_vns)
.await
.context("waiting for wallet funds")?;

self.register_all_validator_nodes().await?;
self.register_all_validator_nodes()
.await
.context("registering validator node via GRPC")?;

loop {
tokio::select! {
Expand Down Expand Up @@ -180,14 +184,15 @@ impl ProcessManager {
}

async fn register_all_validator_nodes(&mut self) -> anyhow::Result<()> {
let mut skip = vec![];
for vn in self.instance_manager.validator_nodes_mut() {
if !vn.instance_mut().check_running() {
log::error!(
"Skipping registration for validator node {}: {} since it is not running",
vn.instance().id(),
vn.instance().name()
);
continue;
skip.push(vn.instance().id());
}
}

Expand All @@ -203,6 +208,9 @@ impl ProcessManager {
})?;

for vn in self.instance_manager.validator_nodes() {
if skip.contains(&vn.instance().id()) {
continue;
}
info!("🟡 Registering validator node {}", vn.instance().name());
if let Err(err) = vn.wait_for_startup(Duration::from_secs(10)).await {
log::error!(
Expand All @@ -215,12 +223,12 @@ impl ProcessManager {

let reg_info = vn.get_registration_info().await?;
let tx_id = wallet.register_validator_node(reg_info).await?;
info!("🟢 Registered validator node with tx_id: {tx_id}");
info!("🟢 Registered validator node {vn} with tx_id: {tx_id}");
// Just wait a bit :shrug: This could be a bug in the console wallet. If we submit too quickly it uses 0
// inputs for a transaction.
sleep(Duration::from_secs(2)).await;
}
self.mine(20).await?;
self.mine(10).await?;
Ok(())
}

Expand Down Expand Up @@ -257,7 +265,6 @@ impl ProcessManager {

let reg_info = vn.get_registration_info().await?;
wallet.register_validator_node(reg_info).await?;
self.mine(20).await?;
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::time::Duration;
use std::{fmt::Display, time::Duration};

use anyhow::{anyhow, Context};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -89,6 +89,12 @@ impl ValidatorNodeProcess {
}
}

impl Display for ValidatorNodeProcess {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} ({})", self.instance().name(), self.instance().id())
}
}

#[derive(Serialize, Deserialize)]
pub struct ValidatorRegistrationInfo {
pub signature: ValidatorNodeSignature,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub async fn create(

if req.register {
context.process_manager().register_validator_node(instance_id).await?;
context.process_manager().mine_blocks(10).await?;
}

Ok(ValidatorNodeCreateResponse { instance_id })
Expand Down
13 changes: 9 additions & 4 deletions applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use tari_dan_app_utilities::{
template_manager::{implementation::TemplateManager, interface::TemplateManagerHandle},
transaction_executor::TariDanTransactionProcessor,
};
use tari_dan_common_types::{Epoch, NodeAddressable, NodeHeight, PeerAddress, SubstateAddress};
use tari_dan_common_types::{shard::Shard, Epoch, NodeAddressable, NodeHeight, PeerAddress, SubstateAddress};
use tari_dan_engine::fees::FeeTable;
use tari_dan_p2p::TariMessagingSpec;
use tari_dan_storage::{
Expand Down Expand Up @@ -331,6 +331,7 @@ pub async fn spawn_services(
spawn_p2p_rpc(
config,
&mut networking,
epoch_manager.clone(),
state_store.clone(),
mempool.clone(),
virtual_substate_manager,
Expand Down Expand Up @@ -422,6 +423,7 @@ impl Services {
async fn spawn_p2p_rpc(
config: &ApplicationConfig,
networking: &mut NetworkingHandle<TariMessagingSpec>,
epoch_manager: EpochManagerHandle<PeerAddress>,
shard_store_store: SqliteStateStore<PeerAddress>,
mempool: MempoolHandle,
virtual_substate_manager: VirtualSubstateManager<SqliteStateStore<PeerAddress>, EpochManagerHandle<PeerAddress>>,
Expand All @@ -431,6 +433,7 @@ async fn spawn_p2p_rpc(
.with_maximum_sessions_per_client(config.validator_node.rpc.max_sessions_per_client)
.finish()
.add_service(create_tari_validator_node_rpc_service(
epoch_manager,
shard_store_store,
mempool,
virtual_substate_manager,
Expand All @@ -451,7 +454,7 @@ where
TTx::Target: StateStoreReadTransaction,
TTx::Addr: NodeAddressable + Serialize,
{
let genesis_block = Block::genesis(network);
let genesis_block = Block::genesis(network, Epoch(0), Shard::zero());
let substate_id = SubstateId::Resource(PUBLIC_IDENTITY_RESOURCE_ADDRESS);
let substate_address = SubstateAddress::from_substate_id(&substate_id, 0);
let mut metadata: Metadata = Default::default();
Expand All @@ -474,8 +477,9 @@ where
state_hash: Default::default(),
created_by_transaction: Default::default(),
created_justify: *genesis_block.justify().id(),
created_block: BlockId::genesis(),
created_block: BlockId::zero(),
created_height: NodeHeight(0),
created_by_shard: Shard::zero(),
created_at_epoch: Epoch(0),
destroyed: None,
}
Expand Down Expand Up @@ -503,9 +507,10 @@ where
state_hash: Default::default(),
created_by_transaction: Default::default(),
created_justify: *genesis_block.justify().id(),
created_block: BlockId::genesis(),
created_block: BlockId::zero(),
created_height: NodeHeight(0),
created_at_epoch: Epoch(0),
created_by_shard: Shard::zero(),
destroyed: None,
}
.create(tx)?;
Expand Down
13 changes: 12 additions & 1 deletion applications/tari_validator_node/src/consensus/handle.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use tari_consensus::hotstuff::{ConsensusCurrentState, HotstuffEvent};
use tari_consensus::hotstuff::{ConsensusCurrentState, CurrentView, HotstuffEvent};
use tokio::sync::{broadcast, watch};

use crate::event_subscription::EventSubscription;
Expand All @@ -10,24 +10,35 @@ use crate::event_subscription::EventSubscription;
pub struct ConsensusHandle {
rx_current_state: watch::Receiver<ConsensusCurrentState>,
events_subscription: EventSubscription<HotstuffEvent>,
current_view: CurrentView,
}

impl ConsensusHandle {
pub(super) fn new(
rx_current_state: watch::Receiver<ConsensusCurrentState>,
events_subscription: EventSubscription<HotstuffEvent>,
current_view: CurrentView,
) -> Self {
Self {
rx_current_state,
events_subscription,
current_view,
}
}

pub fn current_view(&self) -> &CurrentView {
&self.current_view
}

pub fn subscribe_to_hotstuff_events(&mut self) -> broadcast::Receiver<HotstuffEvent> {
self.events_subscription.subscribe()
}

pub fn get_current_state(&self) -> ConsensusCurrentState {
*self.rx_current_state.borrow()
}

pub fn is_running(&self) -> bool {
self.get_current_state().is_running()
}
}
9 changes: 7 additions & 2 deletions applications/tari_validator_node/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,25 @@ pub async fn spawn(
max_base_layer_blocks_ahead: consensus_constants.max_base_layer_blocks_ahead,
},
);
let current_view = hotstuff_worker.pacemaker().current_view().clone();

let (tx_current_state, rx_current_state) = watch::channel(Default::default());
let context = ConsensusWorkerContext {
epoch_manager: epoch_manager.clone(),
hotstuff: hotstuff_worker,
state_sync: RpcStateSyncManager::new(network, epoch_manager, store, leader_strategy, client_factory),
state_sync: RpcStateSyncManager::new(epoch_manager, store, client_factory),
tx_current_state,
};

let handle = ConsensusWorker::new(shutdown_signal).spawn(context);

(
handle,
ConsensusHandle::new(rx_current_state, EventSubscription::new(tx_hotstuff_events)),
ConsensusHandle::new(
rx_current_state,
EventSubscription::new(tx_hotstuff_events),
current_view,
),
rx_mempool,
)
}
4 changes: 4 additions & 0 deletions applications/tari_validator_node/src/json_rpc/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,10 @@ impl JsonRpcHandlers {

pub async fn get_epoch_manager_stats(&self, value: JsonRpcExtractor) -> JrpcResult {
let answer_id = value.get_answer_id();
self.epoch_manager
.wait_for_initial_scanning_to_complete()
.await
.map_err(internal_error(answer_id))?;
let current_epoch = self.epoch_manager.current_epoch().await.map_err(|e| {
JsonRpcResponse::error(
answer_id,
Expand Down
Loading

0 comments on commit de14ad9

Please sign in to comment.