Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(watcher): start validator node process #1116

Merged
Merged
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion applications/tari_watcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
#tari_common = { workspace = true }

clap = { workspace = true, features = ["derive"] }
serde = { workspace = true, features = ["derive"] }
anyhow = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread", "macros", "signal", "process", "time", "fs", "io-util"] }
log = { workspace = true }
fern = { workspace = true, features = ["colored"] }

toml = "0.8.12"
humantime = "2.1.0"
31 changes: 29 additions & 2 deletions applications/tari_watcher/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::path::PathBuf;

use clap::Parser;

use crate::config::Config;
use crate::config::{Config, InstanceType};

#[derive(Clone, Debug, Parser)]
pub struct Cli {
Expand Down Expand Up @@ -43,7 +43,7 @@ pub struct CommonCli {
#[derive(Clone, Debug, clap::Subcommand)]
pub enum Commands {
Init(InitArgs),
Start,
Start(Overrides),
}

#[derive(Clone, Debug, clap::Args)]
Expand All @@ -58,3 +58,30 @@ impl InitArgs {
config.auto_register = !self.no_auto_register;
}
}

#[derive(Clone, Debug, clap::Args)]
pub struct Overrides {
#[clap(long)]
// The path to the validator node binary (optional)
pub vn_node_path: Option<PathBuf>,
}

impl Overrides {
pub fn apply(&self, config: &mut Config) {
if self.vn_node_path.is_none() {
return;
}

if let Some(exec_config) = config
.executable_config
.iter_mut()
.find(|c| c.instance_type == InstanceType::TariValidatorNode)
{
exec_config.executable_path = self.vn_node_path.clone();
}
log::info!(
"Overriding validator node binary path to {:?}",
self.vn_node_path.as_ref().unwrap()
);
}
}
26 changes: 6 additions & 20 deletions applications/tari_watcher/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub struct Config {
/// The Minotari console wallet gRPC address
pub base_wallet_grpc_address: String,

/// The base directory of the watcher with configuration and data files
pub base_dir: PathBuf,

/// The path of the validator node registration file, containing signed information required to
/// submit a registration transaction on behalf of the node
pub vn_registration_file: PathBuf,
Expand Down Expand Up @@ -71,17 +74,9 @@ pub struct ChannelConfig {
pub struct ExecutableConfig {
pub instance_type: InstanceType,
pub executable_path: Option<PathBuf>,
pub compile: Option<CompileConfig>,
pub env: Vec<(String, String)>,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct CompileConfig {
pub working_dir: Option<PathBuf>,
pub package_name: String,
pub target_dir: Option<PathBuf>,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct InstanceConfig {
pub name: String,
Expand Down Expand Up @@ -116,22 +111,12 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result<Config> {
let executables = vec![
ExecutableConfig {
instance_type: InstanceType::TariValidatorNode,
executable_path: Some("target/release/minotari_node".into()),
compile: Some(CompileConfig {
working_dir: Some("../tari".into()),
package_name: "minotari_node".to_string(),
target_dir: None,
}),
executable_path: Some("target/release/tari_validator_node".into()),
env: vec![],
},
ExecutableConfig {
instance_type: InstanceType::MinoTariConsoleWallet,
executable_path: Some("target/release/minotari_wallet".into()),
compile: Some(CompileConfig {
working_dir: Some("../tari".into()),
package_name: "minotari_wallet".to_string(),
target_dir: None,
}),
env: vec![],
},
];
Expand Down Expand Up @@ -161,7 +146,8 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result<Config> {
base_node_grpc_address: "localhost:18142".to_string(),
base_wallet_grpc_address: "localhost:18143".to_string(),
sidechain_id: None,
vn_registration_file: base_dir.join("vn_registration.toml"),
base_dir: base_dir.clone(),
vn_registration_file: base_dir.join("registration.json"),
instance_config: instances.to_vec(),
executable_config: executables,
channel_config: vec![
Expand Down
190 changes: 190 additions & 0 deletions applications/tari_watcher/src/forker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use std::{
io,
net::{IpAddr, Ipv4Addr},
path::PathBuf,
process::Stdio,
};

use anyhow::bail;
use tokio::{
fs,
process::{Child, Command},
};

use crate::{
config::{ExecutableConfig, InstanceType},
port::PortAllocator,
};

const TARI_L2_VN: InstanceType = InstanceType::TariValidatorNode;
const MINO_L1_WALLET: InstanceType = InstanceType::MinoTariConsoleWallet;
sdbondi marked this conversation as resolved.
Show resolved Hide resolved

#[allow(dead_code)]
pub struct Forker {
// Used for the validator to connect to the base (L1) node
base_node_grpc_address: String,
// The base directory of calling the application
base_dir: PathBuf,
// The Tari L2 validator instance
validator: Option<Instance>,
// The Minotari L1 wallet instance
wallet: Option<Instance>,
}

impl Forker {
pub fn new(base_node_grpc_address: String, base_dir: PathBuf) -> Self {
Self {
validator: None,
wallet: None,
base_node_grpc_address,
base_dir,
}
}

pub async fn start_validator(
&mut self,
config: ExecutableConfig,
base_node_grpc_address: String,
) -> anyhow::Result<Child> {
let mut instance = Instance::new(TARI_L2_VN, config.clone());
instance.bootstrap().await?;

// verify everything is up and running
instance.validator_ready()?;

self.validator = Some(instance.clone());

let json_rpc_public_address = instance.port.validator.jrpc_pub_address(instance.listen_ip.unwrap());
let web_ui_address = instance.port.validator.web_ui_address(instance.listen_ip.unwrap());

let mut command = self
.get_command(
config.executable_path.unwrap(),
"esmeralda".to_string(), // TODO: add network to cfg
base_node_grpc_address,
json_rpc_public_address.unwrap(),
web_ui_address.unwrap(),
)
.await?;

// TODO: stdout logs
// let process_dir = self.base_dir.join("processes").join("TariValidatorNode");
// let stdout_log_path = process_dir.join("stdout.log");
// let stderr_log_path = process_dir.join("stderr.log");
command
.kill_on_drop(true)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.stdin(Stdio::null());

let child = command.spawn()?;

Ok(child)
}

async fn get_command(
&self,
target_binary: PathBuf,
network: String,
base_node_grpc_address: String,
json_rpc_public_address: String,
web_ui_address: String,
) -> anyhow::Result<Command> {
log::debug!("Creating validator command from base directory: {:?}", self.base_dir);

// create directory for the validator process
let process_dir = self.base_dir.join("processes").join("TariValidatorNode");
fs::create_dir_all(&process_dir).await?;

log::debug!("Creating validator process to run from: {:?}", process_dir);

let json_rpc_address = json_rpc_public_address.clone();
let mut command = Command::new(target_binary);
let empty: Vec<(&str, &str)> = Vec::new();
command
.envs(empty)
.arg("-b")
.arg(process_dir)
.arg("--network")
.arg(network)
.arg(format!("--json-rpc-public-address={json_rpc_public_address}"))
.arg(format!(
"-pvalidator_node.base_node_grpc_address={base_node_grpc_address}"
))
.arg(format!("-pvalidator_node.json_rpc_listener_address={json_rpc_address}"))
.arg(format!("-pvalidator_node.http_ui_listener_address={web_ui_address}"))
.arg("-pvalidator_node.base_layer_scanning_interval=1");
Ok(command)
}
}

#[allow(dead_code)]
#[derive(Clone)]
struct Instance {
app: InstanceType,
config: ExecutableConfig,
listen_ip: Option<IpAddr>,
port: PortAllocator,
}

impl Instance {
pub fn new(app: InstanceType, config: ExecutableConfig) -> Self {
Self {
app,
config,
listen_ip: None,
port: PortAllocator::new(),
}
}

pub async fn bootstrap(&mut self) -> io::Result<()> {
if self.listen_ip.is_some() {
log::warn!("Instance {} already bootstrapped, ignore", self.app.to_string());
return Ok(());
}

let listen = IpAddr::V4(Ipv4Addr::from([127, 0, 0, 1]));
self.listen_ip = Some(listen);

match self.app {
TARI_L2_VN => {
self.port.open_at(TARI_L2_VN, "jrpc").await?;
self.port.open_at(TARI_L2_VN, "web").await?;
},
MINO_L1_WALLET => {
sdbondi marked this conversation as resolved.
Show resolved Hide resolved
self.port.open_at(MINO_L1_WALLET, "p2p").await?;
self.port.open_at(MINO_L1_WALLET, "grpc").await?;
},
}

Ok(())
}

pub fn validator_ready(&self) -> anyhow::Result<()> {
if self.listen_ip.is_none() {
bail!("Validator listener not initialized, this should not happen");
} else if self.port.validator.jrpc.is_none() {
bail!("Validator JSON-RPC address not initialized, this should not happen");
} else if self.port.validator.web.is_none() {
bail!("Validator Web UI address not initialized, this should not happen");
} else {
Ok(())
}
}

#[allow(dead_code)]
pub fn wallet_ready(&self) -> anyhow::Result<()> {
if self.listen_ip.is_none() {
bail!("Wallet listener not initialized, this should not happen");
} else if self.port.wallet.p2p.is_none() {
bail!("Wallet P2P address not initialized, this should not happen");
} else if self.port.wallet.grpc.is_none() {
bail!("Wallet gRPC address not initialized, this should not happen");
} else {
Ok(())
}
}
}
Loading
Loading