Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: state checkpoint sync, fixes, epoch change #1067

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions applications/tari_dan_app_utilities/src/base_layer_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use tari_crypto::{
ristretto::RistrettoPublicKey,
tari_utilities::{hex::Hex, ByteArray},
};
use tari_dan_common_types::{optional::Optional, Epoch, NodeAddressable, NodeHeight};
use tari_dan_common_types::{optional::Optional, shard::Shard, Epoch, NodeAddressable, NodeHeight};
use tari_dan_storage::{
consensus_models::{Block, SubstateRecord},
global::{GlobalDb, MetadataKey},
Expand Down Expand Up @@ -451,7 +451,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
});
self.state_store
.with_write_tx(|tx| {
let genesis = Block::genesis(self.network);
let genesis = Block::genesis(self.network, Epoch(0), Shard::zero());

// TODO: This should be proposed in a block...
SubstateRecord {
Expand All @@ -463,6 +463,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
created_justify: *genesis.justify().id(),
created_block: *genesis.id(),
created_height: NodeHeight::zero(),
created_by_shard: Shard::zero(),
created_at_epoch: Epoch(0),
destroyed: None,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
time::{Duration, Instant},
};

use indexmap::{IndexMap, IndexSet};
use indexmap::IndexMap;
use log::*;
use tari_common::configuration::Network;
use tari_common_types::types::PublicKey;
Expand Down Expand Up @@ -53,7 +53,7 @@ impl ExecutionOutput {
pub fn resolve_inputs(
&self,
inputs: IndexMap<VersionedSubstateId, Substate>,
) -> IndexSet<VersionedSubstateIdLockIntent> {
) -> Vec<VersionedSubstateIdLockIntent> {
if let Some(diff) = self.result.finalize.accept() {
inputs
.into_iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use diesel::{
};
use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
use log::*;
use tari_crypto::tari_utilities::hex::to_hex;
use tari_dan_common_types::{shard::Shard, substate_type::SubstateType, Epoch};
use tari_dan_storage::{consensus_models::BlockId, StorageError};
use tari_dan_storage_sqlite::{error::SqliteStorageError, SqliteTransaction};
Expand Down Expand Up @@ -802,9 +803,9 @@ impl SubstateStoreWriteTransaction for SqliteSubstateStoreWriteTransaction<'_> {
reason: format!("save_scanned_block_id error: {}", e),
})?;

info!(
debug!(
target: LOG_TARGET,
"Added new scanned block id {:?} for epoch {} and shard {:?}", new.last_block_id, new.epoch, new.shard
"Added new scanned block id {} for epoch {} and shard {:?}", to_hex(&new.last_block_id), new.epoch, new.shard
);

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,11 @@ fn forward_logs<R: AsyncRead + Unpin + Send + 'static>(path: PathBuf, reader: R,
task::spawn(async move {
let mut log_file = File::create(path).await.unwrap();
while let Some(output) = lines.next_line().await.unwrap() {
log::debug!("[{target}] {output}");
log::debug!(target: "swarm", "[{target}] {output}");
log_file.write_all(output.as_bytes()).await.unwrap();
log_file.write_all(b"\n").await.unwrap();
log_file.flush().await.unwrap();
}
log::debug!("Process exited ({target})");
log::debug!(target: "swarm", "Process exited ({target})");
});
}
23 changes: 15 additions & 8 deletions applications/tari_swarm_daemon/src/process_manager/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use std::{collections::HashMap, time::Duration};

use anyhow::anyhow;
use anyhow::{anyhow, Context};
use log::info;
use minotari_node_grpc_client::grpc;
use tari_crypto::tari_utilities::ByteArray;
Expand Down Expand Up @@ -56,10 +56,14 @@ impl ProcessManager {

let num_vns = self.instance_manager.num_validator_nodes();
// Mine some initial funds, guessing 10 blocks to allow for coinbase maturity
self.mine(num_vns + 5).await?;
self.wait_for_wallet_funds(num_vns).await?;
self.mine(num_vns + 10).await.context("mining failed")?;
self.wait_for_wallet_funds(num_vns)
.await
.context("waiting for wallet funds")?;

self.register_all_validator_nodes().await?;
self.register_all_validator_nodes()
.await
.context("registering validator node via GRPC")?;

loop {
tokio::select! {
Expand Down Expand Up @@ -180,14 +184,15 @@ impl ProcessManager {
}

async fn register_all_validator_nodes(&mut self) -> anyhow::Result<()> {
let mut skip = vec![];
for vn in self.instance_manager.validator_nodes_mut() {
if !vn.instance_mut().check_running() {
log::error!(
"Skipping registration for validator node {}: {} since it is not running",
vn.instance().id(),
vn.instance().name()
);
continue;
skip.push(vn.instance().id());
}
}

Expand All @@ -203,6 +208,9 @@ impl ProcessManager {
})?;

for vn in self.instance_manager.validator_nodes() {
if skip.contains(&vn.instance().id()) {
continue;
}
info!("🟡 Registering validator node {}", vn.instance().name());
if let Err(err) = vn.wait_for_startup(Duration::from_secs(10)).await {
log::error!(
Expand All @@ -215,12 +223,12 @@ impl ProcessManager {

let reg_info = vn.get_registration_info().await?;
let tx_id = wallet.register_validator_node(reg_info).await?;
info!("🟢 Registered validator node with tx_id: {tx_id}");
info!("🟢 Registered validator node {vn} with tx_id: {tx_id}");
// Just wait a bit :shrug: This could be a bug in the console wallet. If we submit too quickly it uses 0
// inputs for a transaction.
sleep(Duration::from_secs(2)).await;
}
self.mine(20).await?;
self.mine(10).await?;
Ok(())
}

Expand Down Expand Up @@ -257,7 +265,6 @@ impl ProcessManager {

let reg_info = vn.get_registration_info().await?;
wallet.register_validator_node(reg_info).await?;
self.mine(20).await?;
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::time::Duration;
use std::{fmt::Display, time::Duration};

use anyhow::{anyhow, Context};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -89,6 +89,12 @@ impl ValidatorNodeProcess {
}
}

impl Display for ValidatorNodeProcess {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{} ({})", self.instance().name(), self.instance().id())
}
}

#[derive(Serialize, Deserialize)]
pub struct ValidatorRegistrationInfo {
pub signature: ValidatorNodeSignature,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub async fn create(

if req.register {
context.process_manager().register_validator_node(instance_id).await?;
context.process_manager().mine_blocks(10).await?;
}

Ok(ValidatorNodeCreateResponse { instance_id })
Expand Down
13 changes: 9 additions & 4 deletions applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use tari_dan_app_utilities::{
template_manager::{implementation::TemplateManager, interface::TemplateManagerHandle},
transaction_executor::TariDanTransactionProcessor,
};
use tari_dan_common_types::{Epoch, NodeAddressable, NodeHeight, PeerAddress, SubstateAddress};
use tari_dan_common_types::{shard::Shard, Epoch, NodeAddressable, NodeHeight, PeerAddress, SubstateAddress};
use tari_dan_engine::fees::FeeTable;
use tari_dan_p2p::TariMessagingSpec;
use tari_dan_storage::{
Expand Down Expand Up @@ -331,6 +331,7 @@ pub async fn spawn_services(
spawn_p2p_rpc(
config,
&mut networking,
epoch_manager.clone(),
state_store.clone(),
mempool.clone(),
virtual_substate_manager,
Expand Down Expand Up @@ -422,6 +423,7 @@ impl Services {
async fn spawn_p2p_rpc(
config: &ApplicationConfig,
networking: &mut NetworkingHandle<TariMessagingSpec>,
epoch_manager: EpochManagerHandle<PeerAddress>,
shard_store_store: SqliteStateStore<PeerAddress>,
mempool: MempoolHandle,
virtual_substate_manager: VirtualSubstateManager<SqliteStateStore<PeerAddress>, EpochManagerHandle<PeerAddress>>,
Expand All @@ -431,6 +433,7 @@ async fn spawn_p2p_rpc(
.with_maximum_sessions_per_client(config.validator_node.rpc.max_sessions_per_client)
.finish()
.add_service(create_tari_validator_node_rpc_service(
epoch_manager,
shard_store_store,
mempool,
virtual_substate_manager,
Expand All @@ -451,7 +454,7 @@ where
TTx::Target: StateStoreReadTransaction,
TTx::Addr: NodeAddressable + Serialize,
{
let genesis_block = Block::genesis(network);
let genesis_block = Block::genesis(network, Epoch(0), Shard::zero());
let substate_id = SubstateId::Resource(PUBLIC_IDENTITY_RESOURCE_ADDRESS);
let substate_address = SubstateAddress::from_substate_id(&substate_id, 0);
let mut metadata: Metadata = Default::default();
Expand All @@ -474,8 +477,9 @@ where
state_hash: Default::default(),
created_by_transaction: Default::default(),
created_justify: *genesis_block.justify().id(),
created_block: BlockId::genesis(),
created_block: BlockId::zero(),
created_height: NodeHeight(0),
created_by_shard: Shard::zero(),
created_at_epoch: Epoch(0),
destroyed: None,
}
Expand Down Expand Up @@ -503,9 +507,10 @@ where
state_hash: Default::default(),
created_by_transaction: Default::default(),
created_justify: *genesis_block.justify().id(),
created_block: BlockId::genesis(),
created_block: BlockId::zero(),
created_height: NodeHeight(0),
created_at_epoch: Epoch(0),
created_by_shard: Shard::zero(),
destroyed: None,
}
.create(tx)?;
Expand Down
13 changes: 12 additions & 1 deletion applications/tari_validator_node/src/consensus/handle.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2023 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use tari_consensus::hotstuff::{ConsensusCurrentState, HotstuffEvent};
use tari_consensus::hotstuff::{ConsensusCurrentState, CurrentView, HotstuffEvent};
use tokio::sync::{broadcast, watch};

use crate::event_subscription::EventSubscription;
Expand All @@ -10,24 +10,35 @@ use crate::event_subscription::EventSubscription;
pub struct ConsensusHandle {
rx_current_state: watch::Receiver<ConsensusCurrentState>,
events_subscription: EventSubscription<HotstuffEvent>,
current_view: CurrentView,
}

impl ConsensusHandle {
pub(super) fn new(
rx_current_state: watch::Receiver<ConsensusCurrentState>,
events_subscription: EventSubscription<HotstuffEvent>,
current_view: CurrentView,
) -> Self {
Self {
rx_current_state,
events_subscription,
current_view,
}
}

pub fn current_view(&self) -> &CurrentView {
&self.current_view
}

pub fn subscribe_to_hotstuff_events(&mut self) -> broadcast::Receiver<HotstuffEvent> {
self.events_subscription.subscribe()
}

pub fn get_current_state(&self) -> ConsensusCurrentState {
*self.rx_current_state.borrow()
}

pub fn is_running(&self) -> bool {
self.get_current_state().is_running()
}
}
9 changes: 7 additions & 2 deletions applications/tari_validator_node/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,20 +96,25 @@ pub async fn spawn(
max_base_layer_blocks_ahead: consensus_constants.max_base_layer_blocks_ahead,
},
);
let current_view = hotstuff_worker.pacemaker().current_view().clone();

let (tx_current_state, rx_current_state) = watch::channel(Default::default());
let context = ConsensusWorkerContext {
epoch_manager: epoch_manager.clone(),
hotstuff: hotstuff_worker,
state_sync: RpcStateSyncManager::new(network, epoch_manager, store, leader_strategy, client_factory),
state_sync: RpcStateSyncManager::new(epoch_manager, store, client_factory),
tx_current_state,
};

let handle = ConsensusWorker::new(shutdown_signal).spawn(context);

(
handle,
ConsensusHandle::new(rx_current_state, EventSubscription::new(tx_hotstuff_events)),
ConsensusHandle::new(
rx_current_state,
EventSubscription::new(tx_hotstuff_events),
current_view,
),
rx_mempool,
)
}
4 changes: 4 additions & 0 deletions applications/tari_validator_node/src/json_rpc/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,10 @@ impl JsonRpcHandlers {

pub async fn get_epoch_manager_stats(&self, value: JsonRpcExtractor) -> JrpcResult {
let answer_id = value.get_answer_id();
self.epoch_manager
.wait_for_initial_scanning_to_complete()
.await
.map_err(internal_error(answer_id))?;
let current_epoch = self.epoch_manager.current_epoch().await.map_err(|e| {
JsonRpcResponse::error(
answer_id,
Expand Down
Loading
Loading