From 11931c1714b4c76099702fb2b7f520e43c084680 Mon Sep 17 00:00:00 2001 From: dkf Date: Thu, 15 Aug 2024 08:39:31 +0100 Subject: [PATCH] feat(watcher): registration management prototype (#1118) Description --- Checks the active set of keys on the network before sending a [RegisterValidatorNode](https://github.com/tari-project/tari/blob/feature-dan2/applications/minotari_app_grpc/proto/wallet.proto#L82) request, and backs off until expected registration expires, to register the node again. Breaking Changes --- - [x] None - [ ] Requires data directory to be deleted - [ ] Other - Please specify --- Cargo.lock | 4 + .../config_presets/c_validator_node.toml | 9 +- applications/tari_watcher/Cargo.toml | 4 + applications/tari_watcher/src/config.rs | 8 +- applications/tari_watcher/src/helpers.rs | 57 ++++++++++++ applications/tari_watcher/src/main.rs | 90 ++++++++++++++----- applications/tari_watcher/src/manager.rs | 52 +++++++++-- applications/tari_watcher/src/minotari.rs | 75 ++++++++++++++-- applications/tari_watcher/src/port.rs | 2 - 9 files changed, 254 insertions(+), 47 deletions(-) create mode 100644 applications/tari_watcher/src/helpers.rs delete mode 100644 applications/tari_watcher/src/port.rs diff --git a/Cargo.lock b/Cargo.lock index c0e890538..62e9cd3c9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10142,12 +10142,16 @@ dependencies = [ "clap 3.2.25", "fern", "humantime 2.1.0", + "json5", "log", "minotari_app_grpc", "minotari_node_grpc_client", "minotari_wallet_grpc_client", "serde", "tari_common", + "tari_common_types", + "tari_core", + "tari_crypto", "tari_shutdown", "tokio", "toml 0.8.15", diff --git a/applications/tari_dan_app_utilities/config_presets/c_validator_node.toml b/applications/tari_dan_app_utilities/config_presets/c_validator_node.toml index e0d5973f8..038875880 100644 --- a/applications/tari_dan_app_utilities/config_presets/c_validator_node.toml +++ b/applications/tari_dan_app_utilities/config_presets/c_validator_node.toml @@ -20,11 +20,8 @@ # automatically configured (default = ) #public_address = -# The Minotari base node's GRPC address. (default = "127.0.0.1/" the value is based on network) -#base_node_grpc_address = "127.0.0.1/tcp/18142" - -# The Minotari console wallet's GRPC address. (default = "127.0.0.1/" the value is based on network) -#wallet_grpc_address = "127.0.0.1/tcp/18143" +# The Minotari base node's GRPC address. (default = "127.0.0.1:" the value is based on network) +#base_node_grpc_address = "127.0.0.1:18142" # How often do we want to scan the base layer for changes. (default = 10) #base_layer_scanning_interval = 10 @@ -44,4 +41,4 @@ [validator_node.p2p] #enable_mdns = true #listener_port = 0 -#reachability_mode = "auto" \ No newline at end of file +#reachability_mode = "auto" diff --git a/applications/tari_watcher/Cargo.toml b/applications/tari_watcher/Cargo.toml index f08d2acee..0e6c035cc 100644 --- a/applications/tari_watcher/Cargo.toml +++ b/applications/tari_watcher/Cargo.toml @@ -13,7 +13,10 @@ license.workspace = true minotari_wallet_grpc_client = { workspace = true } minotari_node_grpc_client = { workspace = true } minotari_app_grpc = { workspace = true } +tari_core = { workspace = true } # Used for VN registration signature +tari_crypto = { workspace = true } # Used for `.to_vec()` in registration request tari_common = { workspace = true } +tari_common_types = { workspace = true } tari_shutdown = { workspace = true } clap = { workspace = true, features = ["derive"] } serde = { workspace = true, features = ["derive"] } @@ -30,6 +33,7 @@ tokio = { workspace = true, features = [ log = { workspace = true } fern = { workspace = true, features = ["colored"] } tonic = { workspace = true } +json5 = { workspace = true } toml = "0.8.12" humantime = "2.1.0" diff --git a/applications/tari_watcher/src/config.rs b/applications/tari_watcher/src/config.rs index bf65a2c3a..7304680e7 100644 --- a/applications/tari_watcher/src/config.rs +++ b/applications/tari_watcher/src/config.rs @@ -156,13 +156,19 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result { }) .unwrap_or_else(|| std::env::current_dir().unwrap()); + let vn_registration_file = base_dir + .join("data") + .join("vn1") + .join("esmeralda") + .join("registration.json"); + Ok(Config { auto_register: true, base_node_grpc_address: "".to_string(), base_wallet_grpc_address: "".to_string(), base_dir: base_dir.clone(), sidechain_id: None, - vn_registration_file: base_dir.join("registration.json"), + vn_registration_file, instance_config: instances.to_vec(), executable_config: executables, channel_config: vec![ diff --git a/applications/tari_watcher/src/helpers.rs b/applications/tari_watcher/src/helpers.rs new file mode 100644 index 000000000..a60a39732 --- /dev/null +++ b/applications/tari_watcher/src/helpers.rs @@ -0,0 +1,57 @@ +// Copyright 2024 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use std::path::PathBuf; + +use minotari_app_grpc::tari_rpc::{GetActiveValidatorNodesResponse, TipInfoResponse}; +use tari_common_types::types::PublicKey; +use tari_core::transactions::transaction_components::ValidatorNodeSignature; +use tari_crypto::{ristretto::RistrettoPublicKey, tari_utilities::ByteArray}; +use tokio::fs; + +use crate::config::Config; + +pub async fn read_config_file(path: PathBuf) -> anyhow::Result { + let content = fs::read_to_string(&path).await.map_err(|_| { + format!( + "Failed to read config file at {}", + path.into_os_string().into_string().unwrap() + ) + }); + + let config = toml::from_str(&content.unwrap())?; + + Ok(config) +} + +#[derive(serde::Serialize, serde::Deserialize)] +pub struct ValidatorNodeRegistration { + pub signature: ValidatorNodeSignature, + pub public_key: PublicKey, + pub claim_fees_public_key: PublicKey, +} + +pub async fn read_registration_file(vn_registration_file: PathBuf) -> anyhow::Result { + log::debug!( + "Using VN registration file at: {}", + vn_registration_file.clone().into_os_string().into_string().unwrap() + ); + + let info = fs::read_to_string(vn_registration_file).await?; + let reg = json5::from_str(&info)?; + Ok(reg) +} + +pub fn to_vn_public_keys(vns: Vec) -> Vec { + vns.into_iter() + .map(|vn| PublicKey::from_vec(&vn.public_key).expect("Invalid public key, should not happen")) + .collect() +} + +pub fn to_block_height(tip_info: TipInfoResponse) -> u64 { + tip_info.metadata.unwrap().best_block_height +} + +pub fn contains_key(vns: Vec, needle: PublicKey) -> bool { + vns.iter().any(|vn| vn.eq(&needle)) +} diff --git a/applications/tari_watcher/src/main.rs b/applications/tari_watcher/src/main.rs index 6a15d4baf..b79a33a1a 100644 --- a/applications/tari_watcher/src/main.rs +++ b/applications/tari_watcher/src/main.rs @@ -1,19 +1,22 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::{ - path::{Path, PathBuf}, - time::SystemTime, -}; +use std::time::SystemTime; use anyhow::{anyhow, bail, Context}; +use helpers::read_registration_file; use log::*; use tari_shutdown::{Shutdown, ShutdownSignal}; -use tokio::{fs, task}; +use tokio::{ + fs, + task, + time::{self, Duration}, +}; use crate::{ cli::{Cli, Commands}, config::{get_base_config, Config}, + helpers::{contains_key, read_config_file, to_block_height, to_vn_public_keys}, manager::{ManagerHandle, ProcessManager}, shutdown::exit_signal, }; @@ -21,6 +24,7 @@ use crate::{ mod cli; mod config; mod forker; +mod helpers; mod manager; mod minotari; mod shutdown; @@ -54,7 +58,7 @@ async fn main() -> anyhow::Result<()> { log::info!("Config file created at {}", config_path.display()); }, Commands::Start(ref args) => { - let mut cfg = read_file(cli.get_config_path()).await?; + let mut cfg = read_config_file(cli.get_config_path()).await?; if let Some(conf) = cfg.missing_conf() { bail!("Missing configuration values: {:?}", conf); } @@ -68,26 +72,22 @@ async fn main() -> anyhow::Result<()> { Ok(()) } -async fn read_file(path: PathBuf) -> anyhow::Result { - let p = Path::new(path.to_str().unwrap()); - let content: String = fs::read_to_string(p).await.unwrap(); - let config: Config = toml::from_str(&content)?; - - Ok(config) -} - async fn start(config: Config) -> anyhow::Result { let shutdown = Shutdown::new(); let signal = shutdown.to_signal().select(exit_signal()?); let (task_handle, mut manager_handle) = spawn(config.clone(), shutdown.to_signal()).await; - // Test ping #1 to base node - let tip = manager_handle.get_tip_info().await; - info!("[TEST] Tip status: {:?}", tip); - - // Test ping #2 to base node - let vn_status = manager_handle.get_active_validator_nodes().await; - info!("[TEST] Active validators: {:?}", vn_status); + let mut interval = time::interval(Duration::from_secs(10)); + let constants = manager_handle.get_consensus_constants(0).await; + let validity_period = constants.as_ref().unwrap().validator_node_validity_period; + let epoch_length = constants.unwrap().epoch_length; + debug!("Registrations are currently valid for {} epochs", validity_period); + debug!("Every epoch has {} blocks", epoch_length); + let registration_valid_for = validity_period * epoch_length; + let mut registered_at_block = 0; + let local_node = read_registration_file(config.vn_registration_file).await?; + let local_key = local_node.public_key; // 76fd45c0816f7bd78d33e1b9358a48e8c68b97bfd20d9c80f3934afbde848343 + debug!("Local public key: {}", local_key.clone()); tokio::select! { _ = signal => { @@ -96,7 +96,53 @@ async fn start(config: Config) -> anyhow::Result { result = task_handle => { result??; log::info!("Process manager exited"); - } + }, + _ = async { + loop { + interval.tick().await; + + let tip_info = manager_handle.get_tip_info().await; + if let Err(e) = tip_info { + error!("Failed to get tip info: {}", e); + continue; + } + let curr_height = to_block_height(tip_info.unwrap()); + debug!("Current block height: {}", curr_height); + + let vn_status = manager_handle.get_active_validator_nodes().await; + if let Err(e) = vn_status { + error!("Failed to get active validators: {}", e); + continue; + } + let active_keys = to_vn_public_keys(vn_status.unwrap()); + info!("Amount of active validator node keys: {}", active_keys.len()); + for key in &active_keys { + info!("{}", key); + } + + // if the node is already registered and still valid, skip registration + if contains_key(active_keys.clone(), local_key.clone()) { + info!("Local node is active and still before expiration, skipping registration"); + continue; + } + + // need to be more refined but proves the concept + if curr_height < registered_at_block + registration_valid_for { + info!("Local node still within registration validity period, skipping registration"); + continue; + } + + info!("Local node not active, attempting to register.."); + let tx = manager_handle.register_validator_node().await.unwrap(); + if !tx.is_success { + error!("Failed to register node: {}", tx.failure_message); + continue; + } + info!("Registered node at height {} with transaction id: {}", curr_height, tx.transaction_id); + registered_at_block = curr_height; + + } + } => {}, } Ok(manager_handle) diff --git a/applications/tari_watcher/src/manager.rs b/applications/tari_watcher/src/manager.rs index dd488cf56..7f384396a 100644 --- a/applications/tari_watcher/src/manager.rs +++ b/applications/tari_watcher/src/manager.rs @@ -2,7 +2,13 @@ // SPDX-License-Identifier: BSD-3-Clause use log::*; -use minotari_app_grpc::tari_rpc::{GetActiveValidatorNodesResponse, TipInfoResponse}; +use minotari_app_grpc::tari_rpc::{ + self as grpc, + ConsensusConstants, + GetActiveValidatorNodesResponse, + RegisterValidatorNodeResponse, + TipInfoResponse, +}; use tari_shutdown::ShutdownSignal; use tokio::sync::{mpsc, oneshot}; @@ -30,7 +36,11 @@ impl ProcessManager { forker: Forker::new(), shutdown_signal, rx_request, - chain: Minotari::new(config.base_node_grpc_address, config.base_wallet_grpc_address), + chain: Minotari::new( + config.base_node_grpc_address, + config.base_wallet_grpc_address, + config.vn_registration_file, + ), }; (this, ManagerHandle::new(tx_request)) } @@ -53,8 +63,13 @@ impl ProcessManager { let response = self.chain.get_active_validator_nodes().await?; drop(reply.send(Ok(response))); } - ManagerRequest::RegisterValidatorNode => { - unimplemented!(); + ManagerRequest::RegisterValidatorNode { reply } => { + let response = self.chain.register_validator_node().await?; + drop(reply.send(Ok(response))); + }, + ManagerRequest::GetConsensusConstants { reply, block_height } => { + let response = self.chain.get_consensus_constants(block_height).await?; + drop(reply.send(Ok(response))); } } } @@ -79,9 +94,13 @@ pub enum ManagerRequest { GetActiveValidatorNodes { reply: Reply>, }, - - #[allow(dead_code)] - RegisterValidatorNode, // TODO: populate types + GetConsensusConstants { + block_height: u64, + reply: Reply, + }, + RegisterValidatorNode { + reply: Reply, + }, } pub struct ManagerHandle { @@ -101,6 +120,25 @@ impl ManagerHandle { rx.await? } + pub async fn get_consensus_constants(&mut self, block_height: u64) -> anyhow::Result { + let (tx, rx) = oneshot::channel(); + self.tx_request + .send(ManagerRequest::GetConsensusConstants { + block_height, + reply: tx, + }) + .await?; + rx.await? + } + + pub async fn register_validator_node(&mut self) -> anyhow::Result { + let (tx, rx) = oneshot::channel(); + self.tx_request + .send(ManagerRequest::RegisterValidatorNode { reply: tx }) + .await?; + rx.await? + } + pub async fn get_tip_info(&mut self) -> anyhow::Result { let (tx, rx) = oneshot::channel(); self.tx_request.send(ManagerRequest::GetTipInfo { reply: tx }).await?; diff --git a/applications/tari_watcher/src/minotari.rs b/applications/tari_watcher/src/minotari.rs index 49e058395..e984dae2e 100644 --- a/applications/tari_watcher/src/minotari.rs +++ b/applications/tari_watcher/src/minotari.rs @@ -1,28 +1,40 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause +use std::path::PathBuf; + use anyhow::bail; -use minotari_app_grpc::tari_rpc::{self as grpc, GetActiveValidatorNodesResponse, TipInfoResponse}; +use minotari_app_grpc::tari_rpc::{ + self as grpc, + GetActiveValidatorNodesResponse, + RegisterValidatorNodeResponse, + TipInfoResponse, +}; use minotari_node_grpc_client::BaseNodeGrpcClient; use minotari_wallet_grpc_client::WalletGrpcClient; use tari_common::exit_codes::{ExitCode, ExitError}; +use tari_crypto::tari_utilities::ByteArray; use tonic::transport::Channel; +use crate::helpers::{read_registration_file, to_block_height}; + #[derive(Clone)] pub struct Minotari { bootstrapped: bool, node_grpc_address: String, wallet_grpc_address: String, + node_registration_file: PathBuf, node: Option>, wallet: Option>, } impl Minotari { - pub fn new(node_grpc_address: String, wallet_grpc_address: String) -> Self { + pub fn new(node_grpc_address: String, wallet_grpc_address: String, node_registration_file: PathBuf) -> Self { Self { bootstrapped: false, node_grpc_address, wallet_grpc_address, + node_registration_file, node: None, wallet: None, } @@ -73,20 +85,18 @@ impl Minotari { } pub async fn get_active_validator_nodes(&self) -> anyhow::Result> { - if self.node.is_none() { + if !self.bootstrapped { bail!("Node client not connected"); } - // could be a good idea to cache this or similar in the future, if perf suffers - let info = self.node.clone().unwrap().get_tip_info(grpc::Empty {}).await?; - let block_height = info.into_inner().metadata.unwrap().best_block_height; - + let tip_info = self.get_tip_status().await?; + let height = to_block_height(tip_info); let mut stream = self .node .clone() .unwrap() .get_active_validator_nodes(grpc::GetActiveValidatorNodesRequest { - height: block_height, + height, sidechain_id: vec![], }) .await? @@ -108,9 +118,56 @@ impl Minotari { } if vns.is_empty() { - log::debug!("No active validator nodes found at height: {}", block_height); + log::debug!("No active validator nodes found at height: {}", height); } Ok(vns) } + + pub async fn register_validator_node(&self) -> anyhow::Result { + if !self.bootstrapped { + bail!("Node client not connected"); + } + + let info = read_registration_file(self.node_registration_file.clone()).await?; + let sig = info.signature.signature(); + let resp = self + .wallet + .clone() + .unwrap() + .register_validator_node(grpc::RegisterValidatorNodeRequest { + validator_node_public_key: info.public_key.to_vec(), + validator_node_signature: Some(grpc::Signature { + public_nonce: sig.get_public_nonce().to_vec(), + signature: sig.get_signature().to_vec(), + }), + validator_node_claim_public_key: info.claim_fees_public_key.to_vec(), + fee_per_gram: 10, + message: format!("Validator node registration: {}", info.public_key), + sidechain_deployment_key: vec![], + }) + .await? + .into_inner(); + if !resp.is_success { + bail!("Failed to register validator node: {}", resp.failure_message); + } + + Ok(resp) + } + + pub async fn get_consensus_constants(&self, block_height: u64) -> anyhow::Result { + if !self.bootstrapped { + bail!("Node client not connected"); + } + + let constants = self + .node + .clone() + .unwrap() + .get_constants(grpc::BlockHeight { block_height }) + .await? + .into_inner(); + + Ok(constants) + } } diff --git a/applications/tari_watcher/src/port.rs b/applications/tari_watcher/src/port.rs deleted file mode 100644 index 6917b32ae..000000000 --- a/applications/tari_watcher/src/port.rs +++ /dev/null @@ -1,2 +0,0 @@ -// Copyright 2024 The Tari Project -// SPDX-License-Identifier: BSD-3-Clause