Skip to content

Commit

Permalink
fix(watcher): general fixes while testing (#1131)
Browse files Browse the repository at this point in the history
Description
---
Some teething fixes for watcher
Conseistently use URL format for minotari node GRPC address


How Has This Been Tested?
---

Manually

What process can a PR reviewer use to test or verify this change?
---


Breaking Changes
---

- [x] None
- [ ] Requires data directory to be deleted
- [ ] Other - Please specify
  • Loading branch information
sdbondi authored Sep 4, 2024
1 parent 3c2c009 commit d6bae3b
Show file tree
Hide file tree
Showing 31 changed files with 289 additions and 342 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

########################################################################################################################
# #
# Validator Node Configuration Options (ValidatorNodeConfig) #
Expand All @@ -20,8 +19,8 @@
# automatically configured (default = )
#public_address =

# The Minotari base node's GRPC address. (default = "127.0.0.1:<port>" the <port> value is based on network)
#base_node_grpc_address = "127.0.0.1:18142"
# The Minotari base node's GRPC url. (default = "http://127.0.0.1:<port>" the <port> value is based on network)
#base_node_grpc_url = "http://127.0.0.1:18142"

# How often do we want to scan the base layer for changes. (default = 10)
#base_layer_scanning_interval = 10
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

########################################################################################################################
# #
# Indexer Configuration Options (IndexerConfig) #
Expand All @@ -20,8 +19,8 @@
# automatically configured (default = )
#public_address =

# The Minotari base node's GRPC address. (default = "127.0.0.1/<port>" the <port> value is based on network)
#base_node_grpc_address = "127.0.0.1/tcp/18142"
# The Minotari base node's GRPC URL. (default = "http://127.0.0.1/<port>" the <port> value is based on network)
#base_node_grpc_url = "http://127.0.0.1:18142"

# How often do we want to scan the base layer for changes. (default = 10)
#base_layer_scanning_interval = 10
Expand Down
2 changes: 1 addition & 1 deletion applications/tari_indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ tokio = { workspace = true, features = [
"rt-multi-thread",
] }
tower-http = { workspace = true, features = ["default", "cors"] }

url = { workspace = true, features = ["serde"] }

[package.metadata.cargo-machete]
ignored = [
Expand Down
11 changes: 6 additions & 5 deletions applications/tari_indexer/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,12 @@ pub async fn spawn_services(
ensure_directories_exist(config)?;

// GRPC client connection to base node
let base_node_client =
GrpcBaseNodeClient::new(config.indexer.base_node_grpc_address.clone().unwrap_or_else(|| {
let port = grpc_default_port(ApplicationType::BaseNode, config.network);
format!("127.0.0.1:{port}")
}));
let base_node_client = GrpcBaseNodeClient::new(config.indexer.base_node_grpc_url.clone().unwrap_or_else(|| {
let port = grpc_default_port(ApplicationType::BaseNode, config.network);
format!("http://127.0.0.1:{port}")
.parse()
.expect("Default base node GRPC URL is malformed")
}));

// Initialize networking
let identity = identity::Keypair::sr25519_from_bytes(keypair.secret_key().as_bytes().to_vec()).map_err(|e| {
Expand Down
7 changes: 4 additions & 3 deletions applications/tari_indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use tari_dan_app_utilities::{
p2p_config::{P2pConfig, PeerSeedsConfig},
template_manager::implementation::TemplateConfig,
};
use url::Url;

#[derive(Debug, Clone)]
pub struct ApplicationConfig {
Expand Down Expand Up @@ -70,8 +71,8 @@ pub struct IndexerConfig {
pub identity_file: PathBuf,
/// A path to the file that stores the tor hidden service private key, if using the tor transport
pub tor_identity_file: PathBuf,
/// The Tari base node's GRPC address
pub base_node_grpc_address: Option<String>,
/// The Tari base node's GRPC URL (e.g. http://localhost:18142)
pub base_node_grpc_url: Option<Url>,
/// How often do we want to scan the base layer for changes
#[serde(with = "serializers::seconds")]
pub base_layer_scanning_interval: Duration,
Expand Down Expand Up @@ -127,7 +128,7 @@ impl Default for IndexerConfig {
override_from: None,
identity_file: PathBuf::from("indexer_id.json"),
tor_identity_file: PathBuf::from("indexer_tor_id.json"),
base_node_grpc_address: None,
base_node_grpc_url: None,
base_layer_scanning_interval: Duration::from_secs(10),
data_dir: PathBuf::from("data/indexer"),
p2p: P2pConfig::default(),
Expand Down
13 changes: 8 additions & 5 deletions applications/tari_indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,13 @@ async fn handle_epoch_manager_event(services: &Services, event: EpochManagerEven
}

async fn create_base_layer_clients(config: &ApplicationConfig) -> Result<GrpcBaseNodeClient, ExitError> {
GrpcBaseNodeClient::connect(config.indexer.base_node_grpc_address.clone().unwrap_or_else(|| {
let url = config.indexer.base_node_grpc_url.clone().unwrap_or_else(|| {
let port = grpc_default_port(ApplicationType::BaseNode, config.network);
format!("127.0.0.1:{port}")
}))
.await
.map_err(|err| ExitError::new(ExitCode::ConfigError, format!("Could not connect to base node {}", err)))
format!("http://127.0.0.1:{port}")
.parse()
.expect("Default base node GRPC URL is malformed")
});
GrpcBaseNodeClient::connect(url)
.await
.map_err(|err| ExitError::new(ExitCode::ConfigError, format!("Could not connect to base node {}", err)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ impl ProcessDefinition for Indexer {
.next()
.ok_or_else(|| anyhow!("Base nodes should be started before validator nodes"))?;

let base_node_grpc_address = base_node
let base_node_grpc_url = base_node
.instance()
.allocated_ports()
.get("grpc")
.map(|port| format!("{listen_ip}:{port}"))
.map(|port| format!("http://{listen_ip}:{port}"))
.ok_or_else(|| anyhow!("grpc port not found for base node"))?;

command
Expand All @@ -48,7 +48,7 @@ impl ProcessDefinition for Indexer {
.arg(context.base_path())
.arg("--network")
.arg(context.network().to_string())
.arg(format!("-pindexer.base_node_grpc_address={base_node_grpc_address}"))
.arg(format!("-pindexer.base_node_grpc_url={base_node_grpc_url}"))
.arg(format!("-pindexer.json_rpc_address={json_rpc_address}"))
.arg(format!("-pindexer.http_ui_address={web_ui_address}"))
.arg(format!("-pindexer.ui_connect_address={json_rpc_public_address}"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ impl ProcessDefinition for ValidatorNode {
.next()
.ok_or_else(|| anyhow!("Base nodes should be started before validator nodes"))?;

let base_node_grpc_address = base_node
let base_node_grpc_url = base_node
.instance()
.allocated_ports()
.get("grpc")
.map(|port| format!("{listen_ip}:{port}"))
.map(|port| format!("http://{listen_ip}:{port}"))
.ok_or_else(|| anyhow!("grpc port not found for base node"))?;

debug!(
"Starting validator node #{} with base node grpc address: {}",
context.instance_id(),
base_node_grpc_address
base_node_grpc_url
);

command
Expand All @@ -56,9 +56,7 @@ impl ProcessDefinition for ValidatorNode {
.arg("--network")
.arg(context.network().to_string())
.arg(format!("--json-rpc-public-address={json_rpc_public_address}"))
.arg(format!(
"-pvalidator_node.base_node_grpc_address={base_node_grpc_address}"
))
.arg(format!("-pvalidator_node.base_node_grpc_url={base_node_grpc_url}"))
.arg(format!("-pvalidator_node.json_rpc_listener_address={json_rpc_address}"))
.arg(format!("-pvalidator_node.http_ui_listener_address={web_ui_address}"))
.arg("-pvalidator_node.base_layer_scanning_interval=1");
Expand Down
1 change: 1 addition & 0 deletions applications/tari_validator_node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ tokio = { workspace = true, features = [
"rt-multi-thread",
] }
tower-http = { workspace = true, features = ["default", "cors"] }
url = { workspace = true, features = ["serde"] }

[build-dependencies]
tari_common = { workspace = true, features = ["build"] }
Expand Down
5 changes: 3 additions & 2 deletions applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,8 @@ async fn create_registration_file(
let fee_claim_public_key = config.validator_node.fee_claim_public_key.clone();
epoch_manager
.set_fee_claim_public_key(fee_claim_public_key.clone())
.await?;
.await
.context("set_fee_claim_public_key failed when creating registration file")?;

let signature = ValidatorNodeSignature::sign(keypair.secret_key(), &fee_claim_public_key, b"");

Expand All @@ -382,7 +383,7 @@ async fn create_registration_file(
config.common.base_path.join("registration.json"),
serde_json::to_string(&registration)?,
)
.map_err(|e| ExitError::new(ExitCode::UnknownError, e))?;
.context("failed to write registration file")?;
Ok(())
}

Expand Down
6 changes: 6 additions & 0 deletions applications/tari_validator_node/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::net::SocketAddr;

use clap::Parser;
use minotari_app_utilities::common_cli_args::CommonCliArgs;
use reqwest::Url;
use tari_common::configuration::{ConfigOverrideProvider, Network};
use tari_dan_app_utilities::p2p_config::ReachabilityMode;

Expand All @@ -43,6 +44,8 @@ pub struct Cli {
pub http_ui_listener_address: Option<SocketAddr>,
#[clap(long, env = "TARI_VN_JSON_RPC_PUBLIC_ADDRESS")]
pub json_rpc_public_address: Option<String>,
#[clap(long, alias = "node-grpc", short = 'g', env = "TARI_VN_MINOTARI_NODE_GRPC_URL")]
pub minotari_node_grpc_url: Option<Url>,
#[clap(long, short = 's')]
pub peer_seeds: Vec<String>,
#[clap(long)]
Expand Down Expand Up @@ -102,6 +105,9 @@ impl ConfigOverrideProvider for Cli {
if self.disable_mdns {
overrides.push(("validator_node.p2p.enable_mdns".to_string(), "false".to_string()));
}
if let Some(url) = self.minotari_node_grpc_url.as_ref() {
overrides.push(("validator_node.base_node_grpc_url".to_string(), url.to_string()));
}
overrides
}
}
7 changes: 4 additions & 3 deletions applications/tari_validator_node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use tari_dan_app_utilities::{
p2p_config::{P2pConfig, PeerSeedsConfig, RpcConfig},
template_manager::implementation::TemplateConfig,
};
use url::Url;

#[derive(Debug, Clone)]
pub struct ApplicationConfig {
Expand Down Expand Up @@ -72,8 +73,8 @@ pub struct ValidatorNodeConfig {
pub identity_file: PathBuf,
//// The node's publicly-accessible hostname
// pub public_address: Option<Multiaddr>,
/// The Tari base node's GRPC address
pub base_node_grpc_address: Option<String>,
/// The Tari base node's GRPC URL
pub base_node_grpc_url: Option<Url>,
/// If set to false, there will be no base layer scanning at all
pub scan_base_layer: bool,
/// How often do we want to scan the base layer for changes
Expand Down Expand Up @@ -134,7 +135,7 @@ impl Default for ValidatorNodeConfig {
override_from: None,
shard_key_file: PathBuf::from("shard_key.json"),
identity_file: PathBuf::from("validator_node_id.json"),
base_node_grpc_address: None,
base_node_grpc_url: None,
scan_base_layer: true,
base_layer_scanning_interval: Duration::from_secs(10),
data_dir: PathBuf::from("data/validator_node"),
Expand Down
18 changes: 14 additions & 4 deletions applications/tari_validator_node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,24 @@ pub async fn run_validator_node(
}

async fn create_base_layer_client(config: &ApplicationConfig) -> Result<GrpcBaseNodeClient, ExitError> {
let base_node_address = config.validator_node.base_node_grpc_address.clone().unwrap_or_else(|| {
let base_node_address = config.validator_node.base_node_grpc_url.clone().unwrap_or_else(|| {
let port = grpc_default_port(ApplicationType::BaseNode, config.network);
format!("127.0.0.1:{port}")
format!("http://127.0.0.1:{port}")
.parse()
.expect("Default base node GRPC URL is malformed")
});
info!(target: LOG_TARGET, "Connecting to base node on GRPC at {}", base_node_address);
let base_node_client = GrpcBaseNodeClient::connect(base_node_address)
let base_node_client = GrpcBaseNodeClient::connect(base_node_address.clone())
.await
.map_err(|error| ExitError::new(ExitCode::ConfigError, error))?;
.map_err(|error| {
ExitError::new(
ExitCode::ConfigError,
format!(
"Could not connect to the Minotari node at address {base_node_address}: {error}. Please ensure \
that the Minotari node is running and configured for GRPC."
),
)
})?;

Ok(base_node_client)
}
Expand Down
12 changes: 9 additions & 3 deletions applications/tari_validator_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,15 @@ async fn main_inner() -> Result<(), ExitError> {
info!(target: LOG_TARGET, "Starting validator node on network {}", config.network);
match run_validator_node(&config, shutdown.to_signal()).await {
Ok(_) => info!(target: LOG_TARGET, "Validator node shutdown successfully"),
Err(e) => {
error!(target: LOG_TARGET, "Validator node shutdown with an error: {:?}", e);
return Err(ExitError::new(ExitCode::UnknownError, e));
Err(e) => match e.downcast() {
Ok(exit_error) => {
error!(target: LOG_TARGET, "Validator node shutdown with an error: {:?}", exit_error);
return Err(exit_error);
},
Err(e) => {
error!(target: LOG_TARGET, "Validator node shutdown with an error: {:?}", e);
return Err(ExitError::new(ExitCode::UnknownError, e));
},
},
}

Expand Down
1 change: 1 addition & 0 deletions applications/tari_watcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ tari_shutdown = { workspace = true }
clap = { workspace = true, features = ["derive"] }
serde = { workspace = true, features = ["derive"] }
anyhow = { workspace = true }
url = { workspace = true, features = ["serde"] }
tokio = { workspace = true, features = [
"rt-multi-thread",
"macros",
Expand Down
25 changes: 7 additions & 18 deletions applications/tari_watcher/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ use std::path::PathBuf;
use clap::Parser;

use crate::{
config::{Config, InstanceType},
config::Config,
constants::{
DEFAULT_MAIN_PROJECT_PATH,
DEFAULT_VALIDATOR_DIR,
DEFAULT_VALIDATOR_KEY_PATH,
DEFAULT_WATCHER_BASE_PATH,
DEFAULT_WATCHER_CONFIG_PATH,
},
};
Expand All @@ -35,7 +35,7 @@ impl Cli {

#[derive(Debug, Clone, clap::Args)]
pub struct CommonCli {
#[clap(short = 'b', long, parse(from_os_str), default_value = DEFAULT_MAIN_PROJECT_PATH)]
#[clap(short = 'b', long, parse(from_os_str), default_value = DEFAULT_WATCHER_BASE_PATH)]
pub base_dir: PathBuf,
#[clap(short = 'c', long, parse(from_os_str), default_value = DEFAULT_WATCHER_CONFIG_PATH)]
pub config_path: PathBuf,
Expand Down Expand Up @@ -71,27 +71,16 @@ impl InitArgs {

#[derive(Clone, Debug, clap::Args)]
pub struct Overrides {
/// The path to the validator node binary (optional)
#[clap(long)]
// The path to the validator node binary (optional)
pub vn_node_path: Option<PathBuf>,
}

impl Overrides {
pub fn apply(&self, config: &mut Config) {
if self.vn_node_path.is_none() {
return;
if let Some(path) = self.vn_node_path.clone() {
log::info!("Overriding validator node binary path to {:?}", path);
config.validator_node_executable_path = path;
}

if let Some(exec_config) = config
.executable_config
.iter_mut()
.find(|c| c.instance_type == InstanceType::TariValidatorNode)
{
exec_config.executable_path = self.vn_node_path.clone();
}
log::info!(
"Overriding validator node binary path to {:?}",
self.vn_node_path.as_ref().unwrap()
);
}
}
Loading

0 comments on commit d6bae3b

Please sign in to comment.