Skip to content

Commit

Permalink
impl in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
ksrichard committed Nov 6, 2024
1 parent 5c8d44d commit c8c7cd5
Show file tree
Hide file tree
Showing 15 changed files with 12,910 additions and 182 deletions.
12,843 changes: 12,843 additions & 0 deletions Cargo.lock

Large diffs are not rendered by default.

41 changes: 23 additions & 18 deletions applications/tari_dan_app_utilities/src/base_layer_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use tari_core::transactions::{
ValidatorNodeRegistration,
},
};
use tari_crypto::tari_utilities::ByteArrayError;
use tari_crypto::{
ristretto::RistrettoPublicKey,
tari_utilities::{hex::Hex, ByteArray},
Expand Down Expand Up @@ -218,7 +219,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
.saturating_sub(self.consensus_constants.base_layer_confirmations)
);
self.sync_blockchain().await?;
},
}
BlockchainProgression::Reorged => {
error!(
target: LOG_TARGET,
Expand All @@ -229,15 +230,15 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
self.last_scanned_validator_node_mr = None;
self.last_scanned_height = 0;
self.sync_blockchain().await?;
},
}
BlockchainProgression::NoProgress => {
trace!(target: LOG_TARGET, "No new blocks to scan.");
// If no progress has been made since restarting, we still need to tell the epoch manager that scanning
// is done
if !self.has_attempted_scan {
self.epoch_manager.notify_scanning_complete().await?;
}
},
}
}

self.has_attempted_scan = true;
Expand All @@ -258,7 +259,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
} else {
Ok(BlockchainProgression::Reorged)
}
},
}
None => Ok(BlockchainProgression::Progressed),
}
}
Expand All @@ -278,7 +279,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
"Base layer blockchain is not yet at the required height to start scanning it"
);
return Ok(());
},
}
Some(end_height) => end_height,
};
let mut scan = tip.tip_hash;
Expand All @@ -295,7 +296,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
// This will be processed down below.
break;
}
current_last_validator_nodes_mr = Some(header.validator_node_mr.clone());
current_last_validator_nodes_mr = Some(header.validator_node_mr);
self.epoch_manager.add_block_hash(header.height, scan).await?;
scan = header.prev_hash;
}
Expand All @@ -319,13 +320,15 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
for node_change in node_changes {
match node_change.state() {
ValidatorNodeChangeState::Add => {
// TODO: check on layer 1 if all the details could be fetched and received from `get_validator_node_changes` call
let node_public_key =
PublicKey::from_canonical_bytes(&node_change.public_key).map_err(|error| {
// TODO: convert error
})?;
PublicKey::from_canonical_bytes(&node_change.public_key)
.map_err(BaseLayerScannerError::PublicKeyConversion)?;
validator_nodes_to_register.push(node_public_key);
},
ValidatorNodeChangeState::Remove => {},
}
ValidatorNodeChangeState::Remove => {
// TODO: implement
}
}
}

Expand Down Expand Up @@ -381,9 +384,9 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
reg.clone(),
output.minimum_value_promise,
)
.await?;
.await?;
}
},
}
SideChainFeature::CodeTemplateRegistration(reg) => {
if reg.sidechain_id != self.template_sidechain_id {
warn!(
Expand All @@ -399,8 +402,8 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
reg.clone(),
&block_info,
)
.await?;
},
.await?;
}
SideChainFeature::ConfidentialOutput(data) => {
// Should be checked by the base layer
if !output.is_burned() {
Expand All @@ -427,7 +430,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
output.commitment.as_public_key()
);
self.register_burnt_utxo(output, &block_info).await?;
},
}
}
}

Expand All @@ -441,7 +444,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
match block_info.next_block_hash {
Some(next_hash) => {
current_hash = Some(next_hash);
},
}
None => {
info!(
target: LOG_TARGET,
Expand All @@ -454,7 +457,7 @@ impl<TAddr: NodeAddressable + 'static> BaseLayerScanner<TAddr> {
)));
}
break;
},
}
}
}

Expand Down Expand Up @@ -607,6 +610,8 @@ pub enum BaseLayerScannerError {
commitment: Box<Commitment>,
source: StorageError,
},
#[error("Public key conversion error: {0}")]
PublicKeyConversion(ByteArrayError),
}

