diff --git a/applications/tari_watcher/README.md b/applications/tari_watcher/README.md index 667f6a799..6e2d38e35 100644 --- a/applications/tari_watcher/README.md +++ b/applications/tari_watcher/README.md @@ -8,7 +8,7 @@ ### Quickstart -Initialize the project with `tari_watcher init` and start it with `tari_watcher run`. Edit the newly generated `config.toml` to enable notifications on channels such as Mattermost and Telegram.Make sure to have started up `tari_validator_node` once previously to have a node directory set up, default is `tari_validator_node -- -b data/vn1`. +Initialize the project with `tari_watcher init` and start it with `tari_watcher run`. Edit the newly generated `config.toml` to enable notifications on Mattermost and Telegram. Make sure to have started up `tari_validator_node` once previously to have a node directory set up, default is `tari_validator_node -- -b data/vn1`. ### Setup @@ -29,7 +29,7 @@ The default values used (see `constants.rs`) when running the project without an ``` ├── alerting.rs # channel notifier implementations ├── cli.rs # cli and flags passed during bootup -├── config.rs # main config file creation +├── config.rs # main config file creation ├── constants.rs # various constants used as default values ├── helpers.rs # common helper functions ├── logger.rs diff --git a/applications/tari_watcher/src/alerting.rs b/applications/tari_watcher/src/alerting.rs index 89c00f3f4..122d2837a 100644 --- a/applications/tari_watcher/src/alerting.rs +++ b/applications/tari_watcher/src/alerting.rs @@ -6,8 +6,6 @@ use reqwest::StatusCode; use serde_json::json; pub trait Alerting { - fn new(url: String, channel_id: String, credentials: String) -> Self; - // Sends an alert message to the service async fn alert(&mut self, message: &str) -> Result<()>; @@ -21,31 +19,31 @@ pub trait Alerting { pub struct MatterMostNotifier { // Mattermost server URL - server_url: String, + pub server_url: String, // Mattermost channel ID used for alerts - channel_id: String, + pub channel_id: String, // User token (retrieved after login) - credentials: String, + pub credentials: String, // Alerts sent since last reset - alerts_sent: u64, + pub alerts_sent: u64, // HTTP client - client: reqwest::Client, + pub client: reqwest::Client, } impl Alerting for MatterMostNotifier { - fn new(server_url: String, channel_id: String, credentials: String) -> Self { - Self { - server_url, - channel_id, - credentials, - alerts_sent: 0, - client: reqwest::Client::new(), + async fn alert(&mut self, message: &str) -> Result<()> { + if self.server_url.is_empty() { + bail!("Server URL field is empty"); + } + if self.credentials.is_empty() { + bail!("Credentials field is empty"); + } + if self.channel_id.is_empty() { + bail!("Channel ID is empty"); } - } - async fn alert(&mut self, message: &str) -> Result<()> { - const LOGIN_ENDPOINT: &str = "/api/v4/posts"; - let url = format!("{}{}", self.server_url, LOGIN_ENDPOINT); + const POST_ENDPOINT: &str = "/api/v4/posts"; + let url = format!("{}{}", self.server_url, POST_ENDPOINT); let req = json!({ "channel_id": self.channel_id, "message": message, @@ -73,7 +71,7 @@ impl Alerting for MatterMostNotifier { bail!("Server URL is empty"); } if self.credentials.is_empty() { - bail!("Credentials are empty"); + bail!("Credentials field is empty"); } let url = format!("{}{}", self.server_url, PING_ENDPOINT); @@ -95,3 +93,61 @@ impl Alerting for MatterMostNotifier { Ok(self.alerts_sent) } } + +pub struct TelegramNotifier { + // Telegram bot token + pub bot_token: String, + // Telegram chat ID (either in @channel or number id format) + pub chat_id: String, + // Alerts sent since last reset + pub alerts_sent: u64, + // HTTP client + pub client: reqwest::Client, +} + +impl Alerting for TelegramNotifier { + async fn alert(&mut self, message: &str) -> Result<()> { + if self.bot_token.is_empty() { + bail!("Bot token is empty"); + } + if self.chat_id.is_empty() { + bail!("Chat ID is empty"); + } + + let post_endpoint: &str = &format!("/bot{}/sendMessage", self.bot_token); + let url = format!("https://api.telegram.org{}", post_endpoint); + let req = json!({ + "chat_id": self.chat_id, + "text": message, + }); + let resp = self.client.post(&url).json(&req).send().await?; + + if resp.status() != StatusCode::OK { + bail!("Failed to send alert, got response: {}", resp.status()); + } + + self.alerts_sent += 1; + + Ok(()) + } + + async fn ping(&self) -> Result<()> { + if self.bot_token.is_empty() { + bail!("Bot token is empty"); + } + + let ping_endpoint: &str = &format!("/bot{}/getMe", self.bot_token); + let url = format!("https://api.telegram.org{}", ping_endpoint); + let resp = self.client.get(url.clone()).send().await?; + + if resp.status() != StatusCode::OK { + bail!("Failed to ping, got response: {}", resp.status()); + } + + Ok(()) + } + + fn stats(&self) -> Result { + Ok(self.alerts_sent) + } +} diff --git a/applications/tari_watcher/src/cli.rs b/applications/tari_watcher/src/cli.rs index 0534e08cd..a08baf217 100644 --- a/applications/tari_watcher/src/cli.rs +++ b/applications/tari_watcher/src/cli.rs @@ -56,11 +56,16 @@ pub struct InitArgs { #[clap(long)] /// Disable initial and auto registration of the validator node pub no_auto_register: bool, + + #[clap(long)] + /// Disable auto restart of the validator node + pub no_auto_restart: bool, } impl InitArgs { pub fn apply(&self, config: &mut Config) { config.auto_register = !self.no_auto_register; + config.auto_restart = !self.no_auto_restart; } } diff --git a/applications/tari_watcher/src/config.rs b/applications/tari_watcher/src/config.rs index 71bdc522f..d6d10bd23 100644 --- a/applications/tari_watcher/src/config.rs +++ b/applications/tari_watcher/src/config.rs @@ -25,6 +25,9 @@ pub struct Config { /// the current registration expires pub auto_register: bool, + /// Allow watcher to restart the validator node if it crashes and stops running + pub auto_restart: bool, + /// The Minotari node gRPC address pub base_node_grpc_address: String, @@ -158,6 +161,7 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result { Ok(Config { auto_register: true, + auto_restart: true, base_node_grpc_address: DEFAULT_BASE_NODE_GRPC_ADDRESS.to_string(), base_wallet_grpc_address: DEFAULT_BASE_WALLET_GRPC_ADDRESS.to_string(), base_dir: base_dir.to_path_buf(), @@ -177,7 +181,7 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result { telegram: ChannelConfig { name: "telegram".to_string(), enabled: false, - server_url: "".to_string(), + server_url: "https://api.telegram.org".to_string(), channel_id: "".to_string(), credentials: "".to_string(), }, diff --git a/applications/tari_watcher/src/main.rs b/applications/tari_watcher/src/main.rs index ff92e9c2a..296c4acd8 100644 --- a/applications/tari_watcher/src/main.rs +++ b/applications/tari_watcher/src/main.rs @@ -74,7 +74,7 @@ async fn main() -> anyhow::Result<()> { async fn start(config: Config) -> anyhow::Result<()> { let shutdown = Shutdown::new(); let signal = shutdown.to_signal().select(exit_signal()?); - let (task_handle, manager_handle) = spawn(config.clone(), shutdown.to_signal()).await; + let (task_handle, manager_handle) = spawn(config.clone(), shutdown.to_signal(), shutdown).await; tokio::select! { _ = signal => { @@ -92,8 +92,12 @@ async fn start(config: Config) -> anyhow::Result<()> { Ok(()) } -async fn spawn(config: Config, shutdown: ShutdownSignal) -> (task::JoinHandle>, ManagerHandle) { - let (manager, manager_handle) = ProcessManager::new(config, shutdown); +async fn spawn( + config: Config, + shutdown: ShutdownSignal, + trigger: Shutdown, +) -> (task::JoinHandle>, ManagerHandle) { + let (manager, manager_handle) = ProcessManager::new(config, shutdown, trigger); let task_handle = tokio::spawn(manager.start()); (task_handle, manager_handle) } diff --git a/applications/tari_watcher/src/manager.rs b/applications/tari_watcher/src/manager.rs index 96dd4fb6c..a3070c01e 100644 --- a/applications/tari_watcher/src/manager.rs +++ b/applications/tari_watcher/src/manager.rs @@ -10,7 +10,7 @@ use minotari_app_grpc::tari_rpc::{ GetActiveValidatorNodesResponse, RegisterValidatorNodeResponse, }; -use tari_shutdown::ShutdownSignal; +use tari_shutdown::{Shutdown, ShutdownSignal}; use tokio::sync::{mpsc, oneshot}; use crate::{ @@ -18,7 +18,7 @@ use crate::{ constants::DEFAULT_VALIDATOR_NODE_BINARY_PATH, minotari::{Minotari, TipStatus}, monitoring::{process_status_alert, process_status_log, ProcessStatus, Transaction}, - process::Process, + process::start_validator, }; pub struct ProcessManager { @@ -26,23 +26,24 @@ pub struct ProcessManager { pub validator_base_dir: PathBuf, pub validator_config: ExecutableConfig, pub wallet_config: ExecutableConfig, - pub process: Process, - pub shutdown_signal: ShutdownSignal, + pub shutdown_signal: ShutdownSignal, // listen for keyboard exit signal + pub trigger_signal: Shutdown, // triggered when validator auto-restart is disabled pub rx_request: mpsc::Receiver, pub chain: Minotari, pub alerting_config: Channels, + pub auto_restart: bool, } impl ProcessManager { - pub fn new(config: Config, shutdown_signal: ShutdownSignal) -> (Self, ManagerHandle) { + pub fn new(config: Config, shutdown_signal: ShutdownSignal, trigger_signal: Shutdown) -> (Self, ManagerHandle) { let (tx_request, rx_request) = mpsc::channel(1); let this = Self { base_dir: config.base_dir.clone(), validator_base_dir: config.vn_base_dir, validator_config: config.executable_config[0].clone(), wallet_config: config.executable_config[1].clone(), - process: Process::new(), shutdown_signal, + trigger_signal, rx_request, chain: Minotari::new( config.base_node_grpc_address, @@ -50,6 +51,7 @@ impl ProcessManager { config.vn_registration_file, ), alerting_config: config.channel_config, + auto_restart: config.auto_restart, }; (this, ManagerHandle::new(tx_request)) } @@ -68,10 +70,15 @@ impl ProcessManager { let vn_base_dir = self.base_dir.join(self.validator_base_dir); // get child channel to communicate with the validator node process - let cc = self - .process - .start_validator(vn_binary_path, vn_base_dir, self.base_dir, self.alerting_config) - .await; + let cc = start_validator( + vn_binary_path, + vn_base_dir, + self.base_dir, + self.alerting_config, + self.auto_restart, + self.trigger_signal.clone(), + ) + .await; if cc.is_none() { todo!("Create new validator node process event listener for fetched existing PID from OS"); } @@ -80,9 +87,11 @@ impl ProcessManager { // spawn logging and alerting tasks to process status updates tokio::spawn(async move { process_status_log(cc.rx_log).await; + warn!("Logging task has exited"); }); tokio::spawn(async move { process_status_alert(cc.rx_alert, cc.cfg_alert).await; + warn!("Alerting task has exited"); }); self.chain.bootstrap().await?; diff --git a/applications/tari_watcher/src/monitoring.rs b/applications/tari_watcher/src/monitoring.rs index 6cd12ad1c..33b6562a3 100644 --- a/applications/tari_watcher/src/monitoring.rs +++ b/applications/tari_watcher/src/monitoring.rs @@ -3,10 +3,14 @@ use log::*; use minotari_app_grpc::tari_rpc::RegisterValidatorNodeResponse; -use tokio::{process::Child, sync::mpsc, time::sleep}; +use tokio::{ + process::Child, + sync::mpsc, + time::{sleep, Duration}, +}; use crate::{ - alerting::{Alerting, MatterMostNotifier}, + alerting::{Alerting, MatterMostNotifier, TelegramNotifier}, config::Channels, constants::{ CONSENSUS_CONSTANT_REGISTRATION_DURATION, @@ -15,7 +19,7 @@ use crate::{ }, }; -#[derive(Debug)] +#[derive(Copy, Clone, Debug)] pub struct Transaction { id: u64, block: u64, @@ -30,7 +34,7 @@ impl Transaction { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum ProcessStatus { Running, Exited(i32), // status code @@ -44,6 +48,7 @@ pub async fn monitor_child( mut child: Child, tx_logging: mpsc::Sender, tx_alerting: mpsc::Sender, + tx_restart: mpsc::Sender<()>, ) { loop { sleep(DEFAULT_PROCESS_MONITORING_INTERVAL).await; @@ -60,6 +65,7 @@ pub async fn monitor_child( .send(ProcessStatus::InternalError(err_msg)) .await .expect("Failed to send internal error status to alerting"); + tx_restart.send(()).await.expect("Failed to send restart node signal"); break; } // process has finished, intentional or not, if it has some status @@ -73,6 +79,7 @@ pub async fn monitor_child( .send(ProcessStatus::Crashed) .await .expect("Failed to send status to alerting"); + tx_restart.send(()).await.expect("Failed to send restart node signal"); break; } tx_logging @@ -83,6 +90,7 @@ pub async fn monitor_child( .send(ProcessStatus::Exited(status.code().unwrap_or(0))) .await .expect("Failed to send process exit status to alerting"); + tx_restart.send(()).await.expect("Failed to send restart node signal"); break; } // process is still running @@ -104,106 +112,175 @@ pub fn is_registration_near_expiration(curr_block: u64, last_registered_block: u } pub async fn process_status_log(mut rx: mpsc::Receiver) { - while let Some(status) = rx.recv().await { - match status { - ProcessStatus::Exited(code) => { - error!("Validator node process exited with code {}", code); - break; - }, - ProcessStatus::InternalError(err) => { - error!("Validator node process exited with error: {}", err); - break; - }, - ProcessStatus::Crashed => { - error!("Validator node process crashed"); - break; - }, - ProcessStatus::Running => { - // all good, process is still running - }, - ProcessStatus::Submitted(tx) => { - info!( - "Validator node registration submitted (tx: {}, block: {})", - tx.id, tx.block - ); - }, - ProcessStatus::WarnExpiration(block, last_reg_block) => { - if is_registration_near_expiration(block, last_reg_block) { - let expiration_block = last_reg_block + CONSENSUS_CONSTANT_REGISTRATION_DURATION; - warn!( - "Validator node registration expires at block {}, current block: {}", - expiration_block, block + loop { + if let Some(status) = rx.recv().await { + match status { + ProcessStatus::Exited(code) => { + error!("Validator node process exited with code {}", code); + info!("Pauses process logging for 5 seconds to allow the validator node to restart"); + sleep(Duration::from_secs(5)).await; + }, + ProcessStatus::InternalError(err) => { + error!("Validator node process exited with error: {}", err); + info!("Pausing process logging 5 seconds to allow the validator node to restart"); + sleep(Duration::from_secs(5)).await; + }, + ProcessStatus::Crashed => { + error!("Validator node process crashed"); + info!("Pausing process logging for 5 seconds to allow the validator node to restart"); + sleep(Duration::from_secs(5)).await; + }, + ProcessStatus::Running => { + // all good, process is still running + }, + ProcessStatus::Submitted(tx) => { + info!( + "Validator node registration submitted (tx: {}, block: {})", + tx.id, tx.block ); - } - }, + }, + ProcessStatus::WarnExpiration(block, last_reg_block) => { + if is_registration_near_expiration(block, last_reg_block) { + let expiration_block = last_reg_block + CONSENSUS_CONSTANT_REGISTRATION_DURATION; + warn!( + "Validator node registration expires at block {}, current block: {}", + expiration_block, block + ); + } + }, + } } } } -pub async fn process_status_alert(mut rx: mpsc::Receiver, cfg: Channels) { +fn setup_alerting_clients(cfg: Channels) -> (Option, Option) { let mut mattermost: Option = None; if cfg.mattermost.enabled { let cfg = cfg.mattermost.clone(); info!("MatterMost alerting enabled"); - mattermost = Some(MatterMostNotifier::new(cfg.server_url, cfg.channel_id, cfg.credentials)); + mattermost = Some(MatterMostNotifier { + server_url: cfg.server_url, + channel_id: cfg.channel_id, + credentials: cfg.credentials, + alerts_sent: 0, + client: reqwest::Client::new(), + }); } else { info!("MatterMost alerting disabled"); } - while let Some(status) = rx.recv().await { - match status { - ProcessStatus::Exited(code) => { - if let Some(mm) = &mut mattermost { - mm.alert(&format!("Validator node process exited with code {}", code)) - .await - .expect("Failed to send alert to MatterMost"); - } - }, - ProcessStatus::InternalError(err) => { - if let Some(mm) = &mut mattermost { - mm.alert(&format!("Validator node process internal error: {}", err)) - .await - .expect("Failed to send alert to MatterMost"); - } - }, - ProcessStatus::Crashed => { - if let Some(mm) = &mut mattermost { - mm.alert("Validator node process crashed") - .await - .expect("Failed to send alert to MatterMost"); - } - }, - ProcessStatus::Running => { - // all good, process is still running, send heartbeat to channel(s) - if let Some(mm) = &mut mattermost { - if mm.ping().await.is_err() { - warn!("Failed to send heartbeat to MatterMost"); + let mut telegram: Option = None; + if cfg.telegram.enabled { + let cfg = cfg.telegram.clone(); + info!("Telegram alerting enabled"); + telegram = Some(TelegramNotifier { + bot_token: cfg.credentials, + chat_id: cfg.channel_id, + alerts_sent: 0, + client: reqwest::Client::new(), + }); + } else { + info!("Telegram alerting disabled"); + } + + (mattermost, telegram) +} + +pub async fn process_status_alert(mut rx: mpsc::Receiver, cfg: Channels) { + let (mut mattermost, mut telegram) = setup_alerting_clients(cfg); + + loop { + while let Some(status) = rx.recv().await { + match status { + ProcessStatus::Exited(code) => { + if let Some(mm) = &mut mattermost { + mm.alert(&format!("Validator node process exited with code {}", code)) + .await + .expect("Failed to send alert to MatterMost"); } - } - }, - ProcessStatus::Submitted(tx) => { - if let Some(mm) = &mut mattermost { - mm.alert(&format!( - "Validator node registration submitted (tx: {}, block: {})", - tx.id, tx.block - )) - .await - .expect("Failed to send alert to MatterMost"); - } - }, - ProcessStatus::WarnExpiration(block, last_reg_block) => { - if is_registration_near_expiration(block, last_reg_block) { + if let Some(tg) = &mut telegram { + tg.alert(&format!("Validator node process exited with code {}", code)) + .await + .expect("Failed to send alert to Telegram"); + } + }, + ProcessStatus::InternalError(err) => { + if let Some(mm) = &mut mattermost { + mm.alert(&format!("Validator node process internal error: {}", err)) + .await + .expect("Failed to send alert to MatterMost"); + } + if let Some(tg) = &mut telegram { + tg.alert(&format!("Validator node process internal error: {}", err)) + .await + .expect("Failed to send alert to Telegram"); + } + }, + ProcessStatus::Crashed => { + if let Some(mm) = &mut mattermost { + mm.alert("Validator node process crashed") + .await + .expect("Failed to send alert to MatterMost"); + } + if let Some(tg) = &mut telegram { + tg.alert("Validator node process crashed") + .await + .expect("Failed to send alert to Telegram"); + } + }, + ProcessStatus::Running => { + // all good, process is still running, send heartbeat to channel(s) + if let Some(mm) = &mut mattermost { + if mm.ping().await.is_err() { + warn!("Failed to send heartbeat to MatterMost"); + } + } + if let Some(tg) = &mut telegram { + if tg.ping().await.is_err() { + warn!("Failed to send heartbeat to Telegram"); + } + } + }, + ProcessStatus::Submitted(tx) => { if let Some(mm) = &mut mattermost { - let expiration_block = last_reg_block + CONSENSUS_CONSTANT_REGISTRATION_DURATION; mm.alert(&format!( - "Validator node registration expires at block {}, current block: {}", - expiration_block, block, + "Validator node registration submitted (tx: {}, block: {})", + tx.id, tx.block )) .await .expect("Failed to send alert to MatterMost"); } - } - }, + if let Some(tg) = &mut telegram { + tg.alert(&format!( + "Validator node registration submitted (tx: {}, block: {})", + tx.id, tx.block + )) + .await + .expect("Failed to send alert to Telegram"); + } + }, + ProcessStatus::WarnExpiration(block, last_reg_block) => { + if is_registration_near_expiration(block, last_reg_block) { + let expiration_block = last_reg_block + CONSENSUS_CONSTANT_REGISTRATION_DURATION; + if let Some(mm) = &mut mattermost { + mm.alert(&format!( + "Validator node registration expires at block {}, current block: {}", + expiration_block, block, + )) + .await + .expect("Failed to send alert to MatterMost"); + } + if let Some(tg) = &mut telegram { + tg.alert(&format!( + "Validator node registration expires at block {}, current block: {}", + expiration_block, block, + )) + .await + .expect("Failed to send alert to Telegram"); + } + } + }, + } } } } diff --git a/applications/tari_watcher/src/process.rs b/applications/tari_watcher/src/process.rs index 2e7695aa3..704e86aa4 100644 --- a/applications/tari_watcher/src/process.rs +++ b/applications/tari_watcher/src/process.rs @@ -5,10 +5,11 @@ use std::{path::PathBuf, process::Stdio}; use anyhow::bail; use log::*; +use tari_shutdown::Shutdown; use tokio::{ fs::{self, OpenOptions}, io::AsyncWriteExt, - process::Command as TokioCommand, + process::{Child, Command as TokioCommand}, sync::mpsc::{self}, }; @@ -58,7 +59,6 @@ async fn create_pid_file(path: PathBuf) -> anyhow::Result<()> { } pub struct ChildChannel { - pub pid: u32, pub rx_log: mpsc::Receiver, pub tx_log: mpsc::Sender, pub rx_alert: mpsc::Receiver, @@ -66,17 +66,15 @@ pub struct ChildChannel { pub cfg_alert: Channels, } -pub async fn spawn_validator_node_os( +async fn spawn_child( validator_node_path: PathBuf, validator_config_path: PathBuf, base_dir: PathBuf, - cfg_alert: Channels, -) -> anyhow::Result { +) -> anyhow::Result { let node_binary_path = base_dir.join(validator_node_path); - let mut vn_cfg_path = base_dir.join(validator_config_path); - let vn_cfg_str = vn_cfg_path.as_mut_os_str().to_str(); + let vn_cfg_path = base_dir.join(validator_config_path); debug!("Using VN binary at: {}", node_binary_path.display()); - debug!("Using VN config in directory: {}", vn_cfg_str.unwrap_or_default()); + debug!("Using VN config in directory: {}", vn_cfg_path.display()); let child = TokioCommand::new(node_binary_path.clone().into_os_string()) .arg("-b") @@ -90,16 +88,11 @@ pub async fn spawn_validator_node_os( let pid = child.id().expect("Failed to get PID for child process"); info!("Spawned validator child process with id {}", pid); - if let Err(e) = create_pid_file(PathBuf::from(DEFAULT_VALIDATOR_PID_PATH)).await { + let path = base_dir.join(DEFAULT_VALIDATOR_PID_PATH); + if let Err(e) = create_pid_file(path.clone()).await { log::error!("Failed to create PID file when spawning node: {}", e); } - let path = base_dir.join(DEFAULT_VALIDATOR_PID_PATH); - debug!( - "Spawning validator node with process persisted at file: {}", - path.display() - ); - create_pid_file(path.clone()).await?; let mut file = OpenOptions::new() @@ -110,12 +103,68 @@ pub async fn spawn_validator_node_os( .await?; file.write_all(pid.to_string().as_bytes()).await?; + Ok(child) +} + +pub async fn spawn_validator_node_os( + validator_node_path: PathBuf, + validator_config_path: PathBuf, + base_dir: PathBuf, + cfg_alert: Channels, + auto_restart: bool, + mut trigger_signal: Shutdown, +) -> anyhow::Result { let (tx_log, rx_log) = mpsc::channel(16); let (tx_alert, rx_alert) = mpsc::channel(16); - tokio::spawn(monitor_child(child, tx_log.clone(), tx_alert.clone())); + let (tx_restart, mut rx_restart) = mpsc::channel(1); + + let tx_log_clone_main = tx_log.clone(); + let tx_alert_clone_main = tx_alert.clone(); + let tx_restart_clone_main = tx_restart.clone(); + tokio::spawn(async move { + loop { + let child_res = spawn_child( + validator_node_path.clone(), + validator_config_path.clone(), + base_dir.clone(), + ) + .await; + + match child_res { + Ok(child) => { + let tx_log_monitor = tx_log_clone_main.clone(); + let tx_alert_monitor = tx_alert_clone_main.clone(); + let tx_restart_monitor = tx_restart_clone_main.clone(); + // spawn monitoring and handle logs and alerts + tokio::spawn(async move { + monitor_child(child, tx_log_monitor, tx_alert_monitor, tx_restart_monitor).await; + }); + }, + Err(e) => { + error!("Failed to spawn child process: {:?}", e); + }, + } + + // block channel until we receive a restart signal + match rx_restart.recv().await { + Some(_) => { + if !auto_restart { + info!("Received restart signal, but auto restart is disabled, exiting"); + trigger_signal.trigger(); + break; + } + + info!("Received signal, preparing to restart validator node"); + }, + None => { + error!("Failed to receive restart signal, exiting"); + break; + }, + } + } + }); Ok(ChildChannel { - pid, rx_log, tx_log, tx_alert, @@ -148,39 +197,33 @@ async fn check_existing_node_os(base_dir: PathBuf) -> Option { None } -pub struct Process { - // Child process ID of the forked validator instance. - pid: Option, -} - -impl Process { - pub fn new() -> Self { - Self { pid: None } +pub async fn start_validator( + validator_path: PathBuf, + validator_config_path: PathBuf, + base_dir: PathBuf, + alerting_config: Channels, + auto_restart: bool, + trigger_signal: Shutdown, +) -> Option { + let opt = check_existing_node_os(base_dir.clone()).await; + if let Some(pid) = opt { + info!("Picking up existing validator node process with id: {}", pid); + // todo: create new process status channel for picked up process + return None; + } else { + debug!("No existing validator node process found, spawn new one"); } - pub async fn start_validator( - &mut self, - validator_path: PathBuf, - validator_config_path: PathBuf, - base_dir: PathBuf, - alerting_config: Channels, - ) -> Option { - let opt = check_existing_node_os(base_dir.clone()).await; - if let Some(pid) = opt { - info!("Picking up existing validator node process with id: {}", pid); - - self.pid = Some(pid); - // todo: create new process status channel for picked up process - return None; - } else { - debug!("No existing validator node process found, spawn new one"); - } - - let cc = spawn_validator_node_os(validator_path, validator_config_path, base_dir, alerting_config) - .await - .ok()?; - self.pid = Some(cc.pid); - - Some(cc) - } + let cc = spawn_validator_node_os( + validator_path, + validator_config_path, + base_dir, + alerting_config, + auto_restart, + trigger_signal, + ) + .await + .ok()?; + + Some(cc) }