Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(watcher): start validator node process #1116

Merged
8 changes: 8 additions & 0 deletions applications/tari_watcher/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ pub struct Config {
/// The Minotari console wallet gRPC address
pub base_wallet_grpc_address: String,

// The L2 validator node JSON RPC address
pub vn_public_json_rpc_address: String,

// The L2 validator node web GUI server address
pub vn_gui_http_address: String,

sdbondi marked this conversation as resolved.
Show resolved Hide resolved
/// The base directory of the watcher with configuration and data files
pub base_dir: PathBuf,

Expand Down Expand Up @@ -145,6 +151,8 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result<Config> {
auto_register: true,
base_node_grpc_address: "localhost:18142".to_string(),
base_wallet_grpc_address: "localhost:18143".to_string(),
vn_public_json_rpc_address: "".to_string(),
vn_gui_http_address: "".to_string(),
sidechain_id: None,
base_dir: base_dir.clone(),
vn_registration_file: base_dir.join("registration.json"),
Expand Down
71 changes: 7 additions & 64 deletions applications/tari_watcher/src/forker.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::{
net::{IpAddr, Ipv4Addr},
path::PathBuf,
process::Stdio,
};
use std::{net::IpAddr, path::PathBuf, process::Stdio};

use anyhow::bail;
use tokio::{
fs,
process::{Child, Command},
Expand Down Expand Up @@ -44,25 +39,19 @@ impl Forker {
&mut self,
config: ExecutableConfig,
base_node_grpc_address: String,
vn_public_json_rpc_address: String,
vn_gui_http_address: String,
) -> anyhow::Result<Child> {
let mut instance = Instance::new(InstanceType::TariValidatorNode, config.clone());
instance.bootstrap().await?;

// verify everything is up and running
instance.validator_ready()?;

let instance = Instance::new(InstanceType::TariValidatorNode, config.clone());
self.validator = Some(instance.clone());

let json_rpc_public_address = instance.port.validator.jrpc_pub_address(instance.listen_ip.unwrap());
let web_ui_address = instance.port.validator.web_ui_address(instance.listen_ip.unwrap());

let mut command = self
.get_command(
config.executable_path.unwrap(),
"esmeralda".to_string(), // TODO: add network to cfg
base_node_grpc_address,
json_rpc_public_address.unwrap(),
web_ui_address.unwrap(),
vn_public_json_rpc_address,
sdbondi marked this conversation as resolved.
Show resolved Hide resolved
vn_gui_http_address,
)
.await?;

Expand Down Expand Up @@ -132,53 +121,7 @@ impl Instance {
app,
config: config.clone(),
listen_ip: None,
port: PortAllocator::new(config.env),
}
}

pub async fn bootstrap(&mut self) -> anyhow::Result<()> {
if self.listen_ip.is_some() {
log::warn!("Instance {} already bootstrapped, ignore", self.app.to_string());
return Ok(());
}

let listen = IpAddr::V4(Ipv4Addr::from([127, 0, 0, 1]));
self.listen_ip = Some(listen);

if self.app != InstanceType::TariValidatorNode {
bail!(
"Attempted to bootstrap unrecognized instance type: {}",
self.app.to_string()
);
}

self.port.open_vn_ports(InstanceType::TariValidatorNode).await?;

Ok(())
}

pub fn validator_ready(&self) -> anyhow::Result<()> {
if self.listen_ip.is_none() {
bail!("Validator listener not initialized, this should not happen");
} else if self.port.validator.jrpc.is_none() {
bail!("Validator JSON-RPC address not initialized, this should not happen");
} else if self.port.validator.web.is_none() {
bail!("Validator Web UI address not initialized, this should not happen");
} else {
Ok(())
}
}

#[allow(dead_code)]
pub fn wallet_ready(&self) -> anyhow::Result<()> {
if self.listen_ip.is_none() {
bail!("Wallet listener not initialized, this should not happen");
} else if self.port.wallet.p2p.is_none() {
bail!("Wallet P2P address not initialized, this should not happen");
} else if self.port.wallet.grpc.is_none() {
bail!("Wallet gRPC address not initialized, this should not happen");
} else {
Ok(())
port: PortAllocator::new(),
}
}
}
7 changes: 6 additions & 1 deletion applications/tari_watcher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ async fn start(config: Config) -> anyhow::Result<()> {
let mut manager = ProcessManager::new(config.clone());
manager
.forker
.start_validator(manager.validator_config, config.base_node_grpc_address)
.start_validator(
manager.validator_config,
config.base_node_grpc_address,
config.vn_public_json_rpc_address,
config.vn_gui_http_address,
)
.await?;

Ok(())
Expand Down
81 changes: 1 addition & 80 deletions applications/tari_watcher/src/port.rs
Original file line number Diff line number Diff line change
@@ -1,96 +1,17 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause
use std::net::{IpAddr, SocketAddr};

use anyhow::bail;
use tokio::net::TcpListener;

use crate::config::InstanceType;

#[derive(Clone)]
pub struct PortAllocator {
pub validator: ValidatorPorts,
pub env: Vec<(String, String)>,
pub wallet: MinotariPorts,
}

impl PortAllocator {
pub fn new(env: Vec<(String, String)>) -> Self {
pub fn new() -> Self {
Self {
validator: ValidatorPorts::new(),
env,
wallet: MinotariPorts::new(),
}
}

fn vn_json_rpc_port(&self) -> Option<u32> {
self.env
.iter()
.find(|(k, _)| k == "VN_JSON_RPC_PORT")
.map(|(_, v)| v.parse().unwrap())
}

fn vn_http_port(&self) -> Option<u32> {
self.env
.iter()
.find(|(k, _)| k == "VN_HTTP_PORT")
.map(|(_, v)| v.parse().unwrap())
}

pub async fn open_vn_ports(&mut self, instance: InstanceType) -> anyhow::Result<()> {
if instance != InstanceType::TariValidatorNode {
log::error!("Unrecognized instance type {}", instance);
bail!("Unrecognized instance type {}", instance.to_string());
}

if let Some(port) = self.vn_json_rpc_port() {
log::info!("VN json rpc port started at {}", port);
self.validator.jrpc = Some(port as u16);
} else {
// in case we are missing a port from config, allocate a random one
let fallback = random_port().await?;
self.validator.jrpc = Some(fallback);
log::warn!("Missing validator node json rpc port from config, using: {}", fallback);
}

if let Some(port) = self.vn_http_port() {
log::info!("VN http port started at {}", port);
self.validator.web = Some(port as u16);
} else {
let fallback = random_port().await?;
self.validator.web = Some(fallback);
log::warn!("Missing validator node http port from config, using: {}", fallback);
}

Ok(())
}
}

async fn random_port() -> anyhow::Result<u16> {
let addr = TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))).await?;
Ok(addr.local_addr()?.port())
}

#[derive(Clone)]
pub struct ValidatorPorts {
pub jrpc: Option<u16>,
pub web: Option<u16>,
}

impl ValidatorPorts {
fn new() -> Self {
Self { jrpc: None, web: None }
}

pub fn jrpc_pub_address(&self, listen_ip: IpAddr) -> Option<String> {
self.jrpc?;
Some(format!("{}: {}", listen_ip, self.jrpc.unwrap()))
}

pub fn web_ui_address(&self, listen_ip: IpAddr) -> Option<String> {
self.web?;
Some(format!("{}:{}", listen_ip, self.web.unwrap()))
}
}

#[derive(Clone)]
Expand Down
Loading