Skip to content

Commit

Permalink
fix(watcher): general fixes while testing
Browse files Browse the repository at this point in the history
  • Loading branch information
sdbondi committed Sep 3, 2024
1 parent b3f1507 commit a5fae62
Show file tree
Hide file tree
Showing 12 changed files with 148 additions and 153 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions applications/tari_validator_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::net::SocketAddr;

use clap::Parser;
use minotari_app_utilities::common_cli_args::CommonCliArgs;
use reqwest::Url;
use tari_common::configuration::{ConfigOverrideProvider, Network};
use tari_dan_app_utilities::p2p_config::ReachabilityMode;

Expand All @@ -43,6 +44,8 @@ pub struct Cli {
pub http_ui_listener_address: Option<SocketAddr>,
#[clap(long, env = "TARI_VN_JSON_RPC_PUBLIC_ADDRESS")]
pub json_rpc_public_address: Option<String>,
#[clap(long, alias = "grpc", env = "TARI_VN_MINOTARI_NODE_GRPC_ADDRESS")]
pub minotari_node_grpc_address: Option<Url>,
#[clap(long, short = 's')]
pub peer_seeds: Vec<String>,
#[clap(long)]
Expand Down Expand Up @@ -102,6 +105,9 @@ impl ConfigOverrideProvider for Cli {
if self.disable_mdns {
overrides.push(("validator_node.p2p.enable_mdns".to_string(), "false".to_string()));
}
if let Some(url) = self.minotari_node_grpc_address.as_ref() {
overrides.push(("validator_node.base_node_grpc_address".to_string(), url.to_string()));
}
overrides
}
}
1 change: 1 addition & 0 deletions applications/tari_watcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ tari_shutdown = { workspace = true }
clap = { workspace = true, features = ["derive"] }
serde = { workspace = true, features = ["derive"] }
anyhow = { workspace = true }
url = { workspace = true, features = ["serde"] }
tokio = { workspace = true, features = [
"rt-multi-thread",
"macros",
Expand Down
26 changes: 6 additions & 20 deletions applications/tari_watcher/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
};

use tokio::io::{self, AsyncWriteExt};
use url::Url;

use crate::{
cli::Cli,
Expand All @@ -29,10 +30,10 @@ pub struct Config {
pub auto_restart: bool,

/// The Minotari node gRPC address
pub base_node_grpc_address: String,
pub base_node_grpc_address: Url,

/// The Minotari console wallet gRPC address
pub base_wallet_grpc_address: String,
pub base_wallet_grpc_address: Url,

/// The base directory of the watcher with configuration and data files
pub base_dir: PathBuf,
Expand All @@ -41,7 +42,7 @@ pub struct Config {
/// submit a registration transaction on behalf of the node
pub vn_registration_file: PathBuf,

// The path of the validator node base directory, obtained when initializing a new VN on L2
/// The path of the validator node base directory. This directory is automatically created when starting a new VN.
pub vn_base_dir: PathBuf,

/// The sidechain ID to use. If not provided, the default Tari sidechain ID will be used.
Expand All @@ -63,21 +64,6 @@ impl Config {
writer.write_all(toml.as_bytes()).await?;
Ok(())
}

pub fn missing_conf(&self) -> Option<Vec<&str>> {
let mut v: Vec<&str> = Vec::new();
if self.base_node_grpc_address.is_empty() {
v.push("base_node_grpc_address");
}
if self.base_wallet_grpc_address.is_empty() {
v.push("base_wallet_grpc_address");
}
if v.is_empty() {
None
} else {
Some(v)
}
}
}

#[derive(Debug, Clone, Copy, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
Expand Down Expand Up @@ -162,8 +148,8 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result<Config> {
Ok(Config {
auto_register: true,
auto_restart: true,
base_node_grpc_address: DEFAULT_BASE_NODE_GRPC_ADDRESS.to_string(),
base_wallet_grpc_address: DEFAULT_BASE_WALLET_GRPC_ADDRESS.to_string(),
base_node_grpc_address: DEFAULT_BASE_NODE_GRPC_ADDRESS.parse()?,
base_wallet_grpc_address: DEFAULT_BASE_WALLET_GRPC_ADDRESS.parse()?,
base_dir: base_dir.to_path_buf(),
sidechain_id: None,
vn_registration_file,
Expand Down
3 changes: 0 additions & 3 deletions applications/tari_watcher/src/constants.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use tokio::time::Duration;

pub const DEFAULT_MAIN_PROJECT_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/../../");
pub const DEFAULT_WATCHER_CONFIG_PATH: &str = "data/watcher/config.toml";
pub const DEFAULT_VALIDATOR_PID_PATH: &str = "data/watcher/validator.pid";
Expand All @@ -13,5 +11,4 @@ pub const DEFAULT_MINOTARI_MINER_BINARY_PATH: &str = "target/release/minotari_mi
pub const DEFAULT_BASE_NODE_GRPC_ADDRESS: &str = "http://127.0.0.1:12001"; // note: protocol
pub const DEFAULT_BASE_WALLET_GRPC_ADDRESS: &str = "http://127.0.0.1:12003"; // note: protocol

pub const DEFAULT_PROCESS_MONITORING_INTERVAL: Duration = Duration::from_secs(20); // time to sleep before checking VN process status
pub const DEFAULT_THRESHOLD_WARN_EXPIRATION: u64 = 100; // warn at this many blocks before the registration expires
29 changes: 22 additions & 7 deletions applications/tari_watcher/src/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::path::PathBuf;
use std::{
io,
path::{Path, PathBuf},
};

use minotari_app_grpc::tari_rpc::{ConsensusConstants, GetActiveValidatorNodesResponse};
use tari_common_types::types::PublicKey;
Expand Down Expand Up @@ -31,12 +34,24 @@ pub struct ValidatorNodeRegistration {
pub claim_fees_public_key: PublicKey,
}

pub async fn read_registration_file(vn_registration_file: PathBuf) -> anyhow::Result<ValidatorNodeRegistration> {
log::debug!("Using VN registration file at: {}", vn_registration_file.display());

let info = fs::read_to_string(vn_registration_file).await?;
let reg = json5::from_str(&info)?;
Ok(reg)
pub async fn read_registration_file<P: AsRef<Path>>(
vn_registration_file: P,
) -> anyhow::Result<Option<ValidatorNodeRegistration>> {
log::debug!(
"Using VN registration file at: {}",
vn_registration_file.as_ref().display()
);
match fs::read_to_string(vn_registration_file).await {
Ok(info) => {
let reg = json5::from_str(&info)?;
Ok(Some(reg))
},
Err(e) if e.kind() == io::ErrorKind::NotFound => Ok(None),
Err(e) => {
log::error!("Failed to read VN registration file: {}", e);
Err(e.into())
},
}
}

pub fn to_vn_public_keys(vns: Vec<GetActiveValidatorNodesResponse>) -> Vec<PublicKey> {
Expand Down
11 changes: 4 additions & 7 deletions applications/tari_watcher/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use anyhow::{anyhow, bail, Context};
use anyhow::{anyhow, Context};
use registration::registration_loop;
use tari_shutdown::{Shutdown, ShutdownSignal};
use tokio::{fs, task::JoinHandle};
Expand Down Expand Up @@ -58,9 +58,6 @@ async fn main() -> anyhow::Result<()> {
},
Commands::Start(ref args) => {
let mut cfg = read_config_file(cli.get_config_path()).await?;
if let Some(conf) = cfg.missing_conf() {
bail!("Missing configuration values: {:?}", conf);
}

// optionally override config values
args.apply(&mut cfg);
Expand All @@ -86,9 +83,9 @@ async fn start(config: Config) -> anyhow::Result<()> {
result?;
log::info!("Process manager exited");
},
_ = async {
drop(registration_loop(config, manager_handle).await);
} => {},
Err(err) = registration_loop(config, manager_handle) => {
log::error!("Registration loop exited with error {err}");
},
}

Ok(())
Expand Down
46 changes: 20 additions & 26 deletions applications/tari_watcher/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,19 @@ use tokio::{
};

use crate::{
config::{Channels, Config, ExecutableConfig},
config::{Channels, Config},
constants::DEFAULT_VALIDATOR_NODE_BINARY_PATH,
minotari::{Minotari, TipStatus},
minotari::{MinotariNodes, TipStatus},
monitoring::{process_status_alert, process_status_log, ProcessStatus, Transaction},
process::{start_validator, ChildChannel},
};

pub struct ProcessManager {
pub base_dir: PathBuf,
pub validator_base_dir: PathBuf,
pub validator_config: ExecutableConfig,
pub wallet_config: ExecutableConfig,
pub config: Config,
pub shutdown_signal: ShutdownSignal, // listen for keyboard exit signal
pub trigger_signal: Shutdown, // triggered when validator auto-restart is disabled
pub rx_request: mpsc::Receiver<ManagerRequest>,
pub chain: Minotari,
pub alerting_config: Channels,
pub auto_restart: bool,
pub chain: MinotariNodes,
}

pub struct ChannelReceivers {
Expand All @@ -51,20 +46,15 @@ impl ProcessManager {
pub fn new(config: Config, shutdown_signal: ShutdownSignal, trigger_signal: Shutdown) -> (Self, ManagerHandle) {
let (tx_request, rx_request) = mpsc::channel(1);
let this = Self {
base_dir: config.base_dir.clone(),
validator_base_dir: config.vn_base_dir,
validator_config: config.executable_config[0].clone(),
wallet_config: config.executable_config[1].clone(),
shutdown_signal,
trigger_signal,
rx_request,
chain: Minotari::new(
config.base_node_grpc_address,
config.base_wallet_grpc_address,
config.vn_registration_file,
chain: MinotariNodes::new(
config.base_node_grpc_address.clone(),
config.base_wallet_grpc_address.clone(),
config.vn_registration_file.clone(),
),
alerting_config: config.channel_config,
auto_restart: config.auto_restart,
config,
};
(this, ManagerHandle::new(tx_request))
}
Expand Down Expand Up @@ -167,20 +157,24 @@ impl ProcessManager {

async fn start_child_process(&self) -> ChildChannel {
let vn_binary_path = self
.validator_config
.clone()
.config
.executable_config
.get(0)
.unwrap()
.executable_path
.unwrap_or(PathBuf::from(DEFAULT_VALIDATOR_NODE_BINARY_PATH));
.clone()
.unwrap_or_else(|| PathBuf::from(DEFAULT_VALIDATOR_NODE_BINARY_PATH));

let vn_base_dir = self.base_dir.join(self.validator_base_dir.clone());
let vn_base_dir = self.config.base_dir.join(self.config.vn_base_dir.clone());

// get child channel to communicate with the validator node process
let cc = start_validator(
vn_binary_path,
vn_base_dir,
self.base_dir.clone(),
self.alerting_config.clone(),
self.auto_restart,
// TODO: just pass in config
self.config.base_node_grpc_address.clone(),
self.config.channel_config.clone(),
self.config.auto_restart,
self.trigger_signal.clone(),
)
.await;
Expand Down
28 changes: 18 additions & 10 deletions applications/tari_watcher/src/minotari.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ use tari_common::exit_codes::{ExitCode, ExitError};
use tari_common_types::types::FixedHash;
use tari_crypto::tari_utilities::ByteArray;
use tonic::transport::Channel;
use url::Url;

use crate::helpers::read_registration_file;

#[derive(Clone)]
pub struct Minotari {
pub struct MinotariNodes {
bootstrapped: bool,
node_grpc_address: String,
wallet_grpc_address: String,
node_grpc_address: Url,
wallet_grpc_address: Url,
node_registration_file: PathBuf,
current_height: u64,
node: Option<BaseNodeGrpcClient<Channel>>,
Expand All @@ -42,8 +43,8 @@ impl TipStatus {
}
}

impl Minotari {
pub fn new(node_grpc_address: String, wallet_grpc_address: String, node_registration_file: PathBuf) -> Self {
impl MinotariNodes {
pub fn new(node_grpc_address: Url, wallet_grpc_address: Url, node_registration_file: PathBuf) -> Self {
Self {
bootstrapped: false,
node_grpc_address,
Expand All @@ -67,16 +68,16 @@ impl Minotari {
}

async fn connect_wallet(&mut self) -> anyhow::Result<()> {
log::info!("Connecting to wallet on gRPC {}", self.wallet_grpc_address.clone());
let client = WalletGrpcClient::connect(&self.wallet_grpc_address).await?;
log::info!("Connecting to wallet on gRPC {}", self.wallet_grpc_address);
let client = WalletGrpcClient::connect(self.wallet_grpc_address.as_str()).await?;

self.wallet = Some(client);
Ok(())
}

async fn connect_node(&mut self) -> anyhow::Result<()> {
log::info!("Connecting to base node on gRPC {}", self.node_grpc_address.clone());
let client = BaseNodeGrpcClient::connect(self.node_grpc_address.clone())
log::info!("Connecting to base node on gRPC {}", self.node_grpc_address);
let client = BaseNodeGrpcClient::connect(self.node_grpc_address.to_string())
.await
.map_err(|e| ExitError::new(ExitCode::ConfigError, e))?;

Expand Down Expand Up @@ -156,7 +157,14 @@ impl Minotari {

info!("Preparing to send a VN registration request");

let info = read_registration_file(self.node_registration_file.clone()).await?;
let info = read_registration_file(self.node_registration_file.clone())
.await?
.ok_or_else(|| {
anyhow!(
"No registration data found in file: {}",
self.node_registration_file.display()
)
})?;
let sig = info.signature.signature();
let resp = self
.wallet
Expand Down
Loading

0 comments on commit a5fae62

Please sign in to comment.