Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(watcher): telegram alerting and automatically persist val node #1127

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions applications/tari_watcher/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

### Quickstart

Initialize the project with `tari_watcher init` and start it with `tari_watcher run`. Edit the newly generated `config.toml` to enable notifications on channels such as Mattermost and Telegram.Make sure to have started up `tari_validator_node` once previously to have a node directory set up, default is `tari_validator_node -- -b data/vn1`.
Initialize the project with `tari_watcher init` and start it with `tari_watcher run`. Edit the newly generated `config.toml` to enable notifications on Mattermost and Telegram. Make sure to have started up `tari_validator_node` once previously to have a node directory set up, default is `tari_validator_node -- -b data/vn1`.

### Setup

Expand All @@ -29,7 +29,7 @@ The default values used (see `constants.rs`) when running the project without an
```
├── alerting.rs # channel notifier implementations
├── cli.rs # cli and flags passed during bootup
├── config.rs # main config file creation
├── config.rs # main config file creation
├── constants.rs # various constants used as default values
├── helpers.rs # common helper functions
├── logger.rs
Expand Down
94 changes: 75 additions & 19 deletions applications/tari_watcher/src/alerting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ use reqwest::StatusCode;
use serde_json::json;

pub trait Alerting {
fn new(url: String, channel_id: String, credentials: String) -> Self;

// Sends an alert message to the service
async fn alert(&mut self, message: &str) -> Result<()>;

Expand All @@ -21,31 +19,31 @@ pub trait Alerting {

pub struct MatterMostNotifier {
// Mattermost server URL
server_url: String,
pub server_url: String,
// Mattermost channel ID used for alerts
channel_id: String,
pub channel_id: String,
// User token (retrieved after login)
credentials: String,
pub credentials: String,
// Alerts sent since last reset
alerts_sent: u64,
pub alerts_sent: u64,
// HTTP client
client: reqwest::Client,
pub client: reqwest::Client,
}

impl Alerting for MatterMostNotifier {
fn new(server_url: String, channel_id: String, credentials: String) -> Self {
Self {
server_url,
channel_id,
credentials,
alerts_sent: 0,
client: reqwest::Client::new(),
async fn alert(&mut self, message: &str) -> Result<()> {
if self.server_url.is_empty() {
bail!("Server URL field is empty");
}
if self.credentials.is_empty() {
bail!("Credentials field is empty");
}
if self.channel_id.is_empty() {
bail!("Channel ID is empty");
}
}

async fn alert(&mut self, message: &str) -> Result<()> {
const LOGIN_ENDPOINT: &str = "/api/v4/posts";
let url = format!("{}{}", self.server_url, LOGIN_ENDPOINT);
const POST_ENDPOINT: &str = "/api/v4/posts";
let url = format!("{}{}", self.server_url, POST_ENDPOINT);
let req = json!({
"channel_id": self.channel_id,
"message": message,
Expand Down Expand Up @@ -73,7 +71,7 @@ impl Alerting for MatterMostNotifier {
bail!("Server URL is empty");
}
if self.credentials.is_empty() {
bail!("Credentials are empty");
bail!("Credentials field is empty");
}

let url = format!("{}{}", self.server_url, PING_ENDPOINT);
Expand All @@ -95,3 +93,61 @@ impl Alerting for MatterMostNotifier {
Ok(self.alerts_sent)
}
}

pub struct TelegramNotifier {
// Telegram bot token
pub bot_token: String,
// Telegram chat ID (either in @channel or number id format)
pub chat_id: String,
// Alerts sent since last reset
pub alerts_sent: u64,
// HTTP client
pub client: reqwest::Client,
}

impl Alerting for TelegramNotifier {
async fn alert(&mut self, message: &str) -> Result<()> {
if self.bot_token.is_empty() {
bail!("Bot token is empty");
}
if self.chat_id.is_empty() {
bail!("Chat ID is empty");
}

let post_endpoint: &str = &format!("/bot{}/sendMessage", self.bot_token);
let url = format!("https://api.telegram.org{}", post_endpoint);
let req = json!({
"chat_id": self.chat_id,
"text": message,
});
let resp = self.client.post(&url).json(&req).send().await?;

if resp.status() != StatusCode::OK {
bail!("Failed to send alert, got response: {}", resp.status());
}

self.alerts_sent += 1;

Ok(())
}

async fn ping(&self) -> Result<()> {
if self.bot_token.is_empty() {
bail!("Bot token is empty");
}

let ping_endpoint: &str = &format!("/bot{}/getMe", self.bot_token);
let url = format!("https://api.telegram.org{}", ping_endpoint);
let resp = self.client.get(url.clone()).send().await?;

if resp.status() != StatusCode::OK {
bail!("Failed to ping, got response: {}", resp.status());
}

Ok(())
}

fn stats(&self) -> Result<u64> {
Ok(self.alerts_sent)
}
}
5 changes: 5 additions & 0 deletions applications/tari_watcher/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,16 @@ pub struct InitArgs {
#[clap(long)]
/// Disable initial and auto registration of the validator node
pub no_auto_register: bool,

#[clap(long)]
/// Disable auto restart of the validator node
pub no_auto_restart: bool,
}

impl InitArgs {
pub fn apply(&self, config: &mut Config) {
config.auto_register = !self.no_auto_register;
config.auto_restart = !self.no_auto_restart;
}
}

Expand Down
6 changes: 5 additions & 1 deletion applications/tari_watcher/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ pub struct Config {
/// the current registration expires
pub auto_register: bool,

/// Allow watcher to restart the validator node if it crashes and stops running
pub auto_restart: bool,

/// The Minotari node gRPC address
pub base_node_grpc_address: String,

Expand Down Expand Up @@ -158,6 +161,7 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result<Config> {

Ok(Config {
auto_register: true,
auto_restart: true,
base_node_grpc_address: DEFAULT_BASE_NODE_GRPC_ADDRESS.to_string(),
base_wallet_grpc_address: DEFAULT_BASE_WALLET_GRPC_ADDRESS.to_string(),
base_dir: base_dir.to_path_buf(),
Expand All @@ -177,7 +181,7 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result<Config> {
telegram: ChannelConfig {
name: "telegram".to_string(),
enabled: false,
server_url: "".to_string(),
server_url: "https://api.telegram.org".to_string(),
channel_id: "".to_string(),
credentials: "".to_string(),
},
Expand Down
10 changes: 7 additions & 3 deletions applications/tari_watcher/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ async fn main() -> anyhow::Result<()> {
async fn start(config: Config) -> anyhow::Result<()> {
let shutdown = Shutdown::new();
let signal = shutdown.to_signal().select(exit_signal()?);
let (task_handle, manager_handle) = spawn(config.clone(), shutdown.to_signal()).await;
let (task_handle, manager_handle) = spawn(config.clone(), shutdown.to_signal(), shutdown).await;

tokio::select! {
_ = signal => {
Expand All @@ -92,8 +92,12 @@ async fn start(config: Config) -> anyhow::Result<()> {
Ok(())
}

async fn spawn(config: Config, shutdown: ShutdownSignal) -> (task::JoinHandle<anyhow::Result<()>>, ManagerHandle) {
let (manager, manager_handle) = ProcessManager::new(config, shutdown);
async fn spawn(
config: Config,
shutdown: ShutdownSignal,
trigger: Shutdown,
) -> (task::JoinHandle<anyhow::Result<()>>, ManagerHandle) {
let (manager, manager_handle) = ProcessManager::new(config, shutdown, trigger);
let task_handle = tokio::spawn(manager.start());
(task_handle, manager_handle)
}
29 changes: 19 additions & 10 deletions applications/tari_watcher/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,46 +10,48 @@ use minotari_app_grpc::tari_rpc::{
GetActiveValidatorNodesResponse,
RegisterValidatorNodeResponse,
};
use tari_shutdown::ShutdownSignal;
use tari_shutdown::{Shutdown, ShutdownSignal};
use tokio::sync::{mpsc, oneshot};

use crate::{
config::{Channels, Config, ExecutableConfig},
constants::DEFAULT_VALIDATOR_NODE_BINARY_PATH,
minotari::{Minotari, TipStatus},
monitoring::{process_status_alert, process_status_log, ProcessStatus, Transaction},
process::Process,
process::start_validator,
};

pub struct ProcessManager {
pub base_dir: PathBuf,
pub validator_base_dir: PathBuf,
pub validator_config: ExecutableConfig,
pub wallet_config: ExecutableConfig,
pub process: Process,
pub shutdown_signal: ShutdownSignal,
pub shutdown_signal: ShutdownSignal, // listen for keyboard exit signal
pub trigger_signal: Shutdown, // triggered when validator auto-restart is disabled
pub rx_request: mpsc::Receiver<ManagerRequest>,
pub chain: Minotari,
pub alerting_config: Channels,
pub auto_restart: bool,
}

impl ProcessManager {
pub fn new(config: Config, shutdown_signal: ShutdownSignal) -> (Self, ManagerHandle) {
pub fn new(config: Config, shutdown_signal: ShutdownSignal, trigger_signal: Shutdown) -> (Self, ManagerHandle) {
let (tx_request, rx_request) = mpsc::channel(1);
let this = Self {
base_dir: config.base_dir.clone(),
validator_base_dir: config.vn_base_dir,
validator_config: config.executable_config[0].clone(),
wallet_config: config.executable_config[1].clone(),
process: Process::new(),
shutdown_signal,
trigger_signal,
rx_request,
chain: Minotari::new(
config.base_node_grpc_address,
config.base_wallet_grpc_address,
config.vn_registration_file,
),
alerting_config: config.channel_config,
auto_restart: config.auto_restart,
};
(this, ManagerHandle::new(tx_request))
}
Expand All @@ -68,10 +70,15 @@ impl ProcessManager {
let vn_base_dir = self.base_dir.join(self.validator_base_dir);

// get child channel to communicate with the validator node process
let cc = self
.process
.start_validator(vn_binary_path, vn_base_dir, self.base_dir, self.alerting_config)
.await;
let cc = start_validator(
vn_binary_path,
vn_base_dir,
self.base_dir,
self.alerting_config,
self.auto_restart,
self.trigger_signal.clone(),
)
.await;
if cc.is_none() {
todo!("Create new validator node process event listener for fetched existing PID from OS");
}
Expand All @@ -80,9 +87,11 @@ impl ProcessManager {
// spawn logging and alerting tasks to process status updates
tokio::spawn(async move {
process_status_log(cc.rx_log).await;
warn!("Logging task has exited");
});
tokio::spawn(async move {
process_status_alert(cc.rx_alert, cc.cfg_alert).await;
warn!("Alerting task has exited");
});

self.chain.bootstrap().await?;
Expand Down
Loading
Loading