enum BlockchainProgression {
Expand Down
1 change: 0 additions & 1 deletion applications/tari_indexer/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ pub async fn spawn_services(
.try_into()
.context("committee_size must be non-zero")?,
validator_node_sidechain_id: config.indexer.sidechain_id.clone(),
max_vns_per_epoch_activated: consensus_constants.max_vns_per_epoch_activated,
},
global_db.clone(),
base_node_client.clone(),
Expand Down
11 changes: 5 additions & 6 deletions applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,6 @@ pub async fn spawn_services(
.context("committee size must be non-zero")?,
validator_node_sidechain_id: config.validator_node.validator_node_sidechain_id.clone(),
num_preshards: consensus_constants.num_preshards,
max_vns_per_epoch_activated: consensus_constants.max_vns_per_epoch_activated,
};
// Epoch manager
let (epoch_manager, join_handle) = tari_epoch_manager::base_layer::spawn_service(
Expand Down Expand Up @@ -307,7 +306,7 @@ pub async fn spawn_services(
transaction_executor,
consensus_constants.clone(),
)
.await;
.await;
handles.push(consensus_join_handle);

let (mempool, join_handle) = mempool::spawn(
Expand Down Expand Up @@ -368,7 +367,7 @@ pub async fn spawn_services(
virtual_substate_manager,
consensus_handle.clone(),
)
.await?;
.await?;
// Save final node identity after comms has initialized. This is required because the public_address can be
// changed by comms during initialization when using tor.
save_identities(config, &keypair)?;
Expand Down Expand Up @@ -414,7 +413,7 @@ async fn create_registration_file(
config.common.base_path.join("registration.json"),
serde_json::to_string(&registration)?,
)
.context("failed to write registration file")?;
.context("failed to write registration file")?;
Ok(())
}

Expand Down Expand Up @@ -620,13 +619,13 @@ where
created_at_epoch: Epoch(0),
destroyed: None,
}
.create(tx)?;
.create(tx)?;
Ok(())
}

fn create_mempool_transaction_validator(
template_manager: TemplateManager<PeerAddress>,
) -> impl Validator<Transaction, Context = (), Error = TransactionValidationError> {
) -> impl Validator<Transaction, Context=(), Error=TransactionValidationError> {
HasInputs::new()
.and_then(TemplateExistsValidator::new(template_manager))
.and_then(FeeTransactionValidator)
Expand Down
2 changes: 0 additions & 2 deletions applications/tari_watcher/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,3 @@ pub const DEFAULT_VALIDATOR_KEY_PATH: &str = "data/vn1/esmeralda/registration.js
pub const DEFAULT_VALIDATOR_NODE_BINARY_PATH: &str = "target/release/tari_validator_node";
pub const DEFAULT_BASE_NODE_GRPC_URL: &str = "http://127.0.0.1:12001"; // note: protocol
pub const DEFAULT_BASE_WALLET_GRPC_URL: &str = "http://127.0.0.1:12003"; // note: protocol

pub const DEFAULT_THRESHOLD_WARN_EXPIRATION: u64 = 100; // warn at this many blocks before the registration expires
34 changes: 4 additions & 30 deletions applications/tari_watcher/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use std::{
path::{Path, PathBuf},
};

use minotari_app_grpc::tari_rpc::{ConsensusConstants, GetActiveValidatorNodesResponse};
use minotari_app_grpc::tari_rpc::GetActiveValidatorNodesResponse;
use tari_common_types::types::PublicKey;
use tari_core::transactions::transaction_components::ValidatorNodeSignature;
use tari_crypto::{ristretto::RistrettoPublicKey, tari_utilities::ByteArray};
use tokio::fs;

use crate::{config::Config, constants::DEFAULT_THRESHOLD_WARN_EXPIRATION};
use crate::config::Config;

pub async fn read_config_file(path: PathBuf) -> anyhow::Result<Config> {
let content = fs::read_to_string(&path).await.map_err(|_| {
Expand Down Expand Up @@ -45,12 +45,12 @@ pub async fn read_registration_file<P: AsRef<Path>>(
Ok(info) => {
let reg = json5::from_str(&info)?;
Ok(Some(reg))
},
}
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None),
Err(e) => {
log::error!("Failed to read VN registration file: {}", e);
Err(e.into())
},
}
}
}

Expand All @@ -63,29 +63,3 @@ pub fn to_vn_public_keys(vns: Vec<GetActiveValidatorNodesResponse>) -> Vec<Publi
pub fn contains_key(vns: Vec<RistrettoPublicKey>, needle: PublicKey) -> bool {
vns.iter().any(|vn| vn.eq(&needle))
}

pub fn is_close_to_expiry(
constants: ConsensusConstants,
current_block: u64,
last_registered_block: Option<u64>,
) -> bool {
// if we haven't registered yet in this session, return false
if last_registered_block.is_none() {
return false;
}
let epoch_length = constants.epoch_length;
let validity_period = constants.validator_node_validity_period;
let registration_duration = validity_period * epoch_length;
// check if the current block is an epoch or less away from expiring
current_block + epoch_length >= last_registered_block.unwrap() + registration_duration
}

pub fn is_warning_close_to_expiry(
constants: ConsensusConstants,
current_block: u64,
last_registered_block: u64,
) -> bool {
let registration_duration = constants.epoch_length * constants.validator_node_validity_period;
// if we have approached the expiration threshold
current_block + DEFAULT_THRESHOLD_WARN_EXPIRATION >= last_registered_block + registration_duration
}
13 changes: 5 additions & 8 deletions applications/tari_watcher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,15 @@ async fn main() -> anyhow::Result<()> {
config.write(file).await.context("Writing config failed")?;

log::info!("Config file created at {}", config_path.display());
},
}
Commands::Start(ref args) => {
log::info!("Starting watcher using config {}", config_path.display());
let mut cfg = read_config_file(config_path).await.context("read config file")?;

// optionally override config values
args.apply(&mut cfg);
start(cfg).await?;
},
}
}

Ok(())
Expand All @@ -81,7 +81,7 @@ async fn start(config: Config) -> anyhow::Result<()> {
config.base_dir.join(DEFAULT_WATCHER_BASE_PATH).join("watcher.pid"),
std::process::id(),
)
.await?;
.await?;
let handlers = spawn_manager(config.clone(), shutdown.to_signal(), shutdown).await?;
let manager_handle = handlers.manager;
let task_handle = handlers.task;
Expand All @@ -108,12 +108,9 @@ struct Handlers {
}

async fn spawn_manager(config: Config, shutdown: ShutdownSignal, trigger: Shutdown) -> anyhow::Result<Handlers> {
let (manager, mut manager_handle) = ProcessManager::new(config, shutdown, trigger);
let (manager, manager_handle) = ProcessManager::new(config, shutdown, trigger);
let cr = manager.start_request_handler().await?;
let status = manager_handle.get_tip_info().await?;
// in the case the consensus constants have changed since the genesis block, use the latest ones
let constants = manager_handle.get_consensus_constants(status.height()).await?;
start_receivers(cr.rx_log, cr.rx_alert, cr.cfg_alert, constants).await;
start_receivers(cr.rx_log, cr.rx_alert, cr.cfg_alert).await;

Ok(Handlers {
manager: manager_handle,
Expand Down
19 changes: 3 additions & 16 deletions applications/tari_watcher/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ impl ProcessManager {

let cc = self.start_child_process().await;

let mut last_registered_at_block = 0;
info!("Setup completed: connected to base node and wallet, ready to receive requests");
let task_handle = tokio::spawn(async move {
loop {
Expand All @@ -81,15 +80,6 @@ impl ProcessManager {
}
};

// send latest block height to logging
if let Err(e) = cc.tx_log.send(ProcessStatus::WarnExpiration(response.height(), last_registered_at_block)).await {
error!("Failed to send tip status update to monitoring: {}", e);
}
// send latest block height to alerting
if let Err(e) = cc.tx_alert.send(ProcessStatus::WarnExpiration(response.height(), last_registered_at_block)).await {
error!("Failed to send tip status update to alerting: {}", e);
}

drop(reply.send(Ok(response)));
}
ManagerRequest::GetActiveValidatorNodes { reply } => {
Expand All @@ -110,7 +100,6 @@ impl ProcessManager {
continue;
}
};
last_registered_at_block = block;

// send registration response to logger
if let Err(e) = cc.tx_log.send(ProcessStatus::Submitted(Transaction::new(response.clone(), block))).await {
Expand Down Expand Up @@ -166,7 +155,7 @@ impl ProcessManager {
self.config.auto_restart,
self.trigger_signal.clone(),
)
.await;
.await;
if cc.is_none() {
todo!("Create new validator node process event listener for fetched existing PID from OS");
}
Expand All @@ -179,16 +168,14 @@ pub async fn start_receivers(
rx_log: mpsc::Receiver<ProcessStatus>,
rx_alert: mpsc::Receiver<ProcessStatus>,
cfg_alert: Channels,
constants: ConsensusConstants,
) {
let const_copy = constants.clone();
// spawn logging and alerting tasks to process status updates
tokio::spawn(async move {
process_status_log(rx_log, const_copy).await;
process_status_log(rx_log).await;
warn!("Logging task has exited");
});
tokio::spawn(async move {
process_status_alert(rx_alert, cfg_alert, constants).await;
process_status_alert(rx_alert, cfg_alert).await;
warn!("Alerting task has exited");
});
}
Expand Down
Loading

0 comments on commit c8c7cd5

Please sign in to comment.