From 4ba5551d60b43b409c2e8a98796a272284983c22 Mon Sep 17 00:00:00 2001 From: dkf Date: Thu, 8 Aug 2024 08:37:16 +0100 Subject: [PATCH 1/8] fix(watcher): config structure and naming --- applications/tari_watcher/src/config.rs | 22 ++-------------------- 1 file changed, 2 insertions(+), 20 deletions(-) diff --git a/applications/tari_watcher/src/config.rs b/applications/tari_watcher/src/config.rs index ee6c3250b..aada807dc 100644 --- a/applications/tari_watcher/src/config.rs +++ b/applications/tari_watcher/src/config.rs @@ -71,17 +71,9 @@ pub struct ChannelConfig { pub struct ExecutableConfig { pub instance_type: InstanceType, pub executable_path: Option, - pub compile: Option, pub env: Vec<(String, String)>, } -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct CompileConfig { - pub working_dir: Option, - pub package_name: String, - pub target_dir: Option, -} - #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct InstanceConfig { pub name: String, @@ -116,22 +108,12 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result { let executables = vec![ ExecutableConfig { instance_type: InstanceType::TariValidatorNode, - executable_path: Some("target/release/minotari_node".into()), - compile: Some(CompileConfig { - working_dir: Some("../tari".into()), - package_name: "minotari_node".to_string(), - target_dir: None, - }), + executable_path: Some("target/release/tari_validator_node".into()), env: vec![], }, ExecutableConfig { instance_type: InstanceType::MinoTariConsoleWallet, executable_path: Some("target/release/minotari_wallet".into()), - compile: Some(CompileConfig { - working_dir: Some("../tari".into()), - package_name: "minotari_wallet".to_string(), - target_dir: None, - }), env: vec![], }, ]; @@ -161,7 +143,7 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result { base_node_grpc_address: "localhost:18142".to_string(), base_wallet_grpc_address: "localhost:18143".to_string(), sidechain_id: None, - vn_registration_file: base_dir.join("vn_registration.toml"), + vn_registration_file: base_dir.join("registration.json"), instance_config: instances.to_vec(), executable_config: executables, channel_config: vec![ From f71b6fc1fe57330426b1d631ace8b65bf38dcec9 Mon Sep 17 00:00:00 2001 From: dkf Date: Thu, 8 Aug 2024 09:11:50 +0100 Subject: [PATCH 2/8] feat(watcher): add override vn node binary path --- applications/tari_watcher/src/cli.rs | 30 +++++++++++++++++++++++++-- applications/tari_watcher/src/main.rs | 6 +++++- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/applications/tari_watcher/src/cli.rs b/applications/tari_watcher/src/cli.rs index ae1c1954d..53b5a1eea 100644 --- a/applications/tari_watcher/src/cli.rs +++ b/applications/tari_watcher/src/cli.rs @@ -5,7 +5,7 @@ use std::path::PathBuf; use clap::Parser; -use crate::config::Config; +use crate::config::{Config, InstanceType}; #[derive(Clone, Debug, Parser)] pub struct Cli { @@ -43,7 +43,7 @@ pub struct CommonCli { #[derive(Clone, Debug, clap::Subcommand)] pub enum Commands { Init(InitArgs), - Start, + Start(Overrides), } #[derive(Clone, Debug, clap::Args)] @@ -58,3 +58,29 @@ impl InitArgs { config.auto_register = !self.no_auto_register; } } + +#[derive(Clone, Debug, clap::Args)] +pub struct Overrides { + #[clap(long)] + pub vn_node_binary_path: Option, +} + +impl Overrides { + pub fn apply(&self, config: &mut Config) { + if self.vn_node_binary_path.is_none() { + return; + } + + if let Some(exec_config) = config + .executable_config + .iter_mut() + .find(|c| c.instance_type == InstanceType::TariValidatorNode) + { + exec_config.executable_path = self.vn_node_binary_path.clone(); + } + log::info!( + "Overriding validator node binary path to {:?}", + self.vn_node_binary_path.as_ref().unwrap() + ); + } +} diff --git a/applications/tari_watcher/src/main.rs b/applications/tari_watcher/src/main.rs index cb7c923b9..5ba9c674e 100644 --- a/applications/tari_watcher/src/main.rs +++ b/applications/tari_watcher/src/main.rs @@ -40,7 +40,11 @@ async fn main() -> anyhow::Result<()> { // if let Err(e) = initialize_logging(..) log::info!("Config file created at {}", config_path.display()); }, - Commands::Start => { + Commands::Start(ref args) => { + let mut config = get_base_config(&cli)?; + // optionally override config values + args.apply(&mut config); + unimplemented!("Start command not implemented"); }, } From aa1892581287afeefb3a1e236adb1b0ac64e9139 Mon Sep 17 00:00:00 2001 From: dkf Date: Thu, 8 Aug 2024 09:35:29 +0100 Subject: [PATCH 3/8] feat(watcher): add basic logger --- Cargo.lock | 2 ++ applications/tari_watcher/Cargo.toml | 2 ++ applications/tari_watcher/src/cli.rs | 9 ++++---- applications/tari_watcher/src/main.rs | 30 +++++++++++++++++++++++++-- 4 files changed, 37 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ea0100dcf..67156c6ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10140,6 +10140,8 @@ version = "0.7.0" dependencies = [ "anyhow", "clap 3.2.25", + "fern", + "humantime 2.1.0", "log", "serde", "tokio", diff --git a/applications/tari_watcher/Cargo.toml b/applications/tari_watcher/Cargo.toml index 0367c410d..b9db54e34 100644 --- a/applications/tari_watcher/Cargo.toml +++ b/applications/tari_watcher/Cargo.toml @@ -16,5 +16,7 @@ serde = { workspace = true, features = ["derive"] } anyhow = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal", "process", "time", "fs", "io-util"] } log = { workspace = true } +fern = { workspace = true, features = ["colored"] } toml = "0.8.12" +humantime = "2.1.0" diff --git a/applications/tari_watcher/src/cli.rs b/applications/tari_watcher/src/cli.rs index 53b5a1eea..485be8cc1 100644 --- a/applications/tari_watcher/src/cli.rs +++ b/applications/tari_watcher/src/cli.rs @@ -62,12 +62,13 @@ impl InitArgs { #[derive(Clone, Debug, clap::Args)] pub struct Overrides { #[clap(long)] - pub vn_node_binary_path: Option, + // The path to the validator node binary (optional) + pub vn_node_path: Option, } impl Overrides { pub fn apply(&self, config: &mut Config) { - if self.vn_node_binary_path.is_none() { + if self.vn_node_path.is_none() { return; } @@ -76,11 +77,11 @@ impl Overrides { .iter_mut() .find(|c| c.instance_type == InstanceType::TariValidatorNode) { - exec_config.executable_path = self.vn_node_binary_path.clone(); + exec_config.executable_path = self.vn_node_path.clone(); } log::info!( "Overriding validator node binary path to {:?}", - self.vn_node_binary_path.as_ref().unwrap() + self.vn_node_path.as_ref().unwrap() ); } } diff --git a/applications/tari_watcher/src/main.rs b/applications/tari_watcher/src/main.rs index 5ba9c674e..a1b54a336 100644 --- a/applications/tari_watcher/src/main.rs +++ b/applications/tari_watcher/src/main.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: BSD-3-Clause use anyhow::{anyhow, Context}; +use std::time::SystemTime; use tokio::fs; use crate::{ @@ -17,6 +18,8 @@ async fn main() -> anyhow::Result<()> { let cli = Cli::init(); let config_path = cli.get_config_path(); + setup_logger()?; + match cli.command { Commands::Init(ref args) => { // set by default in CommonCli @@ -36,8 +39,6 @@ async fn main() -> anyhow::Result<()> { .canonicalize() .context("Failed to canonicalize config path")?; - // TODO: use standardised logging - // if let Err(e) = initialize_logging(..) log::info!("Config file created at {}", config_path.display()); }, Commands::Start(ref args) => { @@ -51,3 +52,28 @@ async fn main() -> anyhow::Result<()> { Ok(()) } + +fn setup_logger() -> Result<(), fern::InitError> { + let colors = fern::colors::ColoredLevelConfig::new() + .info(fern::colors::Color::Green) + .debug(fern::colors::Color::Cyan) + .warn(fern::colors::Color::Yellow) + .error(fern::colors::Color::Red); + + fern::Dispatch::new() + .format(move |out, message, record| { + out.finish(format_args!( + "[{} {} {}] {}", + humantime::format_rfc3339_seconds(SystemTime::now()), + colors.color(record.level()), + record.target(), + message + )) + }) + .level(log::LevelFilter::Debug) + .chain(std::io::stdout()) + .chain(fern::log_file("output.log")?) + .apply()?; + + Ok(()) +} From b378c3ed870f5dca1c9fdcf10f096d78e280a650 Mon Sep 17 00:00:00 2001 From: dkf Date: Fri, 9 Aug 2024 17:23:14 +0100 Subject: [PATCH 4/8] feat(watcher): start validator node process --- applications/tari_watcher/Cargo.toml | 1 - applications/tari_watcher/src/config.rs | 4 + applications/tari_watcher/src/forker.rs | 189 +++++++++++++++++++++++ applications/tari_watcher/src/main.rs | 20 ++- applications/tari_watcher/src/manager.rs | 21 +++ applications/tari_watcher/src/port.rs | 108 +++++++++++++ 6 files changed, 339 insertions(+), 4 deletions(-) create mode 100644 applications/tari_watcher/src/forker.rs create mode 100644 applications/tari_watcher/src/manager.rs create mode 100644 applications/tari_watcher/src/port.rs diff --git a/applications/tari_watcher/Cargo.toml b/applications/tari_watcher/Cargo.toml index b9db54e34..ae8cd5a4d 100644 --- a/applications/tari_watcher/Cargo.toml +++ b/applications/tari_watcher/Cargo.toml @@ -9,7 +9,6 @@ license.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -#tari_common = { workspace = true } clap = { workspace = true, features = ["derive"] } serde = { workspace = true, features = ["derive"] } diff --git a/applications/tari_watcher/src/config.rs b/applications/tari_watcher/src/config.rs index aada807dc..230f972a0 100644 --- a/applications/tari_watcher/src/config.rs +++ b/applications/tari_watcher/src/config.rs @@ -23,6 +23,9 @@ pub struct Config { /// The Minotari console wallet gRPC address pub base_wallet_grpc_address: String, + /// The base directory of the watcher with configuration and data files + pub base_dir: PathBuf, + /// The path of the validator node registration file, containing signed information required to /// submit a registration transaction on behalf of the node pub vn_registration_file: PathBuf, @@ -143,6 +146,7 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result { base_node_grpc_address: "localhost:18142".to_string(), base_wallet_grpc_address: "localhost:18143".to_string(), sidechain_id: None, + base_dir: base_dir.clone(), vn_registration_file: base_dir.join("registration.json"), instance_config: instances.to_vec(), executable_config: executables, diff --git a/applications/tari_watcher/src/forker.rs b/applications/tari_watcher/src/forker.rs new file mode 100644 index 000000000..40fd6975d --- /dev/null +++ b/applications/tari_watcher/src/forker.rs @@ -0,0 +1,189 @@ +// Copyright 2024 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use crate::{ + config::{ExecutableConfig, InstanceType}, + port::PortAllocator, +}; + +use tokio::fs; +use tokio::process::{Child, Command}; + +use std::{io, path::PathBuf}; +use std::{ + net::{IpAddr, Ipv4Addr}, + process::Stdio, +}; + +use anyhow::bail; + +const TARI_L2_VN: InstanceType = InstanceType::TariValidatorNode; +const MINO_L1_WALLET: InstanceType = InstanceType::MinoTariConsoleWallet; + +#[allow(dead_code)] +pub struct Forker { + // Used for the validator to connect to the base (L1) node + base_node_grpc_address: String, + // The base directory of calling the application + base_dir: PathBuf, + // The Tari L2 validator instance + validator: Option, + // The Minotari L1 wallet instance + wallet: Option, +} + +impl Forker { + pub fn new(base_node_grpc_address: String, base_dir: PathBuf) -> Self { + Self { + validator: None, + wallet: None, + base_node_grpc_address, + base_dir, + } + } + + pub async fn start_validator( + &mut self, + config: ExecutableConfig, + base_node_grpc_address: String, + ) -> anyhow::Result { + let mut instance = Instance::new(TARI_L2_VN, config.clone()); + instance.bootstrap().await?; + + // verify everything is up and running + if let Err(e) = instance.validator_ready() { + return Err(e); + } + + self.validator = Some(instance.clone()); + + let json_rpc_public_address = instance.port.validator.jrpc_pub_address(instance.listen_ip.unwrap()); + let web_ui_address = instance.port.validator.web_ui_address(instance.listen_ip.unwrap()); + + let mut command = self + .get_command( + config.executable_path.unwrap(), + "esmeralda".to_string(), /* TODO: add network to cfg */ + base_node_grpc_address, + json_rpc_public_address.unwrap(), + web_ui_address.unwrap(), + ) + .await?; + + //TODO: stdout logs + // let process_dir = self.base_dir.join("processes").join("TariValidatorNode"); + // let stdout_log_path = process_dir.join("stdout.log"); + // let stderr_log_path = process_dir.join("stderr.log"); + command + .kill_on_drop(true) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .stdin(Stdio::null()); + + let child = command.spawn()?; + + Ok(child) + } + + async fn get_command( + &self, + target_binary: PathBuf, + network: String, + base_node_grpc_address: String, + json_rpc_public_address: String, + web_ui_address: String, + ) -> anyhow::Result { + log::debug!("Creating validator command from base directory: {:?}", self.base_dir); + + // create directory for the validator process + let process_dir = self.base_dir.join("processes").join("TariValidatorNode"); + fs::create_dir_all(&process_dir).await?; + + log::debug!("Creating validator process to run from: {:?}", process_dir); + + let json_rpc_address = json_rpc_public_address.clone(); + let mut command = Command::new(target_binary); + let empty: Vec<(&str, &str)> = Vec::new(); + command + .envs(empty) + .arg("-b") + .arg(process_dir) + .arg("--network") + .arg(network) + .arg(format!("--json-rpc-public-address={json_rpc_public_address}")) + .arg(format!( + "-pvalidator_node.base_node_grpc_address={base_node_grpc_address}" + )) + .arg(format!("-pvalidator_node.json_rpc_listener_address={json_rpc_address}")) + .arg(format!("-pvalidator_node.http_ui_listener_address={web_ui_address}")) + .arg("-pvalidator_node.base_layer_scanning_interval=1"); + Ok(command) + } +} + +#[allow(dead_code)] +#[derive(Clone)] +struct Instance { + app: InstanceType, + config: ExecutableConfig, + listen_ip: Option, + port: PortAllocator, +} + +impl Instance { + pub fn new(app: InstanceType, config: ExecutableConfig) -> Self { + Self { + app, + config, + listen_ip: None, + port: PortAllocator::new(), + } + } + + pub async fn bootstrap(&mut self) -> io::Result<()> { + if self.listen_ip.is_some() { + log::warn!("Instance {} already bootstrapped, ignore", self.app.to_string()); + return Ok(()); + } + + let listen = IpAddr::V4(Ipv4Addr::from([127, 0, 0, 1])); + self.listen_ip = Some(listen); + + match self.app { + TARI_L2_VN => { + self.port.open_at(TARI_L2_VN, "jrpc").await?; + self.port.open_at(TARI_L2_VN, "web").await?; + }, + MINO_L1_WALLET => { + self.port.open_at(MINO_L1_WALLET, "p2p").await?; + self.port.open_at(MINO_L1_WALLET, "grpc").await?; + }, + } + + Ok(()) + } + + pub fn validator_ready(&self) -> anyhow::Result<()> { + if self.listen_ip.is_none() { + bail!("Validator listener not initialized, this should not happen"); + } else if self.port.validator.jrpc.is_none() { + bail!("Validator JSON-RPC address not initialized, this should not happen"); + } else if self.port.validator.web.is_none() { + bail!("Validator Web UI address not initialized, this should not happen"); + } + + Ok(()) + } + + #[allow(dead_code)] + pub fn wallet_ready(&self) -> anyhow::Result<()> { + if self.listen_ip.is_none() { + bail!("Wallet listener not initialized, this should not happen"); + } else if self.port.wallet.p2p.is_none() { + bail!("Wallet P2P address not initialized, this should not happen"); + } else if self.port.wallet.grpc.is_none() { + bail!("Wallet gRPC address not initialized, this should not happen"); + } + Ok(()) + } +} diff --git a/applications/tari_watcher/src/main.rs b/applications/tari_watcher/src/main.rs index a1b54a336..a9a7d57be 100644 --- a/applications/tari_watcher/src/main.rs +++ b/applications/tari_watcher/src/main.rs @@ -7,11 +7,16 @@ use tokio::fs; use crate::{ cli::{Cli, Commands}, - config::get_base_config, + config::{get_base_config, Config}, }; +use crate::manager::ProcessManager; + mod cli; mod config; +mod forker; +mod manager; +mod port; #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -45,14 +50,23 @@ async fn main() -> anyhow::Result<()> { let mut config = get_base_config(&cli)?; // optionally override config values args.apply(&mut config); - - unimplemented!("Start command not implemented"); + start(config).await?; }, } Ok(()) } +async fn start(config: Config) -> anyhow::Result<()> { + let mut manager = ProcessManager::new(config.clone()); + manager + .forker + .start_validator(manager.validator_config, config.base_node_grpc_address) + .await?; + + Ok(()) +} + fn setup_logger() -> Result<(), fern::InitError> { let colors = fern::colors::ColoredLevelConfig::new() .info(fern::colors::Color::Green) diff --git a/applications/tari_watcher/src/manager.rs b/applications/tari_watcher/src/manager.rs new file mode 100644 index 000000000..f30f75f6d --- /dev/null +++ b/applications/tari_watcher/src/manager.rs @@ -0,0 +1,21 @@ +// Copyright 2024 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause + +use crate::config::{Config, ExecutableConfig}; +use crate::forker::Forker; + +pub struct ProcessManager { + pub validator_config: ExecutableConfig, + pub wallet_config: ExecutableConfig, + pub forker: Forker, +} + +impl ProcessManager { + pub fn new(config: Config) -> Self { + Self { + validator_config: config.executable_config[0].clone(), + wallet_config: config.executable_config[1].clone(), + forker: Forker::new(config.base_node_grpc_address, config.base_dir), + } + } +} diff --git a/applications/tari_watcher/src/port.rs b/applications/tari_watcher/src/port.rs new file mode 100644 index 000000000..433eb0e0e --- /dev/null +++ b/applications/tari_watcher/src/port.rs @@ -0,0 +1,108 @@ +// Copyright 2024 The Tari Project +// SPDX-License-Identifier: BSD-3-Clause +use std::io; +use std::net::{IpAddr, SocketAddr}; +use tokio::net::TcpListener; + +use crate::config::InstanceType; + +#[derive(Clone)] +pub struct PortAllocator { + pub validator: ValidatorPorts, + pub wallet: MinotariPorts, +} + +impl PortAllocator { + pub fn new() -> Self { + Self { + validator: ValidatorPorts::new(), + wallet: MinotariPorts::new(), + } + } + + pub async fn open_at(&mut self, instance: InstanceType, name: &'static str) -> io::Result { + let addr = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))).await?; + let port = addr.local_addr()?.port(); + match instance { + InstanceType::TariValidatorNode => { + if name == "jrpc" { + self.validator.jrpc = Some(port) + } else if name == "web" { + self.validator.web = Some(port) + } else { + log::error!("Invalid port name for {} instance: {}", instance, port); + } + }, + InstanceType::MinoTariConsoleWallet => { + if name == "p2p" { + self.wallet.p2p = Some(port) + } else if name == "grpc" { + self.wallet.grpc = Some(port) + } else { + log::error!("Invalid port name for {} instance: {}", instance, port); + } + }, + } + + log::info!("Started a {}-{} port on {}", instance.to_string(), name, port); + + Ok(port) + } +} + +#[derive(Clone)] +pub struct ValidatorPorts { + pub jrpc: Option, + pub web: Option, +} + +impl ValidatorPorts { + fn new() -> Self { + Self { jrpc: None, web: None } + } + + pub fn jrpc_pub_address(&self, listen_ip: IpAddr) -> Option { + if self.jrpc.is_none() { + return None; + } + + Some(format!("{}: {}", listen_ip, self.jrpc.unwrap())) + } + + pub fn web_ui_address(&self, listen_ip: IpAddr) -> Option { + if self.web.is_none() { + return None; + } + + Some(format!("{}:{}", listen_ip, self.web.unwrap())) + } +} + +#[derive(Clone)] +pub struct MinotariPorts { + pub p2p: Option, + pub grpc: Option, +} + +#[allow(dead_code)] +impl MinotariPorts { + fn new() -> Self { + Self { p2p: None, grpc: None } + } + + pub fn p2p_port_as_string(&self) -> Option { + if self.p2p.is_none() { + return None; + } + + Some(format!("{}", self.p2p.unwrap())) + } + + pub fn grpc_port_as_string(&self) -> Option { + if self.grpc.is_none() { + return None; + } + + Some(format!("{}", self.grpc.unwrap())) + } +} From 1cb3a95a1368ab0ccc3a735355e8ca1801ea392e Mon Sep 17 00:00:00 2001 From: dkf Date: Fri, 9 Aug 2024 17:28:06 +0100 Subject: [PATCH 5/8] chore: make linters happy --- applications/tari_watcher/src/forker.rs | 35 ++++++++++++------------ applications/tari_watcher/src/main.rs | 6 ++-- applications/tari_watcher/src/manager.rs | 6 ++-- applications/tari_watcher/src/port.rs | 27 ++++++------------ 4 files changed, 34 insertions(+), 40 deletions(-) diff --git a/applications/tari_watcher/src/forker.rs b/applications/tari_watcher/src/forker.rs index 40fd6975d..1e9961b54 100644 --- a/applications/tari_watcher/src/forker.rs +++ b/applications/tari_watcher/src/forker.rs @@ -1,21 +1,23 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use crate::{ - config::{ExecutableConfig, InstanceType}, - port::PortAllocator, -}; - -use tokio::fs; -use tokio::process::{Child, Command}; - -use std::{io, path::PathBuf}; use std::{ + io, net::{IpAddr, Ipv4Addr}, + path::PathBuf, process::Stdio, }; use anyhow::bail; +use tokio::{ + fs, + process::{Child, Command}, +}; + +use crate::{ + config::{ExecutableConfig, InstanceType}, + port::PortAllocator, +}; const TARI_L2_VN: InstanceType = InstanceType::TariValidatorNode; const MINO_L1_WALLET: InstanceType = InstanceType::MinoTariConsoleWallet; @@ -51,9 +53,7 @@ impl Forker { instance.bootstrap().await?; // verify everything is up and running - if let Err(e) = instance.validator_ready() { - return Err(e); - } + instance.validator_ready()?; self.validator = Some(instance.clone()); @@ -63,14 +63,14 @@ impl Forker { let mut command = self .get_command( config.executable_path.unwrap(), - "esmeralda".to_string(), /* TODO: add network to cfg */ + "esmeralda".to_string(), // TODO: add network to cfg base_node_grpc_address, json_rpc_public_address.unwrap(), web_ui_address.unwrap(), ) .await?; - //TODO: stdout logs + // TODO: stdout logs // let process_dir = self.base_dir.join("processes").join("TariValidatorNode"); // let stdout_log_path = process_dir.join("stdout.log"); // let stderr_log_path = process_dir.join("stderr.log"); @@ -170,9 +170,9 @@ impl Instance { bail!("Validator JSON-RPC address not initialized, this should not happen"); } else if self.port.validator.web.is_none() { bail!("Validator Web UI address not initialized, this should not happen"); + } else { + Ok(()) } - - Ok(()) } #[allow(dead_code)] @@ -183,7 +183,8 @@ impl Instance { bail!("Wallet P2P address not initialized, this should not happen"); } else if self.port.wallet.grpc.is_none() { bail!("Wallet gRPC address not initialized, this should not happen"); + } else { + Ok(()) } - Ok(()) } } diff --git a/applications/tari_watcher/src/main.rs b/applications/tari_watcher/src/main.rs index a9a7d57be..fb2358e3e 100644 --- a/applications/tari_watcher/src/main.rs +++ b/applications/tari_watcher/src/main.rs @@ -1,17 +1,17 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use anyhow::{anyhow, Context}; use std::time::SystemTime; + +use anyhow::{anyhow, Context}; use tokio::fs; use crate::{ cli::{Cli, Commands}, config::{get_base_config, Config}, + manager::ProcessManager, }; -use crate::manager::ProcessManager; - mod cli; mod config; mod forker; diff --git a/applications/tari_watcher/src/manager.rs b/applications/tari_watcher/src/manager.rs index f30f75f6d..1ca7af08d 100644 --- a/applications/tari_watcher/src/manager.rs +++ b/applications/tari_watcher/src/manager.rs @@ -1,8 +1,10 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use crate::config::{Config, ExecutableConfig}; -use crate::forker::Forker; +use crate::{ + config::{Config, ExecutableConfig}, + forker::Forker, +}; pub struct ProcessManager { pub validator_config: ExecutableConfig, diff --git a/applications/tari_watcher/src/port.rs b/applications/tari_watcher/src/port.rs index 433eb0e0e..eeb3fec2c 100644 --- a/applications/tari_watcher/src/port.rs +++ b/applications/tari_watcher/src/port.rs @@ -1,7 +1,10 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::io; -use std::net::{IpAddr, SocketAddr}; +use std::{ + io, + net::{IpAddr, SocketAddr}, +}; + use tokio::net::TcpListener; use crate::config::InstanceType; @@ -62,18 +65,12 @@ impl ValidatorPorts { } pub fn jrpc_pub_address(&self, listen_ip: IpAddr) -> Option { - if self.jrpc.is_none() { - return None; - } - + self.jrpc?; Some(format!("{}: {}", listen_ip, self.jrpc.unwrap())) } pub fn web_ui_address(&self, listen_ip: IpAddr) -> Option { - if self.web.is_none() { - return None; - } - + self.web?; Some(format!("{}:{}", listen_ip, self.web.unwrap())) } } @@ -91,18 +88,12 @@ impl MinotariPorts { } pub fn p2p_port_as_string(&self) -> Option { - if self.p2p.is_none() { - return None; - } - + self.p2p?; Some(format!("{}", self.p2p.unwrap())) } pub fn grpc_port_as_string(&self) -> Option { - if self.grpc.is_none() { - return None; - } - + self.grpc?; Some(format!("{}", self.grpc.unwrap())) } } From bc2f401bd44b0da2b939aaa12984182d7e3ddec1 Mon Sep 17 00:00:00 2001 From: dkf Date: Mon, 12 Aug 2024 10:09:23 +0100 Subject: [PATCH 6/8] fix(watcher): read vn ports from cfg and renaming --- applications/tari_watcher/src/forker.rs | 28 ++++----- applications/tari_watcher/src/port.rs | 76 +++++++++++++++---------- 2 files changed, 58 insertions(+), 46 deletions(-) diff --git a/applications/tari_watcher/src/forker.rs b/applications/tari_watcher/src/forker.rs index 1e9961b54..920825fa9 100644 --- a/applications/tari_watcher/src/forker.rs +++ b/applications/tari_watcher/src/forker.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: BSD-3-Clause use std::{ - io, net::{IpAddr, Ipv4Addr}, path::PathBuf, process::Stdio, @@ -19,9 +18,6 @@ use crate::{ port::PortAllocator, }; -const TARI_L2_VN: InstanceType = InstanceType::TariValidatorNode; -const MINO_L1_WALLET: InstanceType = InstanceType::MinoTariConsoleWallet; - #[allow(dead_code)] pub struct Forker { // Used for the validator to connect to the base (L1) node @@ -49,7 +45,7 @@ impl Forker { config: ExecutableConfig, base_node_grpc_address: String, ) -> anyhow::Result { - let mut instance = Instance::new(TARI_L2_VN, config.clone()); + let mut instance = Instance::new(InstanceType::TariValidatorNode, config.clone()); instance.bootstrap().await?; // verify everything is up and running @@ -134,13 +130,13 @@ impl Instance { pub fn new(app: InstanceType, config: ExecutableConfig) -> Self { Self { app, - config, + config: config.clone(), listen_ip: None, - port: PortAllocator::new(), + port: PortAllocator::new(config.env), } } - pub async fn bootstrap(&mut self) -> io::Result<()> { + pub async fn bootstrap(&mut self) -> anyhow::Result<()> { if self.listen_ip.is_some() { log::warn!("Instance {} already bootstrapped, ignore", self.app.to_string()); return Ok(()); @@ -149,17 +145,15 @@ impl Instance { let listen = IpAddr::V4(Ipv4Addr::from([127, 0, 0, 1])); self.listen_ip = Some(listen); - match self.app { - TARI_L2_VN => { - self.port.open_at(TARI_L2_VN, "jrpc").await?; - self.port.open_at(TARI_L2_VN, "web").await?; - }, - MINO_L1_WALLET => { - self.port.open_at(MINO_L1_WALLET, "p2p").await?; - self.port.open_at(MINO_L1_WALLET, "grpc").await?; - }, + if self.app != InstanceType::TariValidatorNode { + bail!( + "Attempted to bootstrap unrecognized instance type: {}", + self.app.to_string() + ); } + self.port.open_vn_ports(InstanceType::TariValidatorNode).await?; + Ok(()) } diff --git a/applications/tari_watcher/src/port.rs b/applications/tari_watcher/src/port.rs index eeb3fec2c..f7570d7ca 100644 --- a/applications/tari_watcher/src/port.rs +++ b/applications/tari_watcher/src/port.rs @@ -1,10 +1,8 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::{ - io, - net::{IpAddr, SocketAddr}, -}; +use std::net::{IpAddr, SocketAddr}; +use anyhow::bail; use tokio::net::TcpListener; use crate::config::InstanceType; @@ -12,47 +10,67 @@ use crate::config::InstanceType; #[derive(Clone)] pub struct PortAllocator { pub validator: ValidatorPorts, + pub env: Vec<(String, String)>, pub wallet: MinotariPorts, } impl PortAllocator { - pub fn new() -> Self { + pub fn new(env: Vec<(String, String)>) -> Self { Self { validator: ValidatorPorts::new(), + env, wallet: MinotariPorts::new(), } } - pub async fn open_at(&mut self, instance: InstanceType, name: &'static str) -> io::Result { - let addr = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))).await?; - let port = addr.local_addr()?.port(); - match instance { - InstanceType::TariValidatorNode => { - if name == "jrpc" { - self.validator.jrpc = Some(port) - } else if name == "web" { - self.validator.web = Some(port) - } else { - log::error!("Invalid port name for {} instance: {}", instance, port); - } - }, - InstanceType::MinoTariConsoleWallet => { - if name == "p2p" { - self.wallet.p2p = Some(port) - } else if name == "grpc" { - self.wallet.grpc = Some(port) - } else { - log::error!("Invalid port name for {} instance: {}", instance, port); - } - }, + fn vn_json_rpc_port(&self) -> Option { + self.env + .iter() + .find(|(k, _)| k == "VN_JSON_RPC_PORT") + .map(|(_, v)| v.parse().unwrap()) + } + + fn vn_http_port(&self) -> Option { + self.env + .iter() + .find(|(k, _)| k == "VN_HTTP_PORT") + .map(|(_, v)| v.parse().unwrap()) + } + + pub async fn open_vn_ports(&mut self, instance: InstanceType) -> anyhow::Result<()> { + if instance != InstanceType::TariValidatorNode { + log::error!("Unrecognized instance type {}", instance); + bail!("Unrecognized instance type {}", instance.to_string()); } - log::info!("Started a {}-{} port on {}", instance.to_string(), name, port); + if let Some(port) = self.vn_json_rpc_port() { + log::info!("VN json rpc port started at {}", port); + self.validator.jrpc = Some(port as u16); + } else { + // in case we are missing a port from config, allocate a random one + let fallback = random_port().await?; + self.validator.jrpc = Some(fallback); + log::warn!("Missing validator node json rpc port from config, using: {}", fallback); + } - Ok(port) + if let Some(port) = self.vn_http_port() { + log::info!("VN http port started at {}", port); + self.validator.web = Some(port as u16); + } else { + let fallback = random_port().await?; + self.validator.web = Some(fallback); + log::warn!("Missing validator node http port from config, using: {}", fallback); + } + + Ok(()) } } +async fn random_port() -> anyhow::Result { + let addr = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))).await?; + Ok(addr.local_addr()?.port()) +} + #[derive(Clone)] pub struct ValidatorPorts { pub jrpc: Option, From 25908157b62e3c4676f3ad709851564ced075306 Mon Sep 17 00:00:00 2001 From: dkf Date: Mon, 12 Aug 2024 11:47:54 +0100 Subject: [PATCH 7/8] fix(watcher): vn addresses and cleanup --- applications/tari_watcher/src/config.rs | 8 +++ applications/tari_watcher/src/forker.rs | 71 +++------------------- applications/tari_watcher/src/main.rs | 7 ++- applications/tari_watcher/src/port.rs | 81 +------------------------ 4 files changed, 22 insertions(+), 145 deletions(-) diff --git a/applications/tari_watcher/src/config.rs b/applications/tari_watcher/src/config.rs index 230f972a0..8e75c29b1 100644 --- a/applications/tari_watcher/src/config.rs +++ b/applications/tari_watcher/src/config.rs @@ -23,6 +23,12 @@ pub struct Config { /// The Minotari console wallet gRPC address pub base_wallet_grpc_address: String, + // The L2 validator node JSON RPC address + pub vn_public_json_rpc_address: String, + + // The L2 validator node web GUI server address + pub vn_gui_http_address: String, + /// The base directory of the watcher with configuration and data files pub base_dir: PathBuf, @@ -145,6 +151,8 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result { auto_register: true, base_node_grpc_address: "localhost:18142".to_string(), base_wallet_grpc_address: "localhost:18143".to_string(), + vn_public_json_rpc_address: "".to_string(), + vn_gui_http_address: "".to_string(), sidechain_id: None, base_dir: base_dir.clone(), vn_registration_file: base_dir.join("registration.json"), diff --git a/applications/tari_watcher/src/forker.rs b/applications/tari_watcher/src/forker.rs index 920825fa9..1e6b1c144 100644 --- a/applications/tari_watcher/src/forker.rs +++ b/applications/tari_watcher/src/forker.rs @@ -1,13 +1,8 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::{ - net::{IpAddr, Ipv4Addr}, - path::PathBuf, - process::Stdio, -}; +use std::{net::IpAddr, path::PathBuf, process::Stdio}; -use anyhow::bail; use tokio::{ fs, process::{Child, Command}, @@ -44,25 +39,19 @@ impl Forker { &mut self, config: ExecutableConfig, base_node_grpc_address: String, + vn_public_json_rpc_address: String, + vn_gui_http_address: String, ) -> anyhow::Result { - let mut instance = Instance::new(InstanceType::TariValidatorNode, config.clone()); - instance.bootstrap().await?; - - // verify everything is up and running - instance.validator_ready()?; - + let instance = Instance::new(InstanceType::TariValidatorNode, config.clone()); self.validator = Some(instance.clone()); - let json_rpc_public_address = instance.port.validator.jrpc_pub_address(instance.listen_ip.unwrap()); - let web_ui_address = instance.port.validator.web_ui_address(instance.listen_ip.unwrap()); - let mut command = self .get_command( config.executable_path.unwrap(), "esmeralda".to_string(), // TODO: add network to cfg base_node_grpc_address, - json_rpc_public_address.unwrap(), - web_ui_address.unwrap(), + vn_public_json_rpc_address, + vn_gui_http_address, ) .await?; @@ -132,53 +121,7 @@ impl Instance { app, config: config.clone(), listen_ip: None, - port: PortAllocator::new(config.env), - } - } - - pub async fn bootstrap(&mut self) -> anyhow::Result<()> { - if self.listen_ip.is_some() { - log::warn!("Instance {} already bootstrapped, ignore", self.app.to_string()); - return Ok(()); - } - - let listen = IpAddr::V4(Ipv4Addr::from([127, 0, 0, 1])); - self.listen_ip = Some(listen); - - if self.app != InstanceType::TariValidatorNode { - bail!( - "Attempted to bootstrap unrecognized instance type: {}", - self.app.to_string() - ); - } - - self.port.open_vn_ports(InstanceType::TariValidatorNode).await?; - - Ok(()) - } - - pub fn validator_ready(&self) -> anyhow::Result<()> { - if self.listen_ip.is_none() { - bail!("Validator listener not initialized, this should not happen"); - } else if self.port.validator.jrpc.is_none() { - bail!("Validator JSON-RPC address not initialized, this should not happen"); - } else if self.port.validator.web.is_none() { - bail!("Validator Web UI address not initialized, this should not happen"); - } else { - Ok(()) - } - } - - #[allow(dead_code)] - pub fn wallet_ready(&self) -> anyhow::Result<()> { - if self.listen_ip.is_none() { - bail!("Wallet listener not initialized, this should not happen"); - } else if self.port.wallet.p2p.is_none() { - bail!("Wallet P2P address not initialized, this should not happen"); - } else if self.port.wallet.grpc.is_none() { - bail!("Wallet gRPC address not initialized, this should not happen"); - } else { - Ok(()) + port: PortAllocator::new(), } } } diff --git a/applications/tari_watcher/src/main.rs b/applications/tari_watcher/src/main.rs index fb2358e3e..d9c3be82f 100644 --- a/applications/tari_watcher/src/main.rs +++ b/applications/tari_watcher/src/main.rs @@ -61,7 +61,12 @@ async fn start(config: Config) -> anyhow::Result<()> { let mut manager = ProcessManager::new(config.clone()); manager .forker - .start_validator(manager.validator_config, config.base_node_grpc_address) + .start_validator( + manager.validator_config, + config.base_node_grpc_address, + config.vn_public_json_rpc_address, + config.vn_gui_http_address, + ) .await?; Ok(()) diff --git a/applications/tari_watcher/src/port.rs b/applications/tari_watcher/src/port.rs index f7570d7ca..0938d0b4c 100644 --- a/applications/tari_watcher/src/port.rs +++ b/applications/tari_watcher/src/port.rs @@ -1,96 +1,17 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::net::{IpAddr, SocketAddr}; - -use anyhow::bail; -use tokio::net::TcpListener; - -use crate::config::InstanceType; #[derive(Clone)] pub struct PortAllocator { - pub validator: ValidatorPorts, - pub env: Vec<(String, String)>, pub wallet: MinotariPorts, } impl PortAllocator { - pub fn new(env: Vec<(String, String)>) -> Self { + pub fn new() -> Self { Self { - validator: ValidatorPorts::new(), - env, wallet: MinotariPorts::new(), } } - - fn vn_json_rpc_port(&self) -> Option { - self.env - .iter() - .find(|(k, _)| k == "VN_JSON_RPC_PORT") - .map(|(_, v)| v.parse().unwrap()) - } - - fn vn_http_port(&self) -> Option { - self.env - .iter() - .find(|(k, _)| k == "VN_HTTP_PORT") - .map(|(_, v)| v.parse().unwrap()) - } - - pub async fn open_vn_ports(&mut self, instance: InstanceType) -> anyhow::Result<()> { - if instance != InstanceType::TariValidatorNode { - log::error!("Unrecognized instance type {}", instance); - bail!("Unrecognized instance type {}", instance.to_string()); - } - - if let Some(port) = self.vn_json_rpc_port() { - log::info!("VN json rpc port started at {}", port); - self.validator.jrpc = Some(port as u16); - } else { - // in case we are missing a port from config, allocate a random one - let fallback = random_port().await?; - self.validator.jrpc = Some(fallback); - log::warn!("Missing validator node json rpc port from config, using: {}", fallback); - } - - if let Some(port) = self.vn_http_port() { - log::info!("VN http port started at {}", port); - self.validator.web = Some(port as u16); - } else { - let fallback = random_port().await?; - self.validator.web = Some(fallback); - log::warn!("Missing validator node http port from config, using: {}", fallback); - } - - Ok(()) - } -} - -async fn random_port() -> anyhow::Result { - let addr = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))).await?; - Ok(addr.local_addr()?.port()) -} - -#[derive(Clone)] -pub struct ValidatorPorts { - pub jrpc: Option, - pub web: Option, -} - -impl ValidatorPorts { - fn new() -> Self { - Self { jrpc: None, web: None } - } - - pub fn jrpc_pub_address(&self, listen_ip: IpAddr) -> Option { - self.jrpc?; - Some(format!("{}: {}", listen_ip, self.jrpc.unwrap())) - } - - pub fn web_ui_address(&self, listen_ip: IpAddr) -> Option { - self.web?; - Some(format!("{}:{}", listen_ip, self.web.unwrap())) - } } #[derive(Clone)] From a5f5420bcd61a78afbda0e2ee19682b18cb01ecf Mon Sep 17 00:00:00 2001 From: dkf Date: Mon, 12 Aug 2024 13:18:42 +0100 Subject: [PATCH 8/8] fix(watcher): remove vn json and http address from cfg --- applications/tari_watcher/src/config.rs | 8 --- applications/tari_watcher/src/forker.rs | 76 ++++++------------------- applications/tari_watcher/src/main.rs | 10 +--- 3 files changed, 18 insertions(+), 76 deletions(-) diff --git a/applications/tari_watcher/src/config.rs b/applications/tari_watcher/src/config.rs index 8e75c29b1..230f972a0 100644 --- a/applications/tari_watcher/src/config.rs +++ b/applications/tari_watcher/src/config.rs @@ -23,12 +23,6 @@ pub struct Config { /// The Minotari console wallet gRPC address pub base_wallet_grpc_address: String, - // The L2 validator node JSON RPC address - pub vn_public_json_rpc_address: String, - - // The L2 validator node web GUI server address - pub vn_gui_http_address: String, - /// The base directory of the watcher with configuration and data files pub base_dir: PathBuf, @@ -151,8 +145,6 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result { auto_register: true, base_node_grpc_address: "localhost:18142".to_string(), base_wallet_grpc_address: "localhost:18143".to_string(), - vn_public_json_rpc_address: "".to_string(), - vn_gui_http_address: "".to_string(), sidechain_id: None, base_dir: base_dir.clone(), vn_registration_file: base_dir.join("registration.json"), diff --git a/applications/tari_watcher/src/forker.rs b/applications/tari_watcher/src/forker.rs index 1e6b1c144..d49033e33 100644 --- a/applications/tari_watcher/src/forker.rs +++ b/applications/tari_watcher/src/forker.rs @@ -1,13 +1,15 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use std::{net::IpAddr, path::PathBuf, process::Stdio}; - -use tokio::{ - fs, - process::{Child, Command}, +use std::{ + env, + net::IpAddr, + path::{Path, PathBuf}, + process::Stdio, }; +use tokio::process::{Child, Command}; + use crate::{ config::{ExecutableConfig, InstanceType}, port::PortAllocator, @@ -35,75 +37,31 @@ impl Forker { } } - pub async fn start_validator( - &mut self, - config: ExecutableConfig, - base_node_grpc_address: String, - vn_public_json_rpc_address: String, - vn_gui_http_address: String, - ) -> anyhow::Result { + pub async fn start_validator(&mut self, config: ExecutableConfig) -> anyhow::Result { let instance = Instance::new(InstanceType::TariValidatorNode, config.clone()); self.validator = Some(instance.clone()); - let mut command = self - .get_command( - config.executable_path.unwrap(), - "esmeralda".to_string(), // TODO: add network to cfg - base_node_grpc_address, - vn_public_json_rpc_address, - vn_gui_http_address, - ) - .await?; + let mut cmd = Command::new( + config + .executable_path + .unwrap_or_else(|| Path::new("tari_validator_node").to_path_buf()), + ); // TODO: stdout logs // let process_dir = self.base_dir.join("processes").join("TariValidatorNode"); // let stdout_log_path = process_dir.join("stdout.log"); // let stderr_log_path = process_dir.join("stderr.log"); - command + cmd.envs(env::vars()) + //.arg(format!("--config={validator_node_config_path}")) .kill_on_drop(true) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .stdin(Stdio::null()); - let child = command.spawn()?; + let child = cmd.spawn()?; Ok(child) } - - async fn get_command( - &self, - target_binary: PathBuf, - network: String, - base_node_grpc_address: String, - json_rpc_public_address: String, - web_ui_address: String, - ) -> anyhow::Result { - log::debug!("Creating validator command from base directory: {:?}", self.base_dir); - - // create directory for the validator process - let process_dir = self.base_dir.join("processes").join("TariValidatorNode"); - fs::create_dir_all(&process_dir).await?; - - log::debug!("Creating validator process to run from: {:?}", process_dir); - - let json_rpc_address = json_rpc_public_address.clone(); - let mut command = Command::new(target_binary); - let empty: Vec<(&str, &str)> = Vec::new(); - command - .envs(empty) - .arg("-b") - .arg(process_dir) - .arg("--network") - .arg(network) - .arg(format!("--json-rpc-public-address={json_rpc_public_address}")) - .arg(format!( - "-pvalidator_node.base_node_grpc_address={base_node_grpc_address}" - )) - .arg(format!("-pvalidator_node.json_rpc_listener_address={json_rpc_address}")) - .arg(format!("-pvalidator_node.http_ui_listener_address={web_ui_address}")) - .arg("-pvalidator_node.base_layer_scanning_interval=1"); - Ok(command) - } } #[allow(dead_code)] @@ -119,7 +77,7 @@ impl Instance { pub fn new(app: InstanceType, config: ExecutableConfig) -> Self { Self { app, - config: config.clone(), + config, listen_ip: None, port: PortAllocator::new(), } diff --git a/applications/tari_watcher/src/main.rs b/applications/tari_watcher/src/main.rs index d9c3be82f..9c7ed437e 100644 --- a/applications/tari_watcher/src/main.rs +++ b/applications/tari_watcher/src/main.rs @@ -59,15 +59,7 @@ async fn main() -> anyhow::Result<()> { async fn start(config: Config) -> anyhow::Result<()> { let mut manager = ProcessManager::new(config.clone()); - manager - .forker - .start_validator( - manager.validator_config, - config.base_node_grpc_address, - config.vn_public_json_rpc_address, - config.vn_gui_http_address, - ) - .await?; + manager.forker.start_validator(manager.validator_config).await?; Ok(()) }