Skip to content

Commit

Permalink
feat(watcher): add monitored events to be used by notifier (#1122)
Browse files Browse the repository at this point in the history
Description
---
Creates the following events, to be used by the notifier subsequently:
- `Exited`: manually cancelling of the child process, returning its
status code
- `Crashed`: the child process finished for some reason but was not
cancelled manually
- `InternalError`: the watcher received an error when attempting to
check the child process
- `Submitted`: the watcher has submitted a registration transaction for
the node (tracks `tx_id` and `block`)
- `WarnExpiration`: the watcher emits a warning if the current block is
near the expiration block of a registration
- `Running`: the child process is running as expected, do nothing

Picks up the events from the backend and logs them in `monitoring.rs`.

In addition, whenever it spawns a new child process it also saved its
process id to a file `validator.pid`. This is updated automatically and
can be read from when we want to check whether a node process already
exists. Currently, the lifetime of a child is dependent on the parent
process.

How Has This Been Tested?
---
Running the `tari_watcher` together with `tari_swarm_daemon`. It then
displayed (through logs) the node being registered and then warned when
it was near expiration (less than 100 blocks from the expiration block,
defined as `registered_at_block + 1000`).
  • Loading branch information
therealdannzor authored Aug 26, 2024
1 parent 6dc89c8 commit f5f3860
Show file tree
Hide file tree
Showing 13 changed files with 581 additions and 234 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 8 additions & 12 deletions applications/tari_watcher/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use std::path::PathBuf;

use clap::Parser;

use crate::config::{Config, InstanceType};
use crate::{
config::{Config, InstanceType},
constants::{DEFAULT_PROJECT_ROOT, DEFAULT_WATCHER_CONFIG_PATH},
};

#[derive(Clone, Debug, Parser)]
pub struct Cli {
Expand All @@ -21,22 +24,15 @@ impl Cli {
}

pub fn get_config_path(&self) -> PathBuf {
let Some(ref base_dir) = self.common.base_dir else {
return self.common.config_path.clone();
};
if self.common.config_path.is_relative() {
base_dir.join(&self.common.config_path)
} else {
self.common.config_path.clone()
}
self.common.config_path.clone()
}
}

#[derive(Debug, Clone, clap::Args)]
pub struct CommonCli {
#[clap(short = 'b', long, parse(from_os_str))]
pub base_dir: Option<PathBuf>,
#[clap(short = 'c', long, parse(from_os_str), default_value = "./data/watcher/config.toml")]
#[clap(short = 'b', long, parse(from_os_str), default_value = DEFAULT_PROJECT_ROOT)]
pub base_dir: PathBuf,
#[clap(short = 'c', long, parse(from_os_str), default_value = DEFAULT_WATCHER_CONFIG_PATH)]
pub config_path: PathBuf,
}

Expand Down
32 changes: 15 additions & 17 deletions applications/tari_watcher/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,15 @@ use std::{

use tokio::io::{self, AsyncWriteExt};

use crate::Cli;
use crate::{
constants::{
DEFAULT_BASE_NODE_GRPC_ADDRESS,
DEFAULT_BASE_WALLET_GRPC_ADDRESS,
DEFAULT_MINOTARI_MINER_BINARY_PATH,
DEFAULT_VALIDATOR_NODE_BINARY_PATH,
},
Cli,
};

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Config {
Expand Down Expand Up @@ -126,12 +134,12 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result<Config> {
let executables = vec![
ExecutableConfig {
instance_type: InstanceType::TariValidatorNode,
executable_path: Some("target/release/tari_validator_node".into()),
executable_path: Some(DEFAULT_VALIDATOR_NODE_BINARY_PATH.into()),
env: vec![],
},
ExecutableConfig {
instance_type: InstanceType::MinoTariConsoleWallet,
executable_path: Some("target/release/minotari_wallet".into()),
executable_path: Some(DEFAULT_MINOTARI_MINER_BINARY_PATH.into()),
env: vec![],
},
];
Expand All @@ -144,18 +152,7 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result<Config> {
.with_num_instances(1),
];

let base_dir = cli
.common
.base_dir
.clone()
.or_else(|| {
cli.get_config_path()
.canonicalize()
.ok()
.and_then(|p| p.parent().map(|p| p.to_path_buf()))
})
.unwrap_or_else(|| std::env::current_dir().unwrap());

let base_dir = cli.common.base_dir.clone();
let vn_registration_file = base_dir
.join("data")
.join("vn1")
Expand All @@ -164,8 +161,9 @@ pub fn get_base_config(cli: &Cli) -> anyhow::Result<Config> {

Ok(Config {
auto_register: true,
base_node_grpc_address: "".to_string(),
base_wallet_grpc_address: "".to_string(),
// must contain protocol and port
base_node_grpc_address: DEFAULT_BASE_NODE_GRPC_ADDRESS.to_string(),
base_wallet_grpc_address: DEFAULT_BASE_WALLET_GRPC_ADDRESS.to_string(),
base_dir: base_dir.clone(),
sidechain_id: None,
vn_registration_file,
Expand Down
17 changes: 17 additions & 0 deletions applications/tari_watcher/src/constants.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use tokio::time::Duration;

pub const CONSENSUS_CONSTANT_REGISTRATION_DURATION: u64 = 1000; // in blocks: 100 epochs * 10 blocks/epoch

pub const DEFAULT_PROJECT_ROOT: &str = env!("CARGO_MANIFEST_DIR");
pub const DEFAULT_WATCHER_CONFIG_PATH: &str = "data/watcher/config.toml";
pub const DEFAULT_VALIDATOR_PID_PATH: &str = "data/watcher/validator.pid";
pub const DEFAULT_VALIDATOR_NODE_BINARY_PATH: &str = "target/release/tari_validator_node";
pub const DEFAULT_MINOTARI_MINER_BINARY_PATH: &str = "target/release/minotari_miner";
pub const DEFAULT_BASE_NODE_GRPC_ADDRESS: &str = "http://127.0.0.1:12001"; // note: protocol
pub const DEFAULT_BASE_WALLET_GRPC_ADDRESS: &str = "http://127.0.0.1:12003"; // note: protocol

pub const DEFAULT_PROCESS_MONITORING_INTERVAL: Duration = Duration::from_secs(20); // time to sleep before checking VN process status
pub const DEFAULT_THRESHOLD_WARN_EXPIRATION: u64 = 100; // warn at this many blocks before the registration expires
70 changes: 0 additions & 70 deletions applications/tari_watcher/src/forker.rs

This file was deleted.

6 changes: 1 addition & 5 deletions applications/tari_watcher/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use std::path::PathBuf;

use minotari_app_grpc::tari_rpc::{GetActiveValidatorNodesResponse, TipInfoResponse};
use minotari_app_grpc::tari_rpc::GetActiveValidatorNodesResponse;
use tari_common_types::types::PublicKey;
use tari_core::transactions::transaction_components::ValidatorNodeSignature;
use tari_crypto::{ristretto::RistrettoPublicKey, tari_utilities::ByteArray};
Expand Down Expand Up @@ -48,10 +48,6 @@ pub fn to_vn_public_keys(vns: Vec<GetActiveValidatorNodesResponse>) -> Vec<Publi
.collect()
}

pub fn to_block_height(tip_info: TipInfoResponse) -> u64 {
tip_info.metadata.unwrap().best_block_height
}

pub fn contains_key(vns: Vec<RistrettoPublicKey>, needle: PublicKey) -> bool {
vns.iter().any(|vn| vn.eq(&needle))
}
76 changes: 76 additions & 0 deletions applications/tari_watcher/src/logger.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2024 The Tari Project
// SPDX-License-Identifier: BSD-3-Clause

use fern::FormatCallback;

pub fn init_logger() -> Result<(), log::SetLoggerError> {
fn should_skip(target: &str) -> bool {
const SKIP: [&str; 3] = ["hyper::", "h2::", "tower::"];
if SKIP.iter().any(|s| target.starts_with(s)) {
return true;
}

false
}

let colors = fern::colors::ColoredLevelConfig::new()
.info(fern::colors::Color::Green)
.debug(fern::colors::Color::Yellow)
.error(fern::colors::Color::Red);
fern::Dispatch::new()
.format(move |out, message, record| {
if should_skip(record.target()) {
return;
}

let fallback = |out: FormatCallback<'_>| out.finish(format_args!(
"[{} {} {}] {}",
humantime::format_rfc3339(std::time::SystemTime::now()),
record.metadata().target(),
colors.color(record.level()),
message
));

// Example: [Validator node-#1] 12:55 INFO Received vote for block #NodeHeight(88) d9abc7b1bb66fd912848f5bc4e5a69376571237e3243dc7f6a91db02bb5cf37c from a08cf5038e8e3cda8e3716c79f769cd42fad05f7110628efb5be6a40e28bc94c (4 of 3)
// Implement a naive parsing of the log message to extract the target, level and the log message from each running process
let message_str = message.to_string();
let Some((target, rest)) = message_str.split_once( ']') else {
fallback(out);
return;
};

let mut parts = rest.trim().splitn(3, ' ');

// Skip the time
if parts.next().is_none() {
fallback(out);
return;
}

let Some(level) = parts.next()
.and_then(|s| s.parse().ok())
.map(|l| colors.color(l)) else {
fallback(out);
return;
};

let Some(log) = parts.next() else {
fallback(out);
return;
};

out.finish(format_args!(
"{} {} {}] {} {}",
humantime::format_rfc3339(std::time::SystemTime::now()),
record.metadata().target(),
target,
level,
log
))
})
.filter(|record_metadata| record_metadata.target().starts_with("tari_watcher")) // skip tokio frame prints
.level(log::LevelFilter::Debug)
.chain(std::io::stdout())
// .chain(fern::log_file("output.log").unwrap())
.apply()
}
Loading

0 comments on commit f5f3860

Please sign in to comment